You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/10/11 09:42:12 UTC

[GitHub] [spark] cloud-fan edited a comment on pull request #34034: [SPARK-36794][SQL] Ignore duplicated join keys when building relation for SEMI/ANTI hash join

cloud-fan edited a comment on pull request #34034:
URL: https://github.com/apache/spark/pull/34034#issuecomment-939862907


   Unfortunately, this breaks broadcast reuse which causes perf regression. To reproduce
   ```
   scala> val df1 = spark.range(1000)
   df1: org.apache.spark.sql.Dataset[Long] = [id: bigint]
   
   scala> val df2 = spark.range(100)
   df2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
   
   scala> val j1 = df1.join(df2, Seq("id"), "inner")
   j1: org.apache.spark.sql.DataFrame = [id: bigint]
   
   scala> val j2 = df1.join(df2, Seq("id"), "left_semi")
   j2: org.apache.spark.sql.DataFrame = [id: bigint]
   
   scala> val res = j1.union(j2)
   res: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint]
   
   scala> res.collect()
   res0: Array[org.apache.spark.sql.Row] = Array([0], ...
   
   scala> res.explain
   ```
   
   Before this PR, the query plan was
   ```
   AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      Union
      :- *(3) Project [id#0L]
      :  +- *(3) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, false
      :     :- *(3) Range (0, 1000, step=1, splits=1)
      :     +- BroadcastQueryStage 0
      :        +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false,false), [id=#79]
      :           +- *(1) Range (0, 100, step=1, splits=1)
      +- *(4) BroadcastHashJoin [id#12L], [id#13L], LeftSemi, BuildRight, false
         :- *(4) Range (0, 1000, step=1, splits=1)
         +- BroadcastQueryStage 2
            +- ReusedExchange [id#13L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false,false), [id=#79]
   ```
   
   Now it's
   ```
   AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      Union
      :- *(3) Project [id#0L]
      :  +- *(3) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, false
      :     :- *(3) Range (0, 1000, step=1, splits=1)
      :     +- BroadcastQueryStage 0
      :        +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false,false), [id=#41]
      :           +- *(1) Range (0, 100, step=1, splits=1)
      +- *(4) BroadcastHashJoin [id#6L], [id#7L], LeftSemi, BuildRight, false
         :- *(4) Range (0, 1000, step=1, splits=1)
         +- BroadcastQueryStage 1
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false,true), [id=#50]
               +- *(2) Range (0, 100, step=1, splits=1)
   ```
   
   Ignore duplicated key is a small improvement and broadcast reuse is definitely more important to the query performance. I'm reverting this first. Please re-propose this optimization without breaking broadcast reuse.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org