You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/13 16:37:28 UTC

[GitHub] [beam] lostluck commented on a diff in pull request #17334: [BEAM-11104] Pipe Continuation to DataSource level

lostluck commented on code in PR #17334:
URL: https://github.com/apache/beam/pull/17334#discussion_r848647091


##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -247,9 +253,16 @@ func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 		if r1 != nil {
 			return nil, r1.(error)
 		}
+		if n.outPcIdx >= 0 {
+			n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}
+			return &n.ret, nil
+		}
 		n.ret = FullValue{Windows: ws, Timestamp: ts, Elm: r0, Pane: pn}
 		return &n.ret, nil
 	case n.outEtIdx == 0:
+		if n.outPcIdx >= 0 {
+			panic("invoker.ret2: cannot return event time without a value")
+		}
 		n.ret = FullValue{Windows: ws, Timestamp: r0.(typex.EventTime), Elm: r1, Pane: pn}
 		return &n.ret, nil
 	default:

Review Comment:
   I posit that that's a flawed construction we shouldn't permit. How much simpler does the code become if we avoid that case? (given the changes required, just guess, ignorning the sunk cost of "we already have this code") 
   
   Returning a single element and always choosing to stop or aborting the bundle feels awkward and it would be a performance pit when used accidentally.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -343,7 +347,11 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		defer func() {
 			<-n.SU
 		}()
-		return n.PDo.processSingleWindow(mainIn)
+		continuation, processResult := n.PDo.processSingleWindow(mainIn)
+		if continuation != nil {
+			n.continuation = continuation
+		}

Review Comment:
   The invoker code already ensured that the continuation is not nil, so why check here? Also, the continuation field would likely *always* be nil to start, so overriding it with a nil doesn't change anything.
   
   I'd rather that it gets overriden all the time, since it avoids cross bundle contamination to begin with.
   
   
   



##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -144,12 +145,13 @@ func (n *ParDo) processMainInput(mainIn *MainInput) error {
 	// is that either there is a single window or the function doesn't observe windows, so we can
 	// optimize it by treating all windows as a single one.
 	if !mustExplodeWindows(n.inv.fn, elm, len(n.Side) > 0) {
-		return n.processSingleWindow(mainIn)
+		_, processResult := n.processSingleWindow(mainIn)

Review Comment:
   Please add comments to both these calls to make it clear that they are ignored because only SDFs should have process continuations.



##########
sdks/go/pkg/beam/core/runtime/exec/fn.go:
##########
@@ -224,14 +227,17 @@ func (n *invoker) Invoke(ctx context.Context, pn typex.PaneInfo, ws []typex.Wind
 }
 
 // ret1 handles processing of a single return value.
-// Errors or single values are the only options.
+// Errors, single values, or a ProcessContinuation are the only options.
 func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0 interface{}) (*FullValue, error) {
 	switch {
 	case n.outErrIdx >= 0:
 		if r0 != nil {
 			return nil, r0.(error)
 		}
 		return nil, nil
+	case n.outPcIdx >= 0:
+		n.ret = FullValue{Windows: ws, Timestamp: ts, Pane: pn, Continuation: r0.(sdf.ProcessContinuation)}

Review Comment:
   Agreed with this whole discussion, since nil is always an error for a ProcessContinuation, and hopefully it should provide a clear enough error (identifying the malformed DoFn in question.)



##########
sdks/go/pkg/beam/core/runtime/exec/fullvalue.go:
##########
@@ -35,9 +36,10 @@ type FullValue struct {
 	Elm  interface{} // Element or KV key.
 	Elm2 interface{} // KV value, if not invalid
 
-	Timestamp typex.EventTime
-	Windows   []typex.Window
-	Pane      typex.PaneInfo
+	Timestamp    typex.EventTime
+	Windows      []typex.Window
+	Pane         typex.PaneInfo
+	Continuation sdf.ProcessContinuation

Review Comment:
   Where FullValue is concerned nothing is "automatic" for safety and performance reasons. Safety, to avoid the situation described, and performance because coders are explicitly expressed by the runner and not determined per element.
   
   Eg. [KVs](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/coder.go#L556) and the [WindowedValueHeaders](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/coder.go#L824)
   
   [PairWithRestrictions](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L81) explicitly builds up a KV structure with the restriction, 
   
   And [SplitAndSize](https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L170) does the same but with the size.
   
   Continuations add a non-orthogonal handling wrinkle that your code will be handling, but it's not clear that there's a better way to get the user value out of the user function and half computation, so FullValue it is.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -361,7 +371,7 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 			n.rt = rt
 			n.elm = elm
 			n.SU <- n
-			err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})
+			_, err := n.PDo.processSingleWindow(&MainInput{Key: wElm, Values: mainIn.Values, RTracker: rt})

Review Comment:
   I agree with Danny that until the placeholder is removed, there should be a comment (referencing the JIRA) that indicates that it's a placeholder. Adding the comment now (even if it's going away in the next PR), is a stop gap incase there's unexpected delays for the PR, and a breadcrumb for someone else to pick it up if necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org