You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2020/03/16 22:52:06 UTC

[jira] [Updated] (SPARK-23544) Remove redundancy ShuffleExchange in the planner

     [ https://issues.apache.org/jira/browse/SPARK-23544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun updated SPARK-23544:
----------------------------------
    Affects Version/s:     (was: 3.0.0)
                       3.1.0

> Remove redundancy ShuffleExchange in the planner
> ------------------------------------------------
>
>                 Key: SPARK-23544
>                 URL: https://issues.apache.org/jira/browse/SPARK-23544
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: caoxuewen
>            Priority: Major
>
> Currently, we execute the SQL statement:
> select MDTTemp.* from (select * from distinctAgg where a > 2 distribute by a, b,c) MDTTemp 
> left join (select * from testData5 where f > 1) ProjData 
> on MDTTemp.b = ProjData.g and 
>  MDTTemp.c = ProjData.h and 
>  MDTTemp.d < (ProjData.j - 3) and 
>  MDTTemp.d >= (ProjData.j + 3)
> the physical plan the explain looks like:
> == Physical Plan ==
> *Project [a#203, b#204, c#205, d#206, e#207]
> +- SortMergeJoin [b#204, c#205], [g#222, h#223], LeftOuter, ((d#206 < (j#224 - 3)) && (d#206 >= (j#224 + 3)))
>  :- *Sort [b#204 ASC, c#205 ASC], false, 0
>  : +- Exchange hashpartitioning(b#204, c#205, 5)
>  : +- Exchange hashpartitioning(a#203, b#204, c#205, 5)
>  : +- *Filter (a#203 > 2)
>  : +- Scan ExistingRDD[a#203,b#204,c#205,d#206,e#207]
>  +- *Sort [g#222 ASC, h#223 ASC], false, 0
>  +- Exchange hashpartitioning(g#222, h#223, 5)
>  +- *Project [g#222, h#223, j#224]
>  +- *Filter (f#221 > 1)
>  +- Scan ExistingRDD[f#221,g#222,h#223,j#224,k#225]
> There is a redundancy ShuffleExchange that is not necessary. This PR will provide a rule to remove redundancy ShuffleExchange in the planner. now the explain looks like:
> == Physical Plan ==
> *Project [a#203, b#204, c#205, d#206, e#207]
> +- SortMergeJoin [b#204, c#205], [g#222, h#223], LeftOuter, ((d#206 < (j#224 - 3)) && (d#206 >= (j#224 + 3)))
>  :- *Sort [b#204 ASC, c#205 ASC], false, 0
>  : +- Exchange hashpartitioning(b#204, c#205, 5)
>  : +- *Filter (a#203 > 2)
>  : +- Scan ExistingRDD[a#203,b#204,c#205,d#206,e#207]
>  +- *Sort [g#222 ASC, h#223 ASC], false, 0
>  +- Exchange hashpartitioning(g#222, h#223, 5)
>  +- *Project [g#222, h#223, j#224]
>  +- *Filter (f#221 > 1)
>  +- Scan ExistingRDD[f#221,g#222,h#223,j#224,k#225]
>  
> and I have add a test case:
> val N = 2 << 20
>  runJoinBenchmark("sort merge join", N) {
>  val df1 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485863) % ${N*10} as k1")
>  val df2 = sparkSession.range(N)
>  .selectExpr(s"(id * 15485867) % ${N*10} as k2")
>  val df = df1.join(df2.repartition(20), col("k1") === col("k2"))
>  assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[SortMergeJoinExec]).isDefined)
>  df.count()
>  }
>  
> To test the performance of the following:
> Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Windows 7 6.1
>  Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
>  sort merge join: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
>  ------------------------------------------------------------------------------------------------
>  sort merge join Repartition off 3520 / 4364 0.6 1678.5 1.0X
>  sort merge join Repartition on 1946 / 2203 1.1 927.9 1.8X



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org