You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (JIRA)" <ji...@apache.org> on 2019/07/16 16:42:13 UTC

[jira] [Updated] (SPARK-28148) repartition after join is not optimized away

     [ https://issues.apache.org/jira/browse/SPARK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dongjoon Hyun updated SPARK-28148:
----------------------------------
    Affects Version/s:     (was: 2.4.3)
                       3.0.0

> repartition after join is not optimized away
> --------------------------------------------
>
>                 Key: SPARK-28148
>                 URL: https://issues.apache.org/jira/browse/SPARK-28148
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: colin fang
>            Priority: Minor
>
> Partitioning & sorting is usually retained after join.
> {code}
> spark.conf.set('spark.sql.shuffle.partitions', '42')
> df1 = spark.range(5000000, numPartitions=5)
> df2 = spark.range(10000000, numPartitions=5)
> df3 = spark.range(20000000, numPartitions=5)
> # Reuse previous partitions & sort.
> df1.join(df2, on='id').join(df3, on='id').explain()
> # == Physical Plan ==
> # *(8) Project [id#367L]
> # +- *(8) SortMergeJoin [id#367L], [id#374L], Inner
> #    :- *(5) Project [id#367L]
> #    :  +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
> #    :     :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
> #    :     :  +- Exchange hashpartitioning(id#367L, 42)
> #    :     :     +- *(1) Range (0, 5000000, step=1, splits=5)
> #    :     +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
> #    :        +- Exchange hashpartitioning(id#369L, 42)
> #    :           +- *(3) Range (0, 10000000, step=1, splits=5)
> #    +- *(7) Sort [id#374L ASC NULLS FIRST], false, 0
> #       +- Exchange hashpartitioning(id#374L, 42)
> #          +- *(6) Range (0, 20000000, step=1, splits=5)
> {code}
> However here:  Partitions persist through left join, sort doesn't.
> {code}
> df1.join(df2, on='id', how='left').repartition('id').sortWithinPartitions('id').explain()
> # == Physical Plan ==
> # *(5) Sort [id#367L ASC NULLS FIRST], false, 0
> # +- *(5) Project [id#367L]
> #    +- SortMergeJoin [id#367L], [id#369L], LeftOuter
> #       :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
> #       :  +- Exchange hashpartitioning(id#367L, 42)
> #       :     +- *(1) Range (0, 5000000, step=1, splits=5)
> #       +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
> #          +- Exchange hashpartitioning(id#369L, 42)
> #             +- *(3) Range (0, 10000000, step=1, splits=5)
> {code}
> Also here: Partitions do not persist though inner join.
> {code}
> df1.join(df2, on='id').repartition('id').sortWithinPartitions('id').explain()
> # == Physical Plan ==
> # *(6) Sort [id#367L ASC NULLS FIRST], false, 0
> # +- Exchange hashpartitioning(id#367L, 42)
> #    +- *(5) Project [id#367L]
> #       +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
> #          :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
> #          :  +- Exchange hashpartitioning(id#367L, 42)
> #          :     +- *(1) Range (0, 5000000, step=1, splits=5)
> #          +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
> #             +- Exchange hashpartitioning(id#369L, 42)
> #                +- *(3) Range (0, 10000000, step=1, splits=5)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org