You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2023/02/20 01:49:25 UTC

[beam] 01/02: [prism] add in handlers

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch prism-handlers
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ca36093e625e848bb8f569349c25d6e53167dd3e
Author: Robert Burke <ro...@frantil.com>
AuthorDate: Sun Feb 19 07:18:43 2023 -0800

    [prism] add in handlers
---
 .../beam/runners/prism/internal/handlecombine.go   | 209 +++++++++++++++
 .../pkg/beam/runners/prism/internal/handlepardo.go | 242 +++++++++++++++++
 .../beam/runners/prism/internal/handlerunner.go    | 298 +++++++++++++++++++++
 3 files changed, 749 insertions(+)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go b/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go
new file mode 100644
index 00000000000..ff9bd1e1c88
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"fmt"
+	"reflect"
+
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"google.golang.org/protobuf/proto"
+)
+
+// This file retains the logic for the combine handler
+
+// CombineCharacteristic holds the configuration for Combines.
+type CombineCharacteristic struct {
+	EnableLifting bool // Sets whether a combine composite does combiner lifting or not.
+}
+
+// TODO figure out the factory we'd like.
+
+func Combine(config any) *combine {
+	return &combine{config: config.(CombineCharacteristic)}
+}
+
+// combine represents an instance of the combine handler.
+type combine struct {
+	config CombineCharacteristic
+}
+
+// ConfigURN returns the name for combine in the configuration file.
+func (*combine) ConfigURN() string {
+	return "combine"
+}
+
+func (*combine) ConfigCharacteristic() reflect.Type {
+	return reflect.TypeOf((*CombineCharacteristic)(nil)).Elem()
+}
+
+var _ transformPreparer = (*combine)(nil)
+
+func (*combine) PrepareUrns() []string {
+	return []string{urns.TransformCombinePerKey}
+}
+
+// PrepareTransform returns lifted combines and removes the leaves if enabled. Otherwise returns nothing.
+func (h *combine) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
+	// If we aren't lifting, the "default impl" for combines should be sufficient.
+	if !h.config.EnableLifting {
+		return nil, nil
+	}
+
+	// To lift a combine, the spec should contain a CombinePayload.
+	// That contains the actual FunctionSpec for the DoFn, and the
+	// id for the accumulator coder.
+	// We can synthetically produce/determine the remaining coders for
+	// the Input and Output types from the existing PCollections.
+	//
+	// This means we also need to synthesize pcollections with the accumulator coder too.
+
+	// What we have:
+	//  Input PCol: KV<K, I>      -- INPUT
+	//  -> GBK := KV<K, Iter<I>>  -- GROUPED_I
+	//  -> Combine := KV<K, O>    -- OUTPUT
+	//
+	// What we want:
+	//  Input PCol: KV<K, I>             -- INPUT
+	//  -> PreCombine := KV<K, A>        -- LIFTED
+	//  -> GBK -> KV<K, Iter<A>>         -- GROUPED_A
+	//  -> MergeAccumulators := KV<K, A> -- MERGED_A
+	//  -> ExtractOutput -> KV<K, O>     -- OUTPUT
+	//
+	// First we need to produce new coders for Iter<A>, KV<K, Iter<A>>, and KV<K, A>.
+	// The A coder ID is in the combine payload.
+	//
+	// Then we can produce the PCollections.
+	// We can reuse the INPUT and OUTPUT PCollections.
+	// We need LIFTED to have KV<K, A>  kv_k_a
+	// We need GROUPED_A to have KV<K, Iter<A>> kv_k_iter_a
+	// We need MERGED_A to have KV<K, A> kv_k_a
+	//
+	// GROUPED_I ends up unused.
+	//
+	// The PCollections inherit the properties of the Input PCollection
+	// such as Boundedness, and Windowing Strategy.
+	//
+	// With these, we can produce the PTransforms with the appropriate URNs for the
+	// different parts of the composite, and return the new components.
+
+	cmbPayload := t.GetSpec().GetPayload()
+	cmb := &pipepb.CombinePayload{}
+	if err := (proto.UnmarshalOptions{}).Unmarshal(cmbPayload, cmb); err != nil {
+		panic(fmt.Sprintf("unable to decode ParDoPayload for transform[%v]", t.GetUniqueName()))
+	}
+
+	// First lets get the key coder ID.
+	var pcolInID string
+	// There's only one input.
+	for _, pcol := range t.GetInputs() {
+		pcolInID = pcol
+	}
+	inputPCol := comps.GetPcollections()[pcolInID]
+	kvkiID := inputPCol.GetCoderId()
+	kID := comps.GetCoders()[kvkiID].GetComponentCoderIds()[0]
+
+	// Now we can start synthesis!
+	// Coder IDs
+	aID := cmb.AccumulatorCoderId
+
+	ckvprefix := "c" + tid + "_kv_"
+
+	iterACID := "c" + tid + "_iter_" + aID
+	kvkaCID := ckvprefix + kID + "_" + aID
+	kvkIterACID := ckvprefix + kID + "_iter" + aID
+
+	// PCollection IDs
+	nprefix := "n" + tid + "_"
+	liftedNID := nprefix + "lifted"
+	groupedNID := nprefix + "grouped"
+	mergedNID := nprefix + "merged"
+
+	// Now we need the output collection ID
+	var pcolOutID string
+	// There's only one input.
+	for _, pcol := range t.GetOutputs() {
+		pcolOutID = pcol
+	}
+
+	// Transform IDs
+	eprefix := "e" + tid + "_"
+	liftEID := eprefix + "lift"
+	gbkEID := eprefix + "gbk"
+	mergeEID := eprefix + "merge"
+	extractEID := eprefix + "extract"
+
+	coder := func(urn string, componentIDs ...string) *pipepb.Coder {
+		return &pipepb.Coder{
+			Spec: &pipepb.FunctionSpec{
+				Urn: urn,
+			},
+			ComponentCoderIds: componentIDs,
+		}
+	}
+
+	pcol := func(name, coderID string) *pipepb.PCollection {
+		return &pipepb.PCollection{
+			UniqueName:          name,
+			CoderId:             coderID,
+			IsBounded:           inputPCol.GetIsBounded(),
+			WindowingStrategyId: inputPCol.GetWindowingStrategyId(),
+		}
+	}
+
+	tform := func(name, urn, in, out, env string) *pipepb.PTransform {
+		return &pipepb.PTransform{
+			UniqueName: name,
+			Spec: &pipepb.FunctionSpec{
+				Urn:     urn,
+				Payload: cmbPayload,
+			},
+			Inputs: map[string]string{
+				"i0": in,
+			},
+			Outputs: map[string]string{
+				"i0": out,
+			},
+			EnvironmentId: env,
+		}
+	}
+
+	newComps := &pipepb.Components{
+		Coders: map[string]*pipepb.Coder{
+			iterACID:    coder(urns.CoderIterable, aID),
+			kvkaCID:     coder(urns.CoderKV, kID, aID),
+			kvkIterACID: coder(urns.CoderKV, kID, iterACID),
+		},
+		Pcollections: map[string]*pipepb.PCollection{
+			liftedNID:  pcol(liftedNID, kvkaCID),
+			groupedNID: pcol(groupedNID, kvkIterACID),
+			mergedNID:  pcol(mergedNID, kvkaCID),
+		},
+		Transforms: map[string]*pipepb.PTransform{
+			liftEID:    tform(liftEID, urns.TransformPreCombine, pcolInID, liftedNID, t.GetEnvironmentId()),
+			gbkEID:     tform(gbkEID, urns.TransformGBK, liftedNID, groupedNID, ""),
+			mergeEID:   tform(mergeEID, urns.TransformMerge, groupedNID, mergedNID, t.GetEnvironmentId()),
+			extractEID: tform(mergeEID, urns.TransformExtract, mergedNID, pcolOutID, t.GetEnvironmentId()),
+		},
+	}
+
+	// Now we return everything!
+	// TODO recurse through sub transforms to remove?
+	// We don't need to remove the composite, since we don't add it in
+	// when we return the new transforms, so it's not in the topology.
+	return newComps, t.GetSubtransforms()
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
new file mode 100644
index 00000000000..384a5d93014
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"fmt"
+	"reflect"
+
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"golang.org/x/exp/maps"
+	"google.golang.org/protobuf/proto"
+)
+
+// This file retains the logic for the pardo handler
+
+// ParDoCharacteristic holds the configuration for ParDos.
+type ParDoCharacteristic struct {
+	DisableSDF bool // Sets whether a pardo supports SDFs or not.
+}
+
+func ParDo(config any) *pardo {
+	return &pardo{config: config.(ParDoCharacteristic)}
+}
+
+// pardo represents an instance of the pardo handler.
+type pardo struct {
+	config ParDoCharacteristic
+}
+
+// ConfigURN returns the name for combine in the configuration file.
+func (*pardo) ConfigURN() string {
+	return "pardo"
+}
+
+func (*pardo) ConfigCharacteristic() reflect.Type {
+	return reflect.TypeOf((*ParDoCharacteristic)(nil)).Elem()
+}
+
+var _ transformPreparer = (*pardo)(nil)
+
+func (*pardo) PrepareUrns() []string {
+	return []string{urns.TransformParDo}
+}
+
+// PrepareTransform handles special processing with respect to ParDos, since their handling is dependant on supported features
+// and requirements.
+func (h *pardo) PrepareTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components) (*pipepb.Components, []string) {
+
+	// ParDos are a pain in the butt.
+	// Combines, by comparison, are dramatically simpler.
+	// This is because for ParDos, how they are handled, and what kinds of transforms are in
+	// and around the ParDo, the actual shape of the graph will change.
+	// At their simplest, it's something a DoFn will handle on their own.
+	// At their most complex, they require intimate interaction with the subgraph
+	// bundling process, the data layer, state layers, and control layers.
+	// But unlike combines, which have a clear urn for composite + special payload,
+	// ParDos have the standard URN for composites with the standard payload.
+	// So always, we need to first unmarshal the payload.
+
+	pardoPayload := t.GetSpec().GetPayload()
+	pdo := &pipepb.ParDoPayload{}
+	if err := (proto.UnmarshalOptions{}).Unmarshal(pardoPayload, pdo); err != nil {
+		panic(fmt.Sprintf("unable to decode ParDoPayload for transform[%v]", t.GetUniqueName()))
+	}
+
+	// Lets check for and remove anything that makes things less simple.
+	if pdo.OnWindowExpirationTimerFamilySpec == "" &&
+		!pdo.RequestsFinalization &&
+		!pdo.RequiresStableInput &&
+		!pdo.RequiresTimeSortedInput &&
+		//	len(pdo.SideInputs) == 0 &&
+		len(pdo.StateSpecs) == 0 &&
+		len(pdo.TimerFamilySpecs) == 0 &&
+		pdo.RestrictionCoderId == "" {
+		// At their simplest, we don't need to do anything special at pre-processing time, and simply pass through as normal.
+		return &pipepb.Components{
+			Transforms: map[string]*pipepb.PTransform{
+				tid: t,
+			},
+		}, nil
+	}
+
+	// Side inputs add to topology and make fusion harder to deal with
+	// (side input producers can't be in the same subgraph as their consumers)
+	// But we don't have fusion yet, so no worries.
+
+	// State, Timers, Stable Input, Time Sorted Input, and some parts of SDF
+	// Are easier to deal including a fusion break. But We can do that with a
+	// runner specific transform for stable input, and another for timesorted
+	// input.
+
+	// SplittableDoFns have 3 required phases and a 4th optional phase.
+	//
+	// PAIR_WITH_RESTRICTION which pairs elements with their restrictions
+	// Input: element;   := INPUT
+	// Output: KV(element, restriction)  := PWR
+	//
+	// SPLIT_AND_SIZE_RESTRICTIONS splits the pairs into sub element ranges
+	// and a relative size for each, in a float64 format.
+	// Input: KV(element, restriction) := PWR
+	// Output: KV(KV(element, restriction), float64)  := SPLITnSIZED
+	//
+	// PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS actually processes the
+	// elements. This is also where splits need to be handled.
+	// In particular, primary and residual splits have the same format as the input.
+	// Input: KV(KV(element, restriction), size) := SPLITnSIZED
+	// Output: DoFn's output.  := OUTPUT
+	//
+	// TRUNCATE_SIZED_RESTRICTION is how the runner has an SDK turn an
+	// unbounded transform into a bound one. Not needed until the pipeline
+	// is told to drain.
+	// Input: KV(KV(element, restriction), float64) := synthetic split results from above
+	// Output: KV(KV(element, restriction), float64). := synthetic, truncated results sent as Split n Sized
+	//
+	// So with that, we can figure out the coders we need.
+	//
+	// cE - Element Coder  (same as input coder)
+	// cR - Restriction Coder
+	// cS - Size Coder (float64)
+	// ckvER - KV<Element, Restriction>
+	// ckvERS - KV<KV<Element, Restriction>, Size>
+	//
+	// There could be a few output coders, but the outputs can be copied from
+	// the original transform directly.
+
+	// First lets get the parallel input coder ID.
+	var pcolInID, inputLocalID string
+	for localID, globalID := range t.GetInputs() {
+		// The parallel input is the one that isn't a side input.
+		if _, ok := pdo.SideInputs[localID]; !ok {
+			inputLocalID = localID
+			pcolInID = globalID
+			break
+		}
+	}
+	inputPCol := comps.GetPcollections()[pcolInID]
+	cEID := inputPCol.GetCoderId()
+	cRID := pdo.RestrictionCoderId
+	cSID := "c" + tid + "size"
+	ckvERID := "c" + tid + "kv_ele_rest"
+	ckvERSID := ckvERID + "_size"
+
+	coder := func(urn string, componentIDs ...string) *pipepb.Coder {
+		return &pipepb.Coder{
+			Spec: &pipepb.FunctionSpec{
+				Urn: urn,
+			},
+			ComponentCoderIds: componentIDs,
+		}
+	}
+
+	coders := map[string]*pipepb.Coder{
+		ckvERID:  coder(urns.CoderKV, cEID, cRID),
+		cSID:     coder(urns.CoderDouble),
+		ckvERSID: coder(urns.CoderKV, ckvERID, cSID),
+	}
+
+	// PCollections only have two new ones.
+	// INPUT -> same as ordinary DoFn
+	// PWR, uses ckvER
+	// SPLITnSIZED, uses ckvERS
+	// OUTPUT -> same as ordinary outputs
+
+	nPWRID := "n" + tid + "_pwr"
+	nSPLITnSIZEDID := "n" + tid + "_splitnsized"
+
+	pcol := func(name, coderID string) *pipepb.PCollection {
+		return &pipepb.PCollection{
+			UniqueName:          name,
+			CoderId:             coderID,
+			IsBounded:           inputPCol.GetIsBounded(),
+			WindowingStrategyId: inputPCol.GetWindowingStrategyId(),
+		}
+	}
+
+	pcols := map[string]*pipepb.PCollection{
+		nPWRID:         pcol(nPWRID, ckvERID),
+		nSPLITnSIZEDID: pcol(nSPLITnSIZEDID, ckvERSID),
+	}
+
+	// PTransforms have 3 new ones, with process sized elements and restrictions
+	// taking the brunt of the complexity, consuming the inputs
+
+	ePWRID := "e" + tid + "_pwr"
+	eSPLITnSIZEDID := "e" + tid + "_splitnsize"
+	eProcessID := "e" + tid + "_processandsplit"
+
+	tform := func(name, urn, in, out string) *pipepb.PTransform {
+		return &pipepb.PTransform{
+			UniqueName: name,
+			Spec: &pipepb.FunctionSpec{
+				Urn:     urn,
+				Payload: pardoPayload,
+			},
+			Inputs: map[string]string{
+				inputLocalID: in,
+			},
+			Outputs: map[string]string{
+				"i0": out,
+			},
+			EnvironmentId: t.GetEnvironmentId(),
+		}
+	}
+
+	newInputs := maps.Clone(t.GetInputs())
+	newInputs[inputLocalID] = nSPLITnSIZEDID
+
+	tforms := map[string]*pipepb.PTransform{
+		ePWRID:         tform(ePWRID, urns.TransformPairWithRestriction, pcolInID, nPWRID),
+		eSPLITnSIZEDID: tform(eSPLITnSIZEDID, urns.TransformSplitAndSize, nPWRID, nSPLITnSIZEDID),
+		eProcessID: {
+			UniqueName: eProcessID,
+			Spec: &pipepb.FunctionSpec{
+				Urn:     urns.TransformProcessSizedElements,
+				Payload: pardoPayload,
+			},
+			Inputs:        newInputs,
+			Outputs:       t.GetOutputs(),
+			EnvironmentId: t.GetEnvironmentId(),
+		},
+	}
+
+	return &pipepb.Components{
+		Coders:       coders,
+		Pcollections: pcols,
+		Transforms:   tforms,
+	}, t.GetSubtransforms()
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
new file mode 100644
index 00000000000..b0fc60aad29
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
@@ -0,0 +1,298 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package internal
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"reflect"
+	"sort"
+
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
+	pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
+	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
+	"golang.org/x/exp/slog"
+	"google.golang.org/protobuf/encoding/prototext"
+	"google.golang.org/protobuf/proto"
+)
+
+// This file retains the logic for the pardo handler
+
+// RunnerCharacteristic holds the configuration for Runner based transforms,
+// such as GBKs, Flattens.
+type RunnerCharacteristic struct {
+	SDKFlatten bool // Sets whether we should force an SDK side flatten.
+	SDKGBK     bool // Sets whether the GBK should be handled by the SDK, if possible by the SDK.
+}
+
+func Runner(config any) *runner {
+	return &runner{config: config.(RunnerCharacteristic)}
+}
+
+// runner represents an instance of the runner transform handler.
+type runner struct {
+	config RunnerCharacteristic
+}
+
+// ConfigURN returns the name for combine in the configuration file.
+func (*runner) ConfigURN() string {
+	return "runner"
+}
+
+func (*runner) ConfigCharacteristic() reflect.Type {
+	return reflect.TypeOf((*RunnerCharacteristic)(nil)).Elem()
+}
+
+var _ transformExecuter = (*runner)(nil)
+
+func (*runner) ExecuteUrns() []string {
+	return []string{urns.TransformFlatten, urns.TransformGBK}
+}
+
+// ExecuteWith returns what environment the
+func (h *runner) ExecuteWith(t *pipepb.PTransform) string {
+	urn := t.GetSpec().GetUrn()
+	if urn == urns.TransformFlatten && !h.config.SDKFlatten {
+		return ""
+	}
+	if urn == urns.TransformGBK && !h.config.SDKGBK {
+		return ""
+	}
+	return t.GetEnvironmentId()
+}
+
+// ExecTransform handles special processing with respect to runner specific transforms
+func (h *runner) ExecuteTransform(tid string, t *pipepb.PTransform, comps *pipepb.Components, watermark mtime.Time, inputData [][]byte) *worker.B {
+	urn := t.GetSpec().GetUrn()
+	var data [][]byte
+	var onlyOut string
+	for _, out := range t.GetOutputs() {
+		onlyOut = out
+	}
+
+	switch urn {
+	case urns.TransformFlatten:
+		// Already done and collated.
+		data = inputData
+
+	case urns.TransformGBK:
+		ws := windowingStrategy(comps, tid)
+		kvc := onlyInputCoderForTransform(comps, tid)
+
+		coders := map[string]*pipepb.Coder{}
+
+		// TODO assert this is a KV. It's probably fine, but we should fail anyway.
+		wcID := lpUnknownCoders(ws.GetWindowCoderId(), coders, comps.GetCoders())
+		kcID := lpUnknownCoders(kvc.GetComponentCoderIds()[0], coders, comps.GetCoders())
+		ecID := lpUnknownCoders(kvc.GetComponentCoderIds()[1], coders, comps.GetCoders())
+		reconcileCoders(coders, comps.GetCoders())
+
+		wc := coders[wcID]
+		kc := coders[kcID]
+		ec := coders[ecID]
+
+		data = append(data, gbkBytes(ws, wc, kc, ec, inputData, coders, watermark))
+		if len(data[0]) == 0 {
+			panic("no data for GBK")
+		}
+	default:
+		panic(fmt.Sprintf("unimplemented runner transform[%v]", urn))
+	}
+
+	// To avoid conflicts with these single transform
+	// bundles, we suffix the transform IDs.
+	var localID string
+	for key := range t.GetOutputs() {
+		localID = key
+	}
+
+	if localID == "" {
+		panic(fmt.Sprintf("bad transform: %v", prototext.Format(t)))
+	}
+	output := engine.TentativeData{}
+	for _, d := range data {
+		output.WriteData(onlyOut, d)
+	}
+
+	dataID := tid + "_" + localID // The ID from which the consumer will read from.
+	b := &worker.B{
+		InputTransformID: dataID,
+		SinkToPCollection: map[string]string{
+			dataID: onlyOut,
+		},
+		OutputData: output,
+	}
+	return b
+}
+
+// windowingStrategy sources the transform's windowing strategy from a single parallel input.
+func windowingStrategy(comps *pipepb.Components, tid string) *pipepb.WindowingStrategy {
+	t := comps.GetTransforms()[tid]
+	var inputPColID string
+	for _, pcolID := range t.GetInputs() {
+		inputPColID = pcolID
+	}
+	pcol := comps.GetPcollections()[inputPColID]
+	return comps.GetWindowingStrategies()[pcol.GetWindowingStrategyId()]
+}
+
+// gbkBytes re-encodes gbk inputs in a gbk result.
+func gbkBytes(ws *pipepb.WindowingStrategy, wc, kc, vc *pipepb.Coder, toAggregate [][]byte, coders map[string]*pipepb.Coder, watermark mtime.Time) []byte {
+	var outputTime func(typex.Window, mtime.Time) mtime.Time
+	switch ws.GetOutputTime() {
+	case pipepb.OutputTime_END_OF_WINDOW:
+		outputTime = func(w typex.Window, et mtime.Time) mtime.Time {
+			return w.MaxTimestamp()
+		}
+	default:
+		// TODO need to correct session logic if output time is different.
+		panic(fmt.Sprintf("unsupported OutputTime behavior: %v", ws.GetOutputTime()))
+	}
+	wDec, wEnc := makeWindowCoders(wc)
+
+	type keyTime struct {
+		key    []byte
+		w      typex.Window
+		time   mtime.Time
+		values [][]byte
+	}
+	// Map windows to a map of keys to a map of keys to time.
+	// We ultimately emit the window, the key, the time, and the iterable of elements,
+	// all contained in the final value.
+	windows := map[typex.Window]map[string]keyTime{}
+
+	kd := pullDecoder(kc, coders)
+	vd := pullDecoder(vc, coders)
+
+	// Right, need to get the key coder, and the element coder.
+	// Cus I'll need to pull out anything the runner knows how to deal with.
+	// And repeat.
+	for _, data := range toAggregate {
+		// Parse out each element's data, and repeat.
+		buf := bytes.NewBuffer(data)
+		for {
+			ws, tm, _, err := exec.DecodeWindowedValueHeader(wDec, buf)
+			if err == io.EOF {
+				break
+			}
+			if err != nil {
+				panic(fmt.Sprintf("can't decode windowed value header with %v: %v", wc, err))
+			}
+
+			keyByt := kd(buf)
+			key := string(keyByt)
+			value := vd(buf)
+			for _, w := range ws {
+				ft := outputTime(w, tm)
+				wk, ok := windows[w]
+				if !ok {
+					wk = make(map[string]keyTime)
+					windows[w] = wk
+				}
+				kt := wk[key]
+				kt.time = ft
+				kt.key = keyByt
+				kt.w = w
+				kt.values = append(kt.values, value)
+				wk[key] = kt
+			}
+		}
+	}
+
+	// If the strategy is session windows, then we need to get all the windows, sort them
+	// and see which ones need to be merged together.
+	if ws.GetWindowFn().GetUrn() == urns.WindowFnSession {
+		slog.Debug("sorting by session window")
+		session := &pipepb.SessionWindowsPayload{}
+		if err := (proto.UnmarshalOptions{}).Unmarshal(ws.GetWindowFn().GetPayload(), session); err != nil {
+			panic("unable to decode SessionWindowsPayload")
+		}
+		gapSize := mtime.Time(session.GetGapSize().AsDuration())
+
+		ordered := make([]window.IntervalWindow, 0, len(windows))
+		for k := range windows {
+			ordered = append(ordered, k.(window.IntervalWindow))
+		}
+		// Use a decreasing sort (latest to earliest) so we can correct
+		// the output timestamp to the new end of window immeadiately.
+		// TODO need to correct this if output time is different.
+		sort.Slice(ordered, func(i, j int) bool {
+			return ordered[i].MaxTimestamp() > ordered[j].MaxTimestamp()
+		})
+
+		cur := ordered[0]
+		sessionData := windows[cur]
+		for _, iw := range ordered[1:] {
+			// If they overlap, then we merge the data.
+			if iw.End+gapSize < cur.Start {
+				// Start a new session.
+				windows[cur] = sessionData
+				cur = iw
+				sessionData = windows[iw]
+				continue
+			}
+			// Extend the session
+			cur.Start = iw.Start
+			toMerge := windows[iw]
+			delete(windows, iw)
+			for k, kt := range toMerge {
+				skt := sessionData[k]
+				skt.key = kt.key
+				skt.w = cur
+				skt.values = append(skt.values, kt.values...)
+				sessionData[k] = skt
+			}
+		}
+	}
+	// Everything's aggregated!
+	// Time to turn things into a windowed KV<K, Iterable<V>>
+
+	var buf bytes.Buffer
+	for _, w := range windows {
+		for _, kt := range w {
+			exec.EncodeWindowedValueHeader(
+				wEnc,
+				[]typex.Window{kt.w},
+				kt.time,
+				typex.NoFiringPane(),
+				&buf,
+			)
+			buf.Write(kt.key)
+			coder.EncodeInt32(int32(len(kt.values)), &buf)
+			for _, value := range kt.values {
+				buf.Write(value)
+			}
+		}
+	}
+	return buf.Bytes()
+}
+
+func onlyInputCoderForTransform(comps *pipepb.Components, tid string) *pipepb.Coder {
+	t := comps.GetTransforms()[tid]
+	var inputPColID string
+	for _, pcolID := range t.GetInputs() {
+		inputPColID = pcolID
+	}
+	pcol := comps.GetPcollections()[inputPColID]
+	return comps.GetCoders()[pcol.GetCoderId()]
+}