You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/26 14:16:25 UTC

[GitHub] [beam] damccorm commented on a diff in pull request #22904: [Go SDK] Stream decode values in single iterations

damccorm commented on code in PR #22904:
URL: https://github.com/apache/beam/pull/22904#discussion_r956081165


##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
 		ret = append(ret, *elm)
 	}
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {
+	r    io.Reader
+	d    ElementDecoder
+	size int // The number of elements in this stream.
+}
+
+// Open returns the Stream from the start of the in-memory reader. Returns error if called twice.
+func (n *decodeReStream) Open() (Stream, error) {
+	if n.r == nil {
+		return nil, errors.New("decodeReStream opened twice!")
+	}
+	ret := &decodeStream{r: n.r, d: n.d, size: n.size}
+	n.r = nil
+	n.d = nil
+	return ret, nil
+}
+
+// decodeStream is a decode on demand Stream, that decodes size elements from the provided
+// io.Reader.
+type decodeStream struct {
+	r          io.Reader
+	d          ElementDecoder
+	next, size int
+	ret        FullValue
+}
+
+// Close causes subsequent calls to Read to return io.EOF, and drains the remaining element count
+// from the reader.
+func (s *decodeStream) Close() error {
+	// On close, if next != size, we must iterate through the rest of the decoding
+	// until the reader is drained. Otherwise we corrupt the read for the next element.
+	// TODO: Optimize the case where we have length prefixed values

Review Comment:
   Nit - should we have an issue for this TODO?



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
 		ret = append(ret, *elm)
 	}
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {

Review Comment:
   One (unfortunately verbose) option is `singleUseDecodeReStream`?



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
 		ret = append(ret, *elm)
 	}
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {

Review Comment:
   Naming nit - does it still make sense to call this a ReStream? ReStream suggests to me that it can be iterated multiple times, and I think that's what the interface itself suggests - https://github.com/apache/beam/blob/48bad7d966a583055669850eb9fb558782f636a8/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go#L60



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org