You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/04/07 17:33:03 UTC
[beam] branch master updated: [BEAM-11104] Add ProcessContinuation type to Go SDK (#17265)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9e4c288627c [BEAM-11104] Add ProcessContinuation type to Go SDK (#17265)
9e4c288627c is described below
commit 9e4c288627cf4af4e2397f565d3e2f847e2f4900
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu Apr 7 13:32:56 2022 -0400
[BEAM-11104] Add ProcessContinuation type to Go SDK (#17265)
---
sdks/go/pkg/beam/core/funcx/fn.go | 32 ++++++++++---
sdks/go/pkg/beam/core/funcx/fn_test.go | 7 +++
sdks/go/pkg/beam/core/sdf/continuation.go | 62 ++++++++++++++++++++++++++
sdks/go/pkg/beam/core/sdf/continuation_test.go | 39 ++++++++++++++++
4 files changed, 134 insertions(+), 6 deletions(-)
diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go
index b2f31cc1d18..213d423ba02 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -124,11 +124,12 @@ type ReturnKind int
// The supported types of ReturnKind.
const (
- RetIllegal ReturnKind = 0x0
- RetEventTime ReturnKind = 0x1
- RetValue ReturnKind = 0x2
- RetError ReturnKind = 0x4
- RetRTracker ReturnKind = 0x8
+ RetIllegal ReturnKind = 0x0
+ RetEventTime ReturnKind = 0x1
+ RetValue ReturnKind = 0x2
+ RetError ReturnKind = 0x4
+ RetRTracker ReturnKind = 0x8
+ RetProcessContinuation ReturnKind = 0x10
)
func (k ReturnKind) String() string {
@@ -141,6 +142,8 @@ func (k ReturnKind) String() string {
return "EventTime"
case RetValue:
return "Value"
+ case RetProcessContinuation:
+ return "ProcessContinuation"
default:
return fmt.Sprintf("%v", int(k))
}
@@ -302,6 +305,16 @@ func (u *Fn) OutEventTime() (pos int, exists bool) {
return -1, false
}
+// ProcessContinuation returns (index, true) iff the function returns a process continuation.
+func (u *Fn) ProcessContinuation() (pos int, exists bool) {
+ for i, p := range u.Ret {
+ if p.Kind == RetProcessContinuation {
+ return i, true
+ }
+ }
+ return -1, false
+}
+
// Params returns the parameter indices that matches the given mask.
func (u *Fn) Params(mask FnParamKind) []int {
var ret []int
@@ -392,6 +405,8 @@ func New(fn reflectx.Func) (*Fn, error) {
kind = RetError
case t.Implements(reflect.TypeOf((*sdf.RTracker)(nil)).Elem()):
kind = RetRTracker
+ case t.Implements(reflect.TypeOf((*sdf.ProcessContinuation)(nil)).Elem()):
+ kind = RetProcessContinuation
case t == typex.EventTimeType:
kind = RetEventTime
case typex.IsContainer(t), typex.IsConcrete(t), typex.IsUniversal(t):
@@ -601,6 +616,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
var (
errEventTimeRetPrecedence = errors.New("beam.EventTime must be first return parameter")
errErrorPrecedence = errors.New("error must be the final return parameter")
+ // TODO(BEAM-11104): Enable process continuations as a valid return value.
+ errContinuationSupport = errors.New("process continuations are not supported in this SDK release; see https://issues.apache.org/jira/browse/BEAM-11104 for the feature's current status")
)
type retState int
@@ -610,6 +627,7 @@ const (
rsEventTime
rsOutput
rsError
+ rsProcessContinuation
)
func nextRetState(cur retState, transition ReturnKind) (retState, error) {
@@ -619,7 +637,7 @@ func nextRetState(cur retState, transition ReturnKind) (retState, error) {
case RetEventTime:
return rsEventTime, nil
}
- case rsEventTime, rsOutput:
+ case rsEventTime, rsOutput, rsProcessContinuation:
// Identical to the default cases.
case rsError:
// This is a terminal state. No valid transitions. error must be the final return value.
@@ -631,6 +649,8 @@ func nextRetState(cur retState, transition ReturnKind) (retState, error) {
return -1, errEventTimeRetPrecedence
case RetValue, RetRTracker:
return rsOutput, nil
+ case RetProcessContinuation:
+ return -1, errContinuationSupport
case RetError:
return rsError, nil
default:
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 6616796af8e..96509c6a063 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -23,6 +23,7 @@ import (
"testing"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
)
@@ -112,6 +113,12 @@ func TestNew(t *testing.T) {
Param: []FnParamKind{FnContext, FnEventTime, FnType, FnEmit},
Ret: []ReturnKind{RetError},
},
+ {
+ // TODO(BEAM-11104): Replace with a functioning test case once E2E support is finished.
+ Name: "sdf",
+ Fn: func(sdf.RTracker, func(int)) (sdf.ProcessContinuation, error) { return nil, nil },
+ Err: errContinuationSupport,
+ },
{
Name: "errContextParam: after input",
Fn: func(string, context.Context) {},
diff --git a/sdks/go/pkg/beam/core/sdf/continuation.go b/sdks/go/pkg/beam/core/sdf/continuation.go
new file mode 100644
index 00000000000..c1107b5b816
--- /dev/null
+++ b/sdks/go/pkg/beam/core/sdf/continuation.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdf
+
+import "time"
+
+// ProcessContinuation is an interface used to signal that a splittable DoFn should be
+// split and resumed at a later time. The ProcessContinuation can be returned from
+// a DoFn when it returns, either complete or needing to be resumed.
+type ProcessContinuation interface {
+ // ShouldResume returns a boolean indicating whether the process should be
+ // resumed at a later time.
+ ShouldResume() bool
+
+ // ResumeDelay returns a suggested time.Duration to wait before resuming the
+ // process. The runner is not guaranteed to follow this suggestion.
+ ResumeDelay() time.Duration
+}
+
+// defaultProcessContinuation is the SDK-default implementation of the ProcessContinuation
+// interface, encapsulating the basic behavior necessary to resume a process later.
+type defaultProcessContinuation struct {
+ resumes bool
+ resumeDelay time.Duration
+}
+
+// ShouldResume returns whether or not the DefaultProcessContinuation should lead to the
+// process being resumed.
+func (p *defaultProcessContinuation) ShouldResume() bool {
+ return p.resumes
+}
+
+// ResumeDelay returns the suggested duration that should pass before the process is resumed.
+// If the process should not be resumed, the value returned here does not matter.
+func (p *defaultProcessContinuation) ResumeDelay() time.Duration {
+ return p.resumeDelay
+}
+
+// StopProcessing returns a ProcessContinuation that will not resume the process
+// later.
+func StopProcessing() ProcessContinuation {
+ return &defaultProcessContinuation{resumes: false, resumeDelay: 0 * time.Second}
+}
+
+// ResumeProcessingIn returns a ProcessContinuation that will resume the process
+// later with a suggested delay passed as a time.Duration.
+func ResumeProcessingIn(delay time.Duration) ProcessContinuation {
+ return &defaultProcessContinuation{resumes: true, resumeDelay: delay}
+}
diff --git a/sdks/go/pkg/beam/core/sdf/continuation_test.go b/sdks/go/pkg/beam/core/sdf/continuation_test.go
new file mode 100644
index 00000000000..7f7970d4785
--- /dev/null
+++ b/sdks/go/pkg/beam/core/sdf/continuation_test.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdf
+
+import (
+ "testing"
+ "time"
+)
+
+func TestStopProcessing(t *testing.T) {
+ pc := StopProcessing()
+ if pc.ShouldResume() {
+ t.Error("ShouldResume() got true, want false")
+ }
+}
+
+func TestResumeProcessingIn(t *testing.T) {
+ dur := 10 * time.Second
+ pc := ResumeProcessingIn(dur)
+ if !pc.ShouldResume() {
+ t.Error("ShouldResume() got false, want true")
+ }
+ if got, want := pc.ResumeDelay(), dur; got != want {
+ t.Errorf("got ResumeDelay %v, want %v", got, want)
+ }
+}