You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jeff Min (Jira)" <ji...@apache.org> on 2023/03/27 13:58:00 UTC

[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan

     [ https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jeff Min updated SPARK-42935:
-----------------------------
    Description: 
Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code}
The physical plan is 
{code:shell}
Unable to find source-code formatter for language: shell. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlAdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd.

 # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation.

After doing these, the physical plan is
{code:shell}
daptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#0, id#7, name#8 DESC NULLS LAST
  +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0
     +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200)
        :- FileScan csv spark_catalog.default.table1id#7,name#8
        +- FileScan csv spark_catalog.default.table2id#9,name#10 {code}


 

 

  was:
Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables.
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);
The physical plan is
AdaptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28], [id#35], [name#36 DESC NULLS LAST]
   +- Sort [id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST], false, 0
      +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
         +- Union
            :- FileScan csv spark_catalog.default.table1[id#35,name#36]
            +- FileScan csv spark_catalog.default.table2[id#37,name#38]
Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd.

 # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation.

After doing these, the physical plan is
daptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#0], [id#7], [name#8 DESC NULLS LAST]
   +- Sort [id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST], false, 0
      +- UnionZip [ClusteredDistribution(ArrayBuffer(id#7),false,None), ClusteredDistribution(ArrayBuffer(id#9),false,None)], hashpartitioning(id#7, 200)
         :- FileScan csv spark_catalog.default.table1[id#7,name#8]
         +- FileScan csv spark_catalog.default.table2[id#9,name#10]
 

 


> Optimze shuffle for union spark plan
> ------------------------------------
>
>                 Key: SPARK-42935
>                 URL: https://issues.apache.org/jira/browse/SPARK-42935
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.5.0
>            Reporter: Jeff Min
>            Priority: Major
>             Fix For: 3.5.0
>
>
> Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables.
> {code:sql}
> create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS;
> insert into table1 values(1, "s1");
> insert into table1 values(2, "s2");
> ​
> create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS;
> insert into table2 values(1, "s3");
> ​
> set spark.sql.shuffle.partitions=100;
> explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code}
> The physical plan is 
> {code:shell}
> Unable to find source-code formatter for language: shell. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlAdaptiveSparkPlan isFinalPlan=false
> +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST
>   +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
>      +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
>         +- Union
>            :- FileScan csv spark_catalog.default.table1id#35,name#36
>            +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
>  
> Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null.
> We can indroduce a new idea to optimize exchange plan:
>  # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd.
>  # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation.
> After doing these, the physical plan is
> {code:shell}
> daptiveSparkPlan isFinalPlan=false
> +- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#0, id#7, name#8 DESC NULLS LAST
>   +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0
>      +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200)
>         :- FileScan csv spark_catalog.default.table1id#7,name#8
>         +- FileScan csv spark_catalog.default.table2id#9,name#10 {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org