You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/12 08:43:43 UTC

[GitHub] [beam] youngoli commented on a diff in pull request #17267: [BEAM-11105] Basic Watermark Estimation (Wall Clock Observing)

youngoli commented on code in PR #17267:
URL: https://github.com/apache/beam/pull/17267#discussion_r848100363


##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -913,6 +950,64 @@ func validateSdfElementT(fn *Fn, name string, method *funcx.Fn, num int) error {
 	return nil
 }
 
+// validateIsWatermarkEstimating returns true if watermark estimator methods are present on the DoFn, returns
+// false if they aren't, and returns an error if they are present but the function isn't an sdf and thus doesn't
+// support watermark estimation
+func validateIsWatermarkEstimating(fn *Fn, isSdf bool) (bool, error) {
+	var isWatermarkEstimating bool
+	if _, ok := fn.methods[createWatermarkEstimatorName]; ok {
+		isWatermarkEstimating = true
+	}
+	if !isSdf && isWatermarkEstimating {
+		return false, errors.Errorf("Watermark estimation method %v is defined on non-splittable DoFn. Watermark"+
+			"estimation is only valid on splittable DoFns", createWatermarkEstimatorName)
+	}
+	return isWatermarkEstimating, nil
+}
+
+// validateWatermarkSig validates that all watermark related functions are valid
+func validateWatermarkSig(fn *Fn) error {
+	paramRange := map[string][]int{

Review Comment:
   Nit: While I see that you're future-proofing this validation, I think simplifying it would make it much more legible (that is, instead of having a map and everything, since we only have one method with 0 params we can just store that in a variable like `paramsNum := 0`.
   
   I'd make an exception if you're already sure that additional methods or params will be added, like if those are already in the design doc. If that's the case then future-proofing is fine and you can disregard this nit.



##########
sdks/go/pkg/beam/core/graph/fn_test.go:
##########
@@ -227,6 +228,59 @@ func TestNewDoFnSdf(t *testing.T) {
 	})
 }
 
+func TestNewDoFnWatermarkEstimating(t *testing.T) {
+	t.Run("valid", func(t *testing.T) {
+		tests := []struct {
+			dfn  interface{}
+			main mainInputs
+		}{
+			{dfn: &GoodWatermarkEstimating{}, main: MainSingle},
+		}
+
+		for _, test := range tests {
+			t.Run(reflect.TypeOf(test.dfn).String(), func(t *testing.T) {
+				// Valid DoFns should pass validation with and without KV info.
+				if _, err := NewDoFn(test.dfn); err != nil {
+					t.Fatalf("NewDoFn with Watermark Estimation failed: %v", err)
+				}
+				if _, err := NewDoFn(test.dfn, NumMainInputs(test.main)); err != nil {
+					t.Fatalf("NewDoFn(NumMainInputs(%v)) with Watermark Estimation failed: %v", test.main, err)
+				}
+			})
+		}
+	})
+	t.Run("invalid", func(t *testing.T) {
+		tests := []struct {
+			dfn interface{}
+		}{
+			{dfn: &BadWatermarkEstimatingNonSdf{}},
+			{dfn: &BadWatermarkEstimatingCreateWatermarkEstimatorReturnType{}},
+		}
+		for _, test := range tests {
+			t.Run(reflect.TypeOf(test.dfn).String(), func(t *testing.T) {
+				if cfn, err := NewDoFn(test.dfn); err != nil {
+					t.Logf("NewDoFn with SDF failed as expected:\n%v", err)
+				} else {
+					t.Errorf("NewDoFn(%v) = %v, want failure", cfn.Name(), cfn)
+				}
+				// If validation fails with unknown main inputs, then it should
+				// always fail for any known number of main inputs, so test them
+				// all. Error messages won't necessarily match.
+				if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainSingle)); err != nil {
+					t.Logf("NewDoFn(NumMainInputs(MainSingle)) with SDF failed as expected:\n%v", err)
+				} else {
+					t.Errorf("NewDoFn(%v, NumMainInputs(MainSingle)) = %v, want failure", cfn.Name(), cfn)
+				}
+				if cfn, err := NewDoFn(test.dfn, NumMainInputs(MainKv)); err != nil {

Review Comment:
   Nit: This KV section can be removed since the watermark estimator validation code isn't affected in any way by whether the main input is a KV or not.



##########
sdks/go/pkg/beam/core/graph/fn.go:
##########
@@ -301,6 +308,24 @@ func (f *SplittableDoFn) RestrictionT() reflect.Type {
 	return f.CreateInitialRestrictionFn().Ret[0].T
 }
 
+// IsWatermarkEstimating returns whether the DoFn implements a custom watermark estimator.
+func (f *SplittableDoFn) IsWatermarkEstimating() bool {
+	// Validation already passed, so if one SDF method is present they should
+	// all be present.

Review Comment:
   Nit: This looks like a copy-pasted comment.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -327,6 +338,10 @@ func (n *ProcessSizedElementsAndRestrictions) ProcessElement(_ context.Context,
 		}
 	}
 
+	if n.cweInv != nil {
+		n.PDo.we = n.cweInv.Invoke()

Review Comment:
   Creating a watermark estimator doesn't need to be done for each element, does it? If not, this should go in StartBundle.



##########
sdks/go/pkg/beam/core/runtime/exec/plan.go:
##########
@@ -24,6 +24,7 @@ import (
 	"time"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+	"google.golang.org/protobuf/types/known/timestamppb"

Review Comment:
   Is timestamppb the type we already represent watermarks as internally? From a cursory look it feels weird to represent it with a protobuf type, but I'm also not sure what we currently use so I just want to check.



##########
sdks/go/pkg/beam/core/runtime/exec/sdf.go:
##########
@@ -664,6 +686,21 @@ func (n *ProcessSizedElementsAndRestrictions) GetInputId() string {
 	return indexToInputId(0)
 }
 
+// GetOutputWatermark gets the current output watermark of the splittable unit
+// if one is defined, or returns nil otherwise.
+func (n *ProcessSizedElementsAndRestrictions) GetOutputWatermark() map[string]*timestamppb.Timestamp {
+	if n.PDo.we != nil {
+		ow := timestamppb.New(n.PDo.we.CurrentWatermark())
+		owMap := make(map[string]*timestamppb.Timestamp)
+		for _, out := range n.outputs {
+			owMap[out] = ow

Review Comment:
   Maybe it's because I'm not well-versed in how watermarks work, but why store output watermarks in a map like this if we set the same value for every single output? Why not just store the single timestamp? Is it because of some upcoming work that's going to build on this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org