You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/13 00:09:16 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #22057: Go SDK: Allow overriding the default iterable creator.

lostluck commented on code in PR #22057:
URL: https://github.com/apache/beam/pull/22057#discussion_r919519283


##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -236,24 +249,67 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
 							// We can't re-use the original bcr, since we may get new iterables,
 							// or multiple of them at the same time, but we can re-use the count itself.
 							r = &byteCountReader{reader: r, count: bcr.count}
-							return &elementStream{r: r, ec: cv}, nil
+							return &elementStream{r: r, ec: decoder}, nil
 						},
-					},
-				}, nil
-			default:
-				return nil, errors.Errorf("multi-chunk stream with invalid chunk size of %d", chunk)
+					})
+					return nil
+				default:
+					return errors.Errorf("multi-chunk stream with invalid chunk size of %d", chunkSize)
+				}
 			}
 		}
+		if err := createChunkReStreams(); err != nil {
+			return nil, nopCloser{}, err
+		}
+		closeChunkReStreamsEarly = false
+		return newConcatReStream(chunkReStreams...), chunkReStreamsCloser, nil
 	default:
-		return nil, errors.Errorf("received stream with marker size of %d", size)
+		return nil, nopCloser{}, errors.Errorf("received stream with marker size of %d", size)
 	}
 }
 
-func readStreamToBuffer(cv ElementDecoder, r io.ReadCloser, size int64, buf []FullValue) ([]FullValue, error) {
-	for i := int64(0); i < size; i++ {
-		value, err := cv.Decode(r)
+var readStreamToReStream ReStreamFactory = DefaultReadStreamToReStream
+
+// ReStreamFactory is a function that constructs a ReStream from an io.Reader
+// and a coder for type of elements that need to be decoded. A ReStreamFactory
+// is used by the SDK hardness to transform a byte stream into a stream of
+// FullValues while executing a DoFn that takes an iterator as once of its
+// arguments (GBK and CoGBK DoFns).
+//
+// The factory should return a ReStream that decodes numElements elements from
+// the encodedStream reader. After the DoFn that uses the stream has finished,
+// the second return value will be called to close the ReStream; this provides
+// the factory an opportunity to release any resources associated with the
+// returned ReStream.
+//
+// DefaultReadSTreamToReStream is the default ReStreamFactory that is used by
+// the exec package
+type ReStreamFactory func(ctx context.Context, encodedStream io.Reader, numElements int64, coder *coder.Coder) (ReStream, func() error, error)
+
+// SetReStreamFactory overrides the default behavior for constructing a ReStream
+// for DoFns that iterate over values (GBK and CoGBK).
+//
+// The default implementation of this function is DefaultReadStreamToBuffer.
+func SetReStreamFactory(fn ReStreamFactory) {

Review Comment:
   Per discussion, please label this as Experimental, and link to the issue we're attempting to solve with this.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -164,70 +164,83 @@ func (n *DataSource) Process(ctx context.Context) error {
 		pe.Pane = pn
 
 		var valReStreams []ReStream
-		for _, cv := range cvs {
-			values, err := n.makeReStream(ctx, pe, cv, &bcr)
+		reStreamCloser := &multiOnceCloser{}
+		defer reStreamCloser.Close()
+		for _, cod := range valueCoders {
+			values, closer, err := n.makeReStream(ctx, pe, cod, &dataReaderCounted)
 			if err != nil {
 				return err
 			}
 			valReStreams = append(valReStreams, values)
+			reStreamCloser.children = append(reStreamCloser.children, closer)
 		}
 
 		if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil {
 			return err
 		}
 		// Collect the actual size of the element, and reset the bytecounter reader.
-		n.PCol.addSize(int64(bcr.reset()))
-		bcr.reader = r
+		n.PCol.addSize(int64(dataReaderCounted.reset()))
+		dataReaderCounted.reader = dataReader
+
+		if err := reStreamCloser.Close(); err != nil {
+			return fmt.Errorf("error closing ReStream after processing element: %w", err)
+		}
 	}
 }
 
-func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, bcr *byteCountReader) (ReStream, error) {
+func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, elemCoder *coder.Coder, bcr *byteCountReader) (ReStream, io.Closer, error) {
 	// TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the element sizes?
 	size, err := coder.DecodeInt32(bcr.reader)
 	if err != nil {
-		return nil, errors.Wrap(err, "stream size decoding failed")
+		return nil, nopCloser{}, errors.Wrap(err, "stream size decoding failed")
 	}
 
 	switch {
 	case size >= 0:
 		// Single chunk streams are fully read in and buffered in memory.
-		buf := make([]FullValue, 0, size)
-		buf, err = readStreamToBuffer(cv, bcr, int64(size), buf)
-		if err != nil {
-			return nil, err
-		}
-		return &FixedReStream{Buf: buf}, nil
+		stream, cleanupFn, err := readStreamToReStream(ctx, bcr, int64(size), elemCoder)
+		return stream, closeFunc(cleanupFn), err
 	case size == -1:
+		decoder := MakeElementDecoder(elemCoder)
 		// Multi-chunked stream.
-		var buf []FullValue
-		for {
-			chunk, err := coder.DecodeVarInt(bcr.reader)
-			if err != nil {
-				return nil, errors.Wrap(err, "stream chunk size decoding failed")
+		var chunkReStreams []ReStream
+		chunkReStreamsCloser := &multiOnceCloser{}
+		closeChunkReStreamsEarly := true
+		defer func() {
+			if !closeChunkReStreamsEarly {
+				return
 			}
-			// All done, escape out.
-			switch {
-			case chunk == 0: // End of stream, return buffer.
-				return &FixedReStream{Buf: buf}, nil
-			case chunk > 0: // Non-zero chunk, read that many elements from the stream, and buffer them.
-				chunkBuf := make([]FullValue, 0, chunk)
-				chunkBuf, err = readStreamToBuffer(cv, bcr, chunk, chunkBuf)
-				if err != nil {
-					return nil, err
-				}
-				buf = append(buf, chunkBuf...)
-			case chunk == -1: // State backed iterable!
-				chunk, err := coder.DecodeVarInt(bcr.reader)
-				if err != nil {
-					return nil, err
-				}
-				token, err := ioutilx.ReadN(bcr.reader, (int)(chunk))
+			chunkReStreamsCloser.Close() // ignore error because makeReStream is already returning an error in this case.
+		}()
+		// createChunkReStreams appends to chunkStreams and
+		// chunkStreamsCloser.children
+		createChunkReStreams := func() error {

Review Comment:
   Indirecting the error through an anon closure doesn't help with code readability here. The closure is too long for a meaningful readability benefit.
   
   I'd strongly prefer that we move this out to a named method and pass parameters in and out, instead of making hard to follow code harder to follow.
   
   And looking at this again, is this just to avoid adding a `nopCloser{}` return parameter on error cases? That would be clearer than the indirection.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -619,6 +675,18 @@ type concatReStream struct {
 	first, next ReStream
 }
 
+func newConcatReStream(streams ...ReStream) *concatReStream {
+	if len(streams) == 0 {
+		streams = []ReStream{&FixedReStream{}}
+	}
+	first := streams[0]
+	rest := streams[1:]
+	if len(rest) == 0 {
+		return &concatReStream{first: first, next: nil}
+	}
+	return &concatReStream{first: first, next: newConcatReStream(rest...)}

Review Comment:
   I'm not a huge fan of the recursive linked list construction here, but I think in practice this will end up being minimally used, if ever. But this is probably less fiddly than alternative approaches.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -236,24 +249,67 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
 							// We can't re-use the original bcr, since we may get new iterables,
 							// or multiple of them at the same time, but we can re-use the count itself.
 							r = &byteCountReader{reader: r, count: bcr.count}
-							return &elementStream{r: r, ec: cv}, nil
+							return &elementStream{r: r, ec: decoder}, nil
 						},
-					},
-				}, nil
-			default:
-				return nil, errors.Errorf("multi-chunk stream with invalid chunk size of %d", chunk)
+					})
+					return nil
+				default:
+					return errors.Errorf("multi-chunk stream with invalid chunk size of %d", chunkSize)
+				}
 			}
 		}
+		if err := createChunkReStreams(); err != nil {
+			return nil, nopCloser{}, err
+		}
+		closeChunkReStreamsEarly = false
+		return newConcatReStream(chunkReStreams...), chunkReStreamsCloser, nil
 	default:
-		return nil, errors.Errorf("received stream with marker size of %d", size)
+		return nil, nopCloser{}, errors.Errorf("received stream with marker size of %d", size)
 	}
 }
 
-func readStreamToBuffer(cv ElementDecoder, r io.ReadCloser, size int64, buf []FullValue) ([]FullValue, error) {
-	for i := int64(0); i < size; i++ {
-		value, err := cv.Decode(r)
+var readStreamToReStream ReStreamFactory = DefaultReadStreamToReStream
+
+// ReStreamFactory is a function that constructs a ReStream from an io.Reader
+// and a coder for type of elements that need to be decoded. A ReStreamFactory
+// is used by the SDK hardness to transform a byte stream into a stream of
+// FullValues while executing a DoFn that takes an iterator as once of its
+// arguments (GBK and CoGBK DoFns).
+//
+// The factory should return a ReStream that decodes numElements elements from
+// the encodedStream reader. After the DoFn that uses the stream has finished,
+// the second return value will be called to close the ReStream; this provides
+// the factory an opportunity to release any resources associated with the
+// returned ReStream.
+//
+// DefaultReadSTreamToReStream is the default ReStreamFactory that is used by
+// the exec package
+type ReStreamFactory func(ctx context.Context, encodedStream io.Reader, numElements int64, coder *coder.Coder) (ReStream, func() error, error)
+
+// SetReStreamFactory overrides the default behavior for constructing a ReStream
+// for DoFns that iterate over values (GBK and CoGBK).
+//
+// The default implementation of this function is DefaultReadStreamToBuffer.
+func SetReStreamFactory(fn ReStreamFactory) {
+	readStreamToReStream = fn
+}
+
+// DefaultReadStreamToReStream reads numElements from the byteStream using the
+// element decoder dec and returns an in-memory ReStream.
+func DefaultReadStreamToReStream(_ context.Context, encodedStream io.Reader, numElements int64, coder *coder.Coder) (ReStream, func() error, error) {

Review Comment:
   Per discussion, please unexport the default implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org