You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2018/11/05 10:34:01 UTC

[jira] [Commented] (SPARK-24066) Add new optimization rule to eliminate unnecessary sort by exchanged adjacent Window expressions

    [ https://issues.apache.org/jira/browse/SPARK-24066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16674930#comment-16674930 ] 

Apache Spark commented on SPARK-24066:
--------------------------------------

User 'heary-cao' has created a pull request for this issue:
https://github.com/apache/spark/pull/22945

> Add new optimization rule to eliminate unnecessary sort by exchanged adjacent Window expressions
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24066
>                 URL: https://issues.apache.org/jira/browse/SPARK-24066
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: caoxuewen
>            Priority: Major
>
> Currently, when two adjacent window functions have the same partition and the same intersection of order, 
> There will be two sorted after shuffling, which is not necessary. This PR adds a new optimization rule to eliminate unnecessary sort by exchanged adjacent Window expressions.
> For example:
> val df = Seq(
>       ("a", "p1", 10.0, 20.0, 30.0),
>       ("a", "p2", 20.0, 10.0, 40.0)).toDF("key", "value", "value1", "value2", "value3").select(
>         $"key",
>         sum("value1").over(Window.partitionBy("key").orderBy("value")),
>         max("value2").over(Window.partitionBy("key").orderBy("value", "value1")),
>         avg("value3").over(Window.partitionBy("key").orderBy("value", "value1", "value2"))
>       ).queryExecution.executedPlan
>  
> Before  this PR:
> *(5) Project [key#16, sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29, max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30, avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
> +- Window [max(value2#19) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST]
>    +- *(4) Project [key#16, value1#18, value#17, value2#19, sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29, avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
>       +- Window [avg(value3#20) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST]
>          +- *(3) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST], false, 0
>             +- Window [sum(value1#18) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29], [key#16], [value#17 ASC NULLS FIRST]
>                +- *(2) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS FIRST], false, 0
>                   +- Exchange hashpartitioning(key#16, 5)
>                      +- *(1) Project [_1#5 AS key#16, _3#7 AS value1#18, _2#6 AS value#17, _4#8 AS value2#19, _5#9 AS value3#20]
>                         +- LocalTableScan [_1#5, _2#6, _3#7, _4#8, _5#9]
>  
> After  this PR:
> *(5) Project [key#16, sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29, max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30, avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
> +- Window [sum(value1#18) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sum(value1) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST unspecifiedframe$())#29], [key#16], [value#17 ASC NULLS FIRST]
>    +- *(4) Project [key#16, value1#18, value#17, avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31, max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30]
>       +- Window [max(value2#19) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS max(value2) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST unspecifiedframe$())#30], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST]
>          +- *(3) Project [key#16, value1#18, value#17, value2#19, avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31]
>             +- Window [avg(value3#20) windowspecdefinition(key#16, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS avg(value3) OVER (PARTITION BY key ORDER BY value ASC NULLS FIRST, value1 ASC NULLS FIRST, value2 ASC NULLS FIRST unspecifiedframe$())#31], [key#16], [value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST]
>                +- *(2) Sort [key#16 ASC NULLS FIRST, value#17 ASC NULLS FIRST, value1#18 ASC NULLS FIRST, value2#19 ASC NULLS FIRST], false, 0
>                   +- Exchange hashpartitioning(key#16, 5)
>                      +- *(1) Project [_1#5 AS key#16, _3#7 AS value1#18, _2#6 AS value#17, _4#8 AS value2#19, _5#9 AS value3#20]
>                         +- LocalTableScan [_1#5, _2#6, _3#7, _4#8, _5#9]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org