You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/04/07 17:33:03 UTC

[beam] branch master updated: [BEAM-11104] Add ProcessContinuation type to Go SDK (#17265)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e4c288627c [BEAM-11104] Add ProcessContinuation type to Go SDK (#17265)
9e4c288627c is described below

commit 9e4c288627cf4af4e2397f565d3e2f847e2f4900
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Thu Apr 7 13:32:56 2022 -0400

    [BEAM-11104] Add ProcessContinuation type to Go SDK (#17265)
---
 sdks/go/pkg/beam/core/funcx/fn.go              | 32 ++++++++++---
 sdks/go/pkg/beam/core/funcx/fn_test.go         |  7 +++
 sdks/go/pkg/beam/core/sdf/continuation.go      | 62 ++++++++++++++++++++++++++
 sdks/go/pkg/beam/core/sdf/continuation_test.go | 39 ++++++++++++++++
 4 files changed, 134 insertions(+), 6 deletions(-)

diff --git a/sdks/go/pkg/beam/core/funcx/fn.go b/sdks/go/pkg/beam/core/funcx/fn.go
index b2f31cc1d18..213d423ba02 100644
--- a/sdks/go/pkg/beam/core/funcx/fn.go
+++ b/sdks/go/pkg/beam/core/funcx/fn.go
@@ -124,11 +124,12 @@ type ReturnKind int
 
 // The supported types of ReturnKind.
 const (
-	RetIllegal   ReturnKind = 0x0
-	RetEventTime ReturnKind = 0x1
-	RetValue     ReturnKind = 0x2
-	RetError     ReturnKind = 0x4
-	RetRTracker  ReturnKind = 0x8
+	RetIllegal             ReturnKind = 0x0
+	RetEventTime           ReturnKind = 0x1
+	RetValue               ReturnKind = 0x2
+	RetError               ReturnKind = 0x4
+	RetRTracker            ReturnKind = 0x8
+	RetProcessContinuation ReturnKind = 0x10
 )
 
 func (k ReturnKind) String() string {
@@ -141,6 +142,8 @@ func (k ReturnKind) String() string {
 		return "EventTime"
 	case RetValue:
 		return "Value"
+	case RetProcessContinuation:
+		return "ProcessContinuation"
 	default:
 		return fmt.Sprintf("%v", int(k))
 	}
@@ -302,6 +305,16 @@ func (u *Fn) OutEventTime() (pos int, exists bool) {
 	return -1, false
 }
 
+// ProcessContinuation returns (index, true) iff the function returns a process continuation.
+func (u *Fn) ProcessContinuation() (pos int, exists bool) {
+	for i, p := range u.Ret {
+		if p.Kind == RetProcessContinuation {
+			return i, true
+		}
+	}
+	return -1, false
+}
+
 // Params returns the parameter indices that matches the given mask.
 func (u *Fn) Params(mask FnParamKind) []int {
 	var ret []int
@@ -392,6 +405,8 @@ func New(fn reflectx.Func) (*Fn, error) {
 			kind = RetError
 		case t.Implements(reflect.TypeOf((*sdf.RTracker)(nil)).Elem()):
 			kind = RetRTracker
+		case t.Implements(reflect.TypeOf((*sdf.ProcessContinuation)(nil)).Elem()):
+			kind = RetProcessContinuation
 		case t == typex.EventTimeType:
 			kind = RetEventTime
 		case typex.IsContainer(t), typex.IsConcrete(t), typex.IsUniversal(t):
@@ -601,6 +616,8 @@ func nextParamState(cur paramState, transition FnParamKind) (paramState, error)
 var (
 	errEventTimeRetPrecedence = errors.New("beam.EventTime must be first return parameter")
 	errErrorPrecedence        = errors.New("error must be the final return parameter")
+	// TODO(BEAM-11104): Enable process continuations as a valid return value.
+	errContinuationSupport = errors.New("process continuations are not supported in this SDK release; see https://issues.apache.org/jira/browse/BEAM-11104 for the feature's current status")
 )
 
 type retState int
@@ -610,6 +627,7 @@ const (
 	rsEventTime
 	rsOutput
 	rsError
+	rsProcessContinuation
 )
 
 func nextRetState(cur retState, transition ReturnKind) (retState, error) {
@@ -619,7 +637,7 @@ func nextRetState(cur retState, transition ReturnKind) (retState, error) {
 		case RetEventTime:
 			return rsEventTime, nil
 		}
-	case rsEventTime, rsOutput:
+	case rsEventTime, rsOutput, rsProcessContinuation:
 		// Identical to the default cases.
 	case rsError:
 		// This is a terminal state. No valid transitions. error must be the final return value.
@@ -631,6 +649,8 @@ func nextRetState(cur retState, transition ReturnKind) (retState, error) {
 		return -1, errEventTimeRetPrecedence
 	case RetValue, RetRTracker:
 		return rsOutput, nil
+	case RetProcessContinuation:
+		return -1, errContinuationSupport
 	case RetError:
 		return rsError, nil
 	default:
diff --git a/sdks/go/pkg/beam/core/funcx/fn_test.go b/sdks/go/pkg/beam/core/funcx/fn_test.go
index 6616796af8e..96509c6a063 100644
--- a/sdks/go/pkg/beam/core/funcx/fn_test.go
+++ b/sdks/go/pkg/beam/core/funcx/fn_test.go
@@ -23,6 +23,7 @@ import (
 	"testing"
 
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 )
@@ -112,6 +113,12 @@ func TestNew(t *testing.T) {
 			Param: []FnParamKind{FnContext, FnEventTime, FnType, FnEmit},
 			Ret:   []ReturnKind{RetError},
 		},
+		{
+			// TODO(BEAM-11104): Replace with a functioning test case once E2E support is finished.
+			Name: "sdf",
+			Fn:   func(sdf.RTracker, func(int)) (sdf.ProcessContinuation, error) { return nil, nil },
+			Err:  errContinuationSupport,
+		},
 		{
 			Name: "errContextParam: after input",
 			Fn:   func(string, context.Context) {},
diff --git a/sdks/go/pkg/beam/core/sdf/continuation.go b/sdks/go/pkg/beam/core/sdf/continuation.go
new file mode 100644
index 00000000000..c1107b5b816
--- /dev/null
+++ b/sdks/go/pkg/beam/core/sdf/continuation.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdf
+
+import "time"
+
+// ProcessContinuation is an interface used to signal that a splittable DoFn should be
+// split and resumed at a later time. The ProcessContinuation can be returned from
+// a DoFn when it returns, either complete or needing to be resumed.
+type ProcessContinuation interface {
+	// ShouldResume returns a boolean indicating whether the process should be
+	// resumed at a later time.
+	ShouldResume() bool
+
+	// ResumeDelay returns a suggested time.Duration to wait before resuming the
+	// process. The runner is not guaranteed to follow this suggestion.
+	ResumeDelay() time.Duration
+}
+
+// defaultProcessContinuation is the SDK-default implementation of the ProcessContinuation
+// interface, encapsulating the basic behavior necessary to resume a process later.
+type defaultProcessContinuation struct {
+	resumes     bool
+	resumeDelay time.Duration
+}
+
+// ShouldResume returns whether or not the DefaultProcessContinuation should lead to the
+// process being resumed.
+func (p *defaultProcessContinuation) ShouldResume() bool {
+	return p.resumes
+}
+
+// ResumeDelay returns the suggested duration that should pass before the process is resumed.
+// If the process should not be resumed, the value returned here does not matter.
+func (p *defaultProcessContinuation) ResumeDelay() time.Duration {
+	return p.resumeDelay
+}
+
+// StopProcessing returns a ProcessContinuation that will not resume the process
+// later.
+func StopProcessing() ProcessContinuation {
+	return &defaultProcessContinuation{resumes: false, resumeDelay: 0 * time.Second}
+}
+
+// ResumeProcessingIn returns a ProcessContinuation that will resume the process
+// later with a suggested delay passed as a time.Duration.
+func ResumeProcessingIn(delay time.Duration) ProcessContinuation {
+	return &defaultProcessContinuation{resumes: true, resumeDelay: delay}
+}
diff --git a/sdks/go/pkg/beam/core/sdf/continuation_test.go b/sdks/go/pkg/beam/core/sdf/continuation_test.go
new file mode 100644
index 00000000000..7f7970d4785
--- /dev/null
+++ b/sdks/go/pkg/beam/core/sdf/continuation_test.go
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sdf
+
+import (
+	"testing"
+	"time"
+)
+
+func TestStopProcessing(t *testing.T) {
+	pc := StopProcessing()
+	if pc.ShouldResume() {
+		t.Error("ShouldResume() got true, want false")
+	}
+}
+
+func TestResumeProcessingIn(t *testing.T) {
+	dur := 10 * time.Second
+	pc := ResumeProcessingIn(dur)
+	if !pc.ShouldResume() {
+		t.Error("ShouldResume() got false, want true")
+	}
+	if got, want := pc.ResumeDelay(), dur; got != want {
+		t.Errorf("got ResumeDelay %v, want %v", got, want)
+	}
+}