You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/16 16:33:40 UTC
[beam] branch master updated: [BEAM-14473] Throw error if using globally windowed, unbounded side input (#17681)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9eb86446eb4 [BEAM-14473] Throw error if using globally windowed, unbounded side input (#17681)
9eb86446eb4 is described below
commit 9eb86446eb4c609138e29ead4617331918e120f4
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Mon May 16 12:33:33 2022 -0400
[BEAM-14473] Throw error if using globally windowed, unbounded side input (#17681)
---
sdks/go/pkg/beam/pardo.go | 5 ++++-
sdks/go/pkg/beam/pardo_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go
index aad86b6a02e..e2d536cb4f0 100644
--- a/sdks/go/pkg/beam/pardo.go
+++ b/sdks/go/pkg/beam/pardo.go
@@ -60,7 +60,10 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
return nil, fmt.Errorf("error with side input %d in DoFn %v: PCollections using merging WindowFns are not supported as side inputs. Consider re-windowing the side input PCollection before use", i, fn)
}
if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != window.GlobalWindows) {
- return nil, fmt.Errorf("main input is global windowed in DoFn %v but side input %v is not, cannot map windows correctly. Consider re-windowing the side input PCOllection before use", fn, i)
+ return nil, fmt.Errorf("main input is global windowed in DoFn %v but side input %v is not, cannot map windows correctly. Consider re-windowing the side input PCollection before use", fn, i)
+ }
+ if (sideWfn.Kind == window.GlobalWindows) && !sideNode.Bounded() {
+ return nil, fmt.Errorf("side input %v is global windowed in DoFn %v but is unbounded, DoFn will block until end of Global Window. Consider windowing your unbounded side input PCollection before use", i, fn)
}
in = append(in, s.Input.n)
}
diff --git a/sdks/go/pkg/beam/pardo_test.go b/sdks/go/pkg/beam/pardo_test.go
index f4dd8e96b1f..b15cecbf5a1 100644
--- a/sdks/go/pkg/beam/pardo_test.go
+++ b/sdks/go/pkg/beam/pardo_test.go
@@ -21,8 +21,14 @@ import (
"reflect"
"strings"
"testing"
+ "time"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
)
@@ -132,3 +138,44 @@ type AnnotationsFn struct {
func (fn *AnnotationsFn) ProcessElement(v int) int {
return v
}
+
+func doNothing(_ []byte, _ int) {}
+func TestParDoSideInputValdiation(t *testing.T) {
+ var tests = []struct {
+ name string
+ wFn *window.Fn
+ isBounded bool
+ }{
+ {
+ "global window unbounded",
+ window.NewGlobalWindows(),
+ false,
+ },
+ {
+ "side input session windowed",
+ window.NewSessions(1 * time.Minute),
+ true,
+ },
+ {
+ "global main, interval side",
+ window.NewFixedWindows(10 * time.Second),
+ true,
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ p := NewPipeline()
+ s := p.Root()
+
+ strat := &window.WindowingStrategy{Fn: test.wFn, Trigger: trigger.Default(), AccumulationMode: window.Discarding, AllowedLateness: 0}
+ sideCol := PCollection{n: graph.New().NewNode(typex.New(reflectx.Int), strat, test.isBounded)}
+ outCol, err := TryParDo(s, doNothing, Impulse(s), SideInput{Input: sideCol})
+ if outCol != nil {
+ t.Errorf("TryParDo() produced an output PCollection when it should have failed, got %v", outCol)
+ }
+ if err == nil {
+ t.Errorf("TryParDo() did not return an error when it should have")
+ }
+ })
+ }
+}