You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2022/05/05 19:11:50 UTC

[beam] branch release-2.39.0 updated: Revert "[BEAM-11104] Enable ProcessContinuation return values, add unit test cases (#17533)" (#17562)

This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch release-2.39.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.39.0 by this push:
     new 8aa77e61e5f Revert "[BEAM-11104] Enable ProcessContinuation return values, add unit test cases (#17533)" (#17562)
8aa77e61e5f is described below

commit 8aa77e61e5ffc1efdc07772c591d4ab0a336533d
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu May 5 15:11:44 2022 -0400

    Revert "[BEAM-11104] Enable ProcessContinuation return values, add unit test cases (#17533)" (#17562)
    
    This reverts commit 6a50718364282b4e0a2fb266a7d6767e50961607.
---
 CHANGES.md                                    |  1 -
 sdks/go/pkg/beam/core/funcx/fn.go             | 18 +++-----
 sdks/go/pkg/beam/core/funcx/fn_test.go        | 15 ++-----
 sdks/go/pkg/beam/core/runtime/exec/fn_test.go | 63 ++-------------------------
 4 files changed, 13 insertions(+), 84 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 8071b24be4a..ffd6cd32024 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,7 +67,6 @@
 
 * 'Manage Clusters' JupyterLab extension added for users to configure usage of Dataproc clusters managed by Interactive Beam (Python) ([BEAM-14130](https://issues.apache.org/jira/browse/BEAM-14130)).
 * Pipeline drain support added for Go SDK ([BEAM-11106](https://issues.apache.org/jira/browse/BEAM-11106)). **Note: this feature is not yet fully validated and should be treated as experimental in this release.**
-* Go SDK users may now write self-checkpointing Splittable DoFns to read from streaming sources. **Note: this feature is not yet fully validated and should be treated as experimental in this release.** ([BEAM-11104](https://issues.apache.org/jira/browse/BEAM-11104))
 
 ## Breaking Changes
 
diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go
index 7d39aa1ce88..2580b9bbd9f 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -654,9 +654,10 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
 }
 
 var (
-	errEventTimeRetPrecedence        = errors.New("beam.EventTime must be first return parameter")
-	errErrorPrecedence               = errors.New("error must be the final return parameter")
-	errProcessContinuationPrecedence = errors.New("ProcessContinuation must be the final non-error return parameter")
+	errEventTimeRetPrecedence = errors.New("beam.EventTime must be first return parameter")
+	errErrorPrecedence        = errors.New("error must be the final return parameter")
+	// TODO(BEAM-11104): Enable process continuations as a valid return value.
+	errContinuationSupport = errors.New("process continuations are not supported in this SDK release; see https://issues.apache.org/jira/browse/BEAM-11104 for the feature's current status")
 )
 
 type retState int
@@ -676,15 +677,8 @@ func nextRetState(cur retState, transition ReturnKind) (retState, error) {
 		case RetEventTime:
 			return rsEventTime, nil
 		}
-	case rsEventTime, rsOutput:
+	case rsEventTime, rsOutput, rsProcessContinuation:
 		// Identical to the default cases.
-	case rsProcessContinuation:
-		switch transition {
-		case RetError:
-			return rsError, nil
-		default:
-			return -1, errProcessContinuationPrecedence
-		}
 	case rsError:
 		// This is a terminal state. No valid transitions. error must be the final return value.
 		return -1, errErrorPrecedence
@@ -696,7 +690,7 @@ func nextRetState(cur retState, transition ReturnKind) (retState, error) {
 	case RetValue, RetRTracker:
 		return rsOutput, nil
 	case RetProcessContinuation:
-		return rsProcessContinuation, nil
+		return -1, errContinuationSupport
 	case RetError:
 		return rsError, nil
 	default:
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 202de64b19d..60b97f2080c 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -121,10 +121,10 @@ func TestNew(t *testing.T) {
 			Ret:   []ReturnKind{RetError},
 		},
 		{
-			Name:  "sdf",
-			Fn:    func(sdf.RTracker, func(int)) (sdf.ProcessContinuation, error) { return nil, nil },
-			Param: []FnParamKind{FnRTracker, FnEmit},
-			Ret:   []ReturnKind{RetProcessContinuation, RetError},
+			// TODO(BEAM-11104): Replace with a functioning test case once E2E support is finished.
+			Name: "sdf",
+			Fn:   func(sdf.RTracker, func(int)) (sdf.ProcessContinuation, error) { return nil, nil },
+			Err:  errContinuationSupport,
 		},
 		{
 			Name: "errContextParam: after input",
@@ -247,13 +247,6 @@ func TestNew(t *testing.T) {
 			},
 			Err: errEventTimeRetPrecedence,
 		},
-		{
-			Name: "errProcessContinuationPrecedence",
-			Fn: func() (string, sdf.ProcessContinuation, int, error) {
-				return "", nil, 0, nil
-			},
-			Err: errProcessContinuationPrecedence,
-		},
 		{
 			Name: "errIllegalParametersInEmit - malformed emit struct",
 			Fn:   func(context.Context, typex.EventTime, reflect.Type, func(nonConcreteType)) error { return nil },
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
index d4e4a091229..ee0b9ab5b98 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
@@ -33,13 +33,6 @@ import (
 
 var errGeneric = errors.New("generic error")
 
-func continuationsEqual(first, second sdf.ProcessContinuation) bool {
-	if first.ShouldResume() {
-		return first.ShouldResume() == second.ShouldResume() && first.ResumeDelay() == second.ResumeDelay()
-	}
-	return first.ShouldResume() == second.ShouldResume()
-}
-
 // TestInvoke verifies the the various forms of input to Invoke are handled correctly.
 func TestInvoke(t *testing.T) {
 	tests := []struct {
@@ -133,12 +126,6 @@ func TestInvoke(t *testing.T) {
 			Opt:      &MainInput{Key: FullValue{Elm: 1}},
 			Expected: mtime.EndOfGlobalWindowTime.Milliseconds(),
 		},
-		{
-			// (Return check) ProcessContinuation
-			Fn:                   func(a string) sdf.ProcessContinuation { return sdf.ResumeProcessingIn(1 * time.Second) },
-			Opt:                  &MainInput{Key: FullValue{Elm: "some string"}},
-			ExpectedContinuation: sdf.ResumeProcessingIn(1 * time.Second),
-		},
 		{
 			// (Return check) K, V
 			Fn:        func(a int) (int64, int) { return int64(a), 2 * a },
@@ -146,13 +133,6 @@ func TestInvoke(t *testing.T) {
 			Expected:  int64(1),
 			Expected2: 2,
 		},
-		{
-			// (Return check) V, ProcessContinuation
-			Fn:                   func(a int) (int, sdf.ProcessContinuation) { return 2 * a, sdf.StopProcessing() },
-			Opt:                  &MainInput{Key: FullValue{Elm: 1}},
-			Expected:             2,
-			ExpectedContinuation: sdf.StopProcessing(),
-		},
 		{
 			// (Return check)  K, V, Error
 			Fn:        func(a int) (int64, int, error) { return int64(a), 2 * a, nil },
@@ -160,14 +140,6 @@ func TestInvoke(t *testing.T) {
 			Expected:  int64(1),
 			Expected2: 2,
 		},
-		{
-			// (Return check) K, V, ProcessContinuation
-			Fn:                   func(a int) (int64, int, sdf.ProcessContinuation) { return int64(a), 2 * a, sdf.StopProcessing() },
-			Opt:                  &MainInput{Key: FullValue{Elm: 1}},
-			Expected:             int64(1),
-			Expected2:            2,
-			ExpectedContinuation: sdf.StopProcessing(),
-		},
 		{
 			// (Return check) EventTime, K, V
 			Fn:           func(a int) (typex.EventTime, int64, int) { return 42, int64(a), 3 * a },
@@ -176,17 +148,6 @@ func TestInvoke(t *testing.T) {
 			Expected2:    3,
 			ExpectedTime: 42,
 		},
-		{
-			// (Return check) EventTime, K, V, ProcessContinuation
-			Fn: func(a int) (typex.EventTime, int64, int, sdf.ProcessContinuation) {
-				return 42, int64(a), 3 * a, sdf.StopProcessing()
-			},
-			Opt:                  &MainInput{Key: FullValue{Elm: 1}},
-			Expected:             int64(1),
-			Expected2:            3,
-			ExpectedTime:         42,
-			ExpectedContinuation: sdf.StopProcessing(),
-		},
 		{
 			// (Return check) EventTime, K, V, Error
 			Fn:           func(a int) (typex.EventTime, int64, int, error) { return 47, int64(a), 3 * a, nil },
@@ -195,17 +156,6 @@ func TestInvoke(t *testing.T) {
 			Expected2:    3,
 			ExpectedTime: 47,
 		},
-		{
-			// (Return check) EventTime, K, V, ProcessContinuation, Error
-			Fn: func(a int) (typex.EventTime, int64, int, sdf.ProcessContinuation, error) {
-				return 47, int64(a), 3 * a, sdf.StopProcessing(), nil
-			},
-			Opt:                  &MainInput{Key: FullValue{Elm: 1}},
-			Expected:             int64(1),
-			Expected2:            3,
-			ExpectedTime:         47,
-			ExpectedContinuation: sdf.StopProcessing(),
-		},
 		{
 			// (Return check) EventTime, V, Error
 			Fn:           func(a int) (typex.EventTime, int, error) { return 10, 3 * a, nil },
@@ -250,14 +200,7 @@ func TestInvoke(t *testing.T) {
 			Opt:           &MainInput{Key: FullValue{Elm: 1}},
 			ExpectedError: errGeneric,
 		},
-		{
-			// ret5() error check
-			Fn: func(a int) (typex.EventTime, string, int, sdf.ProcessContinuation, error) {
-				return 0, "", 0, nil, errGeneric
-			},
-			Opt:           &MainInput{Key: FullValue{Elm: 1}},
-			ExpectedError: errGeneric,
-		},
+		// TODO(BEAM-11104): Add unit test cases for ProcessContinuations once they are enabled for use.
 	}
 
 	for i, test := range tests {
@@ -300,8 +243,8 @@ func TestInvoke(t *testing.T) {
 				if val != nil && val.Timestamp != test.ExpectedTime {
 					t.Errorf("EventTime: Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Timestamp, test.ExpectedTime)
 				}
-				if val != nil && test.ExpectedContinuation != nil && !continuationsEqual(val.Continuation, test.ExpectedContinuation) {
-					t.Errorf("Continuation: Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Continuation, test.ExpectedContinuation)
+				if val != nil && val.Continuation != test.ExpectedContinuation {
+					t.Errorf("EventTime: Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Continuation, test.ExpectedContinuation)
 				}
 			}
 		})