You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Terry Kim (Jira)" <ji...@apache.org> on 2020/05/30 04:21:00 UTC

[jira] [Updated] (SPARK-31869) BroadcastHashJoinExe's outputPartitioning can utilize the build side

     [ https://issues.apache.org/jira/browse/SPARK-31869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Terry Kim updated SPARK-31869:
------------------------------
    Description: 
Currently, the BroadcastHashJoinExec's outputPartitioning only uses the streamed side's outputPartitioning. Thus, if the join key is from the build side for the join where one side is BroadcastHashJoinExec:
{code:java}
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500")
val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2")
val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3")
val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4")

// join1 is a sort merge join.
val join1 = t1.join(t2, t1("i1") === t2("i2"))

// join2 is a broadcast join where t3 is broadcasted.
val join2 = join1.join(t3, join1("i1") === t3("i3"))

// Join on the column from the broadcasted side (i3).
val join3 = join2.join(t4, join2("i3") === t4("i4"))

join3.explain
{code}
it produces Exchange hashpartitioning(i2#103, 200):
{code:java}
== Physical Plan ==
*(6) SortMergeJoin [i3#29], [i4#40], Inner
:- *(4) Sort [i3#29 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i3#29, 200), true, [id=#55]
:     +- *(3) BroadcastHashJoin [i1#7], [i3#29], Inner, BuildRight
:        :- *(3) SortMergeJoin [i1#7], [i2#18], Inner
:        :  :- *(1) Sort [i1#7 ASC NULLS FIRST], false, 0
:        :  :  +- Exchange hashpartitioning(i1#7, 200), true, [id=#28]
:        :  :     +- LocalTableScan [i1#7, j1#8]
:        :  +- *(2) Sort [i2#18 ASC NULLS FIRST], false, 0
:        :     +- Exchange hashpartitioning(i2#18, 200), true, [id=#29]
:        :        +- LocalTableScan [i2#18, j2#19]
:        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#34]
:           +- LocalTableScan [i3#29, j3#30]
+- *(5) Sort [i4#40 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i4#40, 200), true, [id=#39]
      +- LocalTableScan [i4#40, j4#41]
{code}
 But, since BroadcastHashJoinExec is only for equi-join, if the streamed side has HashPartitioning, BroadcastHashJoinExec can utilize the info to eliminate the exchange.

  was:
Currently, the BroadcastHashJoinExec's outputPartitioning only uses the streamed side's outputPartitioning. Thus, if the join key is from the build side for the join where one side is BroadcastHashJoinExec:
{code:java}
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500")
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i2", "j2")
val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3")
df1.write.format("parquet").bucketBy(8, "i1").saveAsTable("t1")
df3.write.format("parquet").bucketBy(8, "i3").saveAsTable("t3")
val t1 = spark.table("t1")
val t3 = spark.table("t3")
val join1 = t1.join(df2, t1("i1") === df2("i2"))
val join2 = join1.join(t3, join1("i2") === t3("i3"))
join2.explain
{code}
it produces Exchange hashpartitioning(i2#103, 200):
{code:java}
== Physical Plan ==
*(5) SortMergeJoin [i2#103], [i3#124], Inner
:- *(2) Sort [i2#103 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i2#103, 200)
:     +- *(1) BroadcastHashJoin [i1#120], [i2#103], Inner, BuildRight
:        :- *(1) Project [i1#120, j1#121]
:        :  +- *(1) Filter isnotnull(i1#120)
:        :     +- *(1) FileScan parquet default.t1[i1#120,j1#121] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 8 out of 8
:        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
:           +- LocalTableScan [i2#103, j2#104]
+- *(4) Sort [i3#124 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i3#124, 200)
      +- *(3) Project [i3#124, j3#125]
         +- *(3) Filter isnotnull(i3#124)
            +- *(3) FileScan parquet default.t3[i3#124,j3#125] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i3)], ReadSchema: struct<i3:int,j3:int>, SelectedBucketsCount: 8 out of 8
{code}
 But, since BroadcastHashJoinExec is only for equi-join, if the streamed side has HashPartitioning, BroadcastHashJoinExec can utilize the info to eliminate the exchange.


> BroadcastHashJoinExe's outputPartitioning can utilize the build side
> --------------------------------------------------------------------
>
>                 Key: SPARK-31869
>                 URL: https://issues.apache.org/jira/browse/SPARK-31869
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Terry Kim
>            Priority: Minor
>
> Currently, the BroadcastHashJoinExec's outputPartitioning only uses the streamed side's outputPartitioning. Thus, if the join key is from the build side for the join where one side is BroadcastHashJoinExec:
> {code:java}
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500")
> val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
> val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2")
> val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3")
> val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4")
> // join1 is a sort merge join.
> val join1 = t1.join(t2, t1("i1") === t2("i2"))
> // join2 is a broadcast join where t3 is broadcasted.
> val join2 = join1.join(t3, join1("i1") === t3("i3"))
> // Join on the column from the broadcasted side (i3).
> val join3 = join2.join(t4, join2("i3") === t4("i4"))
> join3.explain
> {code}
> it produces Exchange hashpartitioning(i2#103, 200):
> {code:java}
> == Physical Plan ==
> *(6) SortMergeJoin [i3#29], [i4#40], Inner
> :- *(4) Sort [i3#29 ASC NULLS FIRST], false, 0
> :  +- Exchange hashpartitioning(i3#29, 200), true, [id=#55]
> :     +- *(3) BroadcastHashJoin [i1#7], [i3#29], Inner, BuildRight
> :        :- *(3) SortMergeJoin [i1#7], [i2#18], Inner
> :        :  :- *(1) Sort [i1#7 ASC NULLS FIRST], false, 0
> :        :  :  +- Exchange hashpartitioning(i1#7, 200), true, [id=#28]
> :        :  :     +- LocalTableScan [i1#7, j1#8]
> :        :  +- *(2) Sort [i2#18 ASC NULLS FIRST], false, 0
> :        :     +- Exchange hashpartitioning(i2#18, 200), true, [id=#29]
> :        :        +- LocalTableScan [i2#18, j2#19]
> :        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#34]
> :           +- LocalTableScan [i3#29, j3#30]
> +- *(5) Sort [i4#40 ASC NULLS FIRST], false, 0
>    +- Exchange hashpartitioning(i4#40, 200), true, [id=#39]
>       +- LocalTableScan [i4#40, j4#41]
> {code}
>  But, since BroadcastHashJoinExec is only for equi-join, if the streamed side has HashPartitioning, BroadcastHashJoinExec can utilize the info to eliminate the exchange.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org