You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Amit Sela (JIRA)" <ji...@apache.org> on 2016/12/02 17:04:58 UTC

[jira] [Updated] (BEAM-848) Make post-read (unbounded) shuffle use coders instead of Kryo.

     [ https://issues.apache.org/jira/browse/BEAM-848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Amit Sela updated BEAM-848:
---------------------------
    Description: 
The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and this stateful operation will be followed by a shuffle: 
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159

Since the stateful read maps "splitSource" -> "partition of a list of read values", the following shuffle won't benefit in any way (the list of read values has not been flatMapped yet). In order to avoid shuffle we need to set the input RDD ({{SourceRDD.Unbounded}}) partitioner to be a default {{HashPartitioner}} since {{mapWithState}} would use the same partitioner and will skip shuffle if the partitioners match.

It would be wise to shuffle the read values _after_ flatmap.

I will break this into two tasks:
# Set default-partitioner to the input RDD.
# Shuffle (using Coders) the input.

  was:
The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and this stateful operation will be followed by a shuffle: 
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159

It would be best to use coders here: https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java#L71 in order to get an Iterator<byte[]> and decode afterwards.

This method is preferred for two reasons:
1. Known coding should be faster then Kryo.
2. Users are already required to explicitly use coders when authoring a Pipeline and the runner should avoid adding complexity of "registering" additional serializers. 


> Make post-read (unbounded) shuffle use coders instead of Kryo.
> --------------------------------------------------------------
>
>                 Key: BEAM-848
>                 URL: https://issues.apache.org/jira/browse/BEAM-848
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Amit Sela
>            Assignee: Amit Sela
>
> The SparkRunner uses {{mapWithState}} to read and manage CheckpointMarks, and this stateful operation will be followed by a shuffle: 
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/MapWithStateDStream.scala#L159
> Since the stateful read maps "splitSource" -> "partition of a list of read values", the following shuffle won't benefit in any way (the list of read values has not been flatMapped yet). In order to avoid shuffle we need to set the input RDD ({{SourceRDD.Unbounded}}) partitioner to be a default {{HashPartitioner}} since {{mapWithState}} would use the same partitioner and will skip shuffle if the partitioners match.
> It would be wise to shuffle the read values _after_ flatmap.
> I will break this into two tasks:
> # Set default-partitioner to the input RDD.
> # Shuffle (using Coders) the input.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)