You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "lostluck (via GitHub)" <gi...@apache.org> on 2023/03/26 23:05:56 UTC

[GitHub] [beam] lostluck opened a new pull request, #25982: [#22737] Re-write Go SDK data plane to support timers.

lostluck opened a new pull request, #25982:
URL: https://github.com/apache/beam/pull/25982

   Support receiving and directing timers to the *exec.DataSource from the DataManager.
   
   This doesn't fully implement timers, and only comprises the required data plane changes. This unblocks their development however, though it includes an untested TimerWriter path, and unused callback invocations in the DataSource to support this later.
   
   The FnAPI Data Stream multiplexes all data for all instructions onto a single channel from the Runner harness. Each message may contain Data or Timers bytes for one or more instructions executing on the SDK harness, to be decoded by the execution plane's DataSource. The Data or Timers are directed to a specfic PTransform.
   
   The data and timers may arrive before or after the associated instruction begins execution. This means that the Data and Timers must be buffered in the SDK, until they are consumed, or their associated instruction is marked as ended.  Since a single instruction may have more than one consuming PTransform it's important to track the wanted number of "islast" signals, to compare against the number received, and only signal that all byte streams for this instruction are complete when they match.
   
   Specific things to note: 
   * In elementChan we use a mutex as well as the channel to avoid concurrent changes to the "want" and "got", but use an atomic to check if elementChan is closed. This is lighterweight than using the mutex to check the condition all the time.
   * The channel index in DataSource now defaults to 0 instead of -1, and we now do a "post processing check" to validate if a split has occurred. This ensures that the expected primary and residuals are respected in the same way they were before.
   
   Unit tests were updated to match the current possible cases, and validate the above mentioned semantics. A "read" benchmark was added as well. Additional larger scale testing was also done to ensure continued correctness of the data plane's behavior.
   
   Also Closes #21164, ensuring that the last element is always consumed.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25982:
URL: https://github.com/apache/beam/pull/25982#issuecomment-1487452736

   Run GoPortable PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #25982:
URL: https://github.com/apache/beam/pull/25982#issuecomment-1484255094

   ## [Codecov](https://codecov.io/gh/apache/beam/pull/25982?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#25982](https://codecov.io/gh/apache/beam/pull/25982?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (34efdfb) into [master](https://codecov.io/gh/apache/beam/commit/d80f1ff499a9759aa7529b32e5c4d67fd2e66402?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d80f1ff) will **decrease** coverage by `0.10%`.
   > The diff coverage is `55.22%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #25982      +/-   ##
   ==========================================
   - Coverage   71.41%   71.32%   -0.10%     
   ==========================================
     Files         779      779              
     Lines      102530   102657     +127     
   ==========================================
   - Hits        73220    73218       -2     
   - Misses      27849    27967     +118     
   - Partials     1461     1472      +11     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `53.74% <55.22%> (-0.21%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/25982?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/go/pkg/beam/core/runtime/exec/data.go](https://codecov.io/gh/apache/beam/pull/25982?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9kYXRhLmdv) | `0.00% <ø> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/harness/datamgr.go](https://codecov.io/gh/apache/beam/pull/25982?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvaGFybmVzcy9kYXRhbWdyLmdv) | `56.58% <50.69%> (-17.20%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/runtime/exec/datasource.go](https://codecov.io/gh/apache/beam/pull/25982?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9kYXRhc291cmNlLmdv) | `67.31% <66.29%> (-1.62%)` | :arrow_down: |
   
   ... and [2 files with indirect coverage changes](https://codecov.io/gh/apache/beam/pull/25982/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25982:
URL: https://github.com/apache/beam/pull/25982#issuecomment-1487622029

   Run GoPortable PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #25982:
URL: https://github.com/apache/beam/pull/25982#issuecomment-1484254215

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] riteshghorse commented on a diff in pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1150751691


##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -214,51 +290,88 @@ func (c *DataChannel) terminateStreamOnError(err error) {
 	}
 }
 
-// OpenRead returns an io.ReadCloser of the data elements for the given instruction and ptransform.
-func (c *DataChannel) OpenRead(ctx context.Context, ptransformID string, instID instructionID) io.ReadCloser {
+// OpenWrite returns an io.WriteCloser of the data elements for the given instruction and ptransform.
+func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser {
+	return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID: instID})
+}
+
+// OpenElementChan returns a channel of typex.Elements for the given instruction and ptransform.
+func (c *DataChannel) OpenElementChan(ctx context.Context, ptransformID string, instID instructionID, expectedTimerTransforms []string) (<-chan exec.Elements, error) {
 	c.mu.Lock()
 	defer c.mu.Unlock()
 	cid := clientID{ptransformID: ptransformID, instID: instID}
 	if c.readErr != nil {
-		log.Errorf(ctx, "opening a reader %v on a closed channel", cid)
-		return &errReader{c.readErr}
+		return nil, fmt.Errorf("opening a reader %v on a closed channel. Original error: %w", cid, c.readErr)
 	}
-	return c.makeReader(ctx, cid)
+	return c.makeChannel(true, cid, expectedTimerTransforms...).ch, nil
 }
 
-// OpenWrite returns an io.WriteCloser of the data elements for the given instruction and ptransform.
-func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser {
-	return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID: instID})
+// makeChannel creates a channel of exec.Elements. It expects to be called while c.mu is held.
+func (c *DataChannel) makeChannel(fromSource bool, id clientID, additionalTransforms ...string) *elementsChan {
+	if ec, ok := c.channels[id.instID]; ok {
+		ec.mu.Lock()
+		defer ec.mu.Unlock()
+		if fromSource {
+			ec.want = (1 + int32(len(additionalTransforms)))

Review Comment:
   The `+1` is because of 1 incremented by source?



##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -574,3 +620,99 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
 	w.buf = append(w.buf, p...)
 	return len(p), nil
 }
+
+func (c *DataChannel) makeTimerWriter(ctx context.Context, id clientID, family string) *timerWriter {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	var m map[timerKey]*timerWriter
+	var ok bool
+	if m, ok = c.timerWriters[id.instID]; !ok {
+		m = make(map[timerKey]*timerWriter)
+		c.timerWriters[id.instID] = m
+	}
+	tk := timerKey{ptransformID: id.ptransformID, family: family}
+	if w, ok := m[tk]; ok {
+		return w
+	}
+
+	// We don't check for finished instructions for writers, as writers
+	// can only be created if an instruction is in scope, and aren't
+	// runner or user directed.
+
+	w := &timerWriter{ch: c, id: id, timerFamilyID: family}
+	m[tk] = w
+	return w
+}
+
+type timerWriter struct {
+	id            clientID
+	timerFamilyID string
+	ch            *DataChannel
+}
+
+// send requires the ch.mu lock to be held.
+func (w *timerWriter) send(msg *fnpb.Elements) error {
+	recordStreamSend(msg)
+	if err := w.ch.client.Send(msg); err != nil {
+		if err == io.EOF {
+			log.Warnf(context.TODO(), "dataWriter[%v;%v] EOF on send; fetching real error", w.id, w.ch.id)
+			err = nil
+			for err == nil {
+				// Per GRPC stream documentation, if there's an EOF, we must call Recv
+				// until a non-nil error is returned, to ensure resources are cleaned up.
+				// https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
+				_, err = w.ch.client.Recv()
+			}
+		}
+		log.Warnf(context.TODO(), "dataWriter[%v;%v] error on send: %v", w.id, w.ch.id, err)

Review Comment:
   ```suggestion
   		log.Warnf(context.TODO(), "timerWriter[%v;%v] error on send: %v", w.id, w.ch.id, err)
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -94,20 +98,79 @@ func (n *DataSource) Up(ctx context.Context) error {
 // StartBundle initializes this datasource for the bundle.
 func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContext) error {
 	n.mu.Lock()
+	n.curInst = id
 	n.source = data.Data
 	n.state = data.State
 	n.start = time.Now()
-	n.index = -1
+	n.index = 0
 	n.splitIdx = math.MaxInt64
 	n.mu.Unlock()
 	return n.Out.StartBundle(ctx, id, data)
 }
 
+// splitSuccess is a marker error to indicate we've reached the split index.
+// Akin to io.EOF.
+var splitSuccess = errors.New("split index reached")
+
+// process handles converting elements from the data source to timers.
+//
+// The data and timer callback functions must return an io.EOF if the reader terminates to signal that an additional
+// buffer is desired. On successful splits, [splitSuccess] must be returned to indicate that the
+// PTransform is done processing data for this instruction.
+func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader, ptransformID string) error, timer func(bcr *byteCountReader, ptransformID, timerFamilyID string) error) error {

Review Comment:
   Nice, I like this approach of passing in data and timer handling functions



##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -574,3 +620,99 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
 	w.buf = append(w.buf, p...)
 	return len(p), nil
 }
+
+func (c *DataChannel) makeTimerWriter(ctx context.Context, id clientID, family string) *timerWriter {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	var m map[timerKey]*timerWriter
+	var ok bool
+	if m, ok = c.timerWriters[id.instID]; !ok {
+		m = make(map[timerKey]*timerWriter)
+		c.timerWriters[id.instID] = m
+	}
+	tk := timerKey{ptransformID: id.ptransformID, family: family}
+	if w, ok := m[tk]; ok {
+		return w
+	}
+
+	// We don't check for finished instructions for writers, as writers
+	// can only be created if an instruction is in scope, and aren't
+	// runner or user directed.
+
+	w := &timerWriter{ch: c, id: id, timerFamilyID: family}
+	m[tk] = w
+	return w
+}
+
+type timerWriter struct {
+	id            clientID
+	timerFamilyID string
+	ch            *DataChannel
+}
+
+// send requires the ch.mu lock to be held.
+func (w *timerWriter) send(msg *fnpb.Elements) error {
+	recordStreamSend(msg)
+	if err := w.ch.client.Send(msg); err != nil {
+		if err == io.EOF {
+			log.Warnf(context.TODO(), "dataWriter[%v;%v] EOF on send; fetching real error", w.id, w.ch.id)

Review Comment:
   ```suggestion
   			log.Warnf(context.TODO(), "timerWriter[%v;%v] EOF on send; fetching real error", w.id, w.ch.id)
   ```



##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -124,28 +137,37 @@ func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataCha
 	return ch, nil
 }
 
-func (m *DataChannelManager) closeInstruction(instID instructionID) {
+func (m *DataChannelManager) closeInstruction(instID instructionID, ports []exec.Port) error {
 	m.mu.Lock()
 	defer m.mu.Unlock()
-	for _, ch := range m.ports {
-		ch.removeInstruction(instID)
+	var firstNonNilError error
+	for _, port := range ports {
+		ch, ok := m.ports[port.URL]
+		if !ok {
+			continue
+		}
+		err := ch.removeInstruction(instID)
+		if err != nil && firstNonNilError == nil {

Review Comment:
   So we are closing all ports here first and then returning the caught error. Nice. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1150984329


##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -574,3 +620,99 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
 	w.buf = append(w.buf, p...)
 	return len(p), nil
 }
+
+func (c *DataChannel) makeTimerWriter(ctx context.Context, id clientID, family string) *timerWriter {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	var m map[timerKey]*timerWriter
+	var ok bool
+	if m, ok = c.timerWriters[id.instID]; !ok {
+		m = make(map[timerKey]*timerWriter)
+		c.timerWriters[id.instID] = m
+	}
+	tk := timerKey{ptransformID: id.ptransformID, family: family}
+	if w, ok := m[tk]; ok {
+		return w
+	}
+
+	// We don't check for finished instructions for writers, as writers
+	// can only be created if an instruction is in scope, and aren't
+	// runner or user directed.
+
+	w := &timerWriter{ch: c, id: id, timerFamilyID: family}
+	m[tk] = w
+	return w
+}
+
+type timerWriter struct {
+	id            clientID
+	timerFamilyID string
+	ch            *DataChannel
+}
+
+// send requires the ch.mu lock to be held.
+func (w *timerWriter) send(msg *fnpb.Elements) error {
+	recordStreamSend(msg)
+	if err := w.ch.client.Send(msg); err != nil {
+		if err == io.EOF {
+			log.Warnf(context.TODO(), "dataWriter[%v;%v] EOF on send; fetching real error", w.id, w.ch.id)

Review Comment:
   Done!
   
   Basically, if a setting is functionally "permanent" and decided on construction, it's better to have a separate type instead of an if-statement. Fewer ifs == faster execution. See also my long overdue performance work on the DoFn execution hotpath, which is largely blocked until timers are done, so they can be hotpathed if appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25982:
URL: https://github.com/apache/beam/pull/25982#issuecomment-1487622221

   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25982:
URL: https://github.com/apache/beam/pull/25982#issuecomment-1487452249

   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25982:
URL: https://github.com/apache/beam/pull/25982#issuecomment-1487368201

   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1151041222


##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -124,28 +137,37 @@ func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) (*DataCha
 	return ch, nil
 }
 
-func (m *DataChannelManager) closeInstruction(instID instructionID) {
+func (m *DataChannelManager) closeInstruction(instID instructionID, ports []exec.Port) error {
 	m.mu.Lock()
 	defer m.mu.Unlock()
-	for _, ch := range m.ports {
-		ch.removeInstruction(instID)
+	var firstNonNilError error
+	for _, port := range ports {
+		ch, ok := m.ports[port.URL]
+		if !ok {
+			continue
+		}
+		err := ch.removeInstruction(instID)
+		if err != nil && firstNonNilError == nil {

Review Comment:
   Yup! This is the "paranoid check". Before, with the reader approach, we could just pass a short read / data channel error out with the `io.Reader`. But we can't with the channel being the main pass through, so it needs to be done after the fact via the ScopedDataManager.
   
   I entertained the idea of dramatically changing things so each instruction+PTransform Pair would get a Reader, and then "splitting" a bundle into multiple executing goroutines on independant plans... but that would have been much harder to follow than this approach, which keeps to a single Goroutine per ProcessBundle Instruction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25982:
URL: https://github.com/apache/beam/pull/25982#issuecomment-1487499862

   Thanks for the review!
   
   Jenkins is currently down, so I'm going to wait till at most tomorrow before merging, to get those runs in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25982:
URL: https://github.com/apache/beam/pull/25982#issuecomment-1484253948

   R: @riteshghorse 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] riteshghorse commented on pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "riteshghorse (via GitHub)" <gi...@apache.org>.
riteshghorse commented on PR #25982:
URL: https://github.com/apache/beam/pull/25982#issuecomment-1487234519

   Run GoPortable PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25982:
URL: https://github.com/apache/beam/pull/25982#issuecomment-1487377464

   Run Go Portable PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on PR #25982:
URL: https://github.com/apache/beam/pull/25982#issuecomment-1487377706

   Run Go PostCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck merged pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck merged PR #25982:
URL: https://github.com/apache/beam/pull/25982


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #25982: [#22737] Re-write Go SDK data plane to support timers.

Posted by "lostluck (via GitHub)" <gi...@apache.org>.
lostluck commented on code in PR #25982:
URL: https://github.com/apache/beam/pull/25982#discussion_r1151037803


##########
sdks/go/pkg/beam/core/runtime/harness/datamgr.go:
##########
@@ -214,51 +290,88 @@ func (c *DataChannel) terminateStreamOnError(err error) {
 	}
 }
 
-// OpenRead returns an io.ReadCloser of the data elements for the given instruction and ptransform.
-func (c *DataChannel) OpenRead(ctx context.Context, ptransformID string, instID instructionID) io.ReadCloser {
+// OpenWrite returns an io.WriteCloser of the data elements for the given instruction and ptransform.
+func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser {
+	return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID: instID})
+}
+
+// OpenElementChan returns a channel of typex.Elements for the given instruction and ptransform.
+func (c *DataChannel) OpenElementChan(ctx context.Context, ptransformID string, instID instructionID, expectedTimerTransforms []string) (<-chan exec.Elements, error) {
 	c.mu.Lock()
 	defer c.mu.Unlock()
 	cid := clientID{ptransformID: ptransformID, instID: instID}
 	if c.readErr != nil {
-		log.Errorf(ctx, "opening a reader %v on a closed channel", cid)
-		return &errReader{c.readErr}
+		return nil, fmt.Errorf("opening a reader %v on a closed channel. Original error: %w", cid, c.readErr)
 	}
-	return c.makeReader(ctx, cid)
+	return c.makeChannel(true, cid, expectedTimerTransforms...).ch, nil
 }
 
-// OpenWrite returns an io.WriteCloser of the data elements for the given instruction and ptransform.
-func (c *DataChannel) OpenWrite(ctx context.Context, ptransformID string, instID instructionID) io.WriteCloser {
-	return c.makeWriter(ctx, clientID{ptransformID: ptransformID, instID: instID})
+// makeChannel creates a channel of exec.Elements. It expects to be called while c.mu is held.
+func (c *DataChannel) makeChannel(fromSource bool, id clientID, additionalTransforms ...string) *elementsChan {
+	if ec, ok := c.channels[id.instID]; ok {
+		ec.mu.Lock()
+		defer ec.mu.Unlock()
+		if fromSource {
+			ec.want = (1 + int32(len(additionalTransforms)))

Review Comment:
   There's always a "data" PTransform that isn't part of the "additionalTransforms" list.
   
   This is something we can refactor and clean up at a later time though. Keeping the assumption makes the diff easier to understand and reduces the scope of the change a litte.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org