You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/16 17:31:04 UTC

[GitHub] [beam] kw2542 commented on a change in pull request #15665: [BEAM-12999] Add ReshufflePerKey and ReshufflePerRandomKey Transform

kw2542 commented on a change in pull request #15665:
URL: https://github.com/apache/beam/pull/15665#discussion_r750508469



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -36,102 +36,121 @@
 /**
  * <b>For internal use only; no backwards compatibility guarantees.</b>
  *
- * <p>A {@link PTransform} that returns a {@link PCollection} equivalent to its input but
+ * <p>{@link PTransform}(s) that returns a {@link PCollection} equivalent to its input but
  * operationally provides some of the side effects of a {@link GroupByKey}, in particular
  * checkpointing, and preventing fusion of the surrounding transforms.
  *
  * <p>Performs a {@link GroupByKey} so that the data is key-partitioned. Configures the {@link
  * WindowingStrategy} so that no data is dropped, but doesn't affect the need for the user to
  * specify allowed lateness and accumulation mode before a user-inserted GroupByKey.
- *
- * @param <K> The type of key being reshuffled on.
- * @param <V> The type of value being reshuffled.
- * @deprecated this transform's intended side effects are not portable; it will likely be removed
  */
 @Internal
-@Deprecated
-public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+public class Reshuffle {
 
   private Reshuffle() {}
 
-  public static <K, V> Reshuffle<K, V> of() {
-    return new Reshuffle<>();
+  /** @deprecated use {@link #perKey()}. */
+  @Deprecated
+  public static <K, V> PerKey<K, V> of() {
+    return new PerKey<>();
+  }
+
+  public static <K, V> PerKey<K, V> perKey() {
+    return new PerKey<>();
+  }
+
+  /** @deprecated use {@link #perRandomKey()}. */
+  @Deprecated
+  public static <T> PerRandomKey<T> viaRandomKey() {
+    return perRandomKey();
   }
 
   /**
-   * Encapsulates the sequence "pair input with unique key, apply {@link Reshuffle#of}, drop the
-   * key" commonly used to break fusion.
+   * Encapsulates the sequence "pair input with unique key, apply {@link Reshuffle#perKey()}, drop
+   * the key" commonly used to break fusion.
    */
   @Experimental
-  public static <T> ViaRandomKey<T> viaRandomKey() {
-    return new ViaRandomKey<>();
-  }
-
-  @Override
-  public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
-    WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
-    // If the input has already had its windows merged, then the GBK that performed the merge
-    // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
-    // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
-    // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
-    // time.
-    // Because this outputs as fast as possible, this should not hold the watermark.
-    Window<KV<K, V>> rewindow =
-        Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
-            .triggering(new ReshuffleTrigger<>())
-            .discardingFiredPanes()
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
-
-    return input
-        .apply(rewindow)
-        .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
-        .apply(GroupByKey.create())
-        // Set the windowing strategy directly, so that it doesn't get counted as the user having
-        // set allowed lateness.
-        .setWindowingStrategyInternal(originalStrategy)
-        .apply(
-            "ExpandIterable",
-            ParDo.of(
-                new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, TimestampedValue<V>>>() {
-                  @ProcessElement
-                  public void processElement(
-                      @Element KV<K, Iterable<TimestampedValue<V>>> element,
-                      OutputReceiver<KV<K, TimestampedValue<V>>> r) {
-                    K key = element.getKey();
-                    for (TimestampedValue<V> value : element.getValue()) {
-                      r.output(KV.of(key, value));
-                    }
-                  }
-                }))
-        .apply("RestoreOriginalTimestamps", ReifyTimestamps.extractFromValues());
+  public static <T> PerRandomKey<T> perRandomKey() {
+    return new PerRandomKey<>();
   }
 
   /** Implementation of {@link #viaRandomKey()}. */
-  public static class ViaRandomKey<T> extends PTransform<PCollection<T>, PCollection<T>> {
-    private ViaRandomKey() {}
+  public static class PerRandomKey<T> extends PTransform<PCollection<T>, PCollection<T>> {
+    private PerRandomKey() {}
 
-    private ViaRandomKey(@Nullable Integer numBuckets) {
+    private PerRandomKey(@Nullable Integer numBuckets) {
       this.numBuckets = numBuckets;
     }
 
     // The number of buckets to shard into. This is a performance optimization to prevent having
     // unit sized bundles on the output. If unset, uses a random integer key.
     private @Nullable Integer numBuckets;
 
-    public ViaRandomKey<T> withNumBuckets(@Nullable Integer numBuckets) {
-      return new ViaRandomKey<>(numBuckets);
+    public PerRandomKey<T> withNumBuckets(@Nullable Integer numBuckets) {
+      return new PerRandomKey<>(numBuckets);
     }
 
     @Override
     public PCollection<T> expand(PCollection<T> input) {
       return input
           .apply("Pair with random key", ParDo.of(new AssignShardFn<>(numBuckets)))
-          .apply(Reshuffle.of())
+          .apply(Reshuffle.perKey())
           .apply(Values.create());
     }
   }
 
+  /**
+   * Implementation of {@link #perKey*()}.
+   *
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class PerKey<K, V>

Review comment:
       Updated the PR to follow the same pattern where I kept the old ptransform but expands into the new ptransform. I suppose this could help address backward compatibiilty concern.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org