You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/26 00:13:38 UTC

[GitHub] [beam] lostluck opened a new pull request, #22904: [Go SDK] Stream decode values in single iterations

lostluck opened a new pull request, #22904:
URL: https://github.com/apache/beam/pull/22904

   Fixes #22900 
   
   This adds one time use stream decoders for use whenever we know reiterations won't occur (see issue).
   
   Adds tests, and improves test coverage with a datasource tests for the various iterator handlers (on demand and fixed), unit tests for the on demand decode streams, a new test for pardo short reads, and a primitives test for GBK short reads.
   
   Fixes previous benign issue where value iteration streams were not being closed after the processing DoFn finished, and test for same. Since we always fully read the byte stream for a value, we never ran into a short read problem before. Streaming decode short reads on the data side would cause problems that don't occur with short reads for Side Inputs.
   
   There's a followup task for optimizing short reads when the coder being used is length prefixed, filed in #22901. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #22904: [Go SDK] Stream decode values in single iterations

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #22904:
URL: https://github.com/apache/beam/pull/22904#discussion_r956081165


##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
 		ret = append(ret, *elm)
 	}
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {
+	r    io.Reader
+	d    ElementDecoder
+	size int // The number of elements in this stream.
+}
+
+// Open returns the Stream from the start of the in-memory reader. Returns error if called twice.
+func (n *decodeReStream) Open() (Stream, error) {
+	if n.r == nil {
+		return nil, errors.New("decodeReStream opened twice!")
+	}
+	ret := &decodeStream{r: n.r, d: n.d, size: n.size}
+	n.r = nil
+	n.d = nil
+	return ret, nil
+}
+
+// decodeStream is a decode on demand Stream, that decodes size elements from the provided
+// io.Reader.
+type decodeStream struct {
+	r          io.Reader
+	d          ElementDecoder
+	next, size int
+	ret        FullValue
+}
+
+// Close causes subsequent calls to Read to return io.EOF, and drains the remaining element count
+// from the reader.
+func (s *decodeStream) Close() error {
+	// On close, if next != size, we must iterate through the rest of the decoding
+	// until the reader is drained. Otherwise we corrupt the read for the next element.
+	// TODO: Optimize the case where we have length prefixed values

Review Comment:
   Nit - should we have an issue for this TODO?



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
 		ret = append(ret, *elm)
 	}
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {

Review Comment:
   One (unfortunately verbose) option is `singleUseDecodeReStream`?



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
 		ret = append(ret, *elm)
 	}
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {

Review Comment:
   Naming nit - does it still make sense to call this a ReStream? ReStream suggests to me that it can be iterated multiple times, and I think that's what the interface itself suggests - https://github.com/apache/beam/blob/48bad7d966a583055669850eb9fb558782f636a8/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go#L60



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #22904: [Go SDK] Stream decode values in single iterations

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22904:
URL: https://github.com/apache/beam/pull/22904#issuecomment-1227877809

   # [Codecov](https://codecov.io/gh/apache/beam/pull/22904?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22904](https://codecov.io/gh/apache/beam/pull/22904?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8d30a42) into [master](https://codecov.io/gh/apache/beam/commit/c866771ead49dd03b34c9a7c472c92382bf4b7e2?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c866771) will **increase** coverage by `0.01%`.
   > The diff coverage is `80.41%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #22904      +/-   ##
   ==========================================
   + Coverage   73.88%   73.90%   +0.01%     
   ==========================================
     Files         713      713              
     Lines       94321    94462     +141     
   ==========================================
   + Hits        69691    69811     +120     
   - Misses      23342    23354      +12     
   - Partials     1288     1297       +9     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `51.35% <80.41%> (+0.16%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/22904?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/go/pkg/beam/core/runtime/exec/datasource.go](https://codecov.io/gh/apache/beam/pull/22904/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9kYXRhc291cmNlLmdv) | `66.30% <76.31%> (+0.74%)` | :arrow_up: |
   | [sdks/go/pkg/beam/core/runtime/exec/fullvalue.go](https://codecov.io/gh/apache/beam/pull/22904/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9mdWxsdmFsdWUuZ28=) | `83.42% <81.37%> (-2.46%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/runtime/exec/fn.go](https://codecov.io/gh/apache/beam/pull/22904/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9mbi5nbw==) | `70.76% <100.00%> (+2.44%)` | :arrow_up: |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #22904: [Go SDK] Stream decode values in single iterations

Posted by GitBox <gi...@apache.org>.
lostluck commented on PR #22904:
URL: https://github.com/apache/beam/pull/22904#issuecomment-1228707786

   Thanks for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22904: [Go SDK] Stream decode values in single iterations

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22904:
URL: https://github.com/apache/beam/pull/22904#issuecomment-1227894368

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @damccorm for label go.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #22904: [Go SDK] Stream decode values in single iterations

Posted by GitBox <gi...@apache.org>.
lostluck commented on code in PR #22904:
URL: https://github.com/apache/beam/pull/22904#discussion_r956214350


##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
 		ret = append(ret, *elm)
 	}
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {

Review Comment:
   I'm going to drop the "decode" from the restream levels, so singleUseRestream. Simpler to understand that way.
   
   The Stream levels are pretty standard decode streams at least.
   
   



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -189,3 +191,173 @@ func ReadAll(rs ReStream) ([]FullValue, error) {
 		ret = append(ret, *elm)
 	}
 }
+
+// decodeReStream is a decode on demand ReStream.
+// Can only produce a single Stream because it consumes the reader.
+// Must not be used for streams that might be re-iterated, causing Open
+// to be called twice.
+type decodeReStream struct {
+	r    io.Reader
+	d    ElementDecoder
+	size int // The number of elements in this stream.
+}
+
+// Open returns the Stream from the start of the in-memory reader. Returns error if called twice.
+func (n *decodeReStream) Open() (Stream, error) {
+	if n.r == nil {
+		return nil, errors.New("decodeReStream opened twice!")
+	}
+	ret := &decodeStream{r: n.r, d: n.d, size: n.size}
+	n.r = nil
+	n.d = nil
+	return ret, nil
+}
+
+// decodeStream is a decode on demand Stream, that decodes size elements from the provided
+// io.Reader.
+type decodeStream struct {
+	r          io.Reader
+	d          ElementDecoder
+	next, size int
+	ret        FullValue
+}
+
+// Close causes subsequent calls to Read to return io.EOF, and drains the remaining element count
+// from the reader.
+func (s *decodeStream) Close() error {
+	// On close, if next != size, we must iterate through the rest of the decoding
+	// until the reader is drained. Otherwise we corrupt the read for the next element.
+	// TODO: Optimize the case where we have length prefixed values

Review Comment:
   We do, I missed it here. Same as the other one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck merged pull request #22904: [Go SDK] Stream decode values in single iterations

Posted by GitBox <gi...@apache.org>.
lostluck merged PR #22904:
URL: https://github.com/apache/beam/pull/22904


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org