You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/04/21 19:58:26 UTC

[beam] branch master updated: [BEAM-14346] Fix incorrect error case index in ret2() (#17425)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f4456636ab [BEAM-14346] Fix incorrect error case index in ret2() (#17425)
8f4456636ab is described below

commit 8f4456636abaa2444234626b37be8c4d0dcc14ef
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu Apr 21 15:58:19 2022 -0400

    [BEAM-14346] Fix incorrect error case index in ret2() (#17425)
---
 sdks/go/pkg/beam/core/runtime/exec/fn.go         |  2 +-
 sdks/go/pkg/beam/core/runtime/exec/fn_arity.go   |  3 --
 sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl |  3 --
 sdks/go/pkg/beam/core/runtime/exec/fn_test.go    | 62 ++++++++++++++++++------
 4 files changed, 49 insertions(+), 21 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 494a27598dc..13ceb3f20e1 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -252,7 +252,7 @@ func (n *invoker) ret1(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime,
 // ret2 handles processing of a pair of return values.
 func (n *invoker) ret2(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime, r0, r1 interface{}) (*FullValue, error) {
 	switch {
-	case n.outErrIdx == 2:
+	case n.outErrIdx == 1:
 		if r1 != nil {
 			return nil, r1.(error)
 		}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_arity.go b/sdks/go/pkg/beam/core/runtime/exec/fn_arity.go
index 3e34442a7de..d2ab194db83 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_arity.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_arity.go
@@ -305,9 +305,6 @@ func (n *invoker) initCall() {
 	default:
 		n.call = func(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime) (*FullValue, error) {
 			ret := n.fn.Fn.Call(n.args)
-			if n.outErrIdx >= 0 && ret[n.outErrIdx] != nil {
-				return nil, ret[n.outErrIdx].(error)
-			}
 
 			// (5) Return direct output, if any. Input timestamp and windows are implicitly
 			// propagated.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl b/sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl
index 6737e3f7891..25f1a2ff04d 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_arity.tmpl
@@ -45,9 +45,6 @@ func (n *invoker) initCall() {
 	default:
 		n.call = func(pn typex.PaneInfo, ws []typex.Window, ts typex.EventTime) (*FullValue, error) {
 			ret := n.fn.Fn.Call(n.args)
-			if n.outErrIdx >= 0 && ret[n.outErrIdx] != nil {
-				return nil, ret[n.outErrIdx].(error)
-			}
 
 			// (5) Return direct output, if any. Input timestamp and windows are implicitly
 			// propagated.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
index f63a0f2c077..80cceb40882 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
@@ -31,6 +31,8 @@ import (
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 )
 
+var errGeneric = errors.New("generic error")
+
 // TestInvoke verifies the the various forms of input to Invoke are handled correctly.
 func TestInvoke(t *testing.T) {
 	tests := []struct {
@@ -40,6 +42,7 @@ func TestInvoke(t *testing.T) {
 		Expected, Expected2  interface{}
 		ExpectedTime         typex.EventTime
 		ExpectedContinuation sdf.ProcessContinuation
+		ExpectedError        error
 	}{
 		{
 			// Void function
@@ -159,6 +162,30 @@ func TestInvoke(t *testing.T) {
 			Opt:      &MainInput{Key: FullValue{Elm: "basketball", Elm2: []typex.T{23}}},
 			Expected: "basketball", Expected2: 1,
 		},
+		{
+			// ret1() error check
+			Fn:            func(a int) error { return errGeneric },
+			Opt:           &MainInput{Key: FullValue{Elm: 1}},
+			ExpectedError: errGeneric,
+		},
+		{
+			// ret2() error check
+			Fn:            func(a int) (int, error) { return 0, errGeneric },
+			Opt:           &MainInput{Key: FullValue{Elm: 1}},
+			ExpectedError: errGeneric,
+		},
+		{
+			// ret3() error check
+			Fn:            func(a int) (typex.EventTime, int, error) { return 0, 0, errGeneric },
+			Opt:           &MainInput{Key: FullValue{Elm: 1}},
+			ExpectedError: errGeneric,
+		},
+		{
+			// ret4() error check
+			Fn:            func(a int) (typex.EventTime, string, int, error) { return 0, "", 0, errGeneric },
+			Opt:           &MainInput{Key: FullValue{Elm: 1}},
+			ExpectedError: errGeneric,
+		},
 		// TODO(BEAM-11104): Add unit test cases for ProcessContinuations once they are enabled for use.
 	}
 
@@ -181,20 +208,27 @@ func TestInvoke(t *testing.T) {
 			}
 
 			val, err := Invoke(context.Background(), typex.NoFiringPane(), window.SingleGlobalWindow, ts, fn, test.Opt, nil, test.Args...)
-			if err != nil {
-				t.Fatalf("Invoke(%v,%v) failed: %v", fn.Fn.Name(), test.Args, err)
-			}
-			if val != nil && val.Elm != test.Expected {
-				t.Errorf("Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Elm, test.Expected)
-			}
-			if val != nil && val.Elm2 != test.Expected2 {
-				t.Errorf("Elm2: Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Elm2, test.Expected2)
-			}
-			if val != nil && val.Timestamp != test.ExpectedTime {
-				t.Errorf("EventTime: Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Timestamp, test.ExpectedTime)
-			}
-			if val != nil && val.Continuation != test.ExpectedContinuation {
-				t.Errorf("EventTime: Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Continuation, test.ExpectedContinuation)
+
+			if test.ExpectedError != nil {
+				if err == nil {
+					t.Fatalf("Invoke(%v, %v) succeeded when it should have failed, want %v", fn.Fn.Name(), test.Args, test.ExpectedError)
+				}
+				if err != test.ExpectedError {
+					t.Errorf("Invoke(%v, %v) returned unexpected error, got %v, want %v", fn.Fn.Name(), test.Args, err, test.ExpectedError)
+				}
+			} else {
+				if val != nil && val.Elm != test.Expected {
+					t.Errorf("Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Elm, test.Expected)
+				}
+				if val != nil && val.Elm2 != test.Expected2 {
+					t.Errorf("Elm2: Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Elm2, test.Expected2)
+				}
+				if val != nil && val.Timestamp != test.ExpectedTime {
+					t.Errorf("EventTime: Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Timestamp, test.ExpectedTime)
+				}
+				if val != nil && val.Continuation != test.ExpectedContinuation {
+					t.Errorf("EventTime: Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Continuation, test.ExpectedContinuation)
+				}
 			}
 		})
 	}