You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/26 16:31:54 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #22904: [Go SDK] Stream decode values in single iterations

lostluck commented on code in PR #22904:
URL: https://github.com/apache/beam/pull/22904#discussion_r956214350


##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
 		ret = append(ret, *elm)
 	}
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {

Review Comment:
   I'm going to drop the "decode" from the restream levels, so singleUseRestream. Simpler to understand that way.
   
   The Stream levels are pretty standard decode streams at least.
   
   



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
 		ret = append(ret, *elm)
 	}
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {
+	r    io.Reader
+	d    ElementDecoder
+	size int // The number of elements in this stream.
+}
+
+// Open returns the Stream from the start of the in-memory reader. Returns error if called twice.
+func (n *decodeReStream) Open() (Stream, error) {
+	if n.r == nil {
+		return nil, errors.New("decodeReStream opened twice!")
+	}
+	ret := &decodeStream{r: n.r, d: n.d, size: n.size}
+	n.r = nil
+	n.d = nil
+	return ret, nil
+}
+
+// decodeStream is a decode on demand Stream, that decodes size elements from the provided
+// io.Reader.
+type decodeStream struct {
+	r          io.Reader
+	d          ElementDecoder
+	next, size int
+	ret        FullValue
+}
+
+// Close causes subsequent calls to Read to return io.EOF, and drains the remaining element count
+// from the reader.
+func (s *decodeStream) Close() error {
+	// On close, if next != size, we must iterate through the rest of the decoding
+	// until the reader is drained. Otherwise we corrupt the read for the next element.
+	// TODO: Optimize the case where we have length prefixed values

Review Comment:
   We do, I missed it here. Same as the other one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org