You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/11/20 05:29:10 UTC

[GitHub] [beam] nehsyc commented on a change in pull request #13208: [BEAM-10703, BEAM-10475] Add an option to GroupIntoBatches to output ShardedKeys. Update Dataflow pipeline translation accordingly.

nehsyc commented on a change in pull request #13208:
URL: https://github.com/apache/beam/pull/13208#discussion_r527410523



##########
File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
##########
@@ -125,6 +126,45 @@ public Void apply(Iterable<KV<String, Iterable<String>>> input) {
     pipeline.run();
   }
 
+  @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesStatefulParDo.class})
+  public void testWithShardedKeyInGlobalWindow() {
+    // Since with default sharding, the number of subshards of of a key is nondeterministic, create

Review comment:
       Done.

##########
File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
##########
@@ -92,54 +98,141 @@ public void process(ProcessContext c) {
     }
   }
 
-  static class StreamingGroupIntoBatchesOverrideFactory<K, V>
+  static class BatchGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
       implements PTransformOverrideFactory<
-          PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupIntoBatches<K, V>> {
+          PCollection<KV<K, V>>,
+          PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+          GroupIntoBatches<K, V>.WithShardedKey> {
+
+    @Override
+    public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>>
+        getReplacementTransform(
+            AppliedPTransform<
+                    PCollection<KV<K, V>>,
+                    PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+                    GroupIntoBatches<K, V>.WithShardedKey>
+                transform) {
+      return PTransformReplacement.of(
+          PTransformReplacements.getSingletonMainInput(transform),
+          new BatchGroupIntoBatchesWithShardedKey<>(transform.getTransform().getBatchSize()));
+    }
+
+    @Override
+    public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PCollection<?>> outputs,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+
+  /**
+   * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for bounded Dataflow
+   * pipelines.
+   */
+  static class BatchGroupIntoBatchesWithShardedKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>> {
+
+    private final long batchSize;
+
+    private BatchGroupIntoBatchesWithShardedKey(long batchSize) {
+      this.batchSize = batchSize;
+    }
+
+    @Override
+    public PCollection<KV<ShardedKey<K>, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+      PCollection<KV<ShardedKey<K>, V>> intermediate_input = shardKeys(input);
+      return intermediate_input.apply(new BatchGroupIntoBatches<>(batchSize));
+    }
+  }
+
+  static class StreamingGroupIntoBatchesWithShardedKeyOverrideFactory<K, V>
+      implements PTransformOverrideFactory<
+          PCollection<KV<K, V>>,
+          PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+          GroupIntoBatches<K, V>.WithShardedKey> {
 
     private final DataflowRunner runner;
 
-    StreamingGroupIntoBatchesOverrideFactory(DataflowRunner runner) {
+    StreamingGroupIntoBatchesWithShardedKeyOverrideFactory(DataflowRunner runner) {
       this.runner = runner;
     }
 
     @Override
-    public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
+    public PTransformReplacement<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>>
         getReplacementTransform(
             AppliedPTransform<
-                    PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>, GroupIntoBatches<K, V>>
+                    PCollection<KV<K, V>>,
+                    PCollection<KV<ShardedKey<K>, Iterable<V>>>,
+                    GroupIntoBatches<K, V>.WithShardedKey>
                 transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
-          new StreamingGroupIntoBatches(runner, transform.getTransform()));
+          new StreamingGroupIntoBatchesWithShardedKey<>(
+              runner,
+              transform.getTransform(),
+              PTransformReplacements.getSingletonMainOutput(transform)));
     }
 
     @Override
     public Map<PCollection<?>, ReplacementOutput> mapOutputs(
-        Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KV<K, Iterable<V>>> newOutput) {
+        Map<TupleTag<?>, PCollection<?>> outputs,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> newOutput) {
       return ReplacementOutputs.singleton(outputs, newOutput);
     }
   }
 
   /**
-   * Specialized implementation of {@link GroupIntoBatches} for unbounded Dataflow pipelines. The
-   * override does the same thing as the original transform but additionally record the input to add
-   * corresponding properties during the graph translation.
+   * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for unbounded Dataflow
+   * pipelines. The override does the same thing as the original transform but additionally records
+   * the output in order to append required step properties during the graph translation.
    */
-  static class StreamingGroupIntoBatches<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+  static class StreamingGroupIntoBatchesWithShardedKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<ShardedKey<K>, Iterable<V>>>> {
 
     private final transient DataflowRunner runner;
-    private final GroupIntoBatches<K, V> original;
+    private final GroupIntoBatches<K, V>.WithShardedKey original_transform;
+    private final PCollection<KV<ShardedKey<K>, Iterable<V>>> original_output;
 
-    public StreamingGroupIntoBatches(DataflowRunner runner, GroupIntoBatches<K, V> original) {
+    public StreamingGroupIntoBatchesWithShardedKey(
+        DataflowRunner runner,
+        GroupIntoBatches<K, V>.WithShardedKey original,
+        PCollection<KV<ShardedKey<K>, Iterable<V>>> output) {
       this.runner = runner;
-      this.original = original;
+      this.original_transform = original;
+      this.original_output = output;
     }
 
     @Override
-    public PCollection<KV<K, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
-      runner.maybeRecordPCollectionWithAutoSharding(input);
-      return input.apply(original);
+    public PCollection<KV<ShardedKey<K>, Iterable<V>>> expand(PCollection<KV<K, V>> input) {
+      // Record the output PCollection of the original transform since the new output will be
+      // replaced by the original one when the replacement transform is wired to other nodes in the
+      // graph, although the old and the new outputs are effectively the same.
+      runner.maybeRecordPCollectionWithAutoSharding(original_output);
+      return input.apply(original_transform);
     }
   }
+
+  private static final long uuid = UUID.randomUUID().getMostSignificantBits();

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org