You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by yi...@apache.org on 2022/05/05 19:11:50 UTC
[beam] branch release-2.39.0 updated: Revert "[BEAM-11104] Enable ProcessContinuation return values, add unit test cases (#17533)" (#17562)
This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch release-2.39.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.39.0 by this push:
new 8aa77e61e5f Revert "[BEAM-11104] Enable ProcessContinuation return values, add unit test cases (#17533)" (#17562)
8aa77e61e5f is described below
commit 8aa77e61e5ffc1efdc07772c591d4ab0a336533d
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu May 5 15:11:44 2022 -0400
Revert "[BEAM-11104] Enable ProcessContinuation return values, add unit test cases (#17533)" (#17562)
This reverts commit 6a50718364282b4e0a2fb266a7d6767e50961607.
---
CHANGES.md | 1 -
sdks/go/pkg/beam/core/funcx/fn.go | 18 +++-----
sdks/go/pkg/beam/core/funcx/fn_test.go | 15 ++-----
sdks/go/pkg/beam/core/runtime/exec/fn_test.go | 63 ++-------------------------
4 files changed, 13 insertions(+), 84 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 8071b24be4a..ffd6cd32024 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,7 +67,6 @@
* 'Manage Clusters' JupyterLab extension added for users to configure usage of Dataproc clusters managed by Interactive Beam (Python) ([BEAM-14130](https://issues.apache.org/jira/browse/BEAM-14130)).
* Pipeline drain support added for Go SDK ([BEAM-11106](https://issues.apache.org/jira/browse/BEAM-11106)). **Note: this feature is not yet fully validated and should be treated as experimental in this release.**
-* Go SDK users may now write self-checkpointing Splittable DoFns to read from streaming sources. **Note: this feature is not yet fully validated and should be treated as experimental in this release.** ([BEAM-11104](https://issues.apache.org/jira/browse/BEAM-11104))
## Breaking Changes
diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go
index 7d39aa1ce88..2580b9bbd9f 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -654,9 +654,10 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
}
var (
- errEventTimeRetPrecedence = errors.New("beam.EventTime must be first return parameter")
- errErrorPrecedence = errors.New("error must be the final return parameter")
- errProcessContinuationPrecedence = errors.New("ProcessContinuation must be the final non-error return parameter")
+ errEventTimeRetPrecedence = errors.New("beam.EventTime must be first return parameter")
+ errErrorPrecedence = errors.New("error must be the final return parameter")
+ // TODO(BEAM-11104): Enable process continuations as a valid return value.
+ errContinuationSupport = errors.New("process continuations are not supported in this SDK release; see https://issues.apache.org/jira/browse/BEAM-11104 for the feature's current status")
)
type retState int
@@ -676,15 +677,8 @@ func nextRetState(cur retState, transition ReturnKind) (retState, error) {
case RetEventTime:
return rsEventTime, nil
}
- case rsEventTime, rsOutput:
+ case rsEventTime, rsOutput, rsProcessContinuation:
// Identical to the default cases.
- case rsProcessContinuation:
- switch transition {
- case RetError:
- return rsError, nil
- default:
- return -1, errProcessContinuationPrecedence
- }
case rsError:
// This is a terminal state. No valid transitions. error must be the final return value.
return -1, errErrorPrecedence
@@ -696,7 +690,7 @@ func nextRetState(cur retState, transition ReturnKind) (retState, error) {
case RetValue, RetRTracker:
return rsOutput, nil
case RetProcessContinuation:
- return rsProcessContinuation, nil
+ return -1, errContinuationSupport
case RetError:
return rsError, nil
default:
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 202de64b19d..60b97f2080c 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -121,10 +121,10 @@ func TestNew(t *testing.T) {
Ret: []ReturnKind{RetError},
},
{
- Name: "sdf",
- Fn: func(sdf.RTracker, func(int)) (sdf.ProcessContinuation, error) { return nil, nil },
- Param: []FnParamKind{FnRTracker, FnEmit},
- Ret: []ReturnKind{RetProcessContinuation, RetError},
+ // TODO(BEAM-11104): Replace with a functioning test case once E2E support is finished.
+ Name: "sdf",
+ Fn: func(sdf.RTracker, func(int)) (sdf.ProcessContinuation, error) { return nil, nil },
+ Err: errContinuationSupport,
},
{
Name: "errContextParam: after input",
@@ -247,13 +247,6 @@ func TestNew(t *testing.T) {
},
Err: errEventTimeRetPrecedence,
},
- {
- Name: "errProcessContinuationPrecedence",
- Fn: func() (string, sdf.ProcessContinuation, int, error) {
- return "", nil, 0, nil
- },
- Err: errProcessContinuationPrecedence,
- },
{
Name: "errIllegalParametersInEmit - malformed emit struct",
Fn: func(context.Context, typex.EventTime, reflect.Type, func(nonConcreteType)) error { return nil },
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
index d4e4a091229..ee0b9ab5b98 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn_test.go
@@ -33,13 +33,6 @@ import (
var errGeneric = errors.New("generic error")
-func continuationsEqual(first, second sdf.ProcessContinuation) bool {
- if first.ShouldResume() {
- return first.ShouldResume() == second.ShouldResume() && first.ResumeDelay() == second.ResumeDelay()
- }
- return first.ShouldResume() == second.ShouldResume()
-}
-
// TestInvoke verifies the the various forms of input to Invoke are handled correctly.
func TestInvoke(t *testing.T) {
tests := []struct {
@@ -133,12 +126,6 @@ func TestInvoke(t *testing.T) {
Opt: &MainInput{Key: FullValue{Elm: 1}},
Expected: mtime.EndOfGlobalWindowTime.Milliseconds(),
},
- {
- // (Return check) ProcessContinuation
- Fn: func(a string) sdf.ProcessContinuation { return sdf.ResumeProcessingIn(1 * time.Second) },
- Opt: &MainInput{Key: FullValue{Elm: "some string"}},
- ExpectedContinuation: sdf.ResumeProcessingIn(1 * time.Second),
- },
{
// (Return check) K, V
Fn: func(a int) (int64, int) { return int64(a), 2 * a },
@@ -146,13 +133,6 @@ func TestInvoke(t *testing.T) {
Expected: int64(1),
Expected2: 2,
},
- {
- // (Return check) V, ProcessContinuation
- Fn: func(a int) (int, sdf.ProcessContinuation) { return 2 * a, sdf.StopProcessing() },
- Opt: &MainInput{Key: FullValue{Elm: 1}},
- Expected: 2,
- ExpectedContinuation: sdf.StopProcessing(),
- },
{
// (Return check) K, V, Error
Fn: func(a int) (int64, int, error) { return int64(a), 2 * a, nil },
@@ -160,14 +140,6 @@ func TestInvoke(t *testing.T) {
Expected: int64(1),
Expected2: 2,
},
- {
- // (Return check) K, V, ProcessContinuation
- Fn: func(a int) (int64, int, sdf.ProcessContinuation) { return int64(a), 2 * a, sdf.StopProcessing() },
- Opt: &MainInput{Key: FullValue{Elm: 1}},
- Expected: int64(1),
- Expected2: 2,
- ExpectedContinuation: sdf.StopProcessing(),
- },
{
// (Return check) EventTime, K, V
Fn: func(a int) (typex.EventTime, int64, int) { return 42, int64(a), 3 * a },
@@ -176,17 +148,6 @@ func TestInvoke(t *testing.T) {
Expected2: 3,
ExpectedTime: 42,
},
- {
- // (Return check) EventTime, K, V, ProcessContinuation
- Fn: func(a int) (typex.EventTime, int64, int, sdf.ProcessContinuation) {
- return 42, int64(a), 3 * a, sdf.StopProcessing()
- },
- Opt: &MainInput{Key: FullValue{Elm: 1}},
- Expected: int64(1),
- Expected2: 3,
- ExpectedTime: 42,
- ExpectedContinuation: sdf.StopProcessing(),
- },
{
// (Return check) EventTime, K, V, Error
Fn: func(a int) (typex.EventTime, int64, int, error) { return 47, int64(a), 3 * a, nil },
@@ -195,17 +156,6 @@ func TestInvoke(t *testing.T) {
Expected2: 3,
ExpectedTime: 47,
},
- {
- // (Return check) EventTime, K, V, ProcessContinuation, Error
- Fn: func(a int) (typex.EventTime, int64, int, sdf.ProcessContinuation, error) {
- return 47, int64(a), 3 * a, sdf.StopProcessing(), nil
- },
- Opt: &MainInput{Key: FullValue{Elm: 1}},
- Expected: int64(1),
- Expected2: 3,
- ExpectedTime: 47,
- ExpectedContinuation: sdf.StopProcessing(),
- },
{
// (Return check) EventTime, V, Error
Fn: func(a int) (typex.EventTime, int, error) { return 10, 3 * a, nil },
@@ -250,14 +200,7 @@ func TestInvoke(t *testing.T) {
Opt: &MainInput{Key: FullValue{Elm: 1}},
ExpectedError: errGeneric,
},
- {
- // ret5() error check
- Fn: func(a int) (typex.EventTime, string, int, sdf.ProcessContinuation, error) {
- return 0, "", 0, nil, errGeneric
- },
- Opt: &MainInput{Key: FullValue{Elm: 1}},
- ExpectedError: errGeneric,
- },
+ // TODO(BEAM-11104): Add unit test cases for ProcessContinuations once they are enabled for use.
}
for i, test := range tests {
@@ -300,8 +243,8 @@ func TestInvoke(t *testing.T) {
if val != nil && val.Timestamp != test.ExpectedTime {
t.Errorf("EventTime: Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Timestamp, test.ExpectedTime)
}
- if val != nil && test.ExpectedContinuation != nil && !continuationsEqual(val.Continuation, test.ExpectedContinuation) {
- t.Errorf("Continuation: Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Continuation, test.ExpectedContinuation)
+ if val != nil && val.Continuation != test.ExpectedContinuation {
+ t.Errorf("EventTime: Invoke(%v,%v) = %v, want %v", fn.Fn.Name(), test.Args, val.Continuation, test.ExpectedContinuation)
}
}
})