You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/26 14:36:56 UTC

[GitHub] [beam] damccorm commented on a diff in pull request #17432: [BEAM-11106] Support drain in Go SDK

damccorm commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r858778358


##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
 	return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+	UID         UnitID
+	Fn          *graph.DoFn
+	Out         Node
+	truncateInv *trInvoker
+	sizeInv     *rsInvoker
+	ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+	return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+	fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+	var err error
+	if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+		return errors.WithContextf(err, "%v", n)
+	}
+
+	fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+	if fn != nil {
+		if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err != nil {
+			return err
+		}
+	}
+	fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+	if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+		return err
+	}
+	return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, data DataContext) error {
+	return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
+	mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+	// TODO: change restriction extraction to consider watermark estimator after BEAM-11105 is merged.
+	rest := elm.Elm.(*FullValue).Elm2
+	rt := n.ctInv.Invoke(rest)
+	var err error
+	var newRest interface{}
+	if n.truncateInv == nil {
+		newRest = DefaultTruncateRestriction(rt)

Review Comment:
   Rather than having this nil check, could we define `n.truncateInv` to default to `DefaultTruncateRestriction` in `Up`? That way we're spending less time on it on the hotter ProcessElement path and it should simplify some of our logic here.



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -308,6 +314,17 @@ func (f *SplittableDoFn) RestrictionT() reflect.Type {
 	return f.CreateInitialRestrictionFn().Ret[0].T
 }
 
+// HasTruncateRestriction returns whether the DoFn implements a custom truncate restriction function.
+func (f *SplittableDoFn) HasTruncateRestriction() bool {
+	_, ok := f.methods[truncateRestrictionName]
+	return ok
+}
+
+// TruncateRestrictionFn returns the "TruncateRestriction" function, if present.
+func (f *SplittableDoFn) TruncateRestrictionFn() *funcx.Fn {
+	return f.methods[truncateRestrictionName]
+}
+

Review Comment:
   We should also add validation code (and corresponding tests) to make sure that the supplied TruncateRestriction function is valid (similar to https://github.com/apache/beam/blob/15a064433a363f4c5443b55a43fc29dff836872c/sdks/go/pkg/beam/core/graph/fn.go#L526). This would include validating that the input element matches the input element type used in ProcessElement and that it has the correct number/type of parameters



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
 	return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+	UID         UnitID
+	Fn          *graph.DoFn
+	Out         Node
+	truncateInv *trInvoker
+	sizeInv     *rsInvoker
+	ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+	return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+	fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+	var err error
+	if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+		return errors.WithContextf(err, "%v", n)
+	}
+
+	fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+	if fn != nil {
+		if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err != nil {
+			return err
+		}
+	}
+	fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+	if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+		return err
+	}
+	return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, data DataContext) error {
+	return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
+	mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+	// TODO: change restriction extraction to consider watermark estimator after BEAM-11105 is merged.
+	rest := elm.Elm.(*FullValue).Elm2
+	rt := n.ctInv.Invoke(rest)
+	var err error
+	var newRest interface{}
+	if n.truncateInv == nil {
+		newRest = DefaultTruncateRestriction(rt)
+	} else {
+		newRest = n.truncateInv.Invoke(rt, mainElm)
+	}
+	size := n.sizeInv.Invoke(mainElm, newRest)
+	output := &FullValue{}
+	output.Timestamp = elm.Timestamp
+	output.Windows = elm.Windows
+	output.Elm = &FullValue{Elm: mainElm, Elm2: newRest}
+	output.Elm2 = size
+
+	if err = n.Out.ProcessElement(ctx, output, values...); err != nil {
+		return err
+	}
+	return nil
+}
+
+// FinishBundle resets the invokers.
+func (n *TruncateSizedRestriction) FinishBundle(ctx context.Context) error {
+	n.truncateInv.Reset()

Review Comment:
   We need to reset `n.ctInv` here as well



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -212,6 +212,113 @@ func (n *SplitAndSizeRestrictions) String() string {
 	return fmt.Sprintf("SDF.SplitAndSizeRestrictions[%v] UID:%v Out:%v", path.Base(n.Fn.Name()), n.UID, IDs(n.Out))
 }
 
+// TruncateSizedRestriction is an executor for the expanded SDF step of the
+// same name. This step is added to the expanded SDF when the runner signals to drain
+// the pipeline. This step is followed by ProcessSizedElementsAndRestrictions.
+type TruncateSizedRestriction struct {
+	UID         UnitID
+	Fn          *graph.DoFn
+	Out         Node
+	truncateInv *trInvoker
+	sizeInv     *rsInvoker
+	ctInv       *ctInvoker
+}
+
+// ID return the UnitID for this unit.
+func (n *TruncateSizedRestriction) ID() UnitID {
+	return n.UID
+}
+
+// Up performs one-time setup for this executor.
+func (n *TruncateSizedRestriction) Up(ctx context.Context) error {
+	fn := (*graph.SplittableDoFn)(n.Fn).CreateTrackerFn()
+	var err error
+	if n.ctInv, err = newCreateTrackerInvoker(fn); err != nil {
+		return errors.WithContextf(err, "%v", n)
+	}
+
+	fn = (*graph.SplittableDoFn)(n.Fn).TruncateRestrictionFn()
+	if fn != nil {
+		if n.truncateInv, err = newTruncateRestrictionInvoker(fn); err != nil {
+			return err
+		}
+	}
+	fn = (*graph.SplittableDoFn)(n.Fn).RestrictionSizeFn()
+	if n.sizeInv, err = newRestrictionSizeInvoker(fn); err != nil {
+		return err
+	}
+	return nil
+}
+
+// StartBundle currently does nothing.
+func (n *TruncateSizedRestriction) StartBundle(ctx context.Context, id string, data DataContext) error {
+	return n.Out.StartBundle(ctx, id, data)
+}
+
+// ProcessElement gets input elm as:
+// Input Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+//
+// Output Diagram:
+//   *FullValue {
+//     Elm: *FullValue {
+//       Elm:  *FullValue (original input)
+//       Elm2: Restriction
+//     }
+//     Elm2: float64 (size)
+//     Windows
+//     Timestamps
+//    }
+func (n *TruncateSizedRestriction) ProcessElement(ctx context.Context, elm *FullValue, values ...ReStream) error {
+	mainElm := elm.Elm.(*FullValue).Elm.(*FullValue)
+	// TODO: change restriction extraction to consider watermark estimator after BEAM-11105 is merged.
+	rest := elm.Elm.(*FullValue).Elm2
+	rt := n.ctInv.Invoke(rest)
+	var err error
+	var newRest interface{}
+	if n.truncateInv == nil {
+		newRest = DefaultTruncateRestriction(rt)
+	} else {
+		newRest = n.truncateInv.Invoke(rt, mainElm)
+	}
+	size := n.sizeInv.Invoke(mainElm, newRest)
+	output := &FullValue{}
+	output.Timestamp = elm.Timestamp
+	output.Windows = elm.Windows
+	output.Elm = &FullValue{Elm: mainElm, Elm2: newRest}
+	output.Elm2 = size
+
+	if err = n.Out.ProcessElement(ctx, output, values...); err != nil {
+		return err
+	}
+	return nil
+}
+
+// FinishBundle resets the invokers.
+func (n *TruncateSizedRestriction) FinishBundle(ctx context.Context) error {
+	n.truncateInv.Reset()

Review Comment:
   If truncateInv is nil, this will throw (another reason the approach I suggested above will help :) )



##########
sdks/go/pkg/beam/core/sdf/sdf.go:
##########
@@ -87,6 +87,10 @@ type RTracker interface {
 	// GetRestriction returns the restriction this tracker is tracking, or nil if the restriction
 	// is unavailable for some reason.
 	GetRestriction() interface{}
+
+	// IsBounded returns the boundedness of the current restriction. If the current restriction represents a
+	// finite amount of work, it should return sdf.Bounded. Otherwise, it should return sdf.Unbounded.
+	IsBounded() bool

Review Comment:
   This would be a breaking change for anyone who implements the RTracker interface (which is also why you needed to update all existing Trackers). One way to make this non-breaking would be to introduce a new interface that you can compose with this one, something like:
   
   ```
   type UnboundableRTracker interface {
      IsBounded() bool
   }
   ```
   (UnboundableRTracker might be a bad name, feel free to come up with something better 😄 ). Then whenever we want to check boundedness, we can call something like:
   
   ```
   isRtrackerBound(tracker RTracker) {
      if uTracker, ok := tracker.(UnboundableRTracker); ok && uTracker.IsBounded() {
         return true
      }
      return false
   }
   ```



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
 		n.args[i] = nil
 	}
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+	fn   *funcx.Fn
+	args []interface{}
+	call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) {
+	switch restTracker.(type) {
+	case *offsetrange.Tracker:
+		return restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) // since offsetrange has a bounded restriction
+	default:
+		return nil

Review Comment:
   This default is confusing to me - why do we only handle the offsetrange case? Shouldn't we be doing a check to see if its bounded or not and then either returning the original restriction or nil?



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
 		n.args[i] = nil
 	}
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+	fn   *funcx.Fn
+	args []interface{}
+	call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) {
+	switch restTracker.(type) {
+	case *offsetrange.Tracker:
+		return restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) // since offsetrange has a bounded restriction
+	default:
+		return nil
+	}
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+	n := &trInvoker{
+		fn:   fn,
+		args: make([]interface{}, len(fn.Param)),
+	}
+	if err := n.initCallFn(); err != nil {
+		return nil, errors.WithContext(err, "sdf TruncateRestriction invoker")
+	}
+	return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+	// Expects a signature of the form:
+	// (key?, value, restriction) []restriction
+	// TODO(BEAM-9643): Link to full documentation.
+	switch fnT := n.fn.Fn.(type) {
+	case reflectx.Func2x1:
+		n.call = func(elms *FullValue, rest interface{}) interface{} {
+			return fnT.Call2x1(rest, elms.Elm)

Review Comment:
   Putting the restriction before the element is the opposite of what was described in the design doc. I actually think its the right ordering though, since its what was done in ProcessElement (and it matches my watermark changes), could you update the doc though?



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
 		n.args[i] = nil
 	}
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+	fn   *funcx.Fn
+	args []interface{}
+	call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) {
+	switch restTracker.(type) {
+	case *offsetrange.Tracker:
+		return restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) // since offsetrange has a bounded restriction
+	default:
+		return nil
+	}
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+	n := &trInvoker{
+		fn:   fn,
+		args: make([]interface{}, len(fn.Param)),
+	}
+	if err := n.initCallFn(); err != nil {
+		return nil, errors.WithContext(err, "sdf TruncateRestriction invoker")
+	}
+	return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+	// Expects a signature of the form:
+	// (key?, value, restriction) []restriction
+	// TODO(BEAM-9643): Link to full documentation.
+	switch fnT := n.fn.Fn.(type) {
+	case reflectx.Func2x1:
+		n.call = func(elms *FullValue, rest interface{}) interface{} {
+			return fnT.Call2x1(rest, elms.Elm)
+		}
+	case reflectx.Func3x1:
+		n.call = func(elms *FullValue, rest interface{}) interface{} {
+			return fnT.Call3x1(rest, elms.Elm, elms.Elm2)
+		}
+	default:
+		switch len(n.fn.Param) {
+		case 2:
+			n.call = func(elms *FullValue, rest interface{}) interface{} {
+				n.args[0] = rest
+				n.args[1] = elms.Elm
+				return n.fn.Fn.Call(n.args)[0]
+			}
+		case 3:
+			n.call = func(elms *FullValue, rest interface{}) interface{} {
+				n.args[1] = elms.Elm
+				n.args[2] = elms.Elm2
+				n.args[0] = rest

Review Comment:
   ```suggestion
   				n.args[0] = rest
   				n.args[1] = elms.Elm
   				n.args[2] = elms.Elm2
   ```
   
   Nit - you go in lowest to highest order above, could you do that here as well?



##########
sdks/go/pkg/beam/core/sdf/sdf.go:
##########
@@ -87,6 +87,10 @@ type RTracker interface {
 	// GetRestriction returns the restriction this tracker is tracking, or nil if the restriction
 	// is unavailable for some reason.
 	GetRestriction() interface{}
+
+	// IsBounded returns the boundedness of the current restriction. If the current restriction represents a
+	// finite amount of work, it should return sdf.Bounded. Otherwise, it should return sdf.Unbounded.
+	IsBounded() bool

Review Comment:
   Note - this implicitly treats all existing restrictions as bounded (which IMO is the right thing since we're introducing new behaviors here for unbounded restrictions)



##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
 		n.args[i] = nil
 	}
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+	fn   *funcx.Fn
+	args []interface{}
+	call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) {
+	switch restTracker.(type) {
+	case *offsetrange.Tracker:
+		return restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) // since offsetrange has a bounded restriction
+	default:
+		return nil
+	}
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+	n := &trInvoker{
+		fn:   fn,
+		args: make([]interface{}, len(fn.Param)),
+	}
+	if err := n.initCallFn(); err != nil {
+		return nil, errors.WithContext(err, "sdf TruncateRestriction invoker")
+	}
+	return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+	// Expects a signature of the form:
+	// (key?, value, restriction) []restriction
+	// TODO(BEAM-9643): Link to full documentation.
+	switch fnT := n.fn.Fn.(type) {
+	case reflectx.Func2x1:
+		n.call = func(elms *FullValue, rest interface{}) interface{} {
+			return fnT.Call2x1(rest, elms.Elm)
+		}
+	case reflectx.Func3x1:
+		n.call = func(elms *FullValue, rest interface{}) interface{} {
+			return fnT.Call3x1(rest, elms.Elm, elms.Elm2)
+		}
+	default:
+		switch len(n.fn.Param) {
+		case 2:
+			n.call = func(elms *FullValue, rest interface{}) interface{} {
+				n.args[0] = rest
+				n.args[1] = elms.Elm
+				return n.fn.Fn.Call(n.args)[0]
+			}
+		case 3:
+			n.call = func(elms *FullValue, rest interface{}) interface{} {
+				n.args[1] = elms.Elm
+				n.args[2] = elms.Elm2
+				n.args[0] = rest
+				return n.fn.Fn.Call(n.args)[0]
+			}
+		default:
+			return errors.Errorf("TruncateRestriction fn %v has unexpected number of parameters: %v",
+				n.fn.Fn.Name(), len(n.fn.Param))
+		}
+	}
+	return nil
+}
+
+// Invoke calls TruncateRestriction given a FullValue containing an element and
+// the associated restriction tracker, and returns a truncated restriction.
+func (n *trInvoker) Invoke(rt interface{}, elms *FullValue) (rest interface{}) {
+	return n.call(elms, rt)

Review Comment:
   Nit: There's not really a reason to have it defined `Invoke(rest, elms)` and then have the invoker's call function defined `(elms, rest)` - could you flip the ordering of the call function to match all the other orderings here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org