You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2022/05/16 16:33:40 UTC

[beam] branch master updated: [BEAM-14473] Throw error if using globally windowed, unbounded side input (#17681)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9eb86446eb4 [BEAM-14473] Throw error if using globally windowed, unbounded side input (#17681)
9eb86446eb4 is described below

commit 9eb86446eb4c609138e29ead4617331918e120f4
Author: Jack McCluskey <34...@users.noreply.github.com>
AuthorDate: Mon May 16 12:33:33 2022 -0400

    [BEAM-14473] Throw error if using globally windowed, unbounded side input (#17681)
---
 sdks/go/pkg/beam/pardo.go      |  5 ++++-
 sdks/go/pkg/beam/pardo_test.go | 47 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/pardo.go b/sdks/go/pkg/beam/pardo.go
index aad86b6a02e..e2d536cb4f0 100644
--- a/sdks/go/pkg/beam/pardo.go
+++ b/sdks/go/pkg/beam/pardo.go
@@ -60,7 +60,10 @@ func TryParDo(s Scope, dofn interface{}, col PCollection, opts ...Option) ([]PCo
 			return nil, fmt.Errorf("error with side input %d in DoFn %v: PCollections using merging WindowFns are not supported as side inputs. Consider re-windowing the side input PCollection before use", i, fn)
 		}
 		if (inWfn.Kind == window.GlobalWindows) && (sideWfn.Kind != window.GlobalWindows) {
-			return nil, fmt.Errorf("main input is global windowed in DoFn %v but side input %v is not, cannot map windows correctly. Consider re-windowing the side input PCOllection before use", fn, i)
+			return nil, fmt.Errorf("main input is global windowed in DoFn %v but side input %v is not, cannot map windows correctly. Consider re-windowing the side input PCollection before use", fn, i)
+		}
+		if (sideWfn.Kind == window.GlobalWindows) && !sideNode.Bounded() {
+			return nil, fmt.Errorf("side input %v is global windowed in DoFn %v but is unbounded, DoFn will block until end of Global Window. Consider windowing your unbounded side input PCollection before use", i, fn)
 		}
 		in = append(in, s.Input.n)
 	}
diff --git a/sdks/go/pkg/beam/pardo_test.go b/sdks/go/pkg/beam/pardo_test.go
index f4dd8e96b1f..b15cecbf5a1 100644
--- a/sdks/go/pkg/beam/pardo_test.go
+++ b/sdks/go/pkg/beam/pardo_test.go
@@ -21,8 +21,14 @@ import (
 	"reflect"
 	"strings"
 	"testing"
+	"time"
 
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/reflectx"
 	"github.com/apache/beam/sdks/v2/go/pkg/beam/options/jobopts"
 )
 
@@ -132,3 +138,44 @@ type AnnotationsFn struct {
 func (fn *AnnotationsFn) ProcessElement(v int) int {
 	return v
 }
+
+func doNothing(_ []byte, _ int) {}
+func TestParDoSideInputValdiation(t *testing.T) {
+	var tests = []struct {
+		name      string
+		wFn       *window.Fn
+		isBounded bool
+	}{
+		{
+			"global window unbounded",
+			window.NewGlobalWindows(),
+			false,
+		},
+		{
+			"side input session windowed",
+			window.NewSessions(1 * time.Minute),
+			true,
+		},
+		{
+			"global main, interval side",
+			window.NewFixedWindows(10 * time.Second),
+			true,
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			p := NewPipeline()
+			s := p.Root()
+
+			strat := &window.WindowingStrategy{Fn: test.wFn, Trigger: trigger.Default(), AccumulationMode: window.Discarding, AllowedLateness: 0}
+			sideCol := PCollection{n: graph.New().NewNode(typex.New(reflectx.Int), strat, test.isBounded)}
+			outCol, err := TryParDo(s, doNothing, Impulse(s), SideInput{Input: sideCol})
+			if outCol != nil {
+				t.Errorf("TryParDo() produced an output PCollection when it should have failed, got %v", outCol)
+			}
+			if err == nil {
+				t.Errorf("TryParDo() did not return an error when it should have")
+			}
+		})
+	}
+}