You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2015/01/21 23:04:35 UTC

[jira] [Commented] (SPARK-5360) For CoGroupedRDD, rdds for narrow dependencies and shuffle handles are included twice in serialized task

    [ https://issues.apache.org/jira/browse/SPARK-5360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14286386#comment-14286386 ] 

Apache Spark commented on SPARK-5360:
-------------------------------------

User 'kayousterhout' has created a pull request for this issue:
https://github.com/apache/spark/pull/4145

> For CoGroupedRDD, rdds for narrow dependencies and shuffle handles are included twice in serialized task
> --------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5360
>                 URL: https://issues.apache.org/jira/browse/SPARK-5360
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.2.0
>            Reporter: Kay Ousterhout
>            Assignee: Kay Ousterhout
>            Priority: Minor
>
> CoGroupPartition, part of CoGroupedRDD, includes references to each RDD that the CoGroupedRDD narrowly depends on, and a reference to the ShuffleHandle.  The partition is serialized separately from the RDD, so when the RDD and partition arrive on the worker, the references in the partition and in the RDD no longer point to the same object.
> This is a relatively minor performance issue (the closure can be 2x larger than it needs to be because the rdds and partitions are serialized twice; see numbers below) but is more annoying as a developer issue (this is where I ran into): if any state is stored in the RDD or ShuffleHandle on the worker side, subtle bugs can appear due to the fact that the references to the RDD / ShuffleHandle in the RDD and in the partition point to separate objects.  I'm not sure if this is enough of a potential future problem to fix this old and central part of the code, so hoping to get input from others here.
> I did some simple experiments to see how much this effects closure size.  For this example: 
> $ val a = sc.parallelize(1 to 10).map((_, 1))
> $ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
> $ a.cogroup(b).collect()
> the closure was 1902 bytes with current Spark, and 1129 bytes after my change.  The difference comes from eliminating duplicate serialization of the shuffle handle.
> For this example:
> $ val sortedA = a.sortByKey()
> $ val sortedB = b.sortByKey()
> $ sortedA.cogroup(sortedB).collect()
> the closure was 3491 bytes with current Spark, and 1333 bytes after my change. Here, the difference comes from eliminating duplicate serialization of the two RDDs for the narrow dependencies.
> The ShuffleHandle includes the ShuffleDependency, so this difference will get larger if a ShuffleDependency includes a serializer, a key ordering, or an aggregator (all set to None by default).  However, the difference is not affected by the size of the function the user specifies, which (based on my understanding) is typically the source of large task closures.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org