You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Aviem Zur (JIRA)" <ji...@apache.org> on 2017/03/28 03:02:41 UTC

[jira] [Updated] (BEAM-1074) Set default-partitioner in SourceRDD.Unbounded.

     [ https://issues.apache.org/jira/browse/BEAM-1074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aviem Zur updated BEAM-1074:
----------------------------
    Description: 
The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and this stateful operation will be followed by a shuffle: 
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159

Since the stateful read maps "splitSource" -> "partition of a list of read values", the following shuffle won't benefit in any way (the list of read values has not been flatMapped yet). In order to avoid shuffle we need to set the input RDD ({{SourceRDD.Unbounded}}) partitioner to be a default {{HashPartitioner}} since {{mapWithState}} would use the same partitioner and will skip shuffle if the partitioners match.

  was:This will make sure the following stateful read within {{mapWithState}} won't shuffle the read values as long as they are grouped in a {{List}}.


> Set default-partitioner in SourceRDD.Unbounded.
> -----------------------------------------------
>
>                 Key: BEAM-1074
>                 URL: https://issues.apache.org/jira/browse/BEAM-1074
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-spark
>            Reporter: Amit Sela
>            Assignee: Aviem Zur
>             Fix For: First stable release
>
>
> The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and this stateful operation will be followed by a shuffle: 
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159
> Since the stateful read maps "splitSource" -> "partition of a list of read values", the following shuffle won't benefit in any way (the list of read values has not been flatMapped yet). In order to avoid shuffle we need to set the input RDD ({{SourceRDD.Unbounded}}) partitioner to be a default {{HashPartitioner}} since {{mapWithState}} would use the same partitioner and will skip shuffle if the partitioners match.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)