You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/07 23:42:45 UTC

[GitHub] [beam] robertwb commented on a change in pull request #15665: [BEAM-12999] Add ReshufflePerKey and ReshufflePerRandomKey Transform

robertwb commented on a change in pull request #15665:
URL: https://github.com/apache/beam/pull/15665#discussion_r724599383



##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -365,6 +365,12 @@ message StandardPTransforms {
     // Represents the GroupIntoBatches.WithShardedKey operation.
     // Payload: GroupIntoBatchesPayload
     GROUP_INTO_BATCHES_WITH_SHARDED_KEY = 6 [(beam_urn) = "beam:transform:group_into_batches_with_sharded_key:v1"];
+
+    // Represents the Reshuffle.perKey() operation.
+    RESHUFFLE_PER_KEY = 7 [(beam_urn) = "beam:transform:reshuffle_per_key:v1"];
+
+    // Represents the Reshuffle.perRandomKey() operation.

Review comment:
       Similarly
   
   Input: T
   Output: T
   
   The output of this operation, including windowing operation should be identical to its input, but ideally redistributed evenly among the available workers. 

##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -365,6 +365,12 @@ message StandardPTransforms {
     // Represents the GroupIntoBatches.WithShardedKey operation.
     // Payload: GroupIntoBatchesPayload
     GROUP_INTO_BATCHES_WITH_SHARDED_KEY = 6 [(beam_urn) = "beam:transform:group_into_batches_with_sharded_key:v1"];
+
+    // Represents the Reshuffle.perKey() operation.

Review comment:
       It would be nice to explain a bit more what this is other than a reference to the java transform, e.g. 
   
   Input: KV<K, V>
   Output: KV<K, V>
   
   The output of this operation, including windowing operation should be identical to its input, but ideally redistributed among the available workers with all data of a given key going to the same worker. 

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
##########
@@ -153,6 +153,9 @@ public static FlinkBatchPortablePipelineTranslator createTranslator() {
     translatorMap.put(
         PTransformTranslation.RESHUFFLE_URN,
         FlinkBatchPortablePipelineTranslator::translateReshuffle);

Review comment:
       Should we go ahead and rename the FlinkBatchPortablePipelineTranslator::translateReshuffle method as well? 

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -36,102 +36,121 @@
 /**
  * <b>For internal use only; no backwards compatibility guarantees.</b>
  *
- * <p>A {@link PTransform} that returns a {@link PCollection} equivalent to its input but
+ * <p>{@link PTransform}(s) that returns a {@link PCollection} equivalent to its input but
  * operationally provides some of the side effects of a {@link GroupByKey}, in particular
  * checkpointing, and preventing fusion of the surrounding transforms.
  *
  * <p>Performs a {@link GroupByKey} so that the data is key-partitioned. Configures the {@link
  * WindowingStrategy} so that no data is dropped, but doesn't affect the need for the user to
  * specify allowed lateness and accumulation mode before a user-inserted GroupByKey.
- *
- * @param <K> The type of key being reshuffled on.
- * @param <V> The type of value being reshuffled.
- * @deprecated this transform's intended side effects are not portable; it will likely be removed
  */
 @Internal
-@Deprecated
-public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+public class Reshuffle {
 
   private Reshuffle() {}
 
-  public static <K, V> Reshuffle<K, V> of() {
-    return new Reshuffle<>();
+  /** @deprecated use {@link #perKey()}. */
+  @Deprecated
+  public static <K, V> PerKey<K, V> of() {
+    return new PerKey<>();
+  }
+
+  public static <K, V> PerKey<K, V> perKey() {
+    return new PerKey<>();
+  }
+
+  /** @deprecated use {@link #perRandomKey()}. */
+  @Deprecated
+  public static <T> PerRandomKey<T> viaRandomKey() {
+    return perRandomKey();
   }
 
   /**
-   * Encapsulates the sequence "pair input with unique key, apply {@link Reshuffle#of}, drop the
-   * key" commonly used to break fusion.
+   * Encapsulates the sequence "pair input with unique key, apply {@link Reshuffle#perKey()}, drop
+   * the key" commonly used to break fusion.
    */
   @Experimental
-  public static <T> ViaRandomKey<T> viaRandomKey() {
-    return new ViaRandomKey<>();
-  }
-
-  @Override
-  public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
-    WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
-    // If the input has already had its windows merged, then the GBK that performed the merge
-    // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
-    // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
-    // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
-    // time.
-    // Because this outputs as fast as possible, this should not hold the watermark.
-    Window<KV<K, V>> rewindow =
-        Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
-            .triggering(new ReshuffleTrigger<>())
-            .discardingFiredPanes()
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
-
-    return input
-        .apply(rewindow)
-        .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
-        .apply(GroupByKey.create())
-        // Set the windowing strategy directly, so that it doesn't get counted as the user having
-        // set allowed lateness.
-        .setWindowingStrategyInternal(originalStrategy)
-        .apply(
-            "ExpandIterable",
-            ParDo.of(
-                new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, TimestampedValue<V>>>() {
-                  @ProcessElement
-                  public void processElement(
-                      @Element KV<K, Iterable<TimestampedValue<V>>> element,
-                      OutputReceiver<KV<K, TimestampedValue<V>>> r) {
-                    K key = element.getKey();
-                    for (TimestampedValue<V> value : element.getValue()) {
-                      r.output(KV.of(key, value));
-                    }
-                  }
-                }))
-        .apply("RestoreOriginalTimestamps", ReifyTimestamps.extractFromValues());
+  public static <T> PerRandomKey<T> perRandomKey() {
+    return new PerRandomKey<>();
   }
 
   /** Implementation of {@link #viaRandomKey()}. */
-  public static class ViaRandomKey<T> extends PTransform<PCollection<T>, PCollection<T>> {
-    private ViaRandomKey() {}
+  public static class PerRandomKey<T> extends PTransform<PCollection<T>, PCollection<T>> {
+    private PerRandomKey() {}
 
-    private ViaRandomKey(@Nullable Integer numBuckets) {
+    private PerRandomKey(@Nullable Integer numBuckets) {
       this.numBuckets = numBuckets;
     }
 
     // The number of buckets to shard into. This is a performance optimization to prevent having
     // unit sized bundles on the output. If unset, uses a random integer key.
     private @Nullable Integer numBuckets;
 
-    public ViaRandomKey<T> withNumBuckets(@Nullable Integer numBuckets) {
-      return new ViaRandomKey<>(numBuckets);
+    public PerRandomKey<T> withNumBuckets(@Nullable Integer numBuckets) {
+      return new PerRandomKey<>(numBuckets);
     }
 
     @Override
     public PCollection<T> expand(PCollection<T> input) {
       return input
           .apply("Pair with random key", ParDo.of(new AssignShardFn<>(numBuckets)))
-          .apply(Reshuffle.of())
+          .apply(Reshuffle.perKey())
           .apply(Values.create());
     }
   }
 
+  /**
+   * Implementation of {@link #perKey*()}.
+   *
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class PerKey<K, V>

Review comment:
       +1 to this move, but should we override `getKindString` to return `Reshuffle` for backwards (update) compatibility? 

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
##########
@@ -223,7 +223,7 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
     translatorMap.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, this::translateGroupByKey);
     translatorMap.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, this::translateImpulse);
     translatorMap.put(ExecutableStage.URN, this::translateExecutableStage);
-    translatorMap.put(PTransformTranslation.RESHUFFLE_URN, this::translateReshuffle);

Review comment:
       Similarly. 

##########
File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
##########
@@ -128,7 +128,7 @@
         new NonMergingGroupByKeyTranslatorBatch<>());
     TRANSLATORS.put(
         PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslatorBatch<>());
-    TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch<>());

Review comment:
       We'll want to keep the old one around as well, as other SDKs (e.g. Python) still produce it. 

##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -365,6 +365,12 @@ message StandardPTransforms {
     // Represents the GroupIntoBatches.WithShardedKey operation.
     // Payload: GroupIntoBatchesPayload
     GROUP_INTO_BATCHES_WITH_SHARDED_KEY = 6 [(beam_urn) = "beam:transform:group_into_batches_with_sharded_key:v1"];
+
+    // Represents the Reshuffle.perKey() operation.
+    RESHUFFLE_PER_KEY = 7 [(beam_urn) = "beam:transform:reshuffle_per_key:v1"];
+
+    // Represents the Reshuffle.perRandomKey() operation.
+    RESHUFFLE_PER_RANDOM_KEY = 8 [(beam_urn) = "beam:transform:reshuffle_per_random_key:v1"];

Review comment:
       Should this just be called "RESHUFFLE_ELEMENTS." Using a random key is a (possible, not necessary) implementation detail. The other could be called "RESHUFFLE_KEYS."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org