You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/11 15:12:22 UTC

[GitHub] [beam] jrmccluskey opened a new pull request, #17334: [BEAM-11104] Pipe Continuation to DataSource level

jrmccluskey opened a new pull request, #17334:
URL: https://github.com/apache/beam/pull/17334

   Pipes returned ProcessContinuations from the DoFn level up to the execution plan's DataSource level. This involves changes to two .tmpl files and the generated code they produce. Users are still not allowed to return ProcessContinuations as written.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r850586608


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
-		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
-		return &n.ret, nil
-	case n.outEtIdx == 0:
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2, Pane: pn}
 		return &n.ret, nil
 	default:
-		panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match permitted return values.", r0, r1, r2))
+		if n.outErrIdx == 2 {

Review Comment:
   Are you asking for this to be changed back? Or left as-is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
lostluck commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r850595991


##########
sdks/go/pkg/beam/core/graph/edge.go:
##########
@@ -426,8 +426,14 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, rc *coder.C
 	for i := 0; i < len(in); i++ {
 		edge.Input = append(edge.Input, &Inbound{Kind: kinds[i], From: in[i], Type: inbound[i]})
 	}
+	continuation := false
+	// u.Fn is not guaranteed to be non-nil
+	if u.Fn != nil {

Review Comment:
   When Fn is nil, then it's u.Recv is not nil, because [the Fn field is for functional DoFns and Recv is for structural DoFns.](https://github.com/apache/beam/blob/bc54bf1cecda957fb63217ce846163fbca4de2bb/sdks/go/pkg/beam/core/graph/fn.go#L30)
   
   
   So as written, this is only doing the check for functional DoFns, and not structural ones. 
   You should be able to handle both if you call `u.ProcessElementFn()`  then call `ProcessContinuation()` on that.
   
   (See handling difference between the two [here](https://github.com/apache/beam/blob/bc54bf1cecda957fb63217ce846163fbca4de2bb/sdks/go/pkg/beam/core/graph/fn.go#L380))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #17334:
URL: https://github.com/apache/beam/pull/17334#issuecomment-1095229396

   Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`:
   
   R: @damccorm for label go.
   
   Available commands:
   - `stop reviewer notifications` - opt out of the automated review tooling
   - `remind me after tests pass` - tag the comment author after tests pass
   - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
   
   The PR bot will only process comments in the main thread (not review comments).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847566145


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {

Review Comment:
   Committed to explicit `==` checks since we know what indices they should be at



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -247,9 +253,16 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 		if r1 != nil {
 			return nil, r1.(error)
 		}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}
+			return &n.ret, nil
+		}
 		n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
 		return &n.ret, nil
 	case n.outEtIdx == 0:
+		if n.outPcIdx >= 0 {
+			panic("invoker.ret2: cannot return event time without a value")
+		}
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
 		return &n.ret, nil
 	default:

Review Comment:
   Good catch! Missed that one. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847565613


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
-		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
-		return &n.ret, nil
-	case n.outEtIdx == 0:
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2, Pane: pn}
 		return &n.ret, nil
 	default:
-		panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match permitted return values.", r0, r1, r2))
+		if n.outErrIdx == 2 {

Review Comment:
   Good catch, changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847589600


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,14 +227,17 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
 }
 
 // ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
 func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0 interface{}) (*FullValue, error) {
 	switch {
 	case n.outErrIdx >= 0:
 		if r0 != nil {
 			return nil, r0.(error)
 		}
 		return nil, nil
+	case n.outPcIdx >= 0:
+		n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}

Review Comment:
   Ah, I was thinking of n.Fn.Fn.Name() 🙃 using the full object works for me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847549140


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -284,7 +310,24 @@ func (n *invoker) ret4(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 	if r3 != nil {

Review Comment:
   Yeah this will need to match the other cases a little more closely. I'll get this tweaked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847591201


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -274,52 +286,60 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outEtIdx >= 0:
+	case n.outEtIdx == 0:
 		if n.outErrIdx == 2 {
 			if r2 != nil {
 				return nil, r2.(error)
 			}
 			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
 			return &n.ret, nil
 		}
-		if n.outPcIdx >= 0 {
+		if n.outPcIdx == 2 {
 			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2, Pane: pn}
 		return &n.ret, nil
-	default:
-		if n.outErrIdx == 2 {
-			if r2 != nil {
-				return nil, r2.(error)
-			}
-			if n.outPcIdx == 1 {
-				n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn, Continuation: r1.(sdf.ProcessContinuation)}
-				return &n.ret, nil
-			}
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+	case n.outErrIdx == 2:
+		if r2 != nil {
+			return nil, r2.(error)
+		}
+		if n.outPcIdx == 1 {
+			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn, Continuation: r1.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
+		n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		return &n.ret, nil
+	default:
 		n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 		return &n.ret, nil
 	}
 }
 
 // ret4 handles processing of a quad of return values.
 func (n *invoker) ret4(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2, r3 interface{}) (*FullValue, error) {
-	if r3 != nil {
-		return nil, r3.(error)
-	}
-	if n.outEtIdx == 0 {
-		if n.outPcIdx >= 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
+	switch {

Review Comment:
   That's fair, I suppose the switch format really only makes sense to leave here if we're anticipating adding another return type soon and would have to handle it, but it's a small enough change that it's easier to go with the simple option. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847581310


##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -35,9 +36,10 @@ type FullValue struct {
 	Elm  interface{} // Element or KV key.
 	Elm2 interface{} // KV value, if not invalid
 
-	Timestamp typex.EventTime
-	Windows   []typex.Window
-	Pane      typex.PaneInfo
+	Timestamp    typex.EventTime
+	Windows      []typex.Window
+	Pane         typex.PaneInfo
+	Continuation sdf.ProcessContinuation

Review Comment:
   Ok - I'd like to at least verify that before making this change (@lostluck might know offhand)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847533568


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -361,7 +371,7 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 			n.rt = rt
 			n.elm = elm
 			n.SU <- n
-			err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})
+			_, err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})

Review Comment:
   This is a placeholder as we determined that an SDF shouldn't be allowed to explode windows. For now I just blank-assigned the field but in a later PR I believe we're going to remove that case altogether and return an error. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
lostluck commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r850581653


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
-		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
-		return &n.ret, nil
-	case n.outEtIdx == 0:
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2, Pane: pn}
 		return &n.ret, nil
 	default:
-		panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match permitted return values.", r0, r1, r2))
+		if n.outErrIdx == 2 {

Review Comment:
   Style wise (and compiler wise) they're identical so no change should occur from this comment.
   
   The only exception for preferring an if-else ladder is that variables defined in earlier conditional clauses are in scope for the remaining branches eg:
   
   ```
   if foo, ok := getFoo(); !ok {
     // ...
   } else if bar := foo.DoBar(); bar.Frobbled() {
    //  ... can use foo and bar. ...
   } else {
     // ... can use foo and bar. ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847539904


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,14 +227,17 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
 }
 
 // ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
 func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0 interface{}) (*FullValue, error) {
 	switch {
 	case n.outErrIdx >= 0:
 		if r0 != nil {
 			return nil, r0.(error)
 		}
 		return nil, nil
+	case n.outPcIdx >= 0:
+		n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}

Review Comment:
   Interesting question. We've established that we want an explicit stop value from the DoFn in the interface, so we'd probably not want to treat a nil value as a stop. A nil value here isn't technically problematic since we'd ignore it when we pick up the FullValue again and treat it as if the process completed successfully, but you're right in that we shouldn't allow users to do that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847497405


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -247,9 +253,16 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 		if r1 != nil {
 			return nil, r1.(error)
 		}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}
+			return &n.ret, nil
+		}
 		n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
 		return &n.ret, nil
 	case n.outEtIdx == 0:
+		if n.outPcIdx >= 0 {
+			panic("invoker.ret2: cannot return event time without a value")
+		}
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
 		return &n.ret, nil
 	default:

Review Comment:
   I think we're missing a case where outPcIdx >= 0 and neither outEtIdx nor outErrIdx are >= 0 (aka the case where a single element and a processContinuation are emitted together)



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,14 +227,17 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
 }
 
 // ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
 func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0 interface{}) (*FullValue, error) {
 	switch {
 	case n.outErrIdx >= 0:
 		if r0 != nil {
 			return nil, r0.(error)
 		}
 		return nil, nil
+	case n.outPcIdx >= 0:
+		n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}

Review Comment:
   Should we guard against r0 being nil here and either panic with a nicer exception or treat it as a Stop (I'd probably prefer panic)? Same question below



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +348,12 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		defer func() {
 			<-n.SU
 		}()
-		return n.PDo.processSingleWindow(mainIn)
+		continuation, processResult := n.PDo.processSingleWindow(mainIn)
+		if continuation != nil {
+			n.source.pc = continuation
+			n.source.selfSu = n

Review Comment:
   I think I missed the discussion on this in the design doc (I think maybe it got changed after I first looked, or I just missed it), but I don't love this mechanism of talking back to the datasource. This creates a weird backwards dependency and makes both classes harder to update as a result.
   
   Is there any reason we can't try to infer a `ProcessSizedElementsAndRestrictions` type on the out element and if its there check for a ProcessContinuation using some built in method to `ProcessSizedElementsAndRestrictions`? That would avoid the circular dependency here. https://github.com/apache/beam/blob/9bb766b02fbd371b66221f8d3ed1e1228e7a9588/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L185



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -361,7 +371,7 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 			n.rt = rt
 			n.elm = elm
 			n.SU <- n
-			err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})
+			_, err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})

Review Comment:
   A comment explaining why we don't need to worry about ProcessContinuation here might be helpful



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
-		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
-		return &n.ret, nil
-	case n.outEtIdx == 0:
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2, Pane: pn}
 		return &n.ret, nil
 	default:
-		panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match permitted return values.", r0, r1, r2))
+		if n.outErrIdx == 2 {

Review Comment:
   I think this reads cleaner if you move it up to a condition in the outer switch:
   ```
   switch {
   case n.outEtIdx >= 0:
      // Do stuff
   case n.outErrIdx == 2:
      // Do stuff
   default:
      n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
      return &n.ret, nil



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -35,9 +36,10 @@ type FullValue struct {
 	Elm  interface{} // Element or KV key.
 	Elm2 interface{} // KV value, if not invalid
 
-	Timestamp typex.EventTime
-	Windows   []typex.Window
-	Pane      typex.PaneInfo
+	Timestamp    typex.EventTime
+	Windows      []typex.Window
+	Pane         typex.PaneInfo
+	Continuation sdf.ProcessContinuation

Review Comment:
   Genuine (and possibly dumb) question - I'm not totally clear on how our encoding works, but will this cause the whole ProcessContinuation object to get encoded as part of our responses to the runner?



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {

Review Comment:
   There's a lot of mixing `>=` conditions and `==` conditions on how we check these indices - we should coallesce to one approach. As a reader, the back and forth makes it harder to follow. When possible, I'd vote to prefer the exact `==` check since validation of signatures should be taken care of on the front end, and if we don't have the indices right we'll struggle later on anyways.



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
-		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
-		return &n.ret, nil
-	case n.outEtIdx == 0:
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2, Pane: pn}
 		return &n.ret, nil
 	default:
-		panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match permitted return values.", r0, r1, r2))
+		if n.outErrIdx == 2 {

Review Comment:
   If you don't do that, there's no reason to use a switch instead of a simple if/else here



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -284,7 +310,24 @@ func (n *invoker) ret4(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 	if r3 != nil {

Review Comment:
   I don't think this check is valid anymore. Couldn't you have something with 4 returns and no errors (1 event time, 2 kv elements, and 1 process continuation)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on PR #17334:
URL: https://github.com/apache/beam/pull/17334#issuecomment-1095383723

   R: @lostluck 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #17334:
URL: https://github.com/apache/beam/pull/17334#issuecomment-1095336956

   # [Codecov](https://codecov.io/gh/apache/beam/pull/17334?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#17334](https://codecov.io/gh/apache/beam/pull/17334?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2016d74) into [master](https://codecov.io/gh/apache/beam/commit/9e4c288627cf4af4e2397f565d3e2f847e2f4900?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (9e4c288) will **decrease** coverage by `0.22%`.
   > The diff coverage is `7.18%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #17334      +/-   ##
   ==========================================
   - Coverage   74.10%   73.87%   -0.23%     
   ==========================================
     Files         679      680       +1     
     Lines       89115    89453     +338     
   ==========================================
   + Hits        66036    66087      +51     
   - Misses      21928    22203     +275     
   - Partials     1151     1163      +12     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | go | `49.60% <7.18%> (-0.46%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/17334?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/go/pkg/beam/core/graph/edge.go](https://codecov.io/gh/apache/beam/pull/17334/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL2dyYXBoL2VkZ2UuZ28=) | `3.32% <0.00%> (-0.07%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/runtime/exec/fn\_arity.go](https://codecov.io/gh/apache/beam/pull/17334/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9mbl9hcml0eS5nbw==) | `6.47% <0.00%> (-1.53%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/runtime/exec/fullvalue.go](https://codecov.io/gh/apache/beam/pull/17334/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9mdWxsdmFsdWUuZ28=) | `84.70% <ø> (ø)` | |
   | [sdks/go/pkg/beam/core/util/reflectx/calls.go](https://codecov.io/gh/apache/beam/pull/17334/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3V0aWwvcmVmbGVjdHgvY2FsbHMuZ28=) | `0.00% <0.00%> (ø)` | |
   | [sdks/go/pkg/beam/core/runtime/exec/fn.go](https://codecov.io/gh/apache/beam/pull/17334/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9mbi5nbw==) | `57.57% <21.21%> (-11.82%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/runtime/exec/sdf.go](https://codecov.io/gh/apache/beam/pull/17334/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9zZGYuZ28=) | `70.50% <50.00%> (-0.50%)` | :arrow_down: |
   | [sdks/go/pkg/beam/core/runtime/exec/pardo.go](https://codecov.io/gh/apache/beam/pull/17334/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvZXhlYy9wYXJkby5nbw==) | `50.67% <62.50%> (+0.22%)` | :arrow_up: |
   | [sdks/go/pkg/beam/core/runtime/pipelinex/replace.go](https://codecov.io/gh/apache/beam/pull/17334/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS9jb3JlL3J1bnRpbWUvcGlwZWxpbmV4L3JlcGxhY2UuZ28=) | `66.31% <0.00%> (-0.36%)` | :arrow_down: |
   | [sdks/go/pkg/beam/util/pubsubx/pubsub.go](https://codecov.io/gh/apache/beam/pull/17334/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9nby9wa2cvYmVhbS91dGlsL3B1YnN1YngvcHVic3ViLmdv) | `50.76% <0.00%> (ø)` | |
   | ... and [2 more](https://codecov.io/gh/apache/beam/pull/17334/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/17334?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/17334?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [9e4c288...2016d74](https://codecov.io/gh/apache/beam/pull/17334?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r850564778


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -247,9 +253,16 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 		if r1 != nil {
 			return nil, r1.(error)
 		}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}
+			return &n.ret, nil
+		}
 		n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
 		return &n.ret, nil
 	case n.outEtIdx == 0:
+		if n.outPcIdx >= 0 {
+			panic("invoker.ret2: cannot return event time without a value")
+		}
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
 		return &n.ret, nil
 	default:

Review Comment:
   I'm planning on going over the order validation code again when we enable returning process continuations, so we can hopefully have stronger guarantees for what makes it here and simplify the block. Knowing that we 100% do not have an invalid construction here simplifies a lot of these error routes, but that seems worthy of its own PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck merged pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
lostluck merged PR #17334:
URL: https://github.com/apache/beam/pull/17334


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847587000


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -274,52 +286,60 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outEtIdx >= 0:
+	case n.outEtIdx == 0:
 		if n.outErrIdx == 2 {
 			if r2 != nil {
 				return nil, r2.(error)
 			}
 			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
 			return &n.ret, nil
 		}
-		if n.outPcIdx >= 0 {
+		if n.outPcIdx == 2 {
 			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2, Pane: pn}
 		return &n.ret, nil
-	default:
-		if n.outErrIdx == 2 {
-			if r2 != nil {
-				return nil, r2.(error)
-			}
-			if n.outPcIdx == 1 {
-				n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn, Continuation: r1.(sdf.ProcessContinuation)}
-				return &n.ret, nil
-			}
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+	case n.outErrIdx == 2:
+		if r2 != nil {
+			return nil, r2.(error)
+		}
+		if n.outPcIdx == 1 {
+			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn, Continuation: r1.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
+		n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		return &n.ret, nil
+	default:
 		n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 		return &n.ret, nil
 	}
 }
 
 // ret4 handles processing of a quad of return values.
 func (n *invoker) ret4(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2, r3 interface{}) (*FullValue, error) {
-	if r3 != nil {
-		return nil, r3.(error)
-	}
-	if n.outEtIdx == 0 {
-		if n.outPcIdx >= 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
+	switch {

Review Comment:
   I don't really mind it too much here since its consistent with the other functions, but in general I don't love 1 condition switch statements (with or without a default) since its basically just an if. Consider changing it to an if



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847585397


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,14 +227,17 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
 }
 
 // ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
 func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0 interface{}) (*FullValue, error) {
 	switch {
 	case n.outErrIdx >= 0:
 		if r0 != nil {
 			return nil, r0.(error)
 		}
 		return nil, nil
+	case n.outPcIdx >= 0:
+		n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}

Review Comment:
   Fixed (there's no Fn.Name() field so I opted just to use the string output for the Fn object)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r849696541


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -361,7 +371,7 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 			n.rt = rt
 			n.elm = elm
 			n.SU <- n
-			err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})
+			_, err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #17334:
URL: https://github.com/apache/beam/pull/17334#issuecomment-1095178117

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #17334:
URL: https://github.com/apache/beam/pull/17334#issuecomment-1095178115

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847536784


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +348,12 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		defer func() {
 			<-n.SU
 		}()
-		return n.PDo.processSingleWindow(mainIn)
+		continuation, processResult := n.PDo.processSingleWindow(mainIn)
+		if continuation != nil {
+			n.source.pc = continuation
+			n.source.selfSu = n

Review Comment:
   I think that works up at the DataSource level if we wanted to (that's how we init the SplittableUnit.) That's cleaner with how I was thinking about doing the actual split, too (trying to keep all of the logic within DataSource and just returning the split to the harness if we do so.) I'll mock up the code for that after I go through the rest of the comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r850588991


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +348,12 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		defer func() {
 			<-n.SU
 		}()
-		return n.PDo.processSingleWindow(mainIn)
+		continuation, processResult := n.PDo.processSingleWindow(mainIn)
+		if continuation != nil {
+			n.source.pc = continuation
+			n.source.selfSu = n

Review Comment:
   The channel isn't being used if I understand this comment correctly, that was an approach discussed in the design doc that was rejected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r849696275


##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -144,12 +145,13 @@ func (n *ParDo) processMainInput(mainIn *MainInput) error {
 	// is that either there is a single window or the function doesn't observe windows, so we can
 	// optimize it by treating all windows as a single one.
 	if !mustExplodeWindows(n.inv.fn, elm, len(n.Side) > 0) {
-		return n.processSingleWindow(mainIn)
+		_, processResult := n.processSingleWindow(mainIn)

Review Comment:
   Done



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +347,11 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		defer func() {
 			<-n.SU
 		}()
-		return n.PDo.processSingleWindow(mainIn)
+		continuation, processResult := n.PDo.processSingleWindow(mainIn)
+		if continuation != nil {
+			n.continuation = continuation
+		}

Review Comment:
   That's fair. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847551453


##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -35,9 +36,10 @@ type FullValue struct {
 	Elm  interface{} // Element or KV key.
 	Elm2 interface{} // KV value, if not invalid
 
-	Timestamp typex.EventTime
-	Windows   []typex.Window
-	Pane      typex.PaneInfo
+	Timestamp    typex.EventTime
+	Windows      []typex.Window
+	Pane         typex.PaneInfo
+	Continuation sdf.ProcessContinuation

Review Comment:
   I'm not 100% sure, but I don't think so? We could pull the continuation out and set the field in the FullValue back to nil after we retrieve it though. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #17334:
URL: https://github.com/apache/beam/pull/17334#issuecomment-1095178118

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r850604956


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
-		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
-		return &n.ret, nil
-	case n.outEtIdx == 0:
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2, Pane: pn}
 		return &n.ret, nil
 	default:
-		panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match permitted return values.", r0, r1, r2))
+		if n.outErrIdx == 2 {

Review Comment:
   Ah I misunderstood the statement, I interpreted it as "Danny's comment should not have resulted in a change"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847566145


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {

Review Comment:
   Committed to explict `==` checks since we know what indices they should be at



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #17334:
URL: https://github.com/apache/beam/pull/17334#issuecomment-1095385883

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #17334:
URL: https://github.com/apache/beam/pull/17334#issuecomment-1095374410

   R: @youngoli for final approval


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847576863


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +348,12 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		defer func() {
 			<-n.SU
 		}()
-		return n.PDo.processSingleWindow(mainIn)
+		continuation, processResult := n.PDo.processSingleWindow(mainIn)
+		if continuation != nil {
+			n.source.pc = continuation
+			n.source.selfSu = n

Review Comment:
   Changed, PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
lostluck commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r850600397


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
-		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
-		return &n.ret, nil
-	case n.outEtIdx == 0:
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2, Pane: pn}
 		return &n.ret, nil
 	default:
-		panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match permitted return values.", r0, r1, r2))
+		if n.outErrIdx == 2 {

Review Comment:
   As I said...
   > Style wise (and compiler wise) they're identical so no change should occur from this comment.
   
   It's also my job to tell you these sorts of things about Go, and the opportunity is during relevant context, even if it's not changing the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
lostluck commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r850599583


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +348,12 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		defer func() {
 			<-n.SU
 		}()
-		return n.PDo.processSingleWindow(mainIn)
+		continuation, processResult := n.PDo.processSingleWindow(mainIn)
+		if continuation != nil {
+			n.source.pc = continuation
+			n.source.selfSu = n

Review Comment:
   You've read it more recently than I have.
   We can see if it's required in an independant PR to vet that and possibly get rid of it. Do Not 
   combine it with the current work. That's how mistakes happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
lostluck commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r850585916


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +348,12 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		defer func() {
 			<-n.SU
 		}()
-		return n.PDo.processSingleWindow(mainIn)
+		continuation, processResult := n.PDo.processSingleWindow(mainIn)
+		if continuation != nil {
+			n.source.pc = continuation
+			n.source.selfSu = n

Review Comment:
   FYI I do like leveraging the existing "ownership" mechanism using the channel as the changed approach seems to be taking.
   As long as we clearly consider who is writing, and reading when it's fine.  We want to avoid heavy mutexes on a per element basis if we can manage it (that can get very expensive very quickly).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847567115


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,14 +227,17 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
 }
 
 // ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
 func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0 interface{}) (*FullValue, error) {
 	switch {
 	case n.outErrIdx >= 0:
 		if r0 != nil {
 			return nil, r0.(error)
 		}
 		return nil, nil
+	case n.outPcIdx >= 0:
+		n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}

Review Comment:
   I dropped in the panics into ret1 and ret2 to start, may add them to ret3+ if we're happy with how it looks. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847546136


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {

Review Comment:
   You're right, this could use some cleaning up. I think the major juggling comes down to "do we have a V or a KV?" and "do we have an error to check?" and those cases can get messy. But we should be able to do a little more with exact indices. Another thing to tweak



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847579844


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,15 +227,21 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
 }
 
 // ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
 func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
+	case n.outErrIdx == 0:
 		if r0 != nil {
 			return nil, r0.(error)
 		}
 		return nil, nil
-	case n.outEtIdx >= 0:
+	case n.outPcIdx == 0:
+		if r0 == nil {
+			panic("invoker.ret1: cannot return a nil process continuation")

Review Comment:
   It would be good to hydrate this panic (and our other panics) with information about which function is actually causing the problem
   
   ```suggestion
   			panic(fmt.Sprintf("invoker.ret1: cannot return a nil process continuation from function %v", n.fn.Name()))
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,14 +227,17 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
 }
 
 // ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
 func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0 interface{}) (*FullValue, error) {
 	switch {
 	case n.outErrIdx >= 0:
 		if r0 != nil {
 			return nil, r0.(error)
 		}
 		return nil, nil
+	case n.outPcIdx >= 0:
+		n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}

Review Comment:
   Left a comment about the specific string, but I think that is a good thing and I'd vote we expand to other rets



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
damccorm commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847580806


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -361,7 +371,7 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 			n.rt = rt
 			n.elm = elm
 			n.SU <- n
-			err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})
+			_, err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})

Review Comment:
   SGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
lostluck commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r848647091


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -247,9 +253,16 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 		if r1 != nil {
 			return nil, r1.(error)
 		}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}
+			return &n.ret, nil
+		}
 		n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
 		return &n.ret, nil
 	case n.outEtIdx == 0:
+		if n.outPcIdx >= 0 {
+			panic("invoker.ret2: cannot return event time without a value")
+		}
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
 		return &n.ret, nil
 	default:

Review Comment:
   I posit that that's a flawed construction we shouldn't permit. How much simpler does the code become if we avoid that case? (given the changes required, just guess, ignorning the sunk cost of "we already have this code") 
   
   Returning a single element and always choosing to stop or aborting the bundle feels awkward and it would be a performance pit when used accidentally.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +347,11 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		defer func() {
 			<-n.SU
 		}()
-		return n.PDo.processSingleWindow(mainIn)
+		continuation, processResult := n.PDo.processSingleWindow(mainIn)
+		if continuation != nil {
+			n.continuation = continuation
+		}

Review Comment:
   The invoker code already ensured that the continuation is not nil, so why check here? Also, the continuation field would likely *always* be nil to start, so overriding it with a nil doesn't change anything.
   
   I'd rather that it gets overriden all the time, since it avoids cross bundle contamination to begin with.
   
   
   



##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -144,12 +145,13 @@ func (n *ParDo) processMainInput(mainIn *MainInput) error {
 	// is that either there is a single window or the function doesn't observe windows, so we can
 	// optimize it by treating all windows as a single one.
 	if !mustExplodeWindows(n.inv.fn, elm, len(n.Side) > 0) {
-		return n.processSingleWindow(mainIn)
+		_, processResult := n.processSingleWindow(mainIn)

Review Comment:
   Please add comments to both these calls to make it clear that they are ignored because only SDFs should have process continuations.



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,14 +227,17 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
 }
 
 // ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
 func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0 interface{}) (*FullValue, error) {
 	switch {
 	case n.outErrIdx >= 0:
 		if r0 != nil {
 			return nil, r0.(error)
 		}
 		return nil, nil
+	case n.outPcIdx >= 0:
+		n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}

Review Comment:
   Agreed with this whole discussion, since nil is always an error for a ProcessContinuation, and hopefully it should provide a clear enough error (identifying the malformed DoFn in question.)



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -35,9 +36,10 @@ type FullValue struct {
 	Elm  interface{} // Element or KV key.
 	Elm2 interface{} // KV value, if not invalid
 
-	Timestamp typex.EventTime
-	Windows   []typex.Window
-	Pane      typex.PaneInfo
+	Timestamp    typex.EventTime
+	Windows      []typex.Window
+	Pane         typex.PaneInfo
+	Continuation sdf.ProcessContinuation

Review Comment:
   Where FullValue is concerned nothing is "automatic" for safety and performance reasons. Safety, to avoid the situation described, and performance because coders are explicitly expressed by the runner and not determined per element.
   
   Eg. [KVs](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/coder.go#L556) and the [WindowedValueHeaders](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/coder.go#L824)
   
   [PairWithRestrictions](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L81) explicitly builds up a KV structure with the restriction, 
   
   And [SplitAndSize](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L170) does the same but with the size.
   
   Continuations add a non-orthogonal handling wrinkle that your code will be handling, but it's not clear that there's a better way to get the user value out of the user function and half computation, so FullValue it is.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -361,7 +371,7 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 			n.rt = rt
 			n.elm = elm
 			n.SU <- n
-			err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})
+			_, err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})

Review Comment:
   I agree with Danny that until the placeholder is removed, there should be a comment (referencing the JIRA) that indicates that it's a placeholder. Adding the comment now (even if it's going away in the next PR), is a stop gap incase there's unexpected delays for the PR, and a breadcrumb for someone else to pick it up if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lostluck commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
lostluck commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r850581653


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
-		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
-		return &n.ret, nil
-	case n.outEtIdx == 0:
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2, Pane: pn}
 		return &n.ret, nil
 	default:
-		panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match permitted return values.", r0, r1, r2))
+		if n.outErrIdx == 2 {

Review Comment:
   Style wise (and compiler wise) they're identical so no change should occur from this comment.
   
   The only exception for preferring an if-else ladder is that variables defined in earlier conditional clauses are in scope for the remaining branches eg:
   
   ```
   if foo, ok := getFoo(); !ok {
     // ... can't use foo because it's not OK
   } else if bar := foo.DoBar(); bar.Frobbled() {
    //  ... can use foo and bar. ...
   } else {
     // ... can use foo and bar. ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] jrmccluskey commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

Posted by GitBox <gi...@apache.org>.
jrmccluskey commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r850610142


##########
sdks/go/pkg/beam/core/graph/edge.go:
##########
@@ -426,8 +426,14 @@ func newDoFnNode(op Opcode, g *Graph, s *Scope, u *DoFn, in []*Node, rc *coder.C
 	for i := 0; i < len(in); i++ {
 		edge.Input = append(edge.Input, &Inbound{Kind: kinds[i], From: in[i], Type: inbound[i]})
 	}
+	continuation := false
+	// u.Fn is not guaranteed to be non-nil
+	if u.Fn != nil {

Review Comment:
   Ah I see, that simplifies things a little bit. That should do it then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org