You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (JIRA)" <ji...@apache.org> on 2018/06/20 05:30:00 UTC

[jira] [Assigned] (SPARK-23778) SparkContext.emptyRDD confuses SparkContext.union

     [ https://issues.apache.org/jira/browse/SPARK-23778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenchen Fan reassigned SPARK-23778:
-----------------------------------

    Assignee: Marco Gaido

> SparkContext.emptyRDD confuses SparkContext.union
> -------------------------------------------------
>
>                 Key: SPARK-23778
>                 URL: https://issues.apache.org/jira/browse/SPARK-23778
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.3.0, 2.3.0
>            Reporter: Stefano Pettini
>            Assignee: Marco Gaido
>            Priority: Trivial
>             Fix For: 2.4.0
>
>         Attachments: as_it_should_be.png, partitioner_lost_and_unneeded_extra_stage.png
>
>
> SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether it's partitioned or not should be just a academic debate. Unfortunately it doesn't seem to be like this and the issue has side effects.
> Namely, it confuses the RDD union.
> When there are N classic RDDs partitioned the same way, the union is implemented with the optimized PartitionerAwareUnionRDD, that retains the common partitioner in the result. If one of the N RDDs happens to be an emptyRDD, as it doesn't have a partitioner, the union is implemented by just appending all the partitions of the N RDDs, dropping the partitioner. But there's no need for this, as the emptyRDD contains no elements. This results in further unneeded shuffles once the result of the union is used.
> See for example:
> {{val p = new HashPartitioner(3)}}
> {{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / 10).partitionBy(p)}}
> {{val b1 = a.mapValues(_ + 1)}}
> {{val b2 = a.mapValues(_ - 1)}}
> {{val e = context.emptyRDD[(Int, Int)]}}
> {{val x = context.union(a, b1, b2, e)}}
> {{val y = x.reduceByKey(_ + _)}}
> {{assert(x.partitioner.contains(p))}}
> {{y.collect()}}
> The assert fails. Disabling it, it's possible to see that reduceByKey introduced a shuffles, although all the input RDDs are already partitioned the same way, but the emptyRDD.
> Forcing a partitioner on the emptyRDD:
> {{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}}
> solves the problem with the assert and doesn't introduce the unneeded extra stage and shuffle.
> Union implementation should be changed to ignore the partitioner of emptyRDDs and consider those as _partitioned in a way compatible with any partitioner_, basically ignoring them.
> Present since 1.3 at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org