You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (JIRA)" <ji...@apache.org> on 2018/06/20 05:30:00 UTC
[jira] [Assigned] (SPARK-23778) SparkContext.emptyRDD confuses
SparkContext.union
[ https://issues.apache.org/jira/browse/SPARK-23778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan reassigned SPARK-23778:
-----------------------------------
Assignee: Marco Gaido
> SparkContext.emptyRDD confuses SparkContext.union
> -------------------------------------------------
>
> Key: SPARK-23778
> URL: https://issues.apache.org/jira/browse/SPARK-23778
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.3.0, 2.3.0
> Reporter: Stefano Pettini
> Assignee: Marco Gaido
> Priority: Trivial
> Fix For: 2.4.0
>
> Attachments: as_it_should_be.png, partitioner_lost_and_unneeded_extra_stage.png
>
>
> SparkContext.emptyRDD is an unpartitioned RDD. Clearly it's empty so whether it's partitioned or not should be just a academic debate. Unfortunately it doesn't seem to be like this and the issue has side effects.
> Namely, it confuses the RDD union.
> When there are N classic RDDs partitioned the same way, the union is implemented with the optimized PartitionerAwareUnionRDD, that retains the common partitioner in the result. If one of the N RDDs happens to be an emptyRDD, as it doesn't have a partitioner, the union is implemented by just appending all the partitions of the N RDDs, dropping the partitioner. But there's no need for this, as the emptyRDD contains no elements. This results in further unneeded shuffles once the result of the union is used.
> See for example:
> {{val p = new HashPartitioner(3)}}
> {{val a = context.parallelize(List(10, 20, 30, 40, 50)).keyBy(_ / 10).partitionBy(p)}}
> {{val b1 = a.mapValues(_ + 1)}}
> {{val b2 = a.mapValues(_ - 1)}}
> {{val e = context.emptyRDD[(Int, Int)]}}
> {{val x = context.union(a, b1, b2, e)}}
> {{val y = x.reduceByKey(_ + _)}}
> {{assert(x.partitioner.contains(p))}}
> {{y.collect()}}
> The assert fails. Disabling it, it's possible to see that reduceByKey introduced a shuffles, although all the input RDDs are already partitioned the same way, but the emptyRDD.
> Forcing a partitioner on the emptyRDD:
> {{val e = context.emptyRDD[(Int, Int)].partitionBy(p)}}
> solves the problem with the assert and doesn't introduce the unneeded extra stage and shuffle.
> Union implementation should be changed to ignore the partitioner of emptyRDDs and consider those as _partitioned in a way compatible with any partitioner_, basically ignoring them.
> Present since 1.3 at least.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org