You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (Jira)" <ji...@apache.org> on 2021/08/26 03:28:00 UTC

[jira] [Resolved] (SPARK-36568) Missed broadcast join in V2 plan

     [ https://issues.apache.org/jira/browse/SPARK-36568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Wenchen Fan resolved SPARK-36568.
---------------------------------
    Fix Version/s: 3.3.0
       Resolution: Fixed

Issue resolved by pull request 33825
[https://github.com/apache/spark/pull/33825]

> Missed broadcast join in V2 plan
> --------------------------------
>
>                 Key: SPARK-36568
>                 URL: https://issues.apache.org/jira/browse/SPARK-36568
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.0
>            Reporter: Bruce Robbins
>            Priority: Major
>             Fix For: 3.3.0
>
>
> There are some joins that use broadcast hash join with DataSourceV1 but sort merge join with DataSourceV2, even though the two joins are loading the same files [1].
> Example:
> Create data:
> {noformat}
> import scala.util.Random
> val rand = new Random(245665L)
> val df = spark.range(1, 20000).map { x =>
>   (x,
>    rand.alphanumeric.take(20).mkString,
>    rand.alphanumeric.take(20).mkString,
>    rand.alphanumeric.take(20).mkString
>   )
> }.toDF("key", "col1", "col2", "col3")
> df.write.mode("overwrite").format("parquet").save("/tmp/tbl")
> df.write.mode("overwrite").format("parquet").save("/tmp/lookup")
> {noformat}
> Run this code:
> {noformat}
> bin/spark-shell --conf spark.sql.autoBroadcastJoinThreshold=400000
> spark.read.format("parquet").load("/tmp/tbl").createOrReplaceTempView("tbl")
> spark.read.format("parquet").load("/tmp/lookup").createOrReplaceTempView("lookup")
> sql("""select t.key, t.col1, t.col2, t.col3
> from tbl t
> join lookup l
> on t.key = l.key""").explain
> {noformat}
> For V2, do the same, except set {{spark.sql.sources.useV1SourceList=""}}.
> For V1, the result is:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [key#0L, col1#1, col2#2, col3#3]
>    +- BroadcastHashJoin [key#0L], [key#8L], Inner, BuildRight, false
>       :- Filter isnotnull(key#0L)
>       :  +- FileScan parquet [key#0L,col1#1,col2#2,col3#3] Batched: true, DataFilters: [isnotnull(key#0L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/tbl], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,col1:string,col2:string,col3:string>
>       +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#32]
>          +- Filter isnotnull(key#8L)
>             +- FileScan parquet [key#8L] Batched: true, DataFilters: [isnotnull(key#8L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/lookup], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint>
> {noformat}
> For V2, the result is:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [key#0L, col1#1, col2#2, col3#3]
>    +- SortMergeJoin [key#0L], [key#8L], Inner
>       :- Sort [key#0L ASC NULLS FIRST], false, 0
>       :  +- Exchange hashpartitioning(key#0L, 200), ENSURE_REQUIREMENTS, [id=#33]
>       :     +- Filter isnotnull(key#0L)
>       :        +- BatchScan[key#0L, col1#1, col2#2, col3#3] ParquetScan DataFilters: [isnotnull(key#0L)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/tbl], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,col1:string,col2:string,col3:string>, PushedFilters: [IsNotNull(key)] RuntimeFilters: []
>       +- Sort [key#8L ASC NULLS FIRST], false, 0
>          +- Exchange hashpartitioning(key#8L, 200), ENSURE_REQUIREMENTS, [id=#34]
>             +- Filter isnotnull(key#8L)
>                +- BatchScan[key#8L] ParquetScan DataFilters: [isnotnull(key#8L)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/lookup], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint>, PushedFilters: [IsNotNull(key)] RuntimeFilters: []
> {noformat}
> The initial plan with V1 uses broadcast hash join, but the initial plan with V2 uses sort merge join.
> The V1 logical plan contains a projection over the relation for {{lookup}}, which restricts the output columns to just {{key}}. As a result, {{SizeInBytesOnlyStatsPlanVisitor#visitUnaryNode}}, when visiting the project node, reduces sizeInBytes based on the pruning:
> {noformat}
> Project [key#0L, col1#1, col2#2, col3#3]
> +- Join Inner, (key#0L = key#8L)
>    :- Filter isnotnull(key#0L)
>    :  +- Relation [key#0L,col1#1,col2#2,col3#3] parquet
>    +- Project [key#8L]
>       +- Filter isnotnull(key#8L)
>          +- Relation [key#8L,col1#9,col2#10,col3#11] parquet
> {noformat}
> The V2 logical plan does not contain this projection:
> {noformat}
> +- Join Inner, (key#0L = key#8L)
>    :- Filter isnotnull(key#0L)
>    :  +- RelationV2[key#0L, col1#1, col2#2, col3#3] parquet file:/tmp/tbl
>    +- Filter isnotnull(key#8L)
>       +- RelationV2[key#8L] parquet file:/tmp/lookup
> {noformat}
> [1] With my example, AQE converts the join to a broadcast hash join at run time for the V2 case. However, if AQE was disabled, it would obviously remain a sort merge join.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org