You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "yucai (JIRA)" <ji...@apache.org> on 2018/06/14 08:00:00 UTC

[jira] [Updated] (SPARK-24556) ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning

     [ https://issues.apache.org/jira/browse/SPARK-24556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

yucai updated SPARK-24556:
--------------------------
    Description: 
Currently, ReusedExchange would rewrite output partitioning if child's partitioning is HashPartitioning, but it does not do the same when child's partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, see:
{code}
val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
val df1 = df.as("t1")
val df2 = df.as("t2")
val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right")
t.cache.orderBy($"t2.j").explain
{code}
Before fix:
{code:sql}
== Physical Plan ==
*(1) Sort [j#14 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
   +- InMemoryTableScan [i#5, j#6, i#13, j#14]
         +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
               +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
                  :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                  :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
                  :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
                  :        +- LocalTableScan [i#5, j#6]
                  +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
                     +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
{code}
Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)", like:
{code:sql}
== Physical Plan ==
*(1) Sort [j#14 ASC NULLS FIRST], true, 0
+- InMemoryTableScan [i#5, j#6, i#13, j#14]
      +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
            +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
               :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
               :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
               :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
               :        +- LocalTableScan [i#5, j#6]
               +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
                  +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
{code}

  was:
Currently, ReusedExchange would rewrite output partitioning if child's partitioning is HashPartitioning, but it does not do the same when child's partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, see:

{code:scala}
val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
val df1 = df.as("t1")
val df2 = df.as("t2")
val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right")
t.cache.orderBy($"t2.j").explain
{code}

Before fix:
{code:sql}
== Physical Plan ==
*(1) Sort [j#14 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
   +- InMemoryTableScan [i#5, j#6, i#13, j#14]
         +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
:  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
:     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
:        +- LocalTableScan [i#5, j#6]
+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
   +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
,None)
               +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
                  :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
                  :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
                  :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
                  :        +- LocalTableScan [i#5, j#6]
                  +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
                     +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
{code}

Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)", like:
{code:sql}
== Physical Plan ==
*(1) Sort [j#14 ASC NULLS FIRST], true, 0
+- InMemoryTableScan [i#5, j#6, i#13, j#14]
      +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder(true,10000,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
:  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
:     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
:        +- LocalTableScan [i#5, j#6]
+- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
   +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
,None)
            +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
               :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
               :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
               :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
               :        +- LocalTableScan [i#5, j#6]
               +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
                  +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
{code}


> ReusedExchange should rewrite output partitioning also when child's partitioning is RangePartitioning
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24556
>                 URL: https://issues.apache.org/jira/browse/SPARK-24556
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.2
>            Reporter: yucai
>            Priority: Major
>
> Currently, ReusedExchange would rewrite output partitioning if child's partitioning is HashPartitioning, but it does not do the same when child's partitioning is RangePartitioning, sometimes, it could introduce extra shuffle, see:
> {code}
> val df = Seq(1 -> "a", 3 -> "c", 2 -> "b").toDF("i", "j")
> val df1 = df.as("t1")
> val df2 = df.as("t2")
> val t = df1.orderBy("j").join(df2.orderBy("j"), $"t1.i" === $"t2.i", "right")
> t.cache.orderBy($"t2.j").explain
> {code}
> Before fix:
> {code:sql}
> == Physical Plan ==
> *(1) Sort [j#14 ASC NULLS FIRST], true, 0
> +- Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)
>    +- InMemoryTableScan [i#5, j#6, i#13, j#14]
>          +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
>                +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
>                   :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
>                   :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
>                   :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
>                   :        +- LocalTableScan [i#5, j#6]
>                   +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
>                      +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
> {code}
> Better plan should avoid "Exchange rangepartitioning(j#14 ASC NULLS FIRST, 200)", like:
> {code:sql}
> == Physical Plan ==
> *(1) Sort [j#14 ASC NULLS FIRST], true, 0
> +- InMemoryTableScan [i#5, j#6, i#13, j#14]
>       +- InMemoryRelation [i#5, j#6, i#13, j#14], CachedRDDBuilder...
>             +- *(2) BroadcastHashJoin [i#5], [i#13], RightOuter, BuildLeft
>                :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
>                :  +- *(1) Sort [j#6 ASC NULLS FIRST], true, 0
>                :     +- Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
>                :        +- LocalTableScan [i#5, j#6]
>                +- *(2) Sort [j#14 ASC NULLS FIRST], true, 0
>                   +- ReusedExchange [i#13, j#14], Exchange rangepartitioning(j#6 ASC NULLS FIRST, 200)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org