You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/02/20 00:43:55 UTC
[beam] branch master updated: [#24789][prism] add preprocessor and test (#25520)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6667eb4741b [#24789][prism] add preprocessor and test (#25520)
6667eb4741b is described below
commit 6667eb4741bad1bad199b61012476d62ea3d7e27
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Sun Feb 19 16:43:48 2023 -0800
[#24789][prism] add preprocessor and test (#25520)
* [prism] add preprocessor and test
* [prism] preparer comment
* [prism] move preparer
---------
Co-authored-by: lostluck <13...@users.noreply.github.com>
---
sdks/go/pkg/beam/runners/prism/internal/execute.go | 22 +++
.../pkg/beam/runners/prism/internal/preprocess.go | 148 +++++++++++++++++
.../beam/runners/prism/internal/preprocess_test.go | 181 +++++++++++++++++++++
3 files changed, 351 insertions(+)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go
new file mode 100644
index 00000000000..b685df63cf6
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+// stage represents a fused subgraph.
+// temporary implementation to break up PRs.
+type stage struct {
+ transforms []string
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
new file mode 100644
index 00000000000..8769a05d38f
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "sort"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+ "golang.org/x/exp/maps"
+ "golang.org/x/exp/slog"
+)
+
+// transformPreparer is an interface for handling different urns in the preprocessor
+// largely for exchanging transforms for others, to be added to the complete set of
+// components in the pipeline.
+type transformPreparer interface {
+ // PrepareUrns returns the Beam URNs that this handler deals with for preprocessing.
+ PrepareUrns() []string
+ // PrepareTransform takes a PTransform proto and returns a set of new Components, and a list of
+ // transformIDs leaves to remove and ignore from graph processing.
+ PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string)
+}
+
+// preprocessor retains configuration for preprocessing the
+// graph, such as special handling for lifted combiners or
+// other configuration.
+type preprocessor struct {
+ transformPreparers map[string]transformPreparer
+}
+
+func newPreprocessor(preps []transformPreparer) *preprocessor {
+ preparers := map[string]transformPreparer{}
+ for _, prep := range preps {
+ for _, urn := range prep.PrepareUrns() {
+ preparers[urn] = prep
+ }
+ }
+ return &preprocessor{
+ transformPreparers: preparers,
+ }
+}
+
+// preProcessGraph takes the graph and preprocesses for consumption in bundles.
+// The output is the topological sort of the transform ids.
+//
+// These are how transforms are related in graph form, but not the specific bundles themselves, which will come later.
+//
+// Handles awareness of composite transforms and similar. Ultimately, after this point
+// the graph stops being a hypergraph, with composite transforms being treated as
+// "leaves" downstream as needed.
+//
+// This is where Combines become lifted (if it makes sense, or is configured), and similar behaviors.
+func (p *preprocessor) preProcessGraph(comps *pipepb.Components) []*stage {
+ ts := comps.GetTransforms()
+
+ // TODO move this out of this part of the pre-processor?
+ leaves := map[string]struct{}{}
+ ignore := map[string]struct{}{}
+ for tid, t := range ts {
+ if _, ok := ignore[tid]; ok {
+ continue
+ }
+
+ spec := t.GetSpec()
+ if spec == nil {
+ // Most composites don't have specs.
+ slog.Debug("transform is missing a spec",
+ slog.Group("transform", slog.String("ID", tid), slog.String("name", t.GetUniqueName())))
+ continue
+ }
+
+ // Composite Transforms basically means needing to remove the "leaves" from the
+ // handling set, and producing the new sub component transforms. The top level
+ // composite should have enough information to produce the new sub transforms.
+ // In particular, the inputs and outputs need to all be connected and matched up
+ // so the topological sort still works out.
+ h := p.transformPreparers[spec.GetUrn()]
+ if h == nil {
+
+ // If there's an unknown urn, and it's not composite, simply add it to the leaves.
+ if len(t.GetSubtransforms()) == 0 {
+ leaves[tid] = struct{}{}
+ } else {
+ slog.Info("composite transform has unknown urn",
+ slog.Group("transform", slog.String("ID", tid),
+ slog.String("name", t.GetUniqueName()),
+ slog.String("urn", spec.GetUrn())))
+ }
+ continue
+ }
+
+ subs, toRemove := h.PrepareTransform(tid, t, comps)
+
+ // Clear out unnecessary leaves from this composite for topological sort handling.
+ for _, key := range toRemove {
+ ignore[key] = struct{}{}
+ delete(leaves, key)
+ }
+
+ // ts should be a clone, so we should be able to add new transforms into the map.
+ for tid, t := range subs.GetTransforms() {
+ leaves[tid] = struct{}{}
+ ts[tid] = t
+ }
+ for cid, c := range subs.GetCoders() {
+ comps.GetCoders()[cid] = c
+ }
+ for nid, n := range subs.GetPcollections() {
+ comps.GetPcollections()[nid] = n
+ }
+ // It's unlikely for these to change, but better to handle them now, to save a headache later.
+ for wid, w := range subs.GetWindowingStrategies() {
+ comps.GetWindowingStrategies()[wid] = w
+ }
+ for envid, env := range subs.GetEnvironments() {
+ comps.GetEnvironments()[envid] = env
+ }
+ }
+
+ // Extract URNs for the given transform.
+
+ keptLeaves := maps.Keys(leaves)
+ sort.Strings(keptLeaves)
+ topological := pipelinex.TopologicalSort(ts, keptLeaves)
+ slog.Debug("topological transform ordering", topological)
+
+ var stages []*stage
+ for _, tid := range topological {
+ stages = append(stages, &stage{
+ transforms: []string{tid},
+ })
+ }
+ return stages
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go
new file mode 100644
index 00000000000..add69a7c767
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess_test.go
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "testing"
+
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+ "github.com/google/go-cmp/cmp"
+ "google.golang.org/protobuf/testing/protocmp"
+)
+
+func Test_preprocessor_preProcessGraph(t *testing.T) {
+ tests := []struct {
+ name string
+ input *pipepb.Components
+
+ wantComponents *pipepb.Components
+ wantStages []*stage
+ }{
+ {
+ name: "noPreparer",
+ input: &pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{
+ "e1": {
+ UniqueName: "e1",
+ Spec: &pipepb.FunctionSpec{
+ Urn: "defaultUrn",
+ },
+ },
+ },
+ },
+
+ wantStages: []*stage{{transforms: []string{"e1"}}},
+ wantComponents: &pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{
+ "e1": {
+ UniqueName: "e1",
+ Spec: &pipepb.FunctionSpec{
+ Urn: "defaultUrn",
+ },
+ },
+ },
+ },
+ }, {
+ name: "preparer",
+ input: &pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{
+ "e1": {
+ UniqueName: "e1",
+ Spec: &pipepb.FunctionSpec{
+ Urn: "test_urn",
+ },
+ },
+ },
+ // Initialize maps because they always are by proto unmarshallers.
+ Pcollections: map[string]*pipepb.PCollection{},
+ WindowingStrategies: map[string]*pipepb.WindowingStrategy{},
+ Coders: map[string]*pipepb.Coder{},
+ Environments: map[string]*pipepb.Environment{},
+ },
+
+ wantStages: []*stage{{transforms: []string{"e1_early"}}, {transforms: []string{"e1_late"}}},
+ wantComponents: &pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{
+ // Original is always kept
+ "e1": {
+ UniqueName: "e1",
+ Spec: &pipepb.FunctionSpec{
+ Urn: "test_urn",
+ },
+ },
+ "e1_early": {
+ UniqueName: "e1_early",
+ Spec: &pipepb.FunctionSpec{
+ Urn: "defaultUrn",
+ },
+ Outputs: map[string]string{"i0": "pcol1"},
+ EnvironmentId: "env1",
+ },
+ "e1_late": {
+ UniqueName: "e1_late",
+ Spec: &pipepb.FunctionSpec{
+ Urn: "defaultUrn",
+ },
+ Inputs: map[string]string{"i0": "pcol1"},
+ EnvironmentId: "env1",
+ },
+ },
+ Pcollections: map[string]*pipepb.PCollection{
+ "pcol1": {
+ UniqueName: "pcol1",
+ CoderId: "coder1",
+ WindowingStrategyId: "ws1",
+ },
+ },
+ Coders: map[string]*pipepb.Coder{
+ "coder1": {Spec: &pipepb.FunctionSpec{Urn: "coder1"}},
+ },
+ WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+ "ws1": {WindowCoderId: "global"},
+ },
+ Environments: map[string]*pipepb.Environment{
+ "env1": {Urn: "env1"},
+ },
+ },
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ pre := newPreprocessor([]transformPreparer{&testPreparer{}})
+
+ gotStages := pre.preProcessGraph(test.input)
+ if diff := cmp.Diff(test.wantStages, gotStages, cmp.AllowUnexported(stage{})); diff != "" {
+ t.Errorf("preProcessGraph(%q) stages diff (-want,+got)\n%v", test.name, diff)
+ }
+
+ if diff := cmp.Diff(test.input, test.wantComponents, protocmp.Transform()); diff != "" {
+ t.Errorf("preProcessGraph(%q) components diff (-want,+got)\n%v", test.name, diff)
+ }
+ })
+ }
+}
+
+type testPreparer struct{}
+
+func (p *testPreparer) PrepareUrns() []string {
+ return []string{"test_urn"}
+}
+
+func (p *testPreparer) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
+ return &pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{
+ "e1_early": {
+ UniqueName: "e1_early",
+ Spec: &pipepb.FunctionSpec{
+ Urn: "defaultUrn",
+ },
+ Outputs: map[string]string{"i0": "pcol1"},
+ EnvironmentId: "env1",
+ },
+ "e1_late": {
+ UniqueName: "e1_late",
+ Spec: &pipepb.FunctionSpec{
+ Urn: "defaultUrn",
+ },
+ Inputs: map[string]string{"i0": "pcol1"},
+ EnvironmentId: "env1",
+ },
+ },
+ Pcollections: map[string]*pipepb.PCollection{
+ "pcol1": {
+ UniqueName: "pcol1",
+ CoderId: "coder1",
+ WindowingStrategyId: "ws1",
+ },
+ },
+ Coders: map[string]*pipepb.Coder{
+ "coder1": {Spec: &pipepb.FunctionSpec{Urn: "coder1"}},
+ },
+ WindowingStrategies: map[string]*pipepb.WindowingStrategy{
+ "ws1": {WindowCoderId: "global"},
+ },
+ Environments: map[string]*pipepb.Environment{
+ "env1": {Urn: "env1"},
+ },
+ }, []string{"e1"}
+}