You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/11/28 21:50:30 UTC

[GitHub] [beam] cozos commented on issue #24365: [Bug]: No parallelism using WriteToParquet in Apache Spark

cozos commented on issue #24365:
URL: https://github.com/apache/beam/issues/24365#issuecomment-1329802297

   Hi @Abacn, thanks for your response. 
   
   Upon a closer reading of  [iobase._WriteBundleDoFn](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py#L1190-L1203), I realized that it  does not actually return a pcollection of all elements, but rather a pcollection of all the file paths that the elements were written to. This makes the `None` `GroupByKey` a bit better, as the shuffle skew is only happening on the number of files (several hundred or thousands) which is a much smaller magnitude than elements/rows (millions). 
   
   With this in mind, the poor performance from the GroupByKey is perplexing, especially since it seems to work fine in GCP Dataflow but not Spark. Any ideas? 
   
   Here is where my Spark job is stuck on the Beam-to-Spark translation:
   
   ```
   org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:45)
   org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.groupByKeyInGlobalWindow(GroupNonMergingWindowsFunctions.java:272)
   org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator.translateGroupByKey(SparkBatchPortablePipelineTranslator.java:203)
   org.apache.beam.runners.spark.translation.SparkBatchPortablePipelineTranslator.translate(SparkBatchPortablePipelineTranslator.java:158)
   org.apache.beam.runners.spark.SparkPipelineRunner.lambda$run$2(SparkPipelineRunner.java:189)
   java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   java.util.concurrent.FutureTask.run(FutureTask.java:266)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   java.lang.Thread.run(Thread.java:748)
   ```
   
   > We should be able to replace the change in https://github.com/apache/beam/pull/958 to a ReShuffle(). Would you mind testing if it resolves your issue and appreciate if opening a PR?
   
   I will give this a try.
   
   Thanks
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org