You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/05/13 01:40:43 UTC
[beam] branch master updated: [BEAM-9959] Root Transform fixes
(#11686)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 29c9733 [BEAM-9959] Root Transform fixes (#11686)
29c9733 is described below
commit 29c9733d9a64ac5abaf7386d41bf04bbd2b74b08
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue May 12 18:40:28 2020 -0700
[BEAM-9959] Root Transform fixes (#11686)
* Correctly handle Output PCollections being used outside of a composite.
* Previously Composites that used an output pcollection wouldn't include that pcollection as an output even if that pcollection was an input to a transform outside the composite.
* Make TopologicalSort deterministic (to simplify debugging in the future).
* Tests for the above.
* Minor: Use pipepb as the package shortname for the pipeline proto package
---
sdks/go/pkg/beam/core/runtime/pipelinex/replace.go | 40 +++-
.../beam/core/runtime/pipelinex/replace_test.go | 94 ++++++++-
sdks/go/pkg/beam/core/runtime/pipelinex/util.go | 35 +++-
.../pkg/beam/core/runtime/pipelinex/util_test.go | 231 +++++++++++++++++++++
4 files changed, 382 insertions(+), 18 deletions(-)
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
index cce53aa..8a0a3b9 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
@@ -126,14 +126,23 @@ func walk(id string, ret map[string]*pipepb.PTransform, seen map[string]bool) {
in := make(map[string]bool)
out := make(map[string]bool)
+ local := map[string]bool{id: true}
for _, sid := range t.Subtransforms {
walk(sid, ret, seen)
inout(ret[sid], in, out)
+ local[sid] = true
}
+ // At this point, we know all the inputs and outputs of this composite.
+ // However, outputs in this PTransform can also be used by PTransforms
+ // external to this composite. So we must check the inputs in the rest of
+ // the graph, and ensure they're counted.
+ extIn := make(map[string]bool)
+ externalIns(local, ret, extIn, out)
+
upd := ShallowClonePTransform(t)
upd.Inputs = diff(in, out)
- upd.Outputs = diff(out, in)
+ upd.Outputs = diffAndMerge(out, in, extIn)
upd.Subtransforms = TopologicalSort(ret, upd.Subtransforms)
ret[id] = upd
@@ -166,6 +175,35 @@ func inout(transform *pipepb.PTransform, in, out map[string]bool) {
out[col] = true
}
}
+func diffAndMerge(out, in, extIn map[string]bool) map[string]string {
+ ret := diff(out, in)
+ for key := range extIn {
+ if ret == nil {
+ ret = make(map[string]string)
+ }
+ ret[key] = key
+ }
+ return ret
+}
+
+// externalIns checks the unseen non-composite graph
+func externalIns(counted map[string]bool, xforms map[string]*pipepb.PTransform, extIn, out map[string]bool) {
+ for id, pt := range xforms {
+ // Ignore other composites or already counted transforms.
+ if counted[id] || len(pt.GetSubtransforms()) != 0 {
+ continue
+ }
+ // Check this PTransform's inputs for anything output by something
+ // the current composite.
+ for col := range out {
+ for _, incol := range pt.GetInputs() {
+ if col == incol {
+ extIn[col] = true
+ }
+ }
+ }
+ }
+}
// ensureUniqueNames ensures that each name is unique. Any conflict is
// resolved by adding '1, '2, etc to the name.
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
index a4fdb3b..7bb395a 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
@@ -63,9 +63,11 @@ func TestEnsureUniqueName(t *testing.T) {
func TestComputeInputOutput(t *testing.T) {
tests := []struct {
+ name string
in, exp map[string]*pipepb.PTransform
}{
- { // singleton composite
+ {
+ name: "singleton composite",
in: map[string]*pipepb.PTransform{
"1": {
UniqueName: "a",
@@ -91,7 +93,8 @@ func TestComputeInputOutput(t *testing.T) {
},
},
},
- { // closed composite
+ {
+ name: "closed composite",
in: map[string]*pipepb.PTransform{
"1": {
UniqueName: "a",
@@ -109,12 +112,91 @@ func TestComputeInputOutput(t *testing.T) {
"3": {UniqueName: "c", Inputs: map[string]string{"i0": "p1"}},
},
},
+ {
+ name: "nested composites",
+ in: map[string]*pipepb.PTransform{
+ "1": {
+ UniqueName: "a",
+ Subtransforms: []string{"2"},
+ },
+ "2": {
+ UniqueName: "b",
+ Subtransforms: []string{"3", "7", "8"},
+ },
+ "3": {
+ UniqueName: "c",
+ Subtransforms: []string{"4", "5", "6"},
+ },
+ "4": {UniqueName: "d", Inputs: map[string]string{"i0": "p1"}, Outputs: map[string]string{"i0": "p2"}},
+ "5": {UniqueName: "e", Inputs: map[string]string{"i0": "p2"}, Outputs: map[string]string{"i0": "p3", "i1": "p4"}},
+ "6": {UniqueName: "f", Inputs: map[string]string{"i0": "p2", "i1": "p5"}, Outputs: map[string]string{"i0": "p6"}},
+ "7": {UniqueName: "g", Inputs: map[string]string{"i0": "p4", "i1": "p6", "i2": "p8"}, Outputs: map[string]string{"i0": "p7"}},
+ "8": {UniqueName: "h", Inputs: map[string]string{"i0": "p7"}},
+ },
+ exp: map[string]*pipepb.PTransform{
+ "1": {
+ UniqueName: "a",
+ Subtransforms: []string{"2"},
+ Inputs: map[string]string{"p1": "p1", "p5": "p5", "p8": "p8"},
+ Outputs: map[string]string{"p3": "p3"},
+ },
+ "2": {
+ UniqueName: "b",
+ Subtransforms: []string{"3", "7", "8"},
+ Inputs: map[string]string{"p1": "p1", "p5": "p5", "p8": "p8"},
+ Outputs: map[string]string{"p3": "p3"},
+ },
+ "3": {
+ UniqueName: "c",
+ Subtransforms: []string{"4", "6", "5"}, // topologically sorted.
+ Inputs: map[string]string{"p1": "p1", "p5": "p5"},
+ Outputs: map[string]string{"p4": "p4", "p6": "p6", "p3": "p3"},
+ },
+ "4": {UniqueName: "d", Inputs: map[string]string{"i0": "p1"}, Outputs: map[string]string{"i0": "p2"}},
+ "5": {UniqueName: "e", Inputs: map[string]string{"i0": "p2"}, Outputs: map[string]string{"i0": "p3", "i1": "p4"}},
+ "6": {UniqueName: "f", Inputs: map[string]string{"i0": "p2", "i1": "p5"}, Outputs: map[string]string{"i0": "p6"}},
+ "7": {UniqueName: "g", Inputs: map[string]string{"i0": "p4", "i1": "p6", "i2": "p8"}, Outputs: map[string]string{"i0": "p7"}},
+ "8": {UniqueName: "h", Inputs: map[string]string{"i0": "p7"}},
+ },
+ }, {
+ name: "sibling composite",
+ in: map[string]*pipepb.PTransform{
+ "1": {
+ UniqueName: "a",
+ Subtransforms: []string{"3", "4"},
+ },
+ "2": {
+ UniqueName: "b",
+ Subtransforms: []string{"5"},
+ },
+ "3": {UniqueName: "c", Outputs: map[string]string{"i0": "p1"}},
+ "4": {UniqueName: "d", Inputs: map[string]string{"i0": "p1"}},
+ "5": {UniqueName: "e", Inputs: map[string]string{"i0": "p1"}},
+ },
+ exp: map[string]*pipepb.PTransform{
+ "1": {
+ UniqueName: "a",
+ Subtransforms: []string{"3", "4"},
+ Outputs: map[string]string{"p1": "p1"},
+ },
+ "2": {
+ UniqueName: "b",
+ Subtransforms: []string{"5"},
+ Inputs: map[string]string{"p1": "p1"},
+ },
+ "3": {UniqueName: "c", Outputs: map[string]string{"i0": "p1"}},
+ "4": {UniqueName: "d", Inputs: map[string]string{"i0": "p1"}},
+ "5": {UniqueName: "e", Inputs: map[string]string{"i0": "p1"}},
+ },
+ },
}
for _, test := range tests {
- actual := computeCompositeInputOutput(test.in)
- if !cmp.Equal(actual, test.exp, cmp.Comparer(proto.Equal)) {
- t.Errorf("coimputeInputOutput(%v) = %v, want %v", test.in, actual, test.exp)
- }
+ t.Run(test.name, func(t *testing.T) {
+ actual := computeCompositeInputOutput(test.in)
+ if diff := cmp.Diff(actual, test.exp, cmp.Comparer(proto.Equal)); diff != "" {
+ t.Errorf("computeInputOutput(%v)\ndiff: %v", test.in, diff)
+ }
+ })
}
}
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/util.go b/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
index f859d95..9aae045 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
@@ -15,13 +15,17 @@
package pipelinex
-import pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
-import "github.com/golang/protobuf/proto"
+import (
+ "sort"
+
+ pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+ "github.com/golang/protobuf/proto"
+)
// Bounded returns true iff all PCollections are bounded.
-func Bounded(p *pb.Pipeline) bool {
+func Bounded(p *pipepb.Pipeline) bool {
for _, col := range p.GetComponents().GetPcollections() {
- if col.IsBounded == pb.IsBounded_UNBOUNDED {
+ if col.IsBounded == pipepb.IsBounded_UNBOUNDED {
return false
}
}
@@ -30,11 +34,11 @@ func Bounded(p *pb.Pipeline) bool {
// ContainerImages returns the set of container images used
// in the given pipeline.
-func ContainerImages(p *pb.Pipeline) []string {
+func ContainerImages(p *pipepb.Pipeline) []string {
var ret []string
for _, t := range p.GetComponents().GetEnvironments() {
// TODO(angoenka) 09/14/2018 Check t.Urn before parsing the payload.
- var payload pb.DockerPayload
+ var payload pipepb.DockerPayload
proto.Unmarshal(t.GetPayload(), &payload)
ret = append(ret, payload.ContainerImage)
}
@@ -43,7 +47,7 @@ func ContainerImages(p *pb.Pipeline) []string {
// TopologicalSort returns a topologically sorted list of the given
// ids, generally from the same scope/composite. Assumes acyclic graph.
-func TopologicalSort(xforms map[string]*pb.PTransform, ids []string) []string {
+func TopologicalSort(xforms map[string]*pipepb.PTransform, ids []string) []string {
if len(ids) == 0 {
return ids
}
@@ -62,7 +66,7 @@ type visiter struct {
next map[string][]string // collection -> transforms
}
-func newVisiter(xforms map[string]*pb.PTransform, ids []string) *visiter {
+func newVisiter(xforms map[string]*pipepb.PTransform, ids []string) *visiter {
ret := &visiter{
output: make([]string, len(ids), len(ids)),
index: len(ids) - 1,
@@ -74,20 +78,29 @@ func newVisiter(xforms map[string]*pb.PTransform, ids []string) *visiter {
ret.next[in] = append(ret.next[in], id)
}
}
+ for _, ns := range ret.next {
+ sort.Strings(ns)
+ }
return ret
}
-func (v *visiter) visit(xforms map[string]*pb.PTransform, id string) {
+func (v *visiter) visit(xforms map[string]*pipepb.PTransform, id string) {
if v.seen[id] {
return
}
v.seen[id] = true
- for _, out := range xforms[id].Outputs {
+ // Deterministically iterate through the output keys.
+ outputKeys := make([]string, 0, len(xforms[id].Outputs))
+ for _, k := range xforms[id].Outputs {
+ outputKeys = append(outputKeys, k)
+ }
+ sort.Strings(outputKeys)
+
+ for _, out := range outputKeys {
for _, next := range v.next[out] {
v.visit(xforms, next)
}
}
-
v.output[v.index] = id
v.index--
}
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/util_test.go b/sdks/go/pkg/beam/core/runtime/pipelinex/util_test.go
new file mode 100644
index 0000000..8f19c12
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/util_test.go
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipelinex
+
+import (
+ "fmt"
+ "testing"
+
+ pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+ "github.com/google/go-cmp/cmp"
+)
+
+func TestTopologicalSort(t *testing.T) {
+ graphs := map[string]map[string]*pipepb.PTransform{
+ "linkedList": map[string]*pipepb.PTransform{
+ "p0": ptImpulse("n0"),
+ "p1": ptNoSide("n0", "n1"),
+ "p2": ptNoSide("n1", "n2"),
+ "p3": ptNoSide("n2", "n3"),
+ "p4": ptSink("n3"),
+ },
+ "binarytree": map[string]*pipepb.PTransform{
+ "p0": ptImpulse("n0"),
+ "p1a": ptNoSide("n0", "n1a"),
+ "p1b": ptNoSide("n0", "n1b"),
+ "p2aa": ptSink("n1a"),
+ "p2ab": ptSink("n1a"),
+ "p2ba": ptSink("n1b"),
+ "p2bb": ptSink("n1b"),
+ },
+ "binarytreeWComps": map[string]*pipepb.PTransform{
+ "p0": ptImpulse("n0"),
+ "p1a": ptNoSide("n0", "n1a"),
+ "p1b": ptNoSide("n0", "n1b"),
+ "p2aa": ptSink("n1a"),
+ "p2ab": ptSink("n1a"),
+ "p2ba": ptSink("n1b"),
+ "p2bb": ptSink("n1b"),
+ "c1": ptComp("p0", "p1a", "p1b"),
+ "c2": ptComp("p2aa", "p2ab"),
+ "c3": ptComp("p2ba", "p2bb"),
+ },
+ "linkedListWComps": map[string]*pipepb.PTransform{
+ "p0": ptImpulse("n0"),
+ "p1": ptNoSide("n0", "n1"),
+ "p2": ptNoSide("n1", "n2"),
+ "p3": ptNoSide("n2", "n3"),
+ "p4": ptSink("n3"),
+ "c1": ptComp("p0"),
+ "c2": ptComp("p1", "p2"),
+ "c3": ptComp("c1", "c2", "p3"),
+ },
+ "leafdag": map[string]*pipepb.PTransform{
+ "A": ptImpulse("n0"),
+ "B": ptNoSide("n0", "n1"),
+ "C": ptSink("n0", "n1"),
+ "c1": ptComp("A", "C"),
+ "c2": ptComp("B"),
+ },
+ }
+
+ for k, g := range graphs {
+ graphs[k] = computeCompositeInputOutput(g)
+ }
+
+ tests := []struct {
+ graph string
+ toSort []string
+ }{
+ {
+ graph: "linkedList",
+ toSort: []string{},
+ }, {
+ graph: "linkedList",
+ toSort: []string{"p0", "p1", "p2", "p3", "p4"},
+ }, {
+ graph: "linkedList",
+ toSort: []string{"p3", "p4", "p0", "p1", "p2"},
+ }, {
+ graph: "binarytree",
+ toSort: []string{"p0", "p1a", "p1b", "p2aa", "p2ab", "p2ba", "p2bb"},
+ }, {
+ graph: "binarytree",
+ toSort: []string{"p0", "p2bb", "p2aa", "p2ba", "p1b", "p1a", "p2ab"},
+ }, {
+ graph: "binarytree",
+ toSort: []string{"p1b", "p0", "p2ba"},
+ }, {
+ graph: "binarytreeWComps",
+ toSort: []string{"p0", "p1a", "p1b", "p2aa", "p2ab", "p2ba", "p2bb"},
+ }, {
+ graph: "binarytreeWComps",
+ toSort: []string{"c1", "p2aa", "p2ab", "p2ba", "p2bb"},
+ }, {
+ graph: "binarytreeWComps",
+ toSort: []string{"c1", "c2", "c3"},
+ }, {
+ graph: "linkedListWComps",
+ toSort: []string{"c3", "p4"},
+ }, {
+ graph: "linkedListWComps",
+ toSort: []string{"c1", "c2", "p3", "p4"},
+ }, {
+ graph: "linkedListWComps",
+ toSort: []string{"p0", "c2", "p3", "p4"},
+ }, {
+ graph: "linkedListWComps",
+ toSort: []string{"c1", "p1", "p2", "p3", "p4"},
+ }, {
+ graph: "leafdag",
+ toSort: []string{"A", "B", "C"},
+ }, {
+ graph: "leafdag",
+ toSort: []string{"A", "c2", "C"},
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.graph, func(t *testing.T) {
+ xforms := graphs[test.graph]
+ got1 := TopologicalSort(xforms, test.toSort)
+ got2 := TopologicalSort(xforms, test.toSort)
+ if diff := cmp.Diff(got1, got2); diff != "" {
+ t.Errorf("TopologicalSort(%v, %v) not deterministic: %v", test.graph, test.toSort, diff)
+ }
+ validateSortForTest(t, xforms, got1)
+ })
+ }
+}
+
+func ptSink(input string, sides ...string) *pipepb.PTransform {
+ ins := map[string]string{"i0": input}
+ for i, s := range sides {
+ ins[fmt.Sprintf("i%d", i+1)] = s
+ }
+ return &pipepb.PTransform{
+ Inputs: ins,
+ }
+}
+
+// ptComp generates a composite PCollection. Unlike the other helpers, it takes in
+// the *subtransform ids* instead of the input/output pcollections.
+func ptComp(subs ...string) *pipepb.PTransform {
+ return &pipepb.PTransform{
+ Subtransforms: subs,
+ }
+}
+
+func ptImpulse(output string) *pipepb.PTransform {
+ return &pipepb.PTransform{
+ Outputs: map[string]string{"o0": output},
+ }
+}
+
+func pt(inputs []string, outputs ...string) *pipepb.PTransform {
+ ins := make(map[string]string)
+ for i, in := range inputs {
+ ins[fmt.Sprintf("i%d", i)] = in
+ }
+ outs := make(map[string]string)
+ for i, out := range outputs {
+ outs[fmt.Sprintf("i%d", i)] = out
+ }
+ return &pipepb.PTransform{
+ Inputs: ins,
+ Outputs: outs,
+ }
+}
+
+func ptNoSide(input string, outputs ...string) *pipepb.PTransform {
+ outs := make(map[string]string)
+ for i, o := range outputs {
+ outs[fmt.Sprintf("o%d", i)] = o
+ }
+ return &pipepb.PTransform{
+ Inputs: map[string]string{"i0": input},
+ Outputs: outs,
+ }
+}
+
+// validateSortForTest ensures that in the sorted ids, outputs are declared before
+// they're used as inputs.
+// Distinct from validateSort to better communicate error details using the test harness,
+// and base things on the generated unique ids, rather than the user readable scopes.
+func validateSortForTest(t *testing.T, xforms map[string]*pipepb.PTransform, sorted []string) {
+ t.Helper()
+ seenPCollections := make(map[string]bool)
+ missingPCollections := make(map[string]bool)
+
+ for _, id := range sorted {
+ pt := xforms[id]
+ for _, in := range pt.Inputs {
+ if !seenPCollections[in] {
+ t.Errorf("out of order pcollection %v required by %v", in, id)
+ missingPCollections[in] = true
+ }
+ }
+
+ if len(pt.GetSubtransforms()) != 0 {
+ for _, sptid := range pt.GetSubtransforms() {
+ spt := xforms[sptid]
+ for _, out := range spt.GetOutputs() {
+ if missingPCollections[out] {
+ t.Errorf("missing pcollection %v created by %v in composite %v", out, sptid, id)
+ }
+ }
+ }
+ }
+ for _, out := range pt.GetOutputs() {
+ seenPCollections[out] = true
+ if missingPCollections[out] {
+ t.Errorf("missing pcollection %v created by %v", out, id)
+ }
+ }
+ }
+ if len(missingPCollections) != 0 {
+ t.Log(sorted)
+ }
+}