You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/29 17:13:54 UTC

[beam] branch master updated: [BEAM-5354] Add side input nodes to their consumer's parent-scope.

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c26009  [BEAM-5354] Add side input nodes to their consumer's parent-scope.
     new 9647808  Merge pull request #8713 from lostluck/sideforms
1c26009 is described below

commit 1c26009cf9154ff337ab997b84b404e29e877f89
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Wed May 29 09:34:48 2019 -0700

    [BEAM-5354] Add side input nodes to their consumer's parent-scope.
---
 sdks/go/pkg/beam/core/runtime/graphx/translate.go  |  14 ++-
 .../pkg/beam/core/runtime/graphx/translate_test.go | 130 ++++++++++++++++-----
 sdks/go/test/integration/primitives/pardo.go       |   3 +-
 3 files changed, 109 insertions(+), 38 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 99a3b8f..17dc253 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -128,7 +128,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
 
 	var subtransforms []string
 	for _, edge := range s.Edges {
-		subtransforms = append(subtransforms, m.addMultiEdge(edge))
+		subtransforms = append(subtransforms, m.addMultiEdge(edge)...)
 	}
 	for _, tree := range s.Children {
 		subtransforms = append(subtransforms, m.addScopeTree(tree))
@@ -173,14 +173,14 @@ func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pb.PTrans
 	transform.Spec = &pb.FunctionSpec{Urn: URNCombinePerKey, Payload: protox.MustEncode(payload)}
 }
 
-func (m *marshaller) addMultiEdge(edge NamedEdge) string {
+func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
 	id := edgeID(edge.Edge)
 	if _, exists := m.transforms[id]; exists {
-		return id
+		return []string{id}
 	}
 
 	if edge.Edge.Op == graph.CoGBK && len(edge.Edge.Input) > 1 {
-		return m.expandCoGBK(edge)
+		return []string{m.expandCoGBK(edge)}
 	}
 
 	inputs := make(map[string]string)
@@ -194,6 +194,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) string {
 		outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
 	}
 
+	// allPIds tracks additional PTransformIDs generated for the pipeline
+	var allPIds []string
 	var spec *pb.FunctionSpec
 	switch edge.Edge.Op {
 	case graph.Impulse:
@@ -238,6 +240,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) string {
 					Outputs: map[string]string{"i0": out},
 				}
 				m.transforms[keyedID] = keyed
+				allPIds = append(allPIds, keyedID)
 
 				// Fixup input map
 				inputs[fmt.Sprintf("i%v", i)] = out
@@ -320,7 +323,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) string {
 		Outputs:    outputs,
 	}
 	m.transforms[id] = transform
-	return id
+	allPIds = append(allPIds, id)
+	return allPIds
 }
 
 func (m *marshaller) expandCoGBK(edge NamedEdge) string {
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
index ec2a8b7..0026745 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
@@ -41,22 +41,36 @@ func pickFn(a int, small, big func(int)) {
 	}
 }
 
-func pick(t *testing.T, g *graph.Graph) *graph.MultiEdge {
-	dofn, err := graph.NewDoFn(pickFn)
+func pickSideFn(a, side int, small, big func(int)) {
+	if a < side {
+		small(a)
+	} else {
+		big(a)
+	}
+}
+
+func addDoFn(t *testing.T, g *graph.Graph, fn interface{}, scope *graph.Scope, inputs []*graph.Node, outputCoders []*coder.Coder) {
+	t.Helper()
+	dofn, err := graph.NewDoFn(fn)
 	if err != nil {
 		t.Fatal(err)
 	}
-
-	in := g.NewNode(intT(), window.DefaultWindowingStrategy(), true)
-	in.Coder = intCoder()
-
-	e, err := graph.NewParDo(g, g.Root(), dofn, []*graph.Node{in}, nil)
+	e, err := graph.NewParDo(g, scope, dofn, inputs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	e.Output[0].To.Coder = intCoder()
-	e.Output[1].To.Coder = intCoder()
-	return e
+	if len(outputCoders) != len(e.Output) {
+		t.Fatalf("%v has %d outputs, but only got %d coders", dofn.Name(), len(e.Output), len(outputCoders))
+	}
+	for i, c := range outputCoders {
+		e.Output[i].To.Coder = c
+	}
+}
+
+func newIntInput(g *graph.Graph) *graph.Node {
+	in := g.NewNode(intT(), window.DefaultWindowingStrategy(), true)
+	in.Coder = intCoder()
+	return in
 }
 
 func intT() typex.FullType {
@@ -67,30 +81,82 @@ func intCoder() *coder.Coder {
 	return custom("int", reflectx.Int)
 }
 
-// TestParDo verifies that ParDo can be serialized.
-func TestParDo(t *testing.T) {
-	g := graph.New()
-	pick(t, g)
-
-	edges, _, err := g.Build()
-	if err != nil {
-		t.Fatal(err)
-	}
-	if len(edges) != 1 {
-		t.Fatal("expected a single edge")
+// TestMarshal verifies that ParDo can be serialized.
+func TestMarshal(t *testing.T) {
+	tests := []struct {
+		name                     string
+		makeGraph                func(t *testing.T, g *graph.Graph)
+		edges, transforms, roots int
+	}{
+		{
+			name: "ParDo",
+			makeGraph: func(t *testing.T, g *graph.Graph) {
+				addDoFn(t, g, pickFn, g.Root(), []*graph.Node{newIntInput(g)}, []*coder.Coder{intCoder(), intCoder()})
+			},
+			edges:      1,
+			transforms: 1,
+			roots:      1,
+		}, {
+			name: "ScopedParDo",
+			makeGraph: func(t *testing.T, g *graph.Graph) {
+				addDoFn(t, g, pickFn, g.NewScope(g.Root(), "sub"), []*graph.Node{newIntInput(g)}, []*coder.Coder{intCoder(), intCoder()})
+			},
+			edges:      1,
+			transforms: 2,
+			roots:      1,
+		}, {
+			name: "SideInput",
+			makeGraph: func(t *testing.T, g *graph.Graph) {
+				in := newIntInput(g)
+				side := newIntInput(g)
+				addDoFn(t, g, pickSideFn, g.Root(), []*graph.Node{in, side}, []*coder.Coder{intCoder(), intCoder()})
+			},
+			edges:      1,
+			transforms: 2,
+			roots:      2,
+		}, {
+			name: "ScopedSideInput",
+			makeGraph: func(t *testing.T, g *graph.Graph) {
+				in := newIntInput(g)
+				side := newIntInput(g)
+				addDoFn(t, g, pickSideFn, g.NewScope(g.Root(), "sub"), []*graph.Node{in, side}, []*coder.Coder{intCoder(), intCoder()})
+			},
+			edges:      1,
+			transforms: 3,
+			roots:      1,
+		},
 	}
+	for _, test := range tests {
+		test := test
+		t.Run(test.name, func(t *testing.T) {
 
-	payload, err := proto.Marshal(&pb.DockerPayload{ContainerImage: "foo"})
-	if err != nil {
-		t.Fatal(err)
-	}
-	p, err := graphx.Marshal(edges,
-		&graphx.Options{Environment: pb.Environment{Urn: "beam:env:docker:v1", Payload: payload}})
-	if err != nil {
-		t.Fatal(err)
-	}
+			g := graph.New()
+			test.makeGraph(t, g)
+
+			edges, _, err := g.Build()
+			if err != nil {
+				t.Fatal(err)
+			}
+			if len(edges) != test.edges {
+				t.Fatal("expected a single edge")
+			}
+
+			payload, err := proto.Marshal(&pb.DockerPayload{ContainerImage: "foo"})
+			if err != nil {
+				t.Fatal(err)
+			}
+			p, err := graphx.Marshal(edges,
+				&graphx.Options{Environment: pb.Environment{Urn: "beam:env:docker:v1", Payload: payload}})
+			if err != nil {
+				t.Fatal(err)
+			}
 
-	if len(p.GetComponents().GetTransforms()) != 1 {
-		t.Errorf("bad ParDo translation: %v", proto.MarshalTextString(p))
+			if got, want := len(p.GetComponents().GetTransforms()), test.transforms; got != want {
+				t.Errorf("got %d transforms, want %d : %v", got, want, proto.MarshalTextString(p))
+			}
+			if got, want := len(p.GetRootTransformIds()), test.roots; got != want {
+				t.Errorf("got %d roots, want %d : %v", got, want, proto.MarshalTextString(p))
+			}
+		})
 	}
 }
diff --git a/sdks/go/test/integration/primitives/pardo.go b/sdks/go/test/integration/primitives/pardo.go
index ae7d9b2..62654e8 100644
--- a/sdks/go/test/integration/primitives/pardo.go
+++ b/sdks/go/test/integration/primitives/pardo.go
@@ -53,7 +53,8 @@ func ParDoSideInput() *beam.Pipeline {
 	p, s := beam.NewPipelineWithRoot()
 
 	in := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9)
-	out := beam.ParDo(s, sumValuesFn, beam.Impulse(s), beam.SideInput{Input: in})
+	sub := s.Scope("subscope") // Ensure scoping works with side inputs. See: BEAM-5354
+	out := beam.ParDo(sub, sumValuesFn, beam.Impulse(s), beam.SideInput{Input: in})
 	passert.Sum(s, out, "out", 1, 45)
 
 	return p