You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/13 20:12:39 UTC

[GitHub] [beam] iemejia opened a new pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

iemejia opened a new pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406
 
 
   Some DoFn based IOs like JdbcIO and RedisIO rely on the Reparallelize transform, a combination of a an empty PCollectionView and Reshuffle to force the materialization and reparallelize a PCollection. The idea of this issue is to extract this transform and expose it as part of the internal Reshuffle transform to avoid repeating the code for transforms (notably IOs) that require to reparallelize its output.
   
   R: @lukecwik 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] iemejia commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
iemejia commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#discussion_r407778080
 
 

 ##########
 File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##########
 @@ -309,7 +305,7 @@ public ReadAll withOutputParallelization(boolean outputParallelization) {
               .apply(ParDo.of(new ReadFn(connectionConfiguration(), batchSize())))
               .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
       if (outputParallelization()) {
-        output = output.apply(new Reparallelize());
+        output = (PCollection<KV<String, String>>) output.apply(Reshuffle.reparallelize());
 
 Review comment:
   Not necessary, I will remove it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jkff commented on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
jkff commented on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#issuecomment-613147215
 
 
   The comments inside Reparallelize explain how this transform differs from Reshuffle.viaRandomKey(): it performs dramatically better on Dataflow in case the input PCollection is generated highly sequentially, as in the case of reading several GB of JDBC results. It almost certainly performs somewhat worse if the input PCollection is generated in a well-parallelized way, but I haven't measured that; I haven't measured the former case for non-Dataflow runners either.
   
   I think it's reasonable to move this to Reshuffle, but rename it to something more clear: maybe Reshuffle.forSequentiallyGeneratedInput()?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik edited a comment on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
lukecwik edited a comment on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#issuecomment-614121831
 
 
   > The comments inside Reparallelize explain how this transform differs from Reshuffle.viaRandomKey(): it performs dramatically better on Dataflow in case the input PCollection is generated highly sequentially, as in the case of reading several GB of JDBC results. It almost certainly performs somewhat worse if the input PCollection is generated in a well-parallelized way, but I haven't measured that; I haven't measured the former case for non-Dataflow runners either.
   > 
   > I think it's reasonable to move this to Reshuffle, but rename it to something more clear: maybe Reshuffle.forSequentiallyGeneratedInput()?
   
   I see. You would imagine that two materializations once because of the side input fusion break and then the reshuffle would be more expensive overall.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] iemejia commented on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
iemejia commented on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#issuecomment-613706077
 
 
   Changes addressed and renamed to the suggested name by Eugene. PTAL again @lukecwik 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] iemejia commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
iemejia commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#discussion_r407778247
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
 ##########
 @@ -65,6 +66,11 @@ private Reshuffle() {}
     return new ViaRandomKey<>();
   }
 
+  @Experimental
+  public static <T> Reparallelize<T> reparallelize() {
 
 Review comment:
   Yes I will improve the javadoc, maybe move some of the details in the impl. comment below.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] iemejia edited a comment on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
iemejia edited a comment on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#issuecomment-613144797
 
 
   > Why do we want to do this over Reshuffle.viaRandomKeys which should get us the output parallelization we want?
   
   I think that's the case, maybe @jkff who created that code may confirm.
   There seems to be some more details in the implementation choice in the original ticket [BEAM-2803](https://issues.apache.org/jira/browse/BEAM-2803)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#discussion_r407698258
 
 

 ##########
 File path: sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
 ##########
 @@ -309,7 +305,7 @@ public ReadAll withOutputParallelization(boolean outputParallelization) {
               .apply(ParDo.of(new ReadFn(connectionConfiguration(), batchSize())))
               .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
       if (outputParallelization()) {
-        output = output.apply(new Reparallelize());
+        output = (PCollection<KV<String, String>>) output.apply(Reshuffle.reparallelize());
 
 Review comment:
   nit: Why is the cast necessary here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik edited a comment on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
lukecwik edited a comment on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#issuecomment-614121831
 
 
   > The comments inside Reparallelize explain how this transform differs from Reshuffle.viaRandomKey(): it performs dramatically better on Dataflow in case the input PCollection is generated highly sequentially, as in the case of reading several GB of JDBC results. It almost certainly performs somewhat worse if the input PCollection is generated in a well-parallelized way, but I haven't measured that; I haven't measured the former case for non-Dataflow runners either.
   > 
   > I think it's reasonable to move this to Reshuffle, but rename it to something more clear: maybe Reshuffle.forSequentiallyGeneratedInput()?
   
   This was true at some point in time for Dataflow but I don't think we have rerun benchmarks to see how it performs now. Shuffle performance has improved a lot while side input performance has remained relatively flat for the iterable use case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#discussion_r408960107
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
 ##########
 @@ -65,6 +66,16 @@ private Reshuffle() {}
     return new ViaRandomKey<>();
   }
 
+  @Experimental
 
 Review comment:
   I think the key is that the input was generated with limited parallelism and suggest that we update `ViaRandomKey` to perform both expansions where the side input fusion break is added if the `withHintLimitedInputParallelism()` is specified.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] iemejia commented on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
iemejia commented on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#issuecomment-613144797
 
 
   > Why do we want to do this over Reshuffle.viaRandomKeys which should get us the output parallelization we want?
   I think that's the case, maybe @jkff who created that code may confirm.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#discussion_r408960107
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
 ##########
 @@ -65,6 +66,16 @@ private Reshuffle() {}
     return new ViaRandomKey<>();
   }
 
+  @Experimental
 
 Review comment:
   I think the key is that the large input was generated with limited parallelism and suggest that we update `ViaRandomKey` to perform both expansions where the side input fusion break is added if the `withHintHighFanoutAndLimitedInputParallelism()` is specified.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#issuecomment-614121831
 
 
   > The comments inside Reparallelize explain how this transform differs from Reshuffle.viaRandomKey(): it performs dramatically better on Dataflow in case the input PCollection is generated highly sequentially, as in the case of reading several GB of JDBC results. It almost certainly performs somewhat worse if the input PCollection is generated in a well-parallelized way, but I haven't measured that; I haven't measured the former case for non-Dataflow runners either.
   > 
   > I think it's reasonable to move this to Reshuffle, but rename it to something more clear: maybe Reshuffle.forSequentiallyGeneratedInput()?
   
   This was true at some point in time for Dataflow but I don't think we have rerun benchmarks to see how it performs now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] iemejia edited a comment on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
iemejia edited a comment on issue #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#issuecomment-613144797
 
 
   > Why do we want to do this over Reshuffle.viaRandomKeys which should get us the output parallelization we want?
   
   I think that's the case, maybe @jkff who created that code may confirm.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jkff commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
jkff commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#discussion_r408998366
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
 ##########
 @@ -65,6 +66,16 @@ private Reshuffle() {}
     return new ViaRandomKey<>();
   }
 
+  @Experimental
 
 Review comment:
   I think this (`withHint...`) is a great idea, thanks Luke. It also wouldn't hurt to rerun the benchmarks (and to check them in - sadly I didn't do it back then, and probably don't have them anymore, but it only took like 15 min to write).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] jkff commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
jkff commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#discussion_r408484775
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
 ##########
 @@ -65,6 +66,16 @@ private Reshuffle() {}
     return new ViaRandomKey<>();
   }
 
+  @Experimental
 
 Review comment:
   Annotation should probably be below the comment.
   
   Also suggest rephrasing a bit, to explain when one should use one or the other, and what are the consequences of choosing wrong. I'm not sure that "sequentially generated" is clear enough to a casual user. Maybe something like this:
   
   ```
   Materializes the input and prepares it to be consumed in a highly parallel fashion.
   
   This version is tailored to the case when input was produced in an extremely sequential way - typically by a ParDo that emits millions of outputs _per input element_, e.g., executing a large database query or a large simulation and emitting all of their results.
   
   Internally, this version first materializes the input at a moderate cost before reshuffling it internally using viaRandomKey(), making the reshuffling itself significantly cheaper in these extreme cases on some runners. Use this over viaRandomKey() only if your benchmarks show an improvement.
   ```
   
   And mention this at the class-level documentation of Reshuffle for visibility.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11406: [BEAM-9748] Move Reparallelize transform to Reshuffle
URL: https://github.com/apache/beam/pull/11406#discussion_r407698827
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
 ##########
 @@ -65,6 +66,11 @@ private Reshuffle() {}
     return new ViaRandomKey<>();
   }
 
+  @Experimental
+  public static <T> Reparallelize<T> reparallelize() {
 
 Review comment:
   It is unclear to a user why they would choose this implementation over viaRandomKeys

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services