You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2020/07/22 00:43:00 UTC

[jira] [Commented] (SPARK-32383) Preserve hash join (BHJ and SHJ) stream side ordering

    [ https://issues.apache.org/jira/browse/SPARK-32383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162413#comment-17162413 ] 

Apache Spark commented on SPARK-32383:
--------------------------------------

User 'c21' has created a pull request for this issue:
https://github.com/apache/spark/pull/29181

> Preserve hash join (BHJ and SHJ) stream side ordering
> -----------------------------------------------------
>
>                 Key: SPARK-32383
>                 URL: https://issues.apache.org/jira/browse/SPARK-32383
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Cheng Su
>            Priority: Trivial
>
> Currently `BroadcastHashJoinExec` and `ShuffledHashJoinExec` do not preserve children output ordering information (inherit from `SparkPlan.outputOrdering`, which is Nil). This can add unnecessary sort in complex queries involved multiple joins.
> Example:
>  
> {code:java}
> withSQLConf(
>       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") {
>       val df1 = spark.range(100).select($"id".as("k1"))
>       val df2 = spark.range(100).select($"id".as("k2"))
>       val df3 = spark.range(3).select($"id".as("k3"))
>       val df4 = spark.range(100).select($"id".as("k4"))
>       val plan = df1.join(df2, $"k1" === $"k2")
>         .join(df3, $"k1" === $"k3")
>         .join(df4, $"k1" === $"k4")
>         .queryExecution
>         .executedPlan
> }
> {code}
>  
> Current physical plan (extra sort on `k1` before top sort merge join):
> {code:java}
> *(9) SortMergeJoin [k1#220L], [k4#232L], Inner
> :- *(6) Sort [k1#220L ASC NULLS FIRST], false, 0
> :  +- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight
> :     :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner
> :     :  :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0
> :     :  :  +- Exchange hashpartitioning(k1#220L, 5), true, [id=#128]
> :     :  :     +- *(1) Project [id#218L AS k1#220L]
> :     :  :        +- *(1) Range (0, 100, step=1, splits=2)
> :     :  +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0
> :     :     +- Exchange hashpartitioning(k2#224L, 5), true, [id=#134]
> :     :        +- *(3) Project [id#222L AS k2#224L]
> :     :           +- *(3) Range (0, 100, step=1, splits=2)
> :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#141]
> :        +- *(5) Project [id#226L AS k3#228L]
> :           +- *(5) Range (0, 3, step=1, splits=2)
> +- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(k4#232L, 5), true, [id=#148]
>       +- *(7) Project [id#230L AS k4#232L]
>          +- *(7) Range (0, 100, step=1, splits=2)
> {code}
> Ideal physical plan (no extra sort on `k1` before top sort merge join):
> {code:java}
> *(9) SortMergeJoin [k1#220L], [k4#232L], Inner
> :- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight
> :  :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner
> :  :  :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0
> :  :  :  +- Exchange hashpartitioning(k1#220L, 5), true, [id=#127]
> :  :  :     +- *(1) Project [id#218L AS k1#220L]
> :  :  :        +- *(1) Range (0, 100, step=1, splits=2)
> :  :  +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0
> :  :     +- Exchange hashpartitioning(k2#224L, 5), true, [id=#133]
> :  :        +- *(3) Project [id#222L AS k2#224L]
> :  :           +- *(3) Range (0, 100, step=1, splits=2)
> :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#140]
> :     +- *(5) Project [id#226L AS k3#228L]
> :        +- *(5) Range (0, 3, step=1, splits=2)
> +- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(k4#232L, 5), true, [id=#146]
>       +- *(7) Project [id#230L AS k4#232L]
>          +- *(7) Range (0, 100, step=1, splits=2){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org