You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/11/03 23:00:03 UTC

[jira] [Work logged] (BEAM-12999) Improve Reshuffle Transform

     [ https://issues.apache.org/jira/browse/BEAM-12999?focusedWorklogId=675018&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-675018 ]

ASF GitHub Bot logged work on BEAM-12999:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Nov/21 22:59
            Start Date: 03/Nov/21 22:59
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #15665:
URL: https://github.com/apache/beam/pull/15665#discussion_r742406333



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -36,102 +36,121 @@
 /**
  * <b>For internal use only; no backwards compatibility guarantees.</b>
  *
- * <p>A {@link PTransform} that returns a {@link PCollection} equivalent to its input but
+ * <p>{@link PTransform}(s) that returns a {@link PCollection} equivalent to its input but
  * operationally provides some of the side effects of a {@link GroupByKey}, in particular
  * checkpointing, and preventing fusion of the surrounding transforms.
  *
  * <p>Performs a {@link GroupByKey} so that the data is key-partitioned. Configures the {@link
  * WindowingStrategy} so that no data is dropped, but doesn't affect the need for the user to
  * specify allowed lateness and accumulation mode before a user-inserted GroupByKey.
- *
- * @param <K> The type of key being reshuffled on.
- * @param <V> The type of value being reshuffled.
- * @deprecated this transform's intended side effects are not portable; it will likely be removed
  */
 @Internal
-@Deprecated
-public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
+public class Reshuffle {
 
   private Reshuffle() {}
 
-  public static <K, V> Reshuffle<K, V> of() {
-    return new Reshuffle<>();
+  /** @deprecated use {@link #perKey()}. */
+  @Deprecated
+  public static <K, V> PerKey<K, V> of() {
+    return new PerKey<>();
+  }
+
+  public static <K, V> PerKey<K, V> perKey() {
+    return new PerKey<>();
+  }
+
+  /** @deprecated use {@link #perRandomKey()}. */
+  @Deprecated
+  public static <T> PerRandomKey<T> viaRandomKey() {
+    return perRandomKey();
   }
 
   /**
-   * Encapsulates the sequence "pair input with unique key, apply {@link Reshuffle#of}, drop the
-   * key" commonly used to break fusion.
+   * Encapsulates the sequence "pair input with unique key, apply {@link Reshuffle#perKey()}, drop
+   * the key" commonly used to break fusion.
    */
   @Experimental
-  public static <T> ViaRandomKey<T> viaRandomKey() {
-    return new ViaRandomKey<>();
-  }
-
-  @Override
-  public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
-    WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
-    // If the input has already had its windows merged, then the GBK that performed the merge
-    // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
-    // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
-    // The TimestampCombiner is set to ensure the GroupByKey does not shift elements forwards in
-    // time.
-    // Because this outputs as fast as possible, this should not hold the watermark.
-    Window<KV<K, V>> rewindow =
-        Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
-            .triggering(new ReshuffleTrigger<>())
-            .discardingFiredPanes()
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
-
-    return input
-        .apply(rewindow)
-        .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
-        .apply(GroupByKey.create())
-        // Set the windowing strategy directly, so that it doesn't get counted as the user having
-        // set allowed lateness.
-        .setWindowingStrategyInternal(originalStrategy)
-        .apply(
-            "ExpandIterable",
-            ParDo.of(
-                new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, TimestampedValue<V>>>() {
-                  @ProcessElement
-                  public void processElement(
-                      @Element KV<K, Iterable<TimestampedValue<V>>> element,
-                      OutputReceiver<KV<K, TimestampedValue<V>>> r) {
-                    K key = element.getKey();
-                    for (TimestampedValue<V> value : element.getValue()) {
-                      r.output(KV.of(key, value));
-                    }
-                  }
-                }))
-        .apply("RestoreOriginalTimestamps", ReifyTimestamps.extractFromValues());
+  public static <T> PerRandomKey<T> perRandomKey() {
+    return new PerRandomKey<>();
   }
 
   /** Implementation of {@link #viaRandomKey()}. */
-  public static class ViaRandomKey<T> extends PTransform<PCollection<T>, PCollection<T>> {
-    private ViaRandomKey() {}
+  public static class PerRandomKey<T> extends PTransform<PCollection<T>, PCollection<T>> {
+    private PerRandomKey() {}
 
-    private ViaRandomKey(@Nullable Integer numBuckets) {
+    private PerRandomKey(@Nullable Integer numBuckets) {
       this.numBuckets = numBuckets;
     }
 
     // The number of buckets to shard into. This is a performance optimization to prevent having
     // unit sized bundles on the output. If unset, uses a random integer key.
     private @Nullable Integer numBuckets;
 
-    public ViaRandomKey<T> withNumBuckets(@Nullable Integer numBuckets) {
-      return new ViaRandomKey<>(numBuckets);
+    public PerRandomKey<T> withNumBuckets(@Nullable Integer numBuckets) {
+      return new PerRandomKey<>(numBuckets);
     }
 
     @Override
     public PCollection<T> expand(PCollection<T> input) {
       return input
           .apply("Pair with random key", ParDo.of(new AssignShardFn<>(numBuckets)))
-          .apply(Reshuffle.of())
+          .apply(Reshuffle.perKey())
           .apply(Values.create());
     }
   }
 
+  /**
+   * Implementation of {@link #perKey*()}.
+   *
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class PerKey<K, V>

Review comment:
       No, we don't need to change this anymore, but we may need to implement `getKindString` for PerRandomKey which was renamed. 

##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -365,6 +365,12 @@ message StandardPTransforms {
     // Represents the GroupIntoBatches.WithShardedKey operation.
     // Payload: GroupIntoBatchesPayload
     GROUP_INTO_BATCHES_WITH_SHARDED_KEY = 6 [(beam_urn) = "beam:transform:group_into_batches_with_sharded_key:v1"];
+
+    // Represents the Reshuffle.perKey() operation.
+    RESHUFFLE_PER_KEY = 7 [(beam_urn) = "beam:transform:reshuffle_per_key:v1"];
+
+    // Represents the Reshuffle.perRandomKey() operation.
+    RESHUFFLE_PER_RANDOM_KEY = 8 [(beam_urn) = "beam:transform:reshuffle_per_random_key:v1"];

Review comment:
       Yes. We could rename `Reshuffle.perRandomKey()` to `Reshuffle.elements()` as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 675018)
    Time Spent: 14h 50m  (was: 14h 40m)

> Improve Reshuffle Transform
> ---------------------------
>
>                 Key: BEAM-12999
>                 URL: https://issues.apache.org/jira/browse/BEAM-12999
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go, sdk-java-core, sdk-py-core
>            Reporter: Ke Wu
>            Assignee: Ke Wu
>            Priority: P2
>          Time Spent: 14h 50m
>  Remaining Estimate: 0h
>
> See discussion [https://lists.apache.org/thread.html/r83adaad3a512ad186f2f9dc9dc4bec2a789070677c07cdcaad6fcfa5%40%3Cdev.beam.apache.org%3E] 
>  
> “beam:transform:reshuffle:v1" Transform represents different semantic transforms in different SDKs. The proposal is to replace "beam:transform:reshuffle:v1" with two new urns, one to represent reshuffle KV PCollection using the K, and the other to reshuffle based on random key.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)