You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Terry Kim (Jira)" <ji...@apache.org> on 2020/05/30 03:42:00 UTC

[jira] [Created] (SPARK-31869) BroadcastHashJoinExe's outputPartitioning can utilize the build side

Terry Kim created SPARK-31869:
---------------------------------

             Summary: BroadcastHashJoinExe's outputPartitioning can utilize the build side
                 Key: SPARK-31869
                 URL: https://issues.apache.org/jira/browse/SPARK-31869
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.1.0
            Reporter: Terry Kim


Currently, the BroadcastHashJoinExec's outputPartitioning only uses the streamed side's outputPartitioning. Thus, if the join key is from the build side for the join where one side is BroadcastHashJoinExec:
{code:java}
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500")
val df1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val df2 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i2", "j2")
val df3 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i3", "j3")
df1.write.format("parquet").bucketBy(8, "i1").saveAsTable("t1")
df3.write.format("parquet").bucketBy(8, "i3").saveAsTable("t3")
val t1 = spark.table("t1")
val t3 = spark.table("t3")
val join1 = t1.join(df2, t1("i1") === df2("i2"))
val join2 = join1.join(t3, join1("i2") === t3("i3"))
join2.explain
{code}
it produces Exchange hashpartitioning(i2#103, 200):
{code:java}
== Physical Plan ==
*(5) SortMergeJoin [i2#103], [i3#124], Inner
:- *(2) Sort [i2#103 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i2#103, 200)
:     +- *(1) BroadcastHashJoin [i1#120], [i2#103], Inner, BuildRight
:        :- *(1) Project [i1#120, j1#121]
:        :  +- *(1) Filter isnotnull(i1#120)
:        :     +- *(1) FileScan parquet default.t1[i1#120,j1#121] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i1)], ReadSchema: struct<i1:int,j1:int>, SelectedBucketsCount: 8 out of 8
:        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
:           +- LocalTableScan [i2#103, j2#104]
+- *(4) Sort [i3#124 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i3#124, 200)
      +- *(3) Project [i3#124, j3#125]
         +- *(3) Filter isnotnull(i3#124)
            +- *(3) FileScan parquet default.t3[i3#124,j3#125] Batched: true, Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [IsNotNull(i3)], ReadSchema: struct<i3:int,j3:int>, SelectedBucketsCount: 8 out of 8
{code}
 But, since BroadcastHashJoinExec is only for equi-join, if the streamed side has HashPartitioning, BroadcastHashJoinExec can utilize the info to eliminate the exchange.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org