You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/20 18:27:55 UTC

[GitHub] [beam] damccorm commented on a diff in pull request #22330: Support combiner lifting.

damccorm commented on code in PR #22330:
URL: https://github.com/apache/beam/pull/22330#discussion_r925931970


##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -305,6 +307,253 @@ class FlattenOperator implements IOperator {
 
 registerOperator("beam:transform:flatten:v1", FlattenOperator);
 
+// CombinePerKey operators.
+
+abstract class CombineOperator<I, A, O> {
+  receiver: Receiver;
+  combineFn: CombineFn<I, A, O>;
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    this.receiver = context.getReceiver(
+      onlyElement(Object.values(transform.outputs))
+    );
+    const spec = runnerApi.CombinePayload.fromBinary(transform.spec!.payload);
+    this.combineFn = deserializeFn(spec.combineFn!.payload).combineFn;
+  }
+}
+
+export class CombinePerKeyPrecombineOperator<I, A, O>
+  extends CombineOperator<I, A, O>
+  implements IOperator
+{
+  keyCoder: Coder<unknown>;
+  windowCoder: Coder<Window>;
+
+  groups: Map<string, A>;
+  maxKeys: number = 10000;
+
+  static checkSupportsWindowing(
+    windowingStrategy: runnerApi.WindowingStrategy
+  ) {
+    if (
+      windowingStrategy.mergeStatus !== runnerApi.MergeStatus_Enum.NON_MERGING
+    ) {
+      throw new Error("Unsupported non-merging WindowFn: " + windowingStrategy);
+    }
+    if (
+      windowingStrategy.outputTime !== runnerApi.OutputTime_Enum.END_OF_WINDOW
+    ) {
+      throw new Error(
+        "Unsupported windowing output time: " + windowingStrategy
+      );
+    }
+  }
+
+  constructor(
+    transformId: string,
+    transform: PTransform,
+    context: OperatorContext
+  ) {
+    super(transformId, transform, context);
+    const inputPc =
+      context.descriptor.pcollections[
+        onlyElement(Object.values(transform.inputs))
+      ];
+    this.keyCoder = context.pipelineContext.getCoder(
+      context.descriptor.coders[inputPc.coderId].componentCoderIds[0]
+    );
+    const windowingStrategy =
+      context.descriptor.windowingStrategies[inputPc.windowingStrategyId];
+    CombinePerKeyPrecombineOperator.checkSupportsWindowing(windowingStrategy);
+    this.windowCoder = context.pipelineContext.getCoder(
+      windowingStrategy.windowCoderId
+    );
+  }
+
+  process(wvalue: WindowedValue<any>) {
+    for (const window of wvalue.windows) {
+      const wkey =
+        encodeToBase64(window, this.windowCoder) +
+        " " +
+        encodeToBase64(wvalue.value.key, this.keyCoder);
+      if (!this.groups.has(wkey)) {
+        this.groups.set(wkey, this.combineFn.createAccumulator());
+      }
+      this.groups.set(
+        wkey,
+        this.combineFn.addInput(this.groups.get(wkey), wvalue.value.value)
+      );
+    }
+    if (this.groups.size > this.maxKeys) {
+      return this.flush(this.maxKeys * 0.9);

Review Comment:
   Where does 0.9 come from? I'm guessing you're trying to balance the tradeoff of having more keys available is better in case you find the same key vs not incurring the overhead of sending a message for every single key - if so, is it worth adding a TODO to either do tuning or make this tunable in the future (we're definitely not at the fine performance tuning point in the SDK)?



##########
sdks/typescript/src/apache_beam/transforms/internal.ts:
##########
@@ -170,21 +172,84 @@ groupByKey.urn = "beam:transform:group_by_key:v1";
 export function combinePerKey<K, InputT, AccT, OutputT>(
   combineFn: CombineFn<InputT, AccT, OutputT>
 ): PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
-  function expandInternal(input: PCollection<KV<any, InputT>>) {
+  function expandInternal(
+    input: PCollection<KV<any, InputT>>,
+    pipeline: Pipeline,
+    transformProto: runnerApi.PTransform
+  ) {
+    const pipelineComponents: runnerApi.Components =
+      pipeline.getProto().components!;
+    const inputProto = pipelineComponents.pcollections[input.getId()];
+
+    try {
+      // If this fails, we cannot lift, so we skip setting the liftable URN.
+      CombinePerKeyPrecombineOperator.checkSupportsWindowing(
+        pipelineComponents.windowingStrategies[inputProto.windowingStrategyId]
+      );
+
+      // Ensure the input is using the KV coder.
+      const inputCoderProto = pipelineComponents.coders[inputProto.coderId];
+      if (inputCoderProto.spec!.urn !== KVCoder.URN) {
+        return input
+          .apply(
+            withCoderInternal(
+              new KVCoder(new GeneralObjectCoder(), new GeneralObjectCoder())
+            )
+          )
+          .apply(combinePerKey(combineFn));
+      }
+
+      const inputValueCoder = pipeline.context.getCoder<InputT>(
+        inputCoderProto.componentCoderIds[1]
+      );
+
+      transformProto.spec = runnerApi.FunctionSpec.create({
+        urn: combinePerKey.urn,
+        payload: runnerApi.CombinePayload.toBinary({
+          combineFn: {
+            urn: urns.SERIALIZED_JS_COMBINEFN_INFO,
+            payload: serializeFn({ combineFn }),
+          },
+          accumulatorCoderId: pipeline.context.getCoderId(
+            combineFn.accumulatorCoder
+              ? combineFn.accumulatorCoder(inputValueCoder)
+              : new GeneralObjectCoder()
+          ),
+        }),
+      });
+    } catch (err) {
+      // Execute this as an unlifted combine.
+    }
+
     return input //
       .apply(groupByKey())
       .map(
-        withName("applyCombine", (kv) => ({
-          key: kv.key,
-          value: combineFn.extractOutput(
-            kv.value.reduce(
-              combineFn.addInput.bind(combineFn),
-              combineFn.createAccumulator()
-            )
-          ),
-        }))
+        withName("applyCombine", (kv) => {
+          // Artificially use multiple accumulators to emulate what would
+          // happen in a distributed combine.

Review Comment:
   I don't quite follow this - why do we need to use multiple accumulators here?



##########
sdks/typescript/src/apache_beam/runners/direct_runner.ts:
##########
@@ -194,24 +197,29 @@ class DirectGbkOperator implements operators.IOperator {
     );
     const windowingStrategy =
       context.descriptor.windowingStrategies[inputPc.windowingStrategyId];
-    // TODO: (Cleanup) Check or implement triggers, etc.
     if (
       windowingStrategy.mergeStatus !== runnerApi.MergeStatus_Enum.NON_MERGING
     ) {
-      throw new Error("Non-merging WindowFn: " + windowingStrategy);
+      throw new Error("Unsupported non-merging WindowFn: " + windowingStrategy);
+    }
+    if (
+      windowingStrategy.outputTime !== runnerApi.OutputTime_Enum.END_OF_WINDOW
+    ) {
+      throw new Error(
+        "Unsupported windowing output time: " + windowingStrategy
+      );
     }

Review Comment:
   Should we use `CombinePerKeyPrecombineOperator.checkSupportsWindowing` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org