You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/11 16:38:29 UTC

[GitHub] [beam] damccorm commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

damccorm commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r847497405


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -247,9 +253,16 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 		if r1 != nil {
 			return nil, r1.(error)
 		}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}
+			return &n.ret, nil
+		}
 		n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
 		return &n.ret, nil
 	case n.outEtIdx == 0:
+		if n.outPcIdx >= 0 {
+			panic("invoker.ret2: cannot return event time without a value")
+		}
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
 		return &n.ret, nil
 	default:

Review Comment:
   I think we're missing a case where outPcIdx >= 0 and neither outEtIdx nor outErrIdx are >= 0 (aka the case where a single element and a processContinuation are emitted together)



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,14 +227,17 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
 }
 
 // ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
 func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0 interface{}) (*FullValue, error) {
 	switch {
 	case n.outErrIdx >= 0:
 		if r0 != nil {
 			return nil, r0.(error)
 		}
 		return nil, nil
+	case n.outPcIdx >= 0:
+		n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}

Review Comment:
   Should we guard against r0 being nil here and either panic with a nicer exception or treat it as a Stop (I'd probably prefer panic)? Same question below



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +348,12 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		defer func() {
 			<-n.SU
 		}()
-		return n.PDo.processSingleWindow(mainIn)
+		continuation, processResult := n.PDo.processSingleWindow(mainIn)
+		if continuation != nil {
+			n.source.pc = continuation
+			n.source.selfSu = n

Review Comment:
   I think I missed the discussion on this in the design doc (I think maybe it got changed after I first looked, or I just missed it), but I don't love this mechanism of talking back to the datasource. This creates a weird backwards dependency and makes both classes harder to update as a result.
   
   Is there any reason we can't try to infer a `ProcessSizedElementsAndRestrictions` type on the out element and if its there check for a ProcessContinuation using some built in method to `ProcessSizedElementsAndRestrictions`? That would avoid the circular dependency here. https://github.com/apache/beam/blob/9bb766b02fbd371b66221f8d3ed1e1228e7a9588/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L185



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -361,7 +371,7 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 			n.rt = rt
 			n.elm = elm
 			n.SU <- n
-			err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})
+			_, err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})

Review Comment:
   A comment explaining why we don't need to worry about ProcessContinuation here might be helpful



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
-		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
-		return &n.ret, nil
-	case n.outEtIdx == 0:
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2, Pane: pn}
 		return &n.ret, nil
 	default:
-		panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match permitted return values.", r0, r1, r2))
+		if n.outErrIdx == 2 {

Review Comment:
   I think this reads cleaner if you move it up to a condition in the outer switch:
   ```
   switch {
   case n.outEtIdx >= 0:
      // Do stuff
   case n.outErrIdx == 2:
      // Do stuff
   default:
      n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
      return &n.ret, nil



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -35,9 +36,10 @@ type FullValue struct {
 	Elm  interface{} // Element or KV key.
 	Elm2 interface{} // KV value, if not invalid
 
-	Timestamp typex.EventTime
-	Windows   []typex.Window
-	Pane      typex.PaneInfo
+	Timestamp    typex.EventTime
+	Windows      []typex.Window
+	Pane         typex.PaneInfo
+	Continuation sdf.ProcessContinuation

Review Comment:
   Genuine (and possibly dumb) question - I'm not totally clear on how our encoding works, but will this cause the whole ProcessContinuation object to get encoded as part of our responses to the runner?



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {

Review Comment:
   There's a lot of mixing `>=` conditions and `==` conditions on how we check these indices - we should coallesce to one approach. As a reader, the back and forth makes it harder to follow. When possible, I'd vote to prefer the exact `==` check since validation of signatures should be taken care of on the front end, and if we don't have the indices right we'll struggle later on anyways.



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -261,21 +274,34 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret3 handles processing of a trio of return values.
 func (n *invoker) ret3(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1, r2 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx >= 0:
-		if r2 != nil {
-			return nil, r2.(error)
+	case n.outEtIdx >= 0:
+		if n.outErrIdx == 2 {
+			if r2 != nil {
+				return nil, r2.(error)
+			}
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
+			return &n.ret, nil
 		}
-		if n.outEtIdx < 0 {
-			n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Elm2: r1, Pane: pn}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn, Continuation: r2.(sdf.ProcessContinuation)}
 			return &n.ret, nil
 		}
-		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
-		return &n.ret, nil
-	case n.outEtIdx == 0:
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Elm2: r2, Pane: pn}
 		return &n.ret, nil
 	default:
-		panic(fmt.Sprintf("invoker.ret3: %T, %T, and %T don't match permitted return values.", r0, r1, r2))
+		if n.outErrIdx == 2 {

Review Comment:
   If you don't do that, there's no reason to use a switch instead of a simple if/else here



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -284,7 +310,24 @@ func (n *invoker) ret4(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 	if r3 != nil {

Review Comment:
   I don't think this check is valid anymore. Couldn't you have something with 4 returns and no errors (1 event time, 2 kv elements, and 1 process continuation)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org