You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/05/13 01:40:43 UTC

[beam] branch master updated: [BEAM-9959] Root Transform fixes (#11686)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 29c9733  [BEAM-9959] Root Transform fixes (#11686)
29c9733 is described below

commit 29c9733d9a64ac5abaf7386d41bf04bbd2b74b08
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Tue May 12 18:40:28 2020 -0700

    [BEAM-9959] Root Transform fixes (#11686)
    
    * Correctly handle Output PCollections being used outside of a composite.
    * Previously Composites that used an output pcollection wouldn't include that pcollection as an output even if that pcollection was an input to a transform outside the composite.
    * Make TopologicalSort deterministic (to simplify debugging in the future).
    * Tests for the above.
    * Minor: Use pipepb as the package shortname for the pipeline proto package
---
 sdks/go/pkg/beam/core/runtime/pipelinex/replace.go |  40 +++-
 .../beam/core/runtime/pipelinex/replace_test.go    |  94 ++++++++-
 sdks/go/pkg/beam/core/runtime/pipelinex/util.go    |  35 +++-
 .../pkg/beam/core/runtime/pipelinex/util_test.go   | 231 +++++++++++++++++++++
 4 files changed, 382 insertions(+), 18 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
index cce53aa..8a0a3b9 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace.go
@@ -126,14 +126,23 @@ func walk(id string, ret map[string]*pipepb.PTransform, seen map[string]bool) {
 
 	in := make(map[string]bool)
 	out := make(map[string]bool)
+	local := map[string]bool{id: true}
 	for _, sid := range t.Subtransforms {
 		walk(sid, ret, seen)
 		inout(ret[sid], in, out)
+		local[sid] = true
 	}
 
+	// At this point, we know all the inputs and outputs of this composite.
+	// However, outputs in this PTransform can also be used by PTransforms
+	// external to this composite. So we must check the inputs in the rest of
+	// the graph, and ensure they're counted.
+	extIn := make(map[string]bool)
+	externalIns(local, ret, extIn, out)
+
 	upd := ShallowClonePTransform(t)
 	upd.Inputs = diff(in, out)
-	upd.Outputs = diff(out, in)
+	upd.Outputs = diffAndMerge(out, in, extIn)
 	upd.Subtransforms = TopologicalSort(ret, upd.Subtransforms)
 
 	ret[id] = upd
@@ -166,6 +175,35 @@ func inout(transform *pipepb.PTransform, in, out map[string]bool) {
 		out[col] = true
 	}
 }
+func diffAndMerge(out, in, extIn map[string]bool) map[string]string {
+	ret := diff(out, in)
+	for key := range extIn {
+		if ret == nil {
+			ret = make(map[string]string)
+		}
+		ret[key] = key
+	}
+	return ret
+}
+
+// externalIns checks the unseen non-composite graph
+func externalIns(counted map[string]bool, xforms map[string]*pipepb.PTransform, extIn, out map[string]bool) {
+	for id, pt := range xforms {
+		// Ignore other composites or already counted transforms.
+		if counted[id] || len(pt.GetSubtransforms()) != 0 {
+			continue
+		}
+		// Check this PTransform's inputs for anything output by something
+		// the current composite.
+		for col := range out {
+			for _, incol := range pt.GetInputs() {
+				if col == incol {
+					extIn[col] = true
+				}
+			}
+		}
+	}
+}
 
 // ensureUniqueNames ensures that each name is unique. Any conflict is
 // resolved by adding '1, '2, etc to the name.
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
index a4fdb3b..7bb395a 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
@@ -63,9 +63,11 @@ func TestEnsureUniqueName(t *testing.T) {
 
 func TestComputeInputOutput(t *testing.T) {
 	tests := []struct {
+		name    string
 		in, exp map[string]*pipepb.PTransform
 	}{
-		{ // singleton composite
+		{
+			name: "singleton composite",
 			in: map[string]*pipepb.PTransform{
 				"1": {
 					UniqueName:    "a",
@@ -91,7 +93,8 @@ func TestComputeInputOutput(t *testing.T) {
 				},
 			},
 		},
-		{ // closed composite
+		{
+			name: "closed composite",
 			in: map[string]*pipepb.PTransform{
 				"1": {
 					UniqueName:    "a",
@@ -109,12 +112,91 @@ func TestComputeInputOutput(t *testing.T) {
 				"3": {UniqueName: "c", Inputs: map[string]string{"i0": "p1"}},
 			},
 		},
+		{
+			name: "nested composites",
+			in: map[string]*pipepb.PTransform{
+				"1": {
+					UniqueName:    "a",
+					Subtransforms: []string{"2"},
+				},
+				"2": {
+					UniqueName:    "b",
+					Subtransforms: []string{"3", "7", "8"},
+				},
+				"3": {
+					UniqueName:    "c",
+					Subtransforms: []string{"4", "5", "6"},
+				},
+				"4": {UniqueName: "d", Inputs: map[string]string{"i0": "p1"}, Outputs: map[string]string{"i0": "p2"}},
+				"5": {UniqueName: "e", Inputs: map[string]string{"i0": "p2"}, Outputs: map[string]string{"i0": "p3", "i1": "p4"}},
+				"6": {UniqueName: "f", Inputs: map[string]string{"i0": "p2", "i1": "p5"}, Outputs: map[string]string{"i0": "p6"}},
+				"7": {UniqueName: "g", Inputs: map[string]string{"i0": "p4", "i1": "p6", "i2": "p8"}, Outputs: map[string]string{"i0": "p7"}},
+				"8": {UniqueName: "h", Inputs: map[string]string{"i0": "p7"}},
+			},
+			exp: map[string]*pipepb.PTransform{
+				"1": {
+					UniqueName:    "a",
+					Subtransforms: []string{"2"},
+					Inputs:        map[string]string{"p1": "p1", "p5": "p5", "p8": "p8"},
+					Outputs:       map[string]string{"p3": "p3"},
+				},
+				"2": {
+					UniqueName:    "b",
+					Subtransforms: []string{"3", "7", "8"},
+					Inputs:        map[string]string{"p1": "p1", "p5": "p5", "p8": "p8"},
+					Outputs:       map[string]string{"p3": "p3"},
+				},
+				"3": {
+					UniqueName:    "c",
+					Subtransforms: []string{"4", "6", "5"}, // topologically sorted.
+					Inputs:        map[string]string{"p1": "p1", "p5": "p5"},
+					Outputs:       map[string]string{"p4": "p4", "p6": "p6", "p3": "p3"},
+				},
+				"4": {UniqueName: "d", Inputs: map[string]string{"i0": "p1"}, Outputs: map[string]string{"i0": "p2"}},
+				"5": {UniqueName: "e", Inputs: map[string]string{"i0": "p2"}, Outputs: map[string]string{"i0": "p3", "i1": "p4"}},
+				"6": {UniqueName: "f", Inputs: map[string]string{"i0": "p2", "i1": "p5"}, Outputs: map[string]string{"i0": "p6"}},
+				"7": {UniqueName: "g", Inputs: map[string]string{"i0": "p4", "i1": "p6", "i2": "p8"}, Outputs: map[string]string{"i0": "p7"}},
+				"8": {UniqueName: "h", Inputs: map[string]string{"i0": "p7"}},
+			},
+		}, {
+			name: "sibling composite",
+			in: map[string]*pipepb.PTransform{
+				"1": {
+					UniqueName:    "a",
+					Subtransforms: []string{"3", "4"},
+				},
+				"2": {
+					UniqueName:    "b",
+					Subtransforms: []string{"5"},
+				},
+				"3": {UniqueName: "c", Outputs: map[string]string{"i0": "p1"}},
+				"4": {UniqueName: "d", Inputs: map[string]string{"i0": "p1"}},
+				"5": {UniqueName: "e", Inputs: map[string]string{"i0": "p1"}},
+			},
+			exp: map[string]*pipepb.PTransform{
+				"1": {
+					UniqueName:    "a",
+					Subtransforms: []string{"3", "4"},
+					Outputs:       map[string]string{"p1": "p1"},
+				},
+				"2": {
+					UniqueName:    "b",
+					Subtransforms: []string{"5"},
+					Inputs:        map[string]string{"p1": "p1"},
+				},
+				"3": {UniqueName: "c", Outputs: map[string]string{"i0": "p1"}},
+				"4": {UniqueName: "d", Inputs: map[string]string{"i0": "p1"}},
+				"5": {UniqueName: "e", Inputs: map[string]string{"i0": "p1"}},
+			},
+		},
 	}
 
 	for _, test := range tests {
-		actual := computeCompositeInputOutput(test.in)
-		if !cmp.Equal(actual, test.exp, cmp.Comparer(proto.Equal)) {
-			t.Errorf("coimputeInputOutput(%v) = %v, want %v", test.in, actual, test.exp)
-		}
+		t.Run(test.name, func(t *testing.T) {
+			actual := computeCompositeInputOutput(test.in)
+			if diff := cmp.Diff(actual, test.exp, cmp.Comparer(proto.Equal)); diff != "" {
+				t.Errorf("computeInputOutput(%v)\ndiff: %v", test.in, diff)
+			}
+		})
 	}
 }
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/util.go b/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
index f859d95..9aae045 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/util.go
@@ -15,13 +15,17 @@
 
 package pipelinex
 
-import pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
-import "github.com/golang/protobuf/proto"
+import (
+	"sort"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"github.com/golang/protobuf/proto"
+)
 
 // Bounded returns true iff all PCollections are bounded.
-func Bounded(p *pb.Pipeline) bool {
+func Bounded(p *pipepb.Pipeline) bool {
 	for _, col := range p.GetComponents().GetPcollections() {
-		if col.IsBounded == pb.IsBounded_UNBOUNDED {
+		if col.IsBounded == pipepb.IsBounded_UNBOUNDED {
 			return false
 		}
 	}
@@ -30,11 +34,11 @@ func Bounded(p *pb.Pipeline) bool {
 
 // ContainerImages returns the set of container images used
 // in the given pipeline.
-func ContainerImages(p *pb.Pipeline) []string {
+func ContainerImages(p *pipepb.Pipeline) []string {
 	var ret []string
 	for _, t := range p.GetComponents().GetEnvironments() {
 		// TODO(angoenka) 09/14/2018 Check t.Urn before parsing the payload.
-		var payload pb.DockerPayload
+		var payload pipepb.DockerPayload
 		proto.Unmarshal(t.GetPayload(), &payload)
 		ret = append(ret, payload.ContainerImage)
 	}
@@ -43,7 +47,7 @@ func ContainerImages(p *pb.Pipeline) []string {
 
 // TopologicalSort returns a topologically sorted list of the given
 // ids, generally from the same scope/composite. Assumes acyclic graph.
-func TopologicalSort(xforms map[string]*pb.PTransform, ids []string) []string {
+func TopologicalSort(xforms map[string]*pipepb.PTransform, ids []string) []string {
 	if len(ids) == 0 {
 		return ids
 	}
@@ -62,7 +66,7 @@ type visiter struct {
 	next   map[string][]string // collection -> transforms
 }
 
-func newVisiter(xforms map[string]*pb.PTransform, ids []string) *visiter {
+func newVisiter(xforms map[string]*pipepb.PTransform, ids []string) *visiter {
 	ret := &visiter{
 		output: make([]string, len(ids), len(ids)),
 		index:  len(ids) - 1,
@@ -74,20 +78,29 @@ func newVisiter(xforms map[string]*pb.PTransform, ids []string) *visiter {
 			ret.next[in] = append(ret.next[in], id)
 		}
 	}
+	for _, ns := range ret.next {
+		sort.Strings(ns)
+	}
 	return ret
 }
 
-func (v *visiter) visit(xforms map[string]*pb.PTransform, id string) {
+func (v *visiter) visit(xforms map[string]*pipepb.PTransform, id string) {
 	if v.seen[id] {
 		return
 	}
 	v.seen[id] = true
-	for _, out := range xforms[id].Outputs {
+	// Deterministically iterate through the output keys.
+	outputKeys := make([]string, 0, len(xforms[id].Outputs))
+	for _, k := range xforms[id].Outputs {
+		outputKeys = append(outputKeys, k)
+	}
+	sort.Strings(outputKeys)
+
+	for _, out := range outputKeys {
 		for _, next := range v.next[out] {
 			v.visit(xforms, next)
 		}
 	}
-
 	v.output[v.index] = id
 	v.index--
 }
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/util_test.go b/sdks/go/pkg/beam/core/runtime/pipelinex/util_test.go
new file mode 100644
index 0000000..8f19c12
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/util_test.go
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pipelinex
+
+import (
+	"fmt"
+	"testing"
+
+	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"github.com/google/go-cmp/cmp"
+)
+
+func TestTopologicalSort(t *testing.T) {
+	graphs := map[string]map[string]*pipepb.PTransform{
+		"linkedList": map[string]*pipepb.PTransform{
+			"p0": ptImpulse("n0"),
+			"p1": ptNoSide("n0", "n1"),
+			"p2": ptNoSide("n1", "n2"),
+			"p3": ptNoSide("n2", "n3"),
+			"p4": ptSink("n3"),
+		},
+		"binarytree": map[string]*pipepb.PTransform{
+			"p0":   ptImpulse("n0"),
+			"p1a":  ptNoSide("n0", "n1a"),
+			"p1b":  ptNoSide("n0", "n1b"),
+			"p2aa": ptSink("n1a"),
+			"p2ab": ptSink("n1a"),
+			"p2ba": ptSink("n1b"),
+			"p2bb": ptSink("n1b"),
+		},
+		"binarytreeWComps": map[string]*pipepb.PTransform{
+			"p0":   ptImpulse("n0"),
+			"p1a":  ptNoSide("n0", "n1a"),
+			"p1b":  ptNoSide("n0", "n1b"),
+			"p2aa": ptSink("n1a"),
+			"p2ab": ptSink("n1a"),
+			"p2ba": ptSink("n1b"),
+			"p2bb": ptSink("n1b"),
+			"c1":   ptComp("p0", "p1a", "p1b"),
+			"c2":   ptComp("p2aa", "p2ab"),
+			"c3":   ptComp("p2ba", "p2bb"),
+		},
+		"linkedListWComps": map[string]*pipepb.PTransform{
+			"p0": ptImpulse("n0"),
+			"p1": ptNoSide("n0", "n1"),
+			"p2": ptNoSide("n1", "n2"),
+			"p3": ptNoSide("n2", "n3"),
+			"p4": ptSink("n3"),
+			"c1": ptComp("p0"),
+			"c2": ptComp("p1", "p2"),
+			"c3": ptComp("c1", "c2", "p3"),
+		},
+		"leafdag": map[string]*pipepb.PTransform{
+			"A":  ptImpulse("n0"),
+			"B":  ptNoSide("n0", "n1"),
+			"C":  ptSink("n0", "n1"),
+			"c1": ptComp("A", "C"),
+			"c2": ptComp("B"),
+		},
+	}
+
+	for k, g := range graphs {
+		graphs[k] = computeCompositeInputOutput(g)
+	}
+
+	tests := []struct {
+		graph  string
+		toSort []string
+	}{
+		{
+			graph:  "linkedList",
+			toSort: []string{},
+		}, {
+			graph:  "linkedList",
+			toSort: []string{"p0", "p1", "p2", "p3", "p4"},
+		}, {
+			graph:  "linkedList",
+			toSort: []string{"p3", "p4", "p0", "p1", "p2"},
+		}, {
+			graph:  "binarytree",
+			toSort: []string{"p0", "p1a", "p1b", "p2aa", "p2ab", "p2ba", "p2bb"},
+		}, {
+			graph:  "binarytree",
+			toSort: []string{"p0", "p2bb", "p2aa", "p2ba", "p1b", "p1a", "p2ab"},
+		}, {
+			graph:  "binarytree",
+			toSort: []string{"p1b", "p0", "p2ba"},
+		}, {
+			graph:  "binarytreeWComps",
+			toSort: []string{"p0", "p1a", "p1b", "p2aa", "p2ab", "p2ba", "p2bb"},
+		}, {
+			graph:  "binarytreeWComps",
+			toSort: []string{"c1", "p2aa", "p2ab", "p2ba", "p2bb"},
+		}, {
+			graph:  "binarytreeWComps",
+			toSort: []string{"c1", "c2", "c3"},
+		}, {
+			graph:  "linkedListWComps",
+			toSort: []string{"c3", "p4"},
+		}, {
+			graph:  "linkedListWComps",
+			toSort: []string{"c1", "c2", "p3", "p4"},
+		}, {
+			graph:  "linkedListWComps",
+			toSort: []string{"p0", "c2", "p3", "p4"},
+		}, {
+			graph:  "linkedListWComps",
+			toSort: []string{"c1", "p1", "p2", "p3", "p4"},
+		}, {
+			graph:  "leafdag",
+			toSort: []string{"A", "B", "C"},
+		}, {
+			graph:  "leafdag",
+			toSort: []string{"A", "c2", "C"},
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.graph, func(t *testing.T) {
+			xforms := graphs[test.graph]
+			got1 := TopologicalSort(xforms, test.toSort)
+			got2 := TopologicalSort(xforms, test.toSort)
+			if diff := cmp.Diff(got1, got2); diff != "" {
+				t.Errorf("TopologicalSort(%v, %v) not deterministic: %v", test.graph, test.toSort, diff)
+			}
+			validateSortForTest(t, xforms, got1)
+		})
+	}
+}
+
+func ptSink(input string, sides ...string) *pipepb.PTransform {
+	ins := map[string]string{"i0": input}
+	for i, s := range sides {
+		ins[fmt.Sprintf("i%d", i+1)] = s
+	}
+	return &pipepb.PTransform{
+		Inputs: ins,
+	}
+}
+
+// ptComp generates a composite PCollection. Unlike the other helpers, it takes in
+// the *subtransform ids* instead of the input/output pcollections.
+func ptComp(subs ...string) *pipepb.PTransform {
+	return &pipepb.PTransform{
+		Subtransforms: subs,
+	}
+}
+
+func ptImpulse(output string) *pipepb.PTransform {
+	return &pipepb.PTransform{
+		Outputs: map[string]string{"o0": output},
+	}
+}
+
+func pt(inputs []string, outputs ...string) *pipepb.PTransform {
+	ins := make(map[string]string)
+	for i, in := range inputs {
+		ins[fmt.Sprintf("i%d", i)] = in
+	}
+	outs := make(map[string]string)
+	for i, out := range outputs {
+		outs[fmt.Sprintf("i%d", i)] = out
+	}
+	return &pipepb.PTransform{
+		Inputs:  ins,
+		Outputs: outs,
+	}
+}
+
+func ptNoSide(input string, outputs ...string) *pipepb.PTransform {
+	outs := make(map[string]string)
+	for i, o := range outputs {
+		outs[fmt.Sprintf("o%d", i)] = o
+	}
+	return &pipepb.PTransform{
+		Inputs:  map[string]string{"i0": input},
+		Outputs: outs,
+	}
+}
+
+// validateSortForTest ensures that in the sorted ids, outputs are declared before
+// they're used as inputs.
+// Distinct from validateSort to better communicate error details using the test harness,
+// and base things on the generated unique ids, rather than the user readable scopes.
+func validateSortForTest(t *testing.T, xforms map[string]*pipepb.PTransform, sorted []string) {
+	t.Helper()
+	seenPCollections := make(map[string]bool)
+	missingPCollections := make(map[string]bool)
+
+	for _, id := range sorted {
+		pt := xforms[id]
+		for _, in := range pt.Inputs {
+			if !seenPCollections[in] {
+				t.Errorf("out of order pcollection %v required by %v", in, id)
+				missingPCollections[in] = true
+			}
+		}
+
+		if len(pt.GetSubtransforms()) != 0 {
+			for _, sptid := range pt.GetSubtransforms() {
+				spt := xforms[sptid]
+				for _, out := range spt.GetOutputs() {
+					if missingPCollections[out] {
+						t.Errorf("missing pcollection %v created by %v in composite %v", out, sptid, id)
+					}
+				}
+			}
+		}
+		for _, out := range pt.GetOutputs() {
+			seenPCollections[out] = true
+			if missingPCollections[out] {
+				t.Errorf("missing pcollection %v created by %v", out, id)
+			}
+		}
+	}
+	if len(missingPCollections) != 0 {
+		t.Log(sorted)
+	}
+}