You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/10 19:28:41 UTC

[GitHub] [beam] lostluck commented on a change in pull request #11645: [BEAM-9642] Adding Go SDF fallback for unexpanded SDFs.

lostluck commented on a change in pull request #11645:
URL: https://github.com/apache/beam/pull/11645#discussion_r422682835



##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##########
@@ -313,3 +313,94 @@ func (n *ProcessSizedElementsAndRestrictions) Down(ctx context.Context) error {
 func (n *ProcessSizedElementsAndRestrictions) String() string {
 	return fmt.Sprintf("SDF.ProcessSizedElementsAndRestrictions[%v] Out:%v", path.Base(n.PDo.Fn.Name()), IDs(n.PDo.Out...))
 }
+
+// SdfFallback is an executor used when an SDF isn't expanded into steps by the
+// runner, indicating that the runner doesn't support splitting. It executes all
+// the SDF steps together in one unit.
+type SdfFallback struct {
+	PDo *ParDo
+
+	initRestInv *cirInvoker
+	splitInv    *srInvoker
+	trackerInv  *ctInvoker
+}
+
+// ID just defers to the ParDo's ID method.
+func (n *SdfFallback) ID() UnitID {
+	return n.PDo.UID
+}
+
+// Up performs some one-time setup and then defers to the ParDo's Up method.
+func (n *SdfFallback) Up(ctx context.Context) error {
+	sdf := (*graph.SplittableDoFn)(n.PDo.Fn)
+	addContext := func(err error) error {
+		return errors.WithContextf(err, "SdfFallback transform with UID %v", n.ID())

Review comment:
       Consider just using the auto String() print to add as context instead of adding the extra words. eg. (err, "%v", n)

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##########
@@ -313,3 +313,94 @@ func (n *ProcessSizedElementsAndRestrictions) Down(ctx context.Context) error {
 func (n *ProcessSizedElementsAndRestrictions) String() string {
 	return fmt.Sprintf("SDF.ProcessSizedElementsAndRestrictions[%v] Out:%v", path.Base(n.PDo.Fn.Name()), IDs(n.PDo.Out...))
 }
+
+// SdfFallback is an executor used when an SDF isn't expanded into steps by the
+// runner, indicating that the runner doesn't support splitting. It executes all
+// the SDF steps together in one unit.
+type SdfFallback struct {
+	PDo *ParDo
+
+	initRestInv *cirInvoker
+	splitInv    *srInvoker
+	trackerInv  *ctInvoker
+}
+
+// ID just defers to the ParDo's ID method.
+func (n *SdfFallback) ID() UnitID {
+	return n.PDo.UID
+}
+
+// Up performs some one-time setup and then defers to the ParDo's Up method.
+func (n *SdfFallback) Up(ctx context.Context) error {
+	sdf := (*graph.SplittableDoFn)(n.PDo.Fn)
+	addContext := func(err error) error {
+		return errors.WithContextf(err, "SdfFallback transform with UID %v", n.ID())
+	}
+	var err error
+	if n.initRestInv, err = newCreateInitialRestrictionInvoker(sdf.CreateInitialRestrictionFn()); err != nil {
+		return addContext(err)
+	}
+	if n.splitInv, err = newSplitRestrictionInvoker(sdf.SplitRestrictionFn()); err != nil {
+		return addContext(err)
+	}
+	if n.trackerInv, err = newCreateTrackerInvoker(sdf.CreateTrackerFn()); err != nil {
+		return addContext(err)
+	}
+	return n.PDo.Up(ctx)
+}
+
+// StartBundle just defers to the ParDo's StartBundle method.

Review comment:
       Editorial nits: 
   We can probably remove the "just".
   Given "defer" is a keyword in Go, using defers might be misinterpreted. Consider replacing it with "calls"
   
   ```suggestion
   // StartBundle calls the ParDo's StartBundle method.
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##########
@@ -313,3 +313,94 @@ func (n *ProcessSizedElementsAndRestrictions) Down(ctx context.Context) error {
 func (n *ProcessSizedElementsAndRestrictions) String() string {
 	return fmt.Sprintf("SDF.ProcessSizedElementsAndRestrictions[%v] Out:%v", path.Base(n.PDo.Fn.Name()), IDs(n.PDo.Out...))
 }
+
+// SdfFallback is an executor used when an SDF isn't expanded into steps by the
+// runner, indicating that the runner doesn't support splitting. It executes all
+// the SDF steps together in one unit.
+type SdfFallback struct {
+	PDo *ParDo
+
+	initRestInv *cirInvoker
+	splitInv    *srInvoker
+	trackerInv  *ctInvoker
+}
+
+// ID just defers to the ParDo's ID method.
+func (n *SdfFallback) ID() UnitID {
+	return n.PDo.UID
+}
+
+// Up performs some one-time setup and then defers to the ParDo's Up method.
+func (n *SdfFallback) Up(ctx context.Context) error {
+	sdf := (*graph.SplittableDoFn)(n.PDo.Fn)
+	addContext := func(err error) error {
+		return errors.WithContextf(err, "SdfFallback transform with UID %v", n.ID())
+	}
+	var err error
+	if n.initRestInv, err = newCreateInitialRestrictionInvoker(sdf.CreateInitialRestrictionFn()); err != nil {
+		return addContext(err)
+	}
+	if n.splitInv, err = newSplitRestrictionInvoker(sdf.SplitRestrictionFn()); err != nil {
+		return addContext(err)
+	}
+	if n.trackerInv, err = newCreateTrackerInvoker(sdf.CreateTrackerFn()); err != nil {
+		return addContext(err)
+	}
+	return n.PDo.Up(ctx)
+}
+
+// StartBundle just defers to the ParDo's StartBundle method.
+func (n *SdfFallback) StartBundle(ctx context.Context, id string, data DataContext) error {
+	return n.PDo.StartBundle(ctx, id, data)
+}
+
+// ProcessElement performs all the work from the steps above in one transform.
+// This means creating initial restrictions, performing initial splits on those
+// restrictions, and then creating restriction trackers and processing each
+// restriction with the underlying ParDo. This executor skips the sizing step
+// because sizing information is unnecessary for unexpanded SDFs.
+func (n *SdfFallback) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
+	if n.PDo.status != Active {
+		return errors.Errorf("invalid status for ParDo %v: %v, want Active", n.PDo.UID, n.PDo.status)
+	}
+
+	rest := n.initRestInv.Invoke(elm)
+	splitRests := n.splitInv.Invoke(elm, rest)
+	if len(splitRests) == 0 {
+		err := errors.Errorf("initial splitting returned 0 restrictions.")
+		return errors.WithContextf(err, "SdfFallback transform with UID %v", n.ID())

Review comment:
       Same comment here WRT context.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##########
@@ -313,3 +313,94 @@ func (n *ProcessSizedElementsAndRestrictions) Down(ctx context.Context) error {
 func (n *ProcessSizedElementsAndRestrictions) String() string {
 	return fmt.Sprintf("SDF.ProcessSizedElementsAndRestrictions[%v] Out:%v", path.Base(n.PDo.Fn.Name()), IDs(n.PDo.Out...))
 }
+
+// SdfFallback is an executor used when an SDF isn't expanded into steps by the
+// runner, indicating that the runner doesn't support splitting. It executes all
+// the SDF steps together in one unit.
+type SdfFallback struct {
+	PDo *ParDo
+
+	initRestInv *cirInvoker
+	splitInv    *srInvoker
+	trackerInv  *ctInvoker
+}
+
+// ID just defers to the ParDo's ID method.
+func (n *SdfFallback) ID() UnitID {
+	return n.PDo.UID
+}
+
+// Up performs some one-time setup and then defers to the ParDo's Up method.
+func (n *SdfFallback) Up(ctx context.Context) error {
+	sdf := (*graph.SplittableDoFn)(n.PDo.Fn)
+	addContext := func(err error) error {
+		return errors.WithContextf(err, "SdfFallback transform with UID %v", n.ID())
+	}
+	var err error
+	if n.initRestInv, err = newCreateInitialRestrictionInvoker(sdf.CreateInitialRestrictionFn()); err != nil {
+		return addContext(err)
+	}
+	if n.splitInv, err = newSplitRestrictionInvoker(sdf.SplitRestrictionFn()); err != nil {
+		return addContext(err)
+	}
+	if n.trackerInv, err = newCreateTrackerInvoker(sdf.CreateTrackerFn()); err != nil {
+		return addContext(err)
+	}
+	return n.PDo.Up(ctx)
+}
+
+// StartBundle just defers to the ParDo's StartBundle method.
+func (n *SdfFallback) StartBundle(ctx context.Context, id string, data DataContext) error {
+	return n.PDo.StartBundle(ctx, id, data)
+}
+
+// ProcessElement performs all the work from the steps above in one transform.
+// This means creating initial restrictions, performing initial splits on those
+// restrictions, and then creating restriction trackers and processing each
+// restriction with the underlying ParDo. This executor skips the sizing step
+// because sizing information is unnecessary for unexpanded SDFs.
+func (n *SdfFallback) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
+	if n.PDo.status != Active {
+		return errors.Errorf("invalid status for ParDo %v: %v, want Active", n.PDo.UID, n.PDo.status)
+	}
+
+	rest := n.initRestInv.Invoke(elm)
+	splitRests := n.splitInv.Invoke(elm, rest)
+	if len(splitRests) == 0 {
+		err := errors.Errorf("initial splitting returned 0 restrictions.")
+		return errors.WithContextf(err, "SdfFallback transform with UID %v", n.ID())
+	}
+
+	for _, splitRest := range splitRests {
+		rt := n.trackerInv.Invoke(splitRest)
+		mainIn := &MainInput{
+			Key:      *elm,
+			Values:   values,
+			RTracker: rt,
+		}
+		if err := n.PDo.processMainInput(mainIn); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+// FinishBundle does some teardown for the end of the bundle and then defers to
+// the ParDo's FinishBundle method.

Review comment:
       ```suggestion
   // FinishBundle resets the invokers and then calls the ParDo's FinishBundle method.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org