You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/26 15:02:53 UTC

[GitHub] [beam] riteshghorse commented on a diff in pull request #17432: [BEAM-11106] Support drain in Go SDK

riteshghorse commented on code in PR #17432:
URL: https://github.com/apache/beam/pull/17432#discussion_r858827902


##########
sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:
##########
@@ -351,3 +352,84 @@ func (n *cweInvoker) Reset() {
 		n.args[i] = nil
 	}
 }
+
+// trInvoker is an invoker for TruncateRestriction.
+type trInvoker struct {
+	fn   *funcx.Fn
+	args []interface{}
+	call func(elms *FullValue, rest interface{}) (pair interface{})
+}
+
+var offsetrangeTracker = reflect.TypeOf((*offsetrange.Tracker)(nil)).Elem()
+
+// var growableOffsetRangeTracker = reflect.TypeOf((*growable_offsetrange.Tracker)(nil))
+
+func DefaultTruncateRestriction(restTracker interface{}) (newRest interface{}) {
+	switch restTracker.(type) {
+	case *offsetrange.Tracker:
+		return restTracker.(*offsetrange.Tracker).GetRestriction().(offsetrange.Restriction) // since offsetrange has a bounded restriction
+	default:
+		return nil
+	}
+}
+
+func newTruncateRestrictionInvoker(fn *funcx.Fn) (*trInvoker, error) {
+	n := &trInvoker{
+		fn:   fn,
+		args: make([]interface{}, len(fn.Param)),
+	}
+	if err := n.initCallFn(); err != nil {
+		return nil, errors.WithContext(err, "sdf TruncateRestriction invoker")
+	}
+	return n, nil
+}
+
+func (n *trInvoker) initCallFn() error {
+	// Expects a signature of the form:
+	// (key?, value, restriction) []restriction
+	// TODO(BEAM-9643): Link to full documentation.
+	switch fnT := n.fn.Fn.(type) {
+	case reflectx.Func2x1:
+		n.call = func(elms *FullValue, rest interface{}) interface{} {
+			return fnT.Call2x1(rest, elms.Elm)

Review Comment:
   Yeah, I figured it out while implementing. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org