You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by heary-cao <gi...@git.apache.org> on 2018/04/24 09:15:43 UTC

[GitHub] spark pull request #21139: [SPARK-24066][SQL]Add a window exchange rule to e...

GitHub user heary-cao opened a pull request:

    https://github.com/apache/spark/pull/21139

    [SPARK-24066][SQL]Add a window exchange rule to eliminate redundant physical plan SortExec

    ## What changes were proposed in this pull request?
    Currently, when the order field of window function has a subset relationship, SparkSQL will randomly generate different physical plan.
    Similar like:
    
    ```
    case class DistinctAgg(a: Int, b: Float, c: Double, d: Int, e: String)
    val df = spark.sparkContext.parallelize(
          DistinctAgg(8, 2, 3, 4, "a") ::
          DistinctAgg(9, 3, 4, 5, "b") ::
          DistinctAgg(3, 4, 5, 6, "c") ::
          DistinctAgg(3, 4, 5, 7, "c") ::
          DistinctAgg(3, 4, 5, 8, "c") ::
          DistinctAgg(3, 6, 6, 9, "d") ::
          DistinctAgg(30, 40, 50, 60, "e") ::
          DistinctAgg(41, 51, 61, 71, null) ::
          DistinctAgg(42, 52, 62, 72, null) ::
          DistinctAgg(43, 53, 63, 73, "k") ::Nil).toDF()
    df.createOrReplaceTempView("distinctAgg")
    
    select a, b, c, 
    avg(b) over(partition by a order by b) as sumIb, 
    sum(d) over(partition by a order by b, c) as sumId, d 
    from distinctAgg	
    
    ```
    The physics plan will produce different results randomly.		
    **One**: there is only one sort of physical plan	 
    ```
    == Physical Plan ==
    *(3) Project [a#181, b#182, c#183, sumId#210L, sumIb#209L, d#184]
    +- Window [sum(cast(b#182 as bigint)) windowspecdefinition(a#181, b#182 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sumIb#209L], [a#181], [b#182 ASC NULLS FIRST]
       +- Window [sum(cast(d#184 as bigint)) windowspecdefinition(a#181, b#182 ASC NULLS FIRST, c#183 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sumId#210L], [a#181], [b#182 ASC NULLS FIRST, c#183 ASC NULLS FIRST]
          +- *(2) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST, c#183 ASC NULLS FIRST], false, 0
             +- Exchange hashpartitioning(a#181, 5)
                +- *(1) Project [a#181, b#182, c#183, d#184]
                   +- *(1) SerializeFromObject [assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).a AS a#181, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).b AS b#182, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).c AS c#183, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).d AS d#184, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).e, true, false) AS e#185]
                      +- Scan ExternalRDDScan[obj#180]
    
    ```
    **Another one**: there is two sort of physical plans
    ```
    == Physical Plan ==
    *(4) Project [a#181, b#182, c#183, sumId#210L, sumIb#209L, d#184]
    +- Window [sum(cast(d#184 as bigint)) windowspecdefinition(a#181, b#182 ASC NULLS FIRST, c#183 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sumId#210L], [a#181], [b#182 ASC NULLS FIRST, c#183 ASC NULLS FIRST]
       +- *(3) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST, c#183 ASC NULLS FIRST], false, 0
          +- Window [sum(cast(b#182 as bigint)) windowspecdefinition(a#181, b#182 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sumIb#209L], [a#181], [b#182 ASC NULLS FIRST]
             +- *(2) Sort [a#181 ASC NULLS FIRST, b#182 ASC NULLS FIRST], false, 0
                +- Exchange hashpartitioning(a#181, 5)
                   +- *(1) Project [a#181, b#182, c#183, d#184]
                      +- *(1) SerializeFromObject [assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).a AS a#181, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).b AS b#182, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).c AS c#183, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).d AS d#184, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$DistinctAgg, true]).e, true, false) AS e#185]
                         +- Scan ExternalRDDScan[obj#180]
    
    ```
    this PR add an exchange rule to ensure that no redundant physical plan SortExec is generated.
    
    ## How was this patch tested?
    
    add new unit tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/heary-cao/spark ExchangeWindow

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21139.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21139
    
----
commit 4011b08aa7228195779a80eb66c4fe6dbff3352f
Author: caoxuewen <ca...@...>
Date:   2018-04-24T08:44:36Z

    Add a window exchange rule to eliminate redundant physical plan SortExec

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21139: [SPARK-24066][SQL]Add a window exchange rule to e...

Posted by hvanhovell <gi...@git.apache.org>.
Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21139#discussion_r183893505
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -618,6 +619,18 @@ object CollapseRepartition extends Rule[LogicalPlan] {
       }
     }
     
    +/**
    + * Exchanged the adjacent logical window operator according to the order field of window.
    + */
    +object ExchangeWindowWithOrderField extends Rule[LogicalPlan] {
    +  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    +    case w1 @ Window(_, _, orderSpec1, w2 @ Window(_, _, orderSpec2, child2))
    +      if orderSpec1.size > orderSpec2.size =>
    --- End diff --
    
    I am not sure if this a good idea, because this does not consider the partitioning expressions or the actual ordering. You might actively make things worse if you do this. For example when you have three subsequent window operators like this:
    ```
    // Before rule
    window[partition by A order by B, C]
    :- window[partition by B order by C]
       :- window[partition by B order by D]
    
    // After rule (notice how we add another exchange here)
    window[partition by B order by C]
    :- window[partition by A order by B, C]
       :- window[partition by B order by D]
    ```
    I think this only has merit if the partitioning expression set matches, and the smaller ordering clause is the prefix of the larger ordering clause.
    
    Moreover you are messing with the output order, so you need to make sure there is a project (or something else) on top that retains the original order (might not be problem due to the way we currently plan Window operators).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by heary-cao <gi...@git.apache.org>.
Github user heary-cao commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    @hvanhovell, can you help review it again if you have some time, thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21139: [SPARK-24066][SQL]Add a window exchange rule to e...

Posted by heary-cao <gi...@git.apache.org>.
Github user heary-cao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21139#discussion_r183929216
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ---
    @@ -618,6 +619,18 @@ object CollapseRepartition extends Rule[LogicalPlan] {
       }
     }
     
    +/**
    + * Exchanged the adjacent logical window operator according to the order field of window.
    + */
    +object ExchangeWindowWithOrderField extends Rule[LogicalPlan] {
    +  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    +    case w1 @ Window(_, _, orderSpec1, w2 @ Window(_, _, orderSpec2, child2))
    +      if orderSpec1.size > orderSpec2.size =>
    --- End diff --
    
    @hvanhovell , Sorry, I fix it. thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21139: [SPARK-24066][SQL]Add a window exchange rule to e...

Posted by heary-cao <gi...@git.apache.org>.
Github user heary-cao closed the pull request at:

    https://github.com/apache/spark/pull/21139


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21139: [SPARK-24066][SQL]Add a window exchange rule to eliminat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21139
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org