You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/20 18:59:09 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #11406: [BEAM-9748] Refactor Reparallelize as an alternative Reshuffle implementation

lukecwik commented on a change in pull request #11406:
URL: https://github.com/apache/beam/pull/11406#discussion_r428239285



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -107,10 +108,57 @@ public void processElement(
 
   /** Implementation of {@link #viaRandomKey()}. */
   public static class ViaRandomKey<T> extends PTransform<PCollection<T>, PCollection<T>> {
+    private boolean isHighFanoutAndLimitedInputParallelism;
+
     private ViaRandomKey() {}
 
+    /**
+     * Use a different strategy that materializes the input and prepares it to be consumed in a
+     * highly parallel fashion.
+     *
+     * <p>It is tailored to the case when input was produced in an extremely sequential way -
+     * typically by a ParDo that emits millions of outputs _per input element_, e.g., executing a
+     * large database query or a large simulation and emitting all of their results.
+     *
+     * <p>Internally, it materializes the input at a moderate cost before reshuffling it, making the
+     * reshuffling itself significantly cheaper in these extreme cases on some runners. Use this
+     * only if your benchmarks show an improvement.
+     */
+    public ViaRandomKey<T> withHintHighFanoutAndLimitedInputParallelism() {
+      this.isHighFanoutAndLimitedInputParallelism = true;

Review comment:
       Lets return a new PTransform like our other PTransform builder patterns instead of mutating the state of the current transform.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -107,10 +108,57 @@ public void processElement(
 
   /** Implementation of {@link #viaRandomKey()}. */
   public static class ViaRandomKey<T> extends PTransform<PCollection<T>, PCollection<T>> {
+    private boolean isHighFanoutAndLimitedInputParallelism;
+
     private ViaRandomKey() {}
 
+    /**
+     * Use a different strategy that materializes the input and prepares it to be consumed in a
+     * highly parallel fashion.
+     *
+     * <p>It is tailored to the case when input was produced in an extremely sequential way -
+     * typically by a ParDo that emits millions of outputs _per input element_, e.g., executing a
+     * large database query or a large simulation and emitting all of their results.
+     *
+     * <p>Internally, it materializes the input at a moderate cost before reshuffling it, making the
+     * reshuffling itself significantly cheaper in these extreme cases on some runners. Use this
+     * only if your benchmarks show an improvement.
+     */
+    public ViaRandomKey<T> withHintHighFanoutAndLimitedInputParallelism() {
+      this.isHighFanoutAndLimitedInputParallelism = true;
+      return this;
+    }
+
     @Override
     public PCollection<T> expand(PCollection<T> input) {
+      if (isHighFanoutAndLimitedInputParallelism) {
+        // See https://issues.apache.org/jira/browse/BEAM-2803
+        // We use a combined approach to "break fusion" here:
+        // (see https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion)
+        // 1) force the data to be materialized by passing it as a side input to an identity fn,
+        // then 2) reshuffle it with a random key. Initial materialization provides some parallelism
+        // and ensures that data to be shuffled can be generated in parallel, while reshuffling
+        // provides perfect parallelism.
+        // In most cases where a "fusion break" is needed, a simple reshuffle would be sufficient.
+        // The current approach is necessary only to support the particular case of JdbcIO where

Review comment:
       ```suggestion
           // The current approach is necessary to support use cases such as JdbcIO where
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org