You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/29 17:13:54 UTC
[beam] branch master updated: [BEAM-5354] Add side input nodes to
their consumer's parent-scope.
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1c26009 [BEAM-5354] Add side input nodes to their consumer's parent-scope.
new 9647808 Merge pull request #8713 from lostluck/sideforms
1c26009 is described below
commit 1c26009cf9154ff337ab997b84b404e29e877f89
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Wed May 29 09:34:48 2019 -0700
[BEAM-5354] Add side input nodes to their consumer's parent-scope.
---
sdks/go/pkg/beam/core/runtime/graphx/translate.go | 14 ++-
.../pkg/beam/core/runtime/graphx/translate_test.go | 130 ++++++++++++++++-----
sdks/go/test/integration/primitives/pardo.go | 3 +-
3 files changed, 109 insertions(+), 38 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 99a3b8f..17dc253 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -128,7 +128,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
var subtransforms []string
for _, edge := range s.Edges {
- subtransforms = append(subtransforms, m.addMultiEdge(edge))
+ subtransforms = append(subtransforms, m.addMultiEdge(edge)...)
}
for _, tree := range s.Children {
subtransforms = append(subtransforms, m.addScopeTree(tree))
@@ -173,14 +173,14 @@ func (m *marshaller) updateIfCombineComposite(s *ScopeTree, transform *pb.PTrans
transform.Spec = &pb.FunctionSpec{Urn: URNCombinePerKey, Payload: protox.MustEncode(payload)}
}
-func (m *marshaller) addMultiEdge(edge NamedEdge) string {
+func (m *marshaller) addMultiEdge(edge NamedEdge) []string {
id := edgeID(edge.Edge)
if _, exists := m.transforms[id]; exists {
- return id
+ return []string{id}
}
if edge.Edge.Op == graph.CoGBK && len(edge.Edge.Input) > 1 {
- return m.expandCoGBK(edge)
+ return []string{m.expandCoGBK(edge)}
}
inputs := make(map[string]string)
@@ -194,6 +194,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) string {
outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
}
+ // allPIds tracks additional PTransformIDs generated for the pipeline
+ var allPIds []string
var spec *pb.FunctionSpec
switch edge.Edge.Op {
case graph.Impulse:
@@ -238,6 +240,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) string {
Outputs: map[string]string{"i0": out},
}
m.transforms[keyedID] = keyed
+ allPIds = append(allPIds, keyedID)
// Fixup input map
inputs[fmt.Sprintf("i%v", i)] = out
@@ -320,7 +323,8 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) string {
Outputs: outputs,
}
m.transforms[id] = transform
- return id
+ allPIds = append(allPIds, id)
+ return allPIds
}
func (m *marshaller) expandCoGBK(edge NamedEdge) string {
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
index ec2a8b7..0026745 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate_test.go
@@ -41,22 +41,36 @@ func pickFn(a int, small, big func(int)) {
}
}
-func pick(t *testing.T, g *graph.Graph) *graph.MultiEdge {
- dofn, err := graph.NewDoFn(pickFn)
+func pickSideFn(a, side int, small, big func(int)) {
+ if a < side {
+ small(a)
+ } else {
+ big(a)
+ }
+}
+
+func addDoFn(t *testing.T, g *graph.Graph, fn interface{}, scope *graph.Scope, inputs []*graph.Node, outputCoders []*coder.Coder) {
+ t.Helper()
+ dofn, err := graph.NewDoFn(fn)
if err != nil {
t.Fatal(err)
}
-
- in := g.NewNode(intT(), window.DefaultWindowingStrategy(), true)
- in.Coder = intCoder()
-
- e, err := graph.NewParDo(g, g.Root(), dofn, []*graph.Node{in}, nil)
+ e, err := graph.NewParDo(g, scope, dofn, inputs, nil)
if err != nil {
t.Fatal(err)
}
- e.Output[0].To.Coder = intCoder()
- e.Output[1].To.Coder = intCoder()
- return e
+ if len(outputCoders) != len(e.Output) {
+ t.Fatalf("%v has %d outputs, but only got %d coders", dofn.Name(), len(e.Output), len(outputCoders))
+ }
+ for i, c := range outputCoders {
+ e.Output[i].To.Coder = c
+ }
+}
+
+func newIntInput(g *graph.Graph) *graph.Node {
+ in := g.NewNode(intT(), window.DefaultWindowingStrategy(), true)
+ in.Coder = intCoder()
+ return in
}
func intT() typex.FullType {
@@ -67,30 +81,82 @@ func intCoder() *coder.Coder {
return custom("int", reflectx.Int)
}
-// TestParDo verifies that ParDo can be serialized.
-func TestParDo(t *testing.T) {
- g := graph.New()
- pick(t, g)
-
- edges, _, err := g.Build()
- if err != nil {
- t.Fatal(err)
- }
- if len(edges) != 1 {
- t.Fatal("expected a single edge")
+// TestMarshal verifies that ParDo can be serialized.
+func TestMarshal(t *testing.T) {
+ tests := []struct {
+ name string
+ makeGraph func(t *testing.T, g *graph.Graph)
+ edges, transforms, roots int
+ }{
+ {
+ name: "ParDo",
+ makeGraph: func(t *testing.T, g *graph.Graph) {
+ addDoFn(t, g, pickFn, g.Root(), []*graph.Node{newIntInput(g)}, []*coder.Coder{intCoder(), intCoder()})
+ },
+ edges: 1,
+ transforms: 1,
+ roots: 1,
+ }, {
+ name: "ScopedParDo",
+ makeGraph: func(t *testing.T, g *graph.Graph) {
+ addDoFn(t, g, pickFn, g.NewScope(g.Root(), "sub"), []*graph.Node{newIntInput(g)}, []*coder.Coder{intCoder(), intCoder()})
+ },
+ edges: 1,
+ transforms: 2,
+ roots: 1,
+ }, {
+ name: "SideInput",
+ makeGraph: func(t *testing.T, g *graph.Graph) {
+ in := newIntInput(g)
+ side := newIntInput(g)
+ addDoFn(t, g, pickSideFn, g.Root(), []*graph.Node{in, side}, []*coder.Coder{intCoder(), intCoder()})
+ },
+ edges: 1,
+ transforms: 2,
+ roots: 2,
+ }, {
+ name: "ScopedSideInput",
+ makeGraph: func(t *testing.T, g *graph.Graph) {
+ in := newIntInput(g)
+ side := newIntInput(g)
+ addDoFn(t, g, pickSideFn, g.NewScope(g.Root(), "sub"), []*graph.Node{in, side}, []*coder.Coder{intCoder(), intCoder()})
+ },
+ edges: 1,
+ transforms: 3,
+ roots: 1,
+ },
}
+ for _, test := range tests {
+ test := test
+ t.Run(test.name, func(t *testing.T) {
- payload, err := proto.Marshal(&pb.DockerPayload{ContainerImage: "foo"})
- if err != nil {
- t.Fatal(err)
- }
- p, err := graphx.Marshal(edges,
- &graphx.Options{Environment: pb.Environment{Urn: "beam:env:docker:v1", Payload: payload}})
- if err != nil {
- t.Fatal(err)
- }
+ g := graph.New()
+ test.makeGraph(t, g)
+
+ edges, _, err := g.Build()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(edges) != test.edges {
+ t.Fatal("expected a single edge")
+ }
+
+ payload, err := proto.Marshal(&pb.DockerPayload{ContainerImage: "foo"})
+ if err != nil {
+ t.Fatal(err)
+ }
+ p, err := graphx.Marshal(edges,
+ &graphx.Options{Environment: pb.Environment{Urn: "beam:env:docker:v1", Payload: payload}})
+ if err != nil {
+ t.Fatal(err)
+ }
- if len(p.GetComponents().GetTransforms()) != 1 {
- t.Errorf("bad ParDo translation: %v", proto.MarshalTextString(p))
+ if got, want := len(p.GetComponents().GetTransforms()), test.transforms; got != want {
+ t.Errorf("got %d transforms, want %d : %v", got, want, proto.MarshalTextString(p))
+ }
+ if got, want := len(p.GetRootTransformIds()), test.roots; got != want {
+ t.Errorf("got %d roots, want %d : %v", got, want, proto.MarshalTextString(p))
+ }
+ })
}
}
diff --git a/sdks/go/test/integration/primitives/pardo.go b/sdks/go/test/integration/primitives/pardo.go
index ae7d9b2..62654e8 100644
--- a/sdks/go/test/integration/primitives/pardo.go
+++ b/sdks/go/test/integration/primitives/pardo.go
@@ -53,7 +53,8 @@ func ParDoSideInput() *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()
in := beam.Create(s, 1, 2, 3, 4, 5, 6, 7, 8, 9)
- out := beam.ParDo(s, sumValuesFn, beam.Impulse(s), beam.SideInput{Input: in})
+ sub := s.Scope("subscope") // Ensure scoping works with side inputs. See: BEAM-5354
+ out := beam.ParDo(sub, sumValuesFn, beam.Impulse(s), beam.SideInput{Input: in})
passert.Sum(s, out, "out", 1, 45)
return p