You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/07 04:51:36 UTC

[GitHub] [beam] youngoli commented on a change in pull request #16980: [BEAM-10976] Bundle finalization: Harness and some exec changes

youngoli commented on a change in pull request #16980:
URL: https://github.com/apache/beam/pull/16980#discussion_r820365091



##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn_test.go
##########
@@ -195,6 +196,50 @@ func TestInvoke(t *testing.T) {
 	}
 }
 
+func TestRegisterCallback(t *testing.T) {
+	bf := bundleFinalizer{
+		callbacks:         []bundleFinalizationCallback{},
+		lastValidCallback: time.Now(),
+	}
+	testVar := 0
+	bf.RegisterCallback(500*time.Minute, func() error {
+		testVar += 5
+		return nil
+	})
+	bf.RegisterCallback(2*time.Minute, func() error {
+		testVar = 25
+		return nil
+	})
+	callbackErr := errors.New("Callback error")
+	bf.RegisterCallback(2*time.Minute, func() error {
+		return callbackErr
+	})
+
+	// We can't do exact equality since this relies on real time, we'll give it a broad range
+	if bf.lastValidCallback.Before(time.Now().Add(400*time.Minute)) || bf.lastValidCallback.After(time.Now().Add(600*time.Minute)) {
+		t.Errorf("RegisterCallback() lastValidCallback set to %v, want about 500 minutes", bf.lastValidCallback)
+	}
+	if got, want := len(bf.callbacks), 3; got != want {
+		t.Fatalf("RegisterCallback() called twice, got %v callbacks, want %v", got, want)

Review comment:
       Nit: Looks like a lot of these error messages were written with the wrong number of callbacks in mind (like here says it was called twice when it was called three times). For the most part though, I think you can avoid hardcoding that into the error message entirely. Something like "callbacks in bundleFinalizer does not match number of calls to RegisterCallback(), got %v callbacks, want %v". Wordier but doesn't need to be changed whenever the test changes.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn_test.go
##########
@@ -195,6 +196,50 @@ func TestInvoke(t *testing.T) {
 	}
 }
 
+func TestRegisterCallback(t *testing.T) {
+	bf := bundleFinalizer{
+		callbacks:         []bundleFinalizationCallback{},
+		lastValidCallback: time.Now(),
+	}
+	testVar := 0
+	bf.RegisterCallback(500*time.Minute, func() error {
+		testVar += 5
+		return nil
+	})
+	bf.RegisterCallback(2*time.Minute, func() error {
+		testVar = 25
+		return nil
+	})
+	callbackErr := errors.New("Callback error")
+	bf.RegisterCallback(2*time.Minute, func() error {
+		return callbackErr
+	})
+
+	// We can't do exact equality since this relies on real time, we'll give it a broad range
+	if bf.lastValidCallback.Before(time.Now().Add(400*time.Minute)) || bf.lastValidCallback.After(time.Now().Add(600*time.Minute)) {
+		t.Errorf("RegisterCallback() lastValidCallback set to %v, want about 500 minutes", bf.lastValidCallback)
+	}
+	if got, want := len(bf.callbacks), 3; got != want {
+		t.Fatalf("RegisterCallback() called twice, got %v callbacks, want %v", got, want)
+	}
+
+	if err := bf.callbacks[0].callback(); err != nil {
+		t.Errorf("RegisterCallback() first callback returned error %v, want nil", err)
+	}
+	if got, want := testVar, 5; got != want {
+		t.Errorf("RegisterCallback() first callback set testvar to %v, want %v", got, want)
+	}
+	if err := bf.callbacks[1].callback(); err != nil {
+		t.Errorf("RegisterCallback() second callback returned error %v, want nil", err)
+	}
+	if got, want := testVar, 25; got != want {
+		t.Errorf("RegisterCallback() second callback set testvar to %v, want %v", got, want)
+	}
+	if err := bf.callbacks[2].callback(); err != callbackErr {
+		t.Errorf("RegisterCallback() second callback returned error %v, want %v", err, callbackErr)

Review comment:
       Nit: This is actually the third callback. Although it might be easier to add the index to the format string instead.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/plan.go
##########
@@ -131,6 +141,41 @@ func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) erro
 	return nil
 }
 
+func (p *Plan) Finalize(ctx context.Context, id string) error {

Review comment:
       Remember to document any exported functions, we get linter errors otherwise.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/plan.go
##########
@@ -131,6 +141,41 @@ func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) erro
 	return nil
 }
 
+func (p *Plan) Finalize(ctx context.Context, id string) error {
+	if p.status != Up {
+		return errors.Errorf("invalid status for plan %v: %v", p.id, p.status)
+	}
+	failedIndices := []int{}
+	for idx, bfc := range p.bf.callbacks {
+		if time.Now().Before(bfc.validUntil) {
+			if err := bfc.callback(); err != nil {
+				failedIndices = append(failedIndices, idx)
+			}
+		}
+	}
+
+	newFinalizer := bundleFinalizer{
+		callbacks:         []bundleFinalizationCallback{},
+		lastValidCallback: time.Now(),
+	}
+
+	for _, idx := range failedIndices {
+		newFinalizer.callbacks = append(newFinalizer.callbacks, p.bf.callbacks[idx])
+		if newFinalizer.lastValidCallback.Before(p.bf.callbacks[idx].validUntil) {
+			newFinalizer.lastValidCallback = p.bf.callbacks[idx].validUntil
+		}
+	}
+
+	if len(failedIndices) > 0 {
+		return errors.Errorf("Plan %v failed %v callbacks", p.ID(), len(failedIndices))
+	}
+	return nil
+}
+
+func (p *Plan) GetExpirationTime(ctx context.Context, id string) time.Time {

Review comment:
       The id parameter isn't being used, can we remove it?

##########
File path: sdks/go/pkg/beam/core/runtime/exec/plan.go
##########
@@ -131,6 +141,41 @@ func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) erro
 	return nil
 }
 
+func (p *Plan) Finalize(ctx context.Context, id string) error {
+	if p.status != Up {
+		return errors.Errorf("invalid status for plan %v: %v", p.id, p.status)
+	}
+	failedIndices := []int{}
+	for idx, bfc := range p.bf.callbacks {
+		if time.Now().Before(bfc.validUntil) {
+			if err := bfc.callback(); err != nil {
+				failedIndices = append(failedIndices, idx)
+			}
+		}
+	}
+
+	newFinalizer := bundleFinalizer{

Review comment:
       Not sure if I missed something, but it looks like newFinalizer doesn't actually do anything. It gets populated with the failed callbacks, but then this function just returns without actually using it or storing it anywhere.

##########
File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
##########
@@ -506,6 +563,12 @@ func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRe
 func (c *control) getPlanOrResponse(ctx context.Context, kind string, instID, ref instructionID) (*exec.Plan, *metrics.Store, *fnpb.InstructionResponse) {
 	c.mu.Lock()
 	plan, ok := c.active[ref]
+	if !ok {
+		awaitingFinalization, ok := c.awaitingFinalization[ref]

Review comment:
       I'm not 100% sure, but I vaguely recall having an issue where using := in an inner scope would create a new variable of the same name as an outer scope variable. In this case, this inner ok would be a separate variable than the outer one, so like like lower down in line 579, the outer ok is still false and it would return a failure.
   
   Regardless of whether this is intended behavior or not, I think you should rename this for clarity (something like ok2 is fine).

##########
File path: sdks/go/pkg/beam/core/runtime/exec/fn_test.go
##########
@@ -195,6 +196,50 @@ func TestInvoke(t *testing.T) {
 	}
 }
 
+func TestRegisterCallback(t *testing.T) {
+	bf := bundleFinalizer{
+		callbacks:         []bundleFinalizationCallback{},
+		lastValidCallback: time.Now(),
+	}
+	testVar := 0
+	bf.RegisterCallback(500*time.Minute, func() error {
+		testVar += 5
+		return nil
+	})
+	bf.RegisterCallback(2*time.Minute, func() error {
+		testVar = 25
+		return nil
+	})
+	callbackErr := errors.New("Callback error")
+	bf.RegisterCallback(2*time.Minute, func() error {
+		return callbackErr
+	})
+
+	// We can't do exact equality since this relies on real time, we'll give it a broad range
+	if bf.lastValidCallback.Before(time.Now().Add(400*time.Minute)) || bf.lastValidCallback.After(time.Now().Add(600*time.Minute)) {
+		t.Errorf("RegisterCallback() lastValidCallback set to %v, want about 500 minutes", bf.lastValidCallback)
+	}
+	if got, want := len(bf.callbacks), 3; got != want {
+		t.Fatalf("RegisterCallback() called twice, got %v callbacks, want %v", got, want)
+	}
+
+	if err := bf.callbacks[0].callback(); err != nil {
+		t.Errorf("RegisterCallback() first callback returned error %v, want nil", err)

Review comment:
       Errors should usually be wrapped at the end of the string, separated by a colon, whenever possible. For example: "RegisterCallback() first callback returned unexpected error: %v".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org