You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/25 17:52:42 UTC

[GitHub] [beam] gonzojive opened a new pull request, #22057: Go SDK: Allow overriding the default iterable creator.

gonzojive opened a new pull request, #22057:
URL: https://github.com/apache/beam/pull/22057

   This change defines a new exec.ReStreamFactory type for constructing a
   re-iterable stream from a byte stream and coder.Coder. The user may override the
   default behavior by calling exec.SetReStreamFactory.
   
   This customizable factory function allows overriding the default behavior of
   exec.ReStream construction. The default implementation buffers all elements bein
   iterated over by a GBK or CoGBK function into an in-memory buffer, which may
   cause OOM errors when a single iterable is large and the runner does not supply
   a state-backed iterable. See https://github.com/apache/beam/issues/21817. The
   user may specify an alternative function that spills elements to disk if a
   memory threshold is exceeded, for instance.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes. **I did not update this because this change doesn't seem significant enough, but I can update it if requested.**
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
lostluck commented on code in PR #22057:
URL: https://github.com/apache/beam/pull/22057#discussion_r919519283


##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -236,24 +249,67 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
 							// We can't re-use the original bcr, since we may get new iterables,
 							// or multiple of them at the same time, but we can re-use the count itself.
 							r = &byteCountReader{reader: r, count: bcr.count}
-							return &elementStream{r: r, ec: cv}, nil
+							return &elementStream{r: r, ec: decoder}, nil
 						},
-					},
-				}, nil
-			default:
-				return nil, errors.Errorf("multi-chunk stream with invalid chunk size of %d", chunk)
+					})
+					return nil
+				default:
+					return errors.Errorf("multi-chunk stream with invalid chunk size of %d", chunkSize)
+				}
 			}
 		}
+		if err := createChunkReStreams(); err != nil {
+			return nil, nopCloser{}, err
+		}
+		closeChunkReStreamsEarly = false
+		return newConcatReStream(chunkReStreams...), chunkReStreamsCloser, nil
 	default:
-		return nil, errors.Errorf("received stream with marker size of %d", size)
+		return nil, nopCloser{}, errors.Errorf("received stream with marker size of %d", size)
 	}
 }
 
-func readStreamToBuffer(cv ElementDecoder, r io.ReadCloser, size int64, buf []FullValue) ([]FullValue, error) {
-	for i := int64(0); i < size; i++ {
-		value, err := cv.Decode(r)
+var readStreamToReStream ReStreamFactory = DefaultReadStreamToReStream
+
+// ReStreamFactory is a function that constructs a ReStream from an io.Reader
+// and a coder for type of elements that need to be decoded. A ReStreamFactory
+// is used by the SDK hardness to transform a byte stream into a stream of
+// FullValues while executing a DoFn that takes an iterator as once of its
+// arguments (GBK and CoGBK DoFns).
+//
+// The factory should return a ReStream that decodes numElements elements from
+// the encodedStream reader. After the DoFn that uses the stream has finished,
+// the second return value will be called to close the ReStream; this provides
+// the factory an opportunity to release any resources associated with the
+// returned ReStream.
+//
+// DefaultReadSTreamToReStream is the default ReStreamFactory that is used by
+// the exec package
+type ReStreamFactory func(ctx context.Context, encodedStream io.Reader, numElements int64, coder *coder.Coder) (ReStream, func() error, error)
+
+// SetReStreamFactory overrides the default behavior for constructing a ReStream
+// for DoFns that iterate over values (GBK and CoGBK).
+//
+// The default implementation of this function is DefaultReadStreamToBuffer.
+func SetReStreamFactory(fn ReStreamFactory) {

Review Comment:
   Per discussion, please label this as Experimental, and link to the issue we're attempting to solve with this.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -164,70 +164,83 @@ func (n *DataSource) Process(ctx context.Context) error {
 		pe.Pane = pn
 
 		var valReStreams []ReStream
-		for _, cv := range cvs {
-			values, err := n.makeReStream(ctx, pe, cv, &bcr)
+		reStreamCloser := &multiOnceCloser{}
+		defer reStreamCloser.Close()
+		for _, cod := range valueCoders {
+			values, closer, err := n.makeReStream(ctx, pe, cod, &dataReaderCounted)
 			if err != nil {
 				return err
 			}
 			valReStreams = append(valReStreams, values)
+			reStreamCloser.children = append(reStreamCloser.children, closer)
 		}
 
 		if err := n.Out.ProcessElement(ctx, pe, valReStreams...); err != nil {
 			return err
 		}
 		// Collect the actual size of the element, and reset the bytecounter reader.
-		n.PCol.addSize(int64(bcr.reset()))
-		bcr.reader = r
+		n.PCol.addSize(int64(dataReaderCounted.reset()))
+		dataReaderCounted.reader = dataReader
+
+		if err := reStreamCloser.Close(); err != nil {
+			return fmt.Errorf("error closing ReStream after processing element: %w", err)
+		}
 	}
 }
 
-func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv ElementDecoder, bcr *byteCountReader) (ReStream, error) {
+func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, elemCoder *coder.Coder, bcr *byteCountReader) (ReStream, io.Closer, error) {
 	// TODO(lostluck) 2020/02/22: Do we include the chunk size, or just the element sizes?
 	size, err := coder.DecodeInt32(bcr.reader)
 	if err != nil {
-		return nil, errors.Wrap(err, "stream size decoding failed")
+		return nil, nopCloser{}, errors.Wrap(err, "stream size decoding failed")
 	}
 
 	switch {
 	case size >= 0:
 		// Single chunk streams are fully read in and buffered in memory.
-		buf := make([]FullValue, 0, size)
-		buf, err = readStreamToBuffer(cv, bcr, int64(size), buf)
-		if err != nil {
-			return nil, err
-		}
-		return &FixedReStream{Buf: buf}, nil
+		stream, cleanupFn, err := readStreamToReStream(ctx, bcr, int64(size), elemCoder)
+		return stream, closeFunc(cleanupFn), err
 	case size == -1:
+		decoder := MakeElementDecoder(elemCoder)
 		// Multi-chunked stream.
-		var buf []FullValue
-		for {
-			chunk, err := coder.DecodeVarInt(bcr.reader)
-			if err != nil {
-				return nil, errors.Wrap(err, "stream chunk size decoding failed")
+		var chunkReStreams []ReStream
+		chunkReStreamsCloser := &multiOnceCloser{}
+		closeChunkReStreamsEarly := true
+		defer func() {
+			if !closeChunkReStreamsEarly {
+				return
 			}
-			// All done, escape out.
-			switch {
-			case chunk == 0: // End of stream, return buffer.
-				return &FixedReStream{Buf: buf}, nil
-			case chunk > 0: // Non-zero chunk, read that many elements from the stream, and buffer them.
-				chunkBuf := make([]FullValue, 0, chunk)
-				chunkBuf, err = readStreamToBuffer(cv, bcr, chunk, chunkBuf)
-				if err != nil {
-					return nil, err
-				}
-				buf = append(buf, chunkBuf...)
-			case chunk == -1: // State backed iterable!
-				chunk, err := coder.DecodeVarInt(bcr.reader)
-				if err != nil {
-					return nil, err
-				}
-				token, err := ioutilx.ReadN(bcr.reader, (int)(chunk))
+			chunkReStreamsCloser.Close() // ignore error because makeReStream is already returning an error in this case.
+		}()
+		// createChunkReStreams appends to chunkStreams and
+		// chunkStreamsCloser.children
+		createChunkReStreams := func() error {

Review Comment:
   Indirecting the error through an anon closure doesn't help with code readability here. The closure is too long for a meaningful readability benefit.
   
   I'd strongly prefer that we move this out to a named method and pass parameters in and out, instead of making hard to follow code harder to follow.
   
   And looking at this again, is this just to avoid adding a `nopCloser{}` return parameter on error cases? That would be clearer than the indirection.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -619,6 +675,18 @@ type concatReStream struct {
 	first, next ReStream
 }
 
+func newConcatReStream(streams ...ReStream) *concatReStream {
+	if len(streams) == 0 {
+		streams = []ReStream{&FixedReStream{}}
+	}
+	first := streams[0]
+	rest := streams[1:]
+	if len(rest) == 0 {
+		return &concatReStream{first: first, next: nil}
+	}
+	return &concatReStream{first: first, next: newConcatReStream(rest...)}

Review Comment:
   I'm not a huge fan of the recursive linked list construction here, but I think in practice this will end up being minimally used, if ever. But this is probably less fiddly than alternative approaches.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -236,24 +249,67 @@ func (n *DataSource) makeReStream(ctx context.Context, key *FullValue, cv Elemen
 							// We can't re-use the original bcr, since we may get new iterables,
 							// or multiple of them at the same time, but we can re-use the count itself.
 							r = &byteCountReader{reader: r, count: bcr.count}
-							return &elementStream{r: r, ec: cv}, nil
+							return &elementStream{r: r, ec: decoder}, nil
 						},
-					},
-				}, nil
-			default:
-				return nil, errors.Errorf("multi-chunk stream with invalid chunk size of %d", chunk)
+					})
+					return nil
+				default:
+					return errors.Errorf("multi-chunk stream with invalid chunk size of %d", chunkSize)
+				}
 			}
 		}
+		if err := createChunkReStreams(); err != nil {
+			return nil, nopCloser{}, err
+		}
+		closeChunkReStreamsEarly = false
+		return newConcatReStream(chunkReStreams...), chunkReStreamsCloser, nil
 	default:
-		return nil, errors.Errorf("received stream with marker size of %d", size)
+		return nil, nopCloser{}, errors.Errorf("received stream with marker size of %d", size)
 	}
 }
 
-func readStreamToBuffer(cv ElementDecoder, r io.ReadCloser, size int64, buf []FullValue) ([]FullValue, error) {
-	for i := int64(0); i < size; i++ {
-		value, err := cv.Decode(r)
+var readStreamToReStream ReStreamFactory = DefaultReadStreamToReStream
+
+// ReStreamFactory is a function that constructs a ReStream from an io.Reader
+// and a coder for type of elements that need to be decoded. A ReStreamFactory
+// is used by the SDK hardness to transform a byte stream into a stream of
+// FullValues while executing a DoFn that takes an iterator as once of its
+// arguments (GBK and CoGBK DoFns).
+//
+// The factory should return a ReStream that decodes numElements elements from
+// the encodedStream reader. After the DoFn that uses the stream has finished,
+// the second return value will be called to close the ReStream; this provides
+// the factory an opportunity to release any resources associated with the
+// returned ReStream.
+//
+// DefaultReadSTreamToReStream is the default ReStreamFactory that is used by
+// the exec package
+type ReStreamFactory func(ctx context.Context, encodedStream io.Reader, numElements int64, coder *coder.Coder) (ReStream, func() error, error)
+
+// SetReStreamFactory overrides the default behavior for constructing a ReStream
+// for DoFns that iterate over values (GBK and CoGBK).
+//
+// The default implementation of this function is DefaultReadStreamToBuffer.
+func SetReStreamFactory(fn ReStreamFactory) {
+	readStreamToReStream = fn
+}
+
+// DefaultReadStreamToReStream reads numElements from the byteStream using the
+// element decoder dec and returns an in-memory ReStream.
+func DefaultReadStreamToReStream(_ context.Context, encodedStream io.Reader, numElements int64, coder *coder.Coder) (ReStream, func() error, error) {

Review Comment:
   Per discussion, please unexport the default implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1428312150

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @damccorm for label go.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
lostluck commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1182171985

   That feels reasonable, in combination with the marking of the method as experimental, and probably directing it to the linked issue in question and similar.
   
   In practice, it's unlikely that we'll simply remove the entry point code, even if we mark it as experimental. This will allow us to better support this case in the future through alternate means.
   
   I'll try to take another look this afternoon.  (I'm a bit busy preparing for next week's Beam Summit, and it's consuming much of my time.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1582512983

   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] gonzojive commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by "gonzojive (via GitHub)" <gi...@apache.org>.
gonzojive commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1409720043

   I forgot about this but can make the requested changes when I get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1166341171

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @lostluck for label go.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1178919244

   Reminder, please take a look at this pr: @lostluck 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1166333954

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1166334306

   # [Codecov](https://codecov.io/gh/apache/beam/pull/22057?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22057](https://codecov.io/gh/apache/beam/pull/22057?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (31a40d7) into [master](https://codecov.io/gh/apache/beam/commit/7ad4864b0cb19b6c8405265f84fff24bf5b2c8b3?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (7ad4864) will **decrease** coverage by `0.00%`.
   > The diff coverage is `67.88%`.
   
   > :exclamation: Current head 31a40d7 differs from pull request most recent head 8445f3a. Consider uploading reports for the commit 8445f3a to get more accurate results
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #22057      +/-   ##
   ==========================================
   - Coverage   74.00%   74.00%   -0.01%     
   ==========================================
     Files         703      703              
     Lines       92936    92987      +51     
   ==========================================
   + Hits        68776    68811      +35     
   - Misses      22894    22906      +12     
   - Partials     1266     1270       +4     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `51.00% <67.88%> (+0.03%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/22057?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/go/pkg/beam/core/runtime/exec/datasource.go](https://codecov.io/gh/apache/beam/pull/22057/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9kYXRhc291cmNlLmdv) | `65.51% <65.62%> (-0.05%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/runtime/exec/pardo.go](https://codecov.io/gh/apache/beam/pull/22057/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9wYXJkby5nbw==) | `60.31% <84.61%> (+0.87%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/22057?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/22057?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [7ad4864...8445f3a](https://codecov.io/gh/apache/beam/pull/22057?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1166333955

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
lostluck commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1171455914

   Really, what I'd want to tell you instead is to contribute to the Go Direct runner to resolve this for local single machine runs, but I'm working on a portability based replacement (in Go).
   
   Unfortunately it's not yet ready for migrating to the beam repo, and is very test focused. 
   
   Read: currently everything is in memory, no parallelism, and doesn't do DoFn Graph fusion optimizations, each DoFn is executed one at a time. My intent is to allow it to be much more configurable though, for robustness, and be able to actuate all the various Beam FnAPI features. In this case, have it be able to spill out to disk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1166333959

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] gonzojive commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
gonzojive commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1183478961

   FYI: I'm away for a while but plan to make all the requested changes.
   
   On Wed, Jul 13, 2022, 1:10 PM Robert Burke ***@***.***> wrote:
   
   > waiting on author
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/beam/pull/22057#issuecomment-1183475926>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AAAUO562LAPVUDIRH3CWLSDVT3Z75ANCNFSM5Z2TT4DQ>
   > .
   > You are receiving this because you authored the thread.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
lostluck commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1183504634

   That was for the bot. Sorry for the confusion. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1592972192

   This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
lostluck commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1183475926

   waiting on author


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] gonzojive commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
gonzojive commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1182042840

   What if this change was pulled in, but the function to override the default
   stream factory is unexported (lower cased) for now. This will make it
   easier for me to patch the package in my fork without a complicated diff
   that requires maintenance.
   
   On Thu, Jun 30, 2022 at 9:49 AM Robert Burke ***@***.***>
   wrote:
   
   > Really, what I'd want to tell you instead is to contribute to the Go
   > Direct runner to resolve this for local single machine runs, but I'm
   > working on a portability based replacement (in Go).
   >
   > Unfortunately it's not yet ready for migrating to the beam repo, and is
   > very test focused.
   >
   > Read: currently everything is in memory, no parallelism, and doesn't do
   > DoFn Graph fusion optimizations, each DoFn is executed one at a time. My
   > intent is to allow it to be much more configurable though, for robustness,
   > and be able to actuate all the various Beam FnAPI features. In this case,
   > have it be able to spill out to disk.
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/beam/pull/22057#issuecomment-1171455914>, or
   > unsubscribe
   > <https://github.com/notifications/unsubscribe-auth/AAAUO54PO7ST64SLJAM4W5TVRXFZVANCNFSM5Z2TT4DQ>
   > .
   > You are receiving this because you authored the thread.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1166333957

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] closed pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #22057: Go SDK: Allow overriding the default iterable creator.
URL: https://github.com/apache/beam/pull/22057


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1439371514

   Good news @gonzojive !
   
   The Go SDK has a local portable runner, implemented in Go, that would very much welcome support for offloading intermediate datasets to files, compared to the current direct runner, or flink implementations. The v0 is coming with the next release (2.46, being cut tomorrow).
   
   See the following for the current vision.
   https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism
   
   That said, there's other work to be done that may block it from your use case, since Prism is currently in V0, and is presently intentionally Test skewed. But it is moving in the right direction, and is intended to ultimately be configurable for local production execution.
   
   The current missing features that may affect your usage are the following, since you're trying to do real work with it.
   
   1. Shuffles are currently in memory.
   2. Bundles are currently executed one at a time, and not yet in parallel.
   3. State/pending status of Elements, is currently in memory.
   4. State backed iterable support isn't yet implemented.
   5. Splitting/sharding isn't yet implemented, which is blocked by progress tracking and determining load. It will however do initial SDF splits.
   6. Fusion isn't yet implemented.
   
   Other than the offloading to the file system, most of these will come along in due time.
   If you're aware of some kind of light database/memory store that could change how elements are stored/referred to, and "shuffles" are handled, that could be handy though.
   
   As mentioned before, the existing SDK side implementation for what you've got isn't as desirable as a runner based solution. But, as long as any particular feature is configurable between in-memory and more test focused (closer to 1 at a time), I'm very amenable to unblocking things.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1166333953

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1181685487

   Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @jrmccluskey for label go.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by GitBox <gi...@apache.org>.
lostluck commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1181947741

   TIL the bot takes the "review" state pretty seriously, instead of assuming something from the comment. (That's probably the correct approach).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #22057: Go SDK: Allow overriding the default iterable creator.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #22057:
URL: https://github.com/apache/beam/pull/22057#issuecomment-1422501739

   Reminder, please take a look at this pr: @jrmccluskey 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org