You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/02/20 00:43:55 UTC

[beam] branch master updated: [#24789][prism] add preprocessor and test (#25520)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6667eb4741b [#24789][prism] add preprocessor and test (#25520)
6667eb4741b is described below

commit 6667eb4741bad1bad199b61012476d62ea3d7e27
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Sun Feb 19 16:43:48 2023 -0800

    [#24789][prism] add preprocessor and test (#25520)
    
    * [prism] add preprocessor and test
    
    * [prism] preparer comment
    
    * [prism] move preparer
    
    ---------
    
    Co-authored-by: lostluck <13...@users.noreply.github.com>
---
 sdks/go/pkg/beam/runners/prism/internal/execute.go |  22 +++
 .../pkg/beam/runners/prism/internal/preprocess.go  | 148 +++++++++++++++++
 .../beam/runners/prism/internal/preprocess_test.go | 181 +++++++++++++++++++++
 3 files changed, 351 insertions(+)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go
new file mode 100644
index 00000000000..b685df63cf6
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+// stage represents a fused subgraph.
+// temporary implementation to break up PRs.
+type stage struct {
+	transforms []string
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
new file mode 100644
index 00000000000..8769a05d38f
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"sort"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"golang.org/x/exp/maps"
+	"golang.org/x/exp/slog"
+)
+
+// transformPreparer is an interface for handling different urns in the preprocessor
+// largely for exchanging transforms for others, to be added to the complete set of
+// components in the pipeline.
+type transformPreparer interface {
+	// PrepareUrns returns the Beam URNs that this handler deals with for preprocessing.
+	PrepareUrns() []string
+	// PrepareTransform takes a PTransform proto and returns a set of new Components, and a list of
+	// transformIDs leaves to remove and ignore from graph processing.
+	PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string)
+}
+
+// preprocessor retains configuration for preprocessing the
+// graph, such as special handling for lifted combiners or
+// other configuration.
+type preprocessor struct {
+	transformPreparers map[string]transformPreparer
+}
+
+func newPreprocessor(preps []transformPreparer) *preprocessor {
+	preparers := map[string]transformPreparer{}
+	for _, prep := range preps {
+		for _, urn := range prep.PrepareUrns() {
+			preparers[urn] = prep
+		}
+	}
+	return &preprocessor{
+		transformPreparers: preparers,
+	}
+}
+
+// preProcessGraph takes the graph and preprocesses for consumption in bundles.
+// The output is the topological sort of the transform ids.
+//
+// These are how transforms are related in graph form, but not the specific bundles themselves, which will come later.
+//
+// Handles awareness of composite transforms and similar. Ultimately, after this point
+// the graph stops being a hypergraph, with composite transforms being treated as
+// "leaves" downstream as needed.
+//
+// This is where Combines become lifted (if it makes sense, or is configured), and similar behaviors.
+func (p *preprocessor) preProcessGraph(comps *pipepb.Components) []*stage {
+	ts := comps.GetTransforms()
+
+	// TODO move this out of this part of the pre-processor?
+	leaves := map[string]struct{}{}
+	ignore := map[string]struct{}{}
+	for tid, t := range ts {
+		if _, ok := ignore[tid]; ok {
+			continue
+		}
+
+		spec := t.GetSpec()
+		if spec == nil {
+			// Most composites don't have specs.
+			slog.Debug("transform is missing a spec",
+				slog.Group("transform", slog.String("ID", tid), slog.String("name", t.GetUniqueName())))
+			continue
+		}
+
+		// Composite Transforms basically means needing to remove the "leaves" from the
+		// handling set, and producing the new sub component transforms. The top level
+		// composite should have enough information to produce the new sub transforms.
+		// In particular, the inputs and outputs need to all be connected and matched up
+		// so the topological sort still works out.
+		h := p.transformPreparers[spec.GetUrn()]
+		if h == nil {
+
+			// If there's an unknown urn, and it's not composite, simply add it to the leaves.
+			if len(t.GetSubtransforms()) == 0 {
+				leaves[tid] = struct{}{}
+			} else {
+				slog.Info("composite transform has unknown urn",
+					slog.Group("transform", slog.String("ID", tid),
+						slog.String("name", t.GetUniqueName()),
+						slog.String("urn", spec.GetUrn())))
+			}
+			continue
+		}
+
+		subs, toRemove := h.PrepareTransform(tid, t, comps)
+
+		// Clear out unnecessary leaves from this composite for topological sort handling.
+		for _, key := range toRemove {
+			ignore[key] = struct{}{}
+			delete(leaves, key)
+		}
+
+		// ts should be a clone, so we should be able to add new transforms into the map.
+		for tid, t := range subs.GetTransforms() {
+			leaves[tid] = struct{}{}
+			ts[tid] = t
+		}
+		for cid, c := range subs.GetCoders() {
+			comps.GetCoders()[cid] = c
+		}
+		for nid, n := range subs.GetPcollections() {
+			comps.GetPcollections()[nid] = n
+		}
+		// It's unlikely for these to change, but better to handle them now, to save a headache later.
+		for wid, w := range subs.GetWindowingStrategies() {
+			comps.GetWindowingStrategies()[wid] = w
+		}
+		for envid, env := range subs.GetEnvironments() {
+			comps.GetEnvironments()[envid] = env
+		}
+	}
+
+	// Extract URNs for the given transform.
+
+	keptLeaves := maps.Keys(leaves)
+	sort.Strings(keptLeaves)
+	topological := pipelinex.TopologicalSort(ts, keptLeaves)
+	slog.Debug("topological transform ordering", topological)
+
+	var stages []*stage
+	for _, tid := range topological {
+		stages = append(stages, &stage{
+			transforms: []string{tid},
+		})
+	}
+	return stages
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go
new file mode 100644
index 00000000000..add69a7c767
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"testing"
+
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/google/go-cmp/cmp"
+	"google.golang.org/protobuf/testing/protocmp"
+)
+
+func Test_preprocessor_preProcessGraph(t *testing.T) {
+	tests := []struct {
+		name  string
+		input *pipepb.Components
+
+		wantComponents *pipepb.Components
+		wantStages     []*stage
+	}{
+		{
+			name: "noPreparer",
+			input: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"e1": {
+						UniqueName: "e1",
+						Spec: &pipepb.FunctionSpec{
+							Urn: "defaultUrn",
+						},
+					},
+				},
+			},
+
+			wantStages: []*stage{{transforms: []string{"e1"}}},
+			wantComponents: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"e1": {
+						UniqueName: "e1",
+						Spec: &pipepb.FunctionSpec{
+							Urn: "defaultUrn",
+						},
+					},
+				},
+			},
+		}, {
+			name: "preparer",
+			input: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					"e1": {
+						UniqueName: "e1",
+						Spec: &pipepb.FunctionSpec{
+							Urn: "test_urn",
+						},
+					},
+				},
+				// Initialize maps because they always are by proto unmarshallers.
+				Pcollections:        map[string]*pipepb.PCollection{},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{},
+				Coders:              map[string]*pipepb.Coder{},
+				Environments:        map[string]*pipepb.Environment{},
+			},
+
+			wantStages: []*stage{{transforms: []string{"e1_early"}}, {transforms: []string{"e1_late"}}},
+			wantComponents: &pipepb.Components{
+				Transforms: map[string]*pipepb.PTransform{
+					// Original is always kept
+					"e1": {
+						UniqueName: "e1",
+						Spec: &pipepb.FunctionSpec{
+							Urn: "test_urn",
+						},
+					},
+					"e1_early": {
+						UniqueName: "e1_early",
+						Spec: &pipepb.FunctionSpec{
+							Urn: "defaultUrn",
+						},
+						Outputs:       map[string]string{"i0": "pcol1"},
+						EnvironmentId: "env1",
+					},
+					"e1_late": {
+						UniqueName: "e1_late",
+						Spec: &pipepb.FunctionSpec{
+							Urn: "defaultUrn",
+						},
+						Inputs:        map[string]string{"i0": "pcol1"},
+						EnvironmentId: "env1",
+					},
+				},
+				Pcollections: map[string]*pipepb.PCollection{
+					"pcol1": {
+						UniqueName:          "pcol1",
+						CoderId:             "coder1",
+						WindowingStrategyId: "ws1",
+					},
+				},
+				Coders: map[string]*pipepb.Coder{
+					"coder1": {Spec: &pipepb.FunctionSpec{Urn: "coder1"}},
+				},
+				WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+					"ws1": {WindowCoderId: "global"},
+				},
+				Environments: map[string]*pipepb.Environment{
+					"env1": {Urn: "env1"},
+				},
+			},
+		},
+	}
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			pre := newPreprocessor([]transformPreparer{&testPreparer{}})
+
+			gotStages := pre.preProcessGraph(test.input)
+			if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{})); diff != "" {
+				t.Errorf("preProcessGraph(%q) stages diff (-want,+got)\n%v", test.name, diff)
+			}
+
+			if diff := cmp.Diff(test.input, test.wantComponents, protocmp.Transform()); diff != "" {
+				t.Errorf("preProcessGraph(%q) components diff (-want,+got)\n%v", test.name, diff)
+			}
+		})
+	}
+}
+
+type testPreparer struct{}
+
+func (p *testPreparer) PrepareUrns() []string {
+	return []string{"test_urn"}
+}
+
+func (p *testPreparer) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
+	return &pipepb.Components{
+		Transforms: map[string]*pipepb.PTransform{
+			"e1_early": {
+				UniqueName: "e1_early",
+				Spec: &pipepb.FunctionSpec{
+					Urn: "defaultUrn",
+				},
+				Outputs:       map[string]string{"i0": "pcol1"},
+				EnvironmentId: "env1",
+			},
+			"e1_late": {
+				UniqueName: "e1_late",
+				Spec: &pipepb.FunctionSpec{
+					Urn: "defaultUrn",
+				},
+				Inputs:        map[string]string{"i0": "pcol1"},
+				EnvironmentId: "env1",
+			},
+		},
+		Pcollections: map[string]*pipepb.PCollection{
+			"pcol1": {
+				UniqueName:          "pcol1",
+				CoderId:             "coder1",
+				WindowingStrategyId: "ws1",
+			},
+		},
+		Coders: map[string]*pipepb.Coder{
+			"coder1": {Spec: &pipepb.FunctionSpec{Urn: "coder1"}},
+		},
+		WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+			"ws1": {WindowCoderId: "global"},
+		},
+		Environments: map[string]*pipepb.Environment{
+			"env1": {Urn: "env1"},
+		},
+	}, []string{"e1"}
+}