You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (JIRA)" <ji...@apache.org> on 2019/07/16 16:42:13 UTC
[jira] [Updated] (SPARK-28148) repartition after join is not
optimized away
[ https://issues.apache.org/jira/browse/SPARK-28148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-28148:
----------------------------------
Affects Version/s: (was: 2.4.3)
3.0.0
> repartition after join is not optimized away
> --------------------------------------------
>
> Key: SPARK-28148
> URL: https://issues.apache.org/jira/browse/SPARK-28148
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.0.0
> Reporter: colin fang
> Priority: Minor
>
> Partitioning & sorting is usually retained after join.
> {code}
> spark.conf.set('spark.sql.shuffle.partitions', '42')
> df1 = spark.range(5000000, numPartitions=5)
> df2 = spark.range(10000000, numPartitions=5)
> df3 = spark.range(20000000, numPartitions=5)
> # Reuse previous partitions & sort.
> df1.join(df2, on='id').join(df3, on='id').explain()
> # == Physical Plan ==
> # *(8) Project [id#367L]
> # +- *(8) SortMergeJoin [id#367L], [id#374L], Inner
> # :- *(5) Project [id#367L]
> # : +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
> # : :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
> # : : +- Exchange hashpartitioning(id#367L, 42)
> # : : +- *(1) Range (0, 5000000, step=1, splits=5)
> # : +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
> # : +- Exchange hashpartitioning(id#369L, 42)
> # : +- *(3) Range (0, 10000000, step=1, splits=5)
> # +- *(7) Sort [id#374L ASC NULLS FIRST], false, 0
> # +- Exchange hashpartitioning(id#374L, 42)
> # +- *(6) Range (0, 20000000, step=1, splits=5)
> {code}
> However here: Partitions persist through left join, sort doesn't.
> {code}
> df1.join(df2, on='id', how='left').repartition('id').sortWithinPartitions('id').explain()
> # == Physical Plan ==
> # *(5) Sort [id#367L ASC NULLS FIRST], false, 0
> # +- *(5) Project [id#367L]
> # +- SortMergeJoin [id#367L], [id#369L], LeftOuter
> # :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
> # : +- Exchange hashpartitioning(id#367L, 42)
> # : +- *(1) Range (0, 5000000, step=1, splits=5)
> # +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
> # +- Exchange hashpartitioning(id#369L, 42)
> # +- *(3) Range (0, 10000000, step=1, splits=5)
> {code}
> Also here: Partitions do not persist though inner join.
> {code}
> df1.join(df2, on='id').repartition('id').sortWithinPartitions('id').explain()
> # == Physical Plan ==
> # *(6) Sort [id#367L ASC NULLS FIRST], false, 0
> # +- Exchange hashpartitioning(id#367L, 42)
> # +- *(5) Project [id#367L]
> # +- *(5) SortMergeJoin [id#367L], [id#369L], Inner
> # :- *(2) Sort [id#367L ASC NULLS FIRST], false, 0
> # : +- Exchange hashpartitioning(id#367L, 42)
> # : +- *(1) Range (0, 5000000, step=1, splits=5)
> # +- *(4) Sort [id#369L ASC NULLS FIRST], false, 0
> # +- Exchange hashpartitioning(id#369L, 42)
> # +- *(3) Range (0, 10000000, step=1, splits=5)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org