You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:37:31 UTC

[jira] [Resolved] (SPARK-9563) Remove repartition operators when they are the child of Exchange and shuffle=True

     [ https://issues.apache.org/jira/browse/SPARK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-9563.
---------------------------------
    Resolution: Incomplete

> Remove repartition operators when they are the child of Exchange and shuffle=True
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-9563
>                 URL: https://issues.apache.org/jira/browse/SPARK-9563
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Josh Rosen
>            Priority: Major
>              Labels: bulk-closed
>
> Consider the following query:
> {code}
> val df1 = sqlContext.createDataFrame(sc.parallelize(1 to 100, 100).map(x => (x, x)))
> val df2 = sqlContext.createDataFrame(sc.parallelize(1 to 100, 100).map(x => (x, x)))
> df1.repartition(1000).join(df2, "_1").explain(true)
> {code}
> Here's the plan for this query as of Spark 1.4.1:
> {code}
> == Parsed Logical Plan ==
> Project [_1#68991,_2#68992,_2#68994]
>  Join Inner, Some((_1#68991 = _1#68993))
>   Repartition 1000, true
>    LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame at <console>:29
>   LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at <console>:30
> == Analyzed Logical Plan ==
> _1: int, _2: int, _2: int
> Project [_1#68991,_2#68992,_2#68994]
>  Join Inner, Some((_1#68991 = _1#68993))
>   Repartition 1000, true
>    LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame at <console>:29
>   LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at <console>:30
> == Optimized Logical Plan ==
> Project [_1#68991,_2#68992,_2#68994]
>  Join Inner, Some((_1#68991 = _1#68993))
>   Repartition 1000, true
>    LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame at <console>:29
>   LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at <console>:30
> == Physical Plan ==
> Project [_1#68991,_2#68992,_2#68994]
>  ShuffledHashJoin [_1#68991], [_1#68993], BuildRight
>   Exchange (HashPartitioning 200)
>    Repartition 1000, true
>     PhysicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame at <console>:29
>   Exchange (HashPartitioning 200)
>    PhysicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at <console>:30
> {code}
> In this plan, we end up repartitioning {{df1}} to have 1000 partitions, which involves a shuffle, only to turn around and shuffle again as part of the exchange.
> To avoid this extra shuffle, I think that we should remove the Repartition when the following condition holds:
> - Exchange's child is a repartition operator where shuffle=True.
> We should not perform this collapsing when shuffle=False, since there might be a legitimate reason to coalesce before shuffling (reducing the number of map outputs that need to be tracked, for instance).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org