You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:37:31 UTC
[jira] [Resolved] (SPARK-9563) Remove repartition operators when
they are the child of Exchange and shuffle=True
[ https://issues.apache.org/jira/browse/SPARK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-9563.
---------------------------------
Resolution: Incomplete
> Remove repartition operators when they are the child of Exchange and shuffle=True
> ---------------------------------------------------------------------------------
>
> Key: SPARK-9563
> URL: https://issues.apache.org/jira/browse/SPARK-9563
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Reporter: Josh Rosen
> Priority: Major
> Labels: bulk-closed
>
> Consider the following query:
> {code}
> val df1 = sqlContext.createDataFrame(sc.parallelize(1 to 100, 100).map(x => (x, x)))
> val df2 = sqlContext.createDataFrame(sc.parallelize(1 to 100, 100).map(x => (x, x)))
> df1.repartition(1000).join(df2, "_1").explain(true)
> {code}
> Here's the plan for this query as of Spark 1.4.1:
> {code}
> == Parsed Logical Plan ==
> Project [_1#68991,_2#68992,_2#68994]
> Join Inner, Some((_1#68991 = _1#68993))
> Repartition 1000, true
> LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame at <console>:29
> LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at <console>:30
> == Analyzed Logical Plan ==
> _1: int, _2: int, _2: int
> Project [_1#68991,_2#68992,_2#68994]
> Join Inner, Some((_1#68991 = _1#68993))
> Repartition 1000, true
> LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame at <console>:29
> LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at <console>:30
> == Optimized Logical Plan ==
> Project [_1#68991,_2#68992,_2#68994]
> Join Inner, Some((_1#68991 = _1#68993))
> Repartition 1000, true
> LogicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame at <console>:29
> LogicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at <console>:30
> == Physical Plan ==
> Project [_1#68991,_2#68992,_2#68994]
> ShuffledHashJoin [_1#68991], [_1#68993], BuildRight
> Exchange (HashPartitioning 200)
> Repartition 1000, true
> PhysicalRDD [_1#68991,_2#68992], MapPartitionsRDD[82530] at createDataFrame at <console>:29
> Exchange (HashPartitioning 200)
> PhysicalRDD [_1#68993,_2#68994], MapPartitionsRDD[82533] at createDataFrame at <console>:30
> {code}
> In this plan, we end up repartitioning {{df1}} to have 1000 partitions, which involves a shuffle, only to turn around and shuffle again as part of the exchange.
> To avoid this extra shuffle, I think that we should remove the Repartition when the following condition holds:
> - Exchange's child is a repartition operator where shuffle=True.
> We should not perform this collapsing when shuffle=False, since there might be a legitimate reason to coalesce before shuffling (reducing the number of map outputs that need to be tracked, for instance).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org