You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/20 22:36:12 UTC

[GitHub] [beam] robertwb commented on a diff in pull request #22330: Support combiner lifting.

robertwb commented on code in PR #22330:
URL: https://github.com/apache/beam/pull/22330#discussion_r926107561


##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -305,6 +307,253 @@ class FlattenOperator implements IOperator {
 
 registerOperator("beam:transform:flatten:v1", FlattenOperator);
 
+// CombinePerKey operators.
+
+abstract class CombineOperator<I, A, O> {
+  receiver: Receiver;
+  combineFn: CombineFn<I, A, O>;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+    const spec = runnerApi.CombinePayload.fromBinary(transform.spec!.payload);
+    this.combineFn = deserializeFn(spec.combineFn!.payload).combineFn;
+  }
+}
+
+export class CombinePerKeyPrecombineOperator<I, A, O>
+  extends CombineOperator<I, A, O>
+  implements IOperator
+{
+  keyCoder: Coder<unknown>;
+  windowCoder: Coder<Window>;
+
+  groups: Map<string, A>;
+  maxKeys: number = 10000;
+
+  static checkSupportsWindowing(
+    windowingStrategy: runnerApi.WindowingStrategy
+  ) {
+    if (
+      windowingStrategy.mergeStatus !== runnerApi.MergeStatus_Enum.NON_MERGING
+    ) {
+      throw new Error("Unsupported non-merging WindowFn: " + windowingStrategy);
+    }
+    if (
+      windowingStrategy.outputTime !== runnerApi.OutputTime_Enum.END_OF_WINDOW
+    ) {
+      throw new Error(
+        "Unsupported windowing output time: " + windowingStrategy
+      );
+    }
+  }
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    super(transformId, transform, context);
+    const inputPc =
+      context.descriptor.pcollections[
+        onlyElement(Object.values(transform.inputs))
+      ];
+    this.keyCoder = context.pipelineContext.getCoder(
+      context.descriptor.coders[inputPc.coderId].componentCoderIds[0]
+    );
+    const windowingStrategy =
+      context.descriptor.windowingStrategies[inputPc.windowingStrategyId];
+    CombinePerKeyPrecombineOperator.checkSupportsWindowing(windowingStrategy);
+    this.windowCoder = context.pipelineContext.getCoder(
+      windowingStrategy.windowCoderId
+    );
+  }
+
+  process(wvalue: WindowedValue<any>) {
+    for (const window of wvalue.windows) {
+      const wkey =
+        encodeToBase64(window, this.windowCoder) +
+        " " +
+        encodeToBase64(wvalue.value.key, this.keyCoder);
+      if (!this.groups.has(wkey)) {
+        this.groups.set(wkey, this.combineFn.createAccumulator());
+      }
+      this.groups.set(
+        wkey,
+        this.combineFn.addInput(this.groups.get(wkey), wvalue.value.value)
+      );
+    }
+    if (this.groups.size > this.maxKeys) {
+      return this.flush(this.maxKeys * 0.9);

Review Comment:
   Exactly. Dropped a TODO here. 



##########
sdks/typescript/src/apache_beam/runners/direct_runner.ts:
##########
@@ -194,24 +197,29 @@ class DirectGbkOperator implements operators.IOperator {
     );
     const windowingStrategy =
       context.descriptor.windowingStrategies[inputPc.windowingStrategyId];
-    // TODO: (Cleanup) Check or implement triggers, etc.
     if (
       windowingStrategy.mergeStatus !== runnerApi.MergeStatus_Enum.NON_MERGING
     ) {
-      throw new Error("Non-merging WindowFn: " + windowingStrategy);
+      throw new Error("Unsupported non-merging WindowFn: " + windowingStrategy);
+    }
+    if (
+      windowingStrategy.outputTime !== runnerApi.OutputTime_Enum.END_OF_WINDOW
+    ) {
+      throw new Error(
+        "Unsupported windowing output time: " + windowingStrategy
+      );
     }

Review Comment:
   No, as these may evolve separately. (On that note, I started merging these two implementations, but it muddied what the pure GBK one was doing so I left that here for pedagogical purposes.)



##########
sdks/typescript/src/apache_beam/transforms/internal.ts:
##########
@@ -170,21 +172,84 @@ groupByKey.urn = "beam:transform:group_by_key:v1";
 export function combinePerKey<K, InputT, AccT, OutputT>(
   combineFn: CombineFn<InputT, AccT, OutputT>
 ): PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
-  function expandInternal(input: PCollection<KV<any, InputT>>) {
+  function expandInternal(
+    input: PCollection<KV<any, InputT>>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    const pipelineComponents: runnerApi.Components =
+      pipeline.getProto().components!;
+    const inputProto = pipelineComponents.pcollections[input.getId()];
+
+    try {
+      // If this fails, we cannot lift, so we skip setting the liftable URN.
+      CombinePerKeyPrecombineOperator.checkSupportsWindowing(
+        pipelineComponents.windowingStrategies[inputProto.windowingStrategyId]
+      );
+
+      // Ensure the input is using the KV coder.
+      const inputCoderProto = pipelineComponents.coders[inputProto.coderId];
+      if (inputCoderProto.spec!.urn !== KVCoder.URN) {
+        return input
+          .apply(
+            withCoderInternal(
+              new KVCoder(new GeneralObjectCoder(), new GeneralObjectCoder())
+            )
+          )
+          .apply(combinePerKey(combineFn));
+      }
+
+      const inputValueCoder = pipeline.context.getCoder<InputT>(
+        inputCoderProto.componentCoderIds[1]
+      );
+
+      transformProto.spec = runnerApi.FunctionSpec.create({
+        urn: combinePerKey.urn,
+        payload: runnerApi.CombinePayload.toBinary({
+          combineFn: {
+            urn: urns.SERIALIZED_JS_COMBINEFN_INFO,
+            payload: serializeFn({ combineFn }),
+          },
+          accumulatorCoderId: pipeline.context.getCoderId(
+            combineFn.accumulatorCoder
+              ? combineFn.accumulatorCoder(inputValueCoder)
+              : new GeneralObjectCoder()
+          ),
+        }),
+      });
+    } catch (err) {
+      // Execute this as an unlifted combine.
+    }
+
     return input //
       .apply(groupByKey())
       .map(
-        withName("applyCombine", (kv) => ({
-          key: kv.key,
-          value: combineFn.extractOutput(
-            kv.value.reduce(
-              combineFn.addInput.bind(combineFn),
-              combineFn.createAccumulator()
-            )
-          ),
-        }))
+        withName("applyCombine", (kv) => {
+          // Artificially use multiple accumulators to emulate what would
+          // happen in a distributed combine.

Review Comment:
   Just to actually test `mergeAccumulators` in unit tests (e.g. I found a bug in `binaryCombineFn` that would have been caught by this).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org