You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/02/20 16:57:09 UTC
[beam] branch master updated: [#24789][prism] Handlers for combine, ParDo, GBK, Flatten (#25558)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 85ebc2f46b0 [#24789][prism] Handlers for combine, ParDo, GBK, Flatten (#25558)
85ebc2f46b0 is described below
commit 85ebc2f46b07ed245c6b6094781e4b512aeeb5c7
Author: Robert Burke <lo...@users.noreply.github.com>
AuthorDate: Mon Feb 20 08:57:02 2023 -0800
[#24789][prism] Handlers for combine, ParDo, GBK, Flatten (#25558)
* [prism] add in handlers
* [prism] executor interface
* delint - rm processor
* [prism] comments
---
sdks/go/pkg/beam/runners/prism/internal/execute.go | 12 +
.../beam/runners/prism/internal/handlecombine.go | 209 +++++++++++++++
.../pkg/beam/runners/prism/internal/handlepardo.go | 244 +++++++++++++++++
.../beam/runners/prism/internal/handlerunner.go | 298 +++++++++++++++++++++
4 files changed, 763 insertions(+)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index b685df63cf6..7c979ebf730 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -15,8 +15,20 @@
package internal
+import (
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
+)
+
// stage represents a fused subgraph.
// temporary implementation to break up PRs.
type stage struct {
transforms []string
}
+
+type transformExecuter interface {
+ ExecuteUrns() []string
+ ExecuteWith(t *pipepb.PTransform) string
+ ExecuteTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, data [][]byte) *worker.B
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go b/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go
new file mode 100644
index 00000000000..ff9bd1e1c88
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "fmt"
+ "reflect"
+
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+ "google.golang.org/protobuf/proto"
+)
+
+// This file retains the logic for the combine handler
+
+// CombineCharacteristic holds the configuration for Combines.
+type CombineCharacteristic struct {
+ EnableLifting bool // Sets whether a combine composite does combiner lifting or not.
+}
+
+// TODO figure out the factory we'd like.
+
+func Combine(config any) *combine {
+ return &combine{config: config.(CombineCharacteristic)}
+}
+
+// combine represents an instance of the combine handler.
+type combine struct {
+ config CombineCharacteristic
+}
+
+// ConfigURN returns the name for combine in the configuration file.
+func (*combine) ConfigURN() string {
+ return "combine"
+}
+
+func (*combine) ConfigCharacteristic() reflect.Type {
+ return reflect.TypeOf((*CombineCharacteristic)(nil)).Elem()
+}
+
+var _ transformPreparer = (*combine)(nil)
+
+func (*combine) PrepareUrns() []string {
+ return []string{urns.TransformCombinePerKey}
+}
+
+// PrepareTransform returns lifted combines and removes the leaves if enabled. Otherwise returns nothing.
+func (h *combine) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
+ // If we aren't lifting, the "default impl" for combines should be sufficient.
+ if !h.config.EnableLifting {
+ return nil, nil
+ }
+
+ // To lift a combine, the spec should contain a CombinePayload.
+ // That contains the actual FunctionSpec for the DoFn, and the
+ // id for the accumulator coder.
+ // We can synthetically produce/determine the remaining coders for
+ // the Input and Output types from the existing PCollections.
+ //
+ // This means we also need to synthesize pcollections with the accumulator coder too.
+
+ // What we have:
+ // Input PCol: KV<K, I> -- INPUT
+ // -> GBK := KV<K, Iter<I>> -- GROUPED_I
+ // -> Combine := KV<K, O> -- OUTPUT
+ //
+ // What we want:
+ // Input PCol: KV<K, I> -- INPUT
+ // -> PreCombine := KV<K, A> -- LIFTED
+ // -> GBK -> KV<K, Iter<A>> -- GROUPED_A
+ // -> MergeAccumulators := KV<K, A> -- MERGED_A
+ // -> ExtractOutput -> KV<K, O> -- OUTPUT
+ //
+ // First we need to produce new coders for Iter<A>, KV<K, Iter<A>>, and KV<K, A>.
+ // The A coder ID is in the combine payload.
+ //
+ // Then we can produce the PCollections.
+ // We can reuse the INPUT and OUTPUT PCollections.
+ // We need LIFTED to have KV<K, A> kv_k_a
+ // We need GROUPED_A to have KV<K, Iter<A>> kv_k_iter_a
+ // We need MERGED_A to have KV<K, A> kv_k_a
+ //
+ // GROUPED_I ends up unused.
+ //
+ // The PCollections inherit the properties of the Input PCollection
+ // such as Boundedness, and Windowing Strategy.
+ //
+ // With these, we can produce the PTransforms with the appropriate URNs for the
+ // different parts of the composite, and return the new components.
+
+ cmbPayload := t.GetSpec().GetPayload()
+ cmb := &pipepb.CombinePayload{}
+ if err := (proto.UnmarshalOptions{}).Unmarshal(cmbPayload, cmb); err != nil {
+ panic(fmt.Sprintf("unable to decode ParDoPayload for transform[%v]", t.GetUniqueName()))
+ }
+
+ // First lets get the key coder ID.
+ var pcolInID string
+ // There's only one input.
+ for _, pcol := range t.GetInputs() {
+ pcolInID = pcol
+ }
+ inputPCol := comps.GetPcollections()[pcolInID]
+ kvkiID := inputPCol.GetCoderId()
+ kID := comps.GetCoders()[kvkiID].GetComponentCoderIds()[0]
+
+ // Now we can start synthesis!
+ // Coder IDs
+ aID := cmb.AccumulatorCoderId
+
+ ckvprefix := "c" + tid + "_kv_"
+
+ iterACID := "c" + tid + "_iter_" + aID
+ kvkaCID := ckvprefix + kID + "_" + aID
+ kvkIterACID := ckvprefix + kID + "_iter" + aID
+
+ // PCollection IDs
+ nprefix := "n" + tid + "_"
+ liftedNID := nprefix + "lifted"
+ groupedNID := nprefix + "grouped"
+ mergedNID := nprefix + "merged"
+
+ // Now we need the output collection ID
+ var pcolOutID string
+ // There's only one input.
+ for _, pcol := range t.GetOutputs() {
+ pcolOutID = pcol
+ }
+
+ // Transform IDs
+ eprefix := "e" + tid + "_"
+ liftEID := eprefix + "lift"
+ gbkEID := eprefix + "gbk"
+ mergeEID := eprefix + "merge"
+ extractEID := eprefix + "extract"
+
+ coder := func(urn string, componentIDs ...string) *pipepb.Coder {
+ return &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urn,
+ },
+ ComponentCoderIds: componentIDs,
+ }
+ }
+
+ pcol := func(name, coderID string) *pipepb.PCollection {
+ return &pipepb.PCollection{
+ UniqueName: name,
+ CoderId: coderID,
+ IsBounded: inputPCol.GetIsBounded(),
+ WindowingStrategyId: inputPCol.GetWindowingStrategyId(),
+ }
+ }
+
+ tform := func(name, urn, in, out, env string) *pipepb.PTransform {
+ return &pipepb.PTransform{
+ UniqueName: name,
+ Spec: &pipepb.FunctionSpec{
+ Urn: urn,
+ Payload: cmbPayload,
+ },
+ Inputs: map[string]string{
+ "i0": in,
+ },
+ Outputs: map[string]string{
+ "i0": out,
+ },
+ EnvironmentId: env,
+ }
+ }
+
+ newComps := &pipepb.Components{
+ Coders: map[string]*pipepb.Coder{
+ iterACID: coder(urns.CoderIterable, aID),
+ kvkaCID: coder(urns.CoderKV, kID, aID),
+ kvkIterACID: coder(urns.CoderKV, kID, iterACID),
+ },
+ Pcollections: map[string]*pipepb.PCollection{
+ liftedNID: pcol(liftedNID, kvkaCID),
+ groupedNID: pcol(groupedNID, kvkIterACID),
+ mergedNID: pcol(mergedNID, kvkaCID),
+ },
+ Transforms: map[string]*pipepb.PTransform{
+ liftEID: tform(liftEID, urns.TransformPreCombine, pcolInID, liftedNID, t.GetEnvironmentId()),
+ gbkEID: tform(gbkEID, urns.TransformGBK, liftedNID, groupedNID, ""),
+ mergeEID: tform(mergeEID, urns.TransformMerge, groupedNID, mergedNID, t.GetEnvironmentId()),
+ extractEID: tform(mergeEID, urns.TransformExtract, mergedNID, pcolOutID, t.GetEnvironmentId()),
+ },
+ }
+
+ // Now we return everything!
+ // TODO recurse through sub transforms to remove?
+ // We don't need to remove the composite, since we don't add it in
+ // when we return the new transforms, so it's not in the topology.
+ return newComps, t.GetSubtransforms()
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
new file mode 100644
index 00000000000..2ac5ca5bbf5
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "fmt"
+ "reflect"
+
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+ "golang.org/x/exp/maps"
+ "google.golang.org/protobuf/proto"
+)
+
+// This file retains the logic for the pardo handler
+
+// ParDoCharacteristic holds the configuration for ParDos.
+type ParDoCharacteristic struct {
+ DisableSDF bool // Sets whether a pardo supports SDFs or not.
+}
+
+func ParDo(config any) *pardo {
+ return &pardo{config: config.(ParDoCharacteristic)}
+}
+
+// pardo represents an instance of the pardo handler.
+type pardo struct {
+ config ParDoCharacteristic
+}
+
+// ConfigURN returns the name for combine in the configuration file.
+func (*pardo) ConfigURN() string {
+ return "pardo"
+}
+
+func (*pardo) ConfigCharacteristic() reflect.Type {
+ return reflect.TypeOf((*ParDoCharacteristic)(nil)).Elem()
+}
+
+var _ transformPreparer = (*pardo)(nil)
+
+func (*pardo) PrepareUrns() []string {
+ return []string{urns.TransformParDo}
+}
+
+// PrepareTransform handles special processing with respect to ParDos, since their handling is dependant on supported features
+// and requirements.
+func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
+
+ // ParDos are a pain in the butt.
+ // Combines, by comparison, are dramatically simpler.
+ // This is because for ParDos, how they are handled, and what kinds of transforms are in
+ // and around the ParDo, the actual shape of the graph will change.
+ // At their simplest, it's something a DoFn will handle on their own.
+ // At their most complex, they require intimate interaction with the subgraph
+ // bundling process, the data layer, state layers, and control layers.
+ // But unlike combines, which have a clear urn for composite + special payload,
+ // ParDos have the standard URN for composites with the standard payload.
+ // So always, we need to first unmarshal the payload.
+
+ pardoPayload := t.GetSpec().GetPayload()
+ pdo := &pipepb.ParDoPayload{}
+ if err := (proto.UnmarshalOptions{}).Unmarshal(pardoPayload, pdo); err != nil {
+ panic(fmt.Sprintf("unable to decode ParDoPayload for transform[%v]", t.GetUniqueName()))
+ }
+
+ // Lets check for and remove anything that makes things less simple.
+ if pdo.OnWindowExpirationTimerFamilySpec == "" &&
+ !pdo.RequestsFinalization &&
+ !pdo.RequiresStableInput &&
+ !pdo.RequiresTimeSortedInput &&
+ len(pdo.StateSpecs) == 0 &&
+ len(pdo.TimerFamilySpecs) == 0 &&
+ pdo.RestrictionCoderId == "" {
+ // Which inputs are Side inputs don't change the graph further,
+ // so they're not included here. Any nearly any ParDo can have them.
+
+ // At their simplest, we don't need to do anything special at pre-processing time, and simply pass through as normal.
+ return &pipepb.Components{
+ Transforms: map[string]*pipepb.PTransform{
+ tid: t,
+ },
+ }, nil
+ }
+
+ // Side inputs add to topology and make fusion harder to deal with
+ // (side input producers can't be in the same stage as their consumers)
+ // But we don't have fusion yet, so no worries.
+
+ // State, Timers, Stable Input, Time Sorted Input, and some parts of SDF
+ // Are easier to deal including a fusion break. But We can do that with a
+ // runner specific transform for stable input, and another for timesorted
+ // input.
+
+ // SplittableDoFns have 3 required phases and a 4th optional phase.
+ //
+ // PAIR_WITH_RESTRICTION which pairs elements with their restrictions
+ // Input: element; := INPUT
+ // Output: KV(element, restriction) := PWR
+ //
+ // SPLIT_AND_SIZE_RESTRICTIONS splits the pairs into sub element ranges
+ // and a relative size for each, in a float64 format.
+ // Input: KV(element, restriction) := PWR
+ // Output: KV(KV(element, restriction), float64) := SPLITnSIZED
+ //
+ // PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS actually processes the
+ // elements. This is also where splits need to be handled.
+ // In particular, primary and residual splits have the same format as the input.
+ // Input: KV(KV(element, restriction), size) := SPLITnSIZED
+ // Output: DoFn's output. := OUTPUT
+ //
+ // TRUNCATE_SIZED_RESTRICTION is how the runner has an SDK turn an
+ // unbounded transform into a bound one. Not needed until the pipeline
+ // is told to drain.
+ // Input: KV(KV(element, restriction), float64) := synthetic split results from above
+ // Output: KV(KV(element, restriction), float64). := synthetic, truncated results sent as Split n Sized
+ //
+ // So with that, we can figure out the coders we need.
+ //
+ // cE - Element Coder (same as input coder)
+ // cR - Restriction Coder
+ // cS - Size Coder (float64)
+ // ckvER - KV<Element, Restriction>
+ // ckvERS - KV<KV<Element, Restriction>, Size>
+ //
+ // There could be a few output coders, but the outputs can be copied from
+ // the original transform directly.
+
+ // First lets get the parallel input coder ID.
+ var pcolInID, inputLocalID string
+ for localID, globalID := range t.GetInputs() {
+ // The parallel input is the one that isn't a side input.
+ if _, ok := pdo.SideInputs[localID]; !ok {
+ inputLocalID = localID
+ pcolInID = globalID
+ break
+ }
+ }
+ inputPCol := comps.GetPcollections()[pcolInID]
+ cEID := inputPCol.GetCoderId()
+ cRID := pdo.RestrictionCoderId
+ cSID := "c" + tid + "size"
+ ckvERID := "c" + tid + "kv_ele_rest"
+ ckvERSID := ckvERID + "_size"
+
+ coder := func(urn string, componentIDs ...string) *pipepb.Coder {
+ return &pipepb.Coder{
+ Spec: &pipepb.FunctionSpec{
+ Urn: urn,
+ },
+ ComponentCoderIds: componentIDs,
+ }
+ }
+
+ coders := map[string]*pipepb.Coder{
+ ckvERID: coder(urns.CoderKV, cEID, cRID),
+ cSID: coder(urns.CoderDouble),
+ ckvERSID: coder(urns.CoderKV, ckvERID, cSID),
+ }
+
+ // PCollections only have two new ones.
+ // INPUT -> same as ordinary DoFn
+ // PWR, uses ckvER
+ // SPLITnSIZED, uses ckvERS
+ // OUTPUT -> same as ordinary outputs
+
+ nPWRID := "n" + tid + "_pwr"
+ nSPLITnSIZEDID := "n" + tid + "_splitnsized"
+
+ pcol := func(name, coderID string) *pipepb.PCollection {
+ return &pipepb.PCollection{
+ UniqueName: name,
+ CoderId: coderID,
+ IsBounded: inputPCol.GetIsBounded(),
+ WindowingStrategyId: inputPCol.GetWindowingStrategyId(),
+ }
+ }
+
+ pcols := map[string]*pipepb.PCollection{
+ nPWRID: pcol(nPWRID, ckvERID),
+ nSPLITnSIZEDID: pcol(nSPLITnSIZEDID, ckvERSID),
+ }
+
+ // PTransforms have 3 new ones, with process sized elements and restrictions
+ // taking the brunt of the complexity, consuming the inputs
+
+ ePWRID := "e" + tid + "_pwr"
+ eSPLITnSIZEDID := "e" + tid + "_splitnsize"
+ eProcessID := "e" + tid + "_processandsplit"
+
+ tform := func(name, urn, in, out string) *pipepb.PTransform {
+ return &pipepb.PTransform{
+ UniqueName: name,
+ Spec: &pipepb.FunctionSpec{
+ Urn: urn,
+ Payload: pardoPayload,
+ },
+ Inputs: map[string]string{
+ inputLocalID: in,
+ },
+ Outputs: map[string]string{
+ "i0": out,
+ },
+ EnvironmentId: t.GetEnvironmentId(),
+ }
+ }
+
+ newInputs := maps.Clone(t.GetInputs())
+ newInputs[inputLocalID] = nSPLITnSIZEDID
+
+ tforms := map[string]*pipepb.PTransform{
+ ePWRID: tform(ePWRID, urns.TransformPairWithRestriction, pcolInID, nPWRID),
+ eSPLITnSIZEDID: tform(eSPLITnSIZEDID, urns.TransformSplitAndSize, nPWRID, nSPLITnSIZEDID),
+ eProcessID: {
+ UniqueName: eProcessID,
+ Spec: &pipepb.FunctionSpec{
+ Urn: urns.TransformProcessSizedElements,
+ Payload: pardoPayload,
+ },
+ Inputs: newInputs,
+ Outputs: t.GetOutputs(),
+ EnvironmentId: t.GetEnvironmentId(),
+ },
+ }
+
+ return &pipepb.Components{
+ Coders: coders,
+ Pcollections: pcols,
+ Transforms: tforms,
+ }, t.GetSubtransforms()
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
new file mode 100644
index 00000000000..e841620625e
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "reflect"
+ "sort"
+
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+ pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+ "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
+ "golang.org/x/exp/slog"
+ "google.golang.org/protobuf/encoding/prototext"
+ "google.golang.org/protobuf/proto"
+)
+
+// This file retains the logic for the pardo handler
+
+// RunnerCharacteristic holds the configuration for Runner based transforms,
+// such as GBKs, Flattens.
+type RunnerCharacteristic struct {
+ SDKFlatten bool // Sets whether we should force an SDK side flatten.
+ SDKGBK bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK.
+}
+
+func Runner(config any) *runner {
+ return &runner{config: config.(RunnerCharacteristic)}
+}
+
+// runner represents an instance of the runner transform handler.
+type runner struct {
+ config RunnerCharacteristic
+}
+
+// ConfigURN returns the name for combine in the configuration file.
+func (*runner) ConfigURN() string {
+ return "runner"
+}
+
+func (*runner) ConfigCharacteristic() reflect.Type {
+ return reflect.TypeOf((*RunnerCharacteristic)(nil)).Elem()
+}
+
+var _ transformExecuter = (*runner)(nil)
+
+func (*runner) ExecuteUrns() []string {
+ return []string{urns.TransformFlatten, urns.TransformGBK}
+}
+
+// ExecuteWith returns what environment the
+func (h *runner) ExecuteWith(t *pipepb.PTransform) string {
+ urn := t.GetSpec().GetUrn()
+ if urn == urns.TransformFlatten && !h.config.SDKFlatten {
+ return ""
+ }
+ if urn == urns.TransformGBK && !h.config.SDKGBK {
+ return ""
+ }
+ return t.GetEnvironmentId()
+}
+
+// ExecuteTransform handles special processing with respect to runner specific transforms
+func (h *runner) ExecuteTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, inputData [][]byte) *worker.B {
+ urn := t.GetSpec().GetUrn()
+ var data [][]byte
+ var onlyOut string
+ for _, out := range t.GetOutputs() {
+ onlyOut = out
+ }
+
+ switch urn {
+ case urns.TransformFlatten:
+ // Already done and collated.
+ data = inputData
+
+ case urns.TransformGBK:
+ ws := windowingStrategy(comps, tid)
+ kvc := onlyInputCoderForTransform(comps, tid)
+
+ coders := map[string]*pipepb.Coder{}
+
+ // TODO assert this is a KV. It's probably fine, but we should fail anyway.
+ wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
+ kcID := lpUnknownCoders(kvc.GetComponentCoderIds()[0], coders, comps.GetCoders())
+ ecID := lpUnknownCoders(kvc.GetComponentCoderIds()[1], coders, comps.GetCoders())
+ reconcileCoders(coders, comps.GetCoders())
+
+ wc := coders[wcID]
+ kc := coders[kcID]
+ ec := coders[ecID]
+
+ data = append(data, gbkBytes(ws, wc, kc, ec, inputData, coders, watermark))
+ if len(data[0]) == 0 {
+ panic("no data for GBK")
+ }
+ default:
+ panic(fmt.Sprintf("unimplemented runner transform[%v]", urn))
+ }
+
+ // To avoid conflicts with these single transform
+ // bundles, we suffix the transform IDs.
+ var localID string
+ for key := range t.GetOutputs() {
+ localID = key
+ }
+
+ if localID == "" {
+ panic(fmt.Sprintf("bad transform: %v", prototext.Format(t)))
+ }
+ output := engine.TentativeData{}
+ for _, d := range data {
+ output.WriteData(onlyOut, d)
+ }
+
+ dataID := tid + "_" + localID // The ID from which the consumer will read from.
+ b := &worker.B{
+ InputTransformID: dataID,
+ SinkToPCollection: map[string]string{
+ dataID: onlyOut,
+ },
+ OutputData: output,
+ }
+ return b
+}
+
+// windowingStrategy sources the transform's windowing strategy from a single parallel input.
+func windowingStrategy(comps *pipepb.Components, tid string) *pipepb.WindowingStrategy {
+ t := comps.GetTransforms()[tid]
+ var inputPColID string
+ for _, pcolID := range t.GetInputs() {
+ inputPColID = pcolID
+ }
+ pcol := comps.GetPcollections()[inputPColID]
+ return comps.GetWindowingStrategies()[pcol.GetWindowingStrategyId()]
+}
+
+// gbkBytes re-encodes gbk inputs in a gbk result.
+func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder, watermark mtime.Time) []byte {
+ var outputTime func(typex.Window, mtime.Time) mtime.Time
+ switch ws.GetOutputTime() {
+ case pipepb.OutputTime_END_OF_WINDOW:
+ outputTime = func(w typex.Window, et mtime.Time) mtime.Time {
+ return w.MaxTimestamp()
+ }
+ default:
+ // TODO need to correct session logic if output time is different.
+ panic(fmt.Sprintf("unsupported OutputTime behavior: %v", ws.GetOutputTime()))
+ }
+ wDec, wEnc := makeWindowCoders(wc)
+
+ type keyTime struct {
+ key []byte
+ w typex.Window
+ time mtime.Time
+ values [][]byte
+ }
+ // Map windows to a map of keys to a map of keys to time.
+ // We ultimately emit the window, the key, the time, and the iterable of elements,
+ // all contained in the final value.
+ windows := map[typex.Window]map[string]keyTime{}
+
+ kd := pullDecoder(kc, coders)
+ vd := pullDecoder(vc, coders)
+
+ // Right, need to get the key coder, and the element coder.
+ // Cus I'll need to pull out anything the runner knows how to deal with.
+ // And repeat.
+ for _, data := range toAggregate {
+ // Parse out each element's data, and repeat.
+ buf := bytes.NewBuffer(data)
+ for {
+ ws, tm, _, err := exec.DecodeWindowedValueHeader(wDec, buf)
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ panic(fmt.Sprintf("can't decode windowed value header with %v: %v", wc, err))
+ }
+
+ keyByt := kd(buf)
+ key := string(keyByt)
+ value := vd(buf)
+ for _, w := range ws {
+ ft := outputTime(w, tm)
+ wk, ok := windows[w]
+ if !ok {
+ wk = make(map[string]keyTime)
+ windows[w] = wk
+ }
+ kt := wk[key]
+ kt.time = ft
+ kt.key = keyByt
+ kt.w = w
+ kt.values = append(kt.values, value)
+ wk[key] = kt
+ }
+ }
+ }
+
+ // If the strategy is session windows, then we need to get all the windows, sort them
+ // and see which ones need to be merged together.
+ if ws.GetWindowFn().GetUrn() == urns.WindowFnSession {
+ slog.Debug("sorting by session window")
+ session := &pipepb.SessionWindowsPayload{}
+ if err := (proto.UnmarshalOptions{}).Unmarshal(ws.GetWindowFn().GetPayload(), session); err != nil {
+ panic("unable to decode SessionWindowsPayload")
+ }
+ gapSize := mtime.Time(session.GetGapSize().AsDuration())
+
+ ordered := make([]window.IntervalWindow, 0, len(windows))
+ for k := range windows {
+ ordered = append(ordered, k.(window.IntervalWindow))
+ }
+ // Use a decreasing sort (latest to earliest) so we can correct
+ // the output timestamp to the new end of window immeadiately.
+ // TODO need to correct this if output time is different.
+ sort.Slice(ordered, func(i, j int) bool {
+ return ordered[i].MaxTimestamp() > ordered[j].MaxTimestamp()
+ })
+
+ cur := ordered[0]
+ sessionData := windows[cur]
+ for _, iw := range ordered[1:] {
+ // If they overlap, then we merge the data.
+ if iw.End+gapSize < cur.Start {
+ // Start a new session.
+ windows[cur] = sessionData
+ cur = iw
+ sessionData = windows[iw]
+ continue
+ }
+ // Extend the session
+ cur.Start = iw.Start
+ toMerge := windows[iw]
+ delete(windows, iw)
+ for k, kt := range toMerge {
+ skt := sessionData[k]
+ skt.key = kt.key
+ skt.w = cur
+ skt.values = append(skt.values, kt.values...)
+ sessionData[k] = skt
+ }
+ }
+ }
+ // Everything's aggregated!
+ // Time to turn things into a windowed KV<K, Iterable<V>>
+
+ var buf bytes.Buffer
+ for _, w := range windows {
+ for _, kt := range w {
+ exec.EncodeWindowedValueHeader(
+ wEnc,
+ []typex.Window{kt.w},
+ kt.time,
+ typex.NoFiringPane(),
+ &buf,
+ )
+ buf.Write(kt.key)
+ coder.EncodeInt32(int32(len(kt.values)), &buf)
+ for _, value := range kt.values {
+ buf.Write(value)
+ }
+ }
+ }
+ return buf.Bytes()
+}
+
+func onlyInputCoderForTransform(comps *pipepb.Components, tid string) *pipepb.Coder {
+ t := comps.GetTransforms()[tid]
+ var inputPColID string
+ for _, pcolID := range t.GetInputs() {
+ inputPColID = pcolID
+ }
+ pcol := comps.GetPcollections()[inputPColID]
+ return comps.GetCoders()[pcol.GetCoderId()]
+}