You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/18 09:35:03 UTC

[GitHub] [beam] je-ik commented on a change in pull request #15665: [BEAM-12999] Add ReshufflePerKey and ReshufflePerRandomKey Transform

je-ik commented on a change in pull request #15665:
URL: https://github.com/apache/beam/pull/15665#discussion_r752053222



##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
##########
@@ -422,6 +423,38 @@ public void translateNode(
     }
   }
 
+  private static class ReshuffleKeysTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle.Keys<K, InputT>> {
+
+    @Override
+    public void translateNode(

Review comment:
       Could we avoid the copy&paste here and wrap the code into a reusable utility class instead?

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -123,15 +95,88 @@ private ViaRandomKey(@Nullable Integer numBuckets) {
       return new ViaRandomKey<>(numBuckets);
     }
 
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return input.apply(new Elements<>(numBuckets));
+    }
+  }
+
+  /** Implementation of {@link #elements()}. */
+  public static class Elements<T> extends PTransform<PCollection<T>, PCollection<T>> {
+    private Elements() {}
+
+    private Elements(@Nullable Integer numBuckets) {
+      this.numBuckets = numBuckets;
+    }
+
+    // The number of buckets to shard into. This is a performance optimization to prevent having
+    // unit sized bundles on the output. If unset, uses a random integer key.
+    private @Nullable Integer numBuckets;
+
+    public Elements<T> withNumBuckets(@Nullable Integer numBuckets) {
+      return new Elements<>(numBuckets);
+    }
+
     @Override
     public PCollection<T> expand(PCollection<T> input) {
       return input
           .apply("Pair with random key", ParDo.of(new AssignShardFn<>(numBuckets)))
-          .apply(Reshuffle.of())
+          .apply(Reshuffle.keys())
           .apply(Values.create());
     }
   }
 
+  /**
+   * Implementation of {@link #keys *()}.
+   *
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class Keys<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {

Review comment:
       I'm not 100% sure, but this it seems to me, that we still need to override `getKindString` to preserve the original state names. Maybe @robertwb can correct me, if I'm wrong.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
##########
@@ -728,6 +728,36 @@ public String toNativeString() {
     };
   }
 
+  private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle.Keys<K, V>> reshuffleKeys() {

Review comment:
       Same here, this looks like a copy&paste as well.

##########
File path: runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
##########
@@ -530,6 +530,38 @@ public String toNativeString() {
     };
   }
 
+  private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle.Keys<K, V>> reshuffleKeys() {

Review comment:
       As well as here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org