You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/27 00:26:19 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #15731: [BEAM-13067] Mark GroupIntoBatches output as preserving keys

chamikaramj commented on a change in pull request #15731:
URL: https://github.com/apache/beam/pull/15731#discussion_r737015128



##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
##########
@@ -716,13 +716,13 @@ private void addOutput(String name, PValue value, Coder<?> valueCoder) {
       translator.registerOutputName(value, name);
 
       // If the output requires runner determined sharding, also append necessary input properties.
-      if (value instanceof PCollection
-          && translator.runner.doesPCollectionRequireAutoSharding((PCollection<?>) value)) {
-        addInput(PropertyNames.ALLOWS_SHARDABLE_STATE, "true");
-        // Currently we only allow auto-sharding to be enabled through the GroupIntoBatches
-        // transform. So we also add the following property which GroupIntoBatchesDoFn has, to allow
-        // the backend to perform graph optimization.
-        addInput(PropertyNames.PRESERVES_KEYS, "true");
+      if (value instanceof PCollection) {
+        if (translator.runner.doesPCollectionRequireAutoSharding((PCollection<?>) value)) {
+          addInput(PropertyNames.ALLOWS_SHARDABLE_STATE, "true");
+        }
+        if (translator.runner.doesPCollectionPreserveKeys((PCollection<?>) value)) {
+          addInput(PropertyNames.PRESERVES_KEYS, "true");

Review comment:
       I think we need a similar update in the Dataflow service to support Portable Job Submission.

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -152,14 +180,84 @@ public void process(ProcessContext c) {
       extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>> {
 
     private final BatchingParams<V> batchingParams;
+    private final transient DataflowRunner runner;
+    private final transient PCollection<KV<ShardedKey<K>, Iterable<V>>> originalOutput;
 
-    private BatchGroupIntoBatchesWithShardedKey(BatchingParams<V> batchingParams) {
+    private BatchGroupIntoBatchesWithShardedKey(
+        BatchingParams<V> batchingParams,
+        DataflowRunner runner,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> originalOutput) {
       this.batchingParams = batchingParams;
+      this.runner = runner;
+      this.originalOutput = originalOutput;
     }
 
     @Override
     public PCollection<KV<ShardedKey<K>, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
-      return shardKeys(input).apply(new BatchGroupIntoBatches<>(batchingParams));
+      return shardKeys(input)
+          .apply(new BatchGroupIntoBatches<>(batchingParams, runner, originalOutput));
+    }
+  }
+
+  static class StreamingGroupIntoBatchesOverrideFactory<K, V>
+      implements PTransformOverrideFactory<
+          PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupIntoBatches<K, V>> {
+
+    private final DataflowRunner runner;
+
+    StreamingGroupIntoBatchesOverrideFactory(DataflowRunner runner) {
+      this.runner = runner;
+    }
+
+    @Override
+    public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
+        getReplacementTransform(
+            AppliedPTransform<
+                    PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupIntoBatches<K, V>>
+                transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          new StreamingGroupIntoBatches<>(
+              runner,
+              transform.getTransform(),
+              PTransformReplacements.getSingletonMainOutput(transform)));
+    }
+
+    @Override
+    public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<K, Iterable<V>>> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
+  /**
+   * Specialized implementation of {@link GroupIntoBatches} for unbounded Dataflow pipelines. The
+   * override does the same thing as the original transform but additionally records the output in
+   * order to append required step properties during the graph translation.
+   */
+  static class StreamingGroupIntoBatches<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+    private final transient DataflowRunner runner;

Review comment:
       Any downside to keeping a pointer to the runner object in the PTransform ? I think this might be OK since this is a Dataflow specific transform but wanted to point this out.
   
   cc: @kennknowles

##########
File path: runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
##########
@@ -1133,6 +1157,55 @@ private JobSpecification runStreamingGroupIntoBatchesAndGetJobSpec(
         pipeline, pipelineProto, sdkComponents, runner, Collections.emptyList());
   }
 
+  @Test
+  public void testBatchGroupIntoBatchesTranslation() throws Exception {
+    JobSpecification jobSpec =
+        runBatchGroupIntoBatchesAndGetJobSpec(false, Collections.emptyList());
+    List<Step> steps = jobSpec.getJob().getSteps();
+    Step shardedStateStep = steps.get(steps.size() - 1);
+    Map<String, Object> properties = shardedStateStep.getProperties();
+    assertTrue(properties.containsKey(PropertyNames.PRESERVES_KEYS));
+    assertEquals("true", getString(properties, PropertyNames.PRESERVES_KEYS));
+  }
+
+  @Test
+  public void testBatchGroupIntoBatchesWithShardedKeyTranslation() throws Exception {
+    List<String> experiments = Collections.emptyList();
+    JobSpecification jobSpec = runBatchGroupIntoBatchesAndGetJobSpec(true, experiments);
+    List<Step> steps = jobSpec.getJob().getSteps();
+    Step shardedStateStep = steps.get(steps.size() - 1);
+    Map<String, Object> properties = shardedStateStep.getProperties();
+    assertTrue(properties.containsKey(PropertyNames.PRESERVES_KEYS));
+    assertEquals("true", getString(properties, PropertyNames.PRESERVES_KEYS));
+  }
+
+  @Test
+  public void testBatchGroupIntoBatchesTranslationUnifiedWorker() throws Exception {

Review comment:
       Note that Java UW uses Portable Job Submission by default so probably you need to update Dataflow service (beam.cc) as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org