You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Wenchen Fan (Jira)" <ji...@apache.org> on 2021/08/26 03:28:00 UTC
[jira] [Resolved] (SPARK-36568) Missed broadcast join in V2 plan
[ https://issues.apache.org/jira/browse/SPARK-36568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan resolved SPARK-36568.
---------------------------------
Fix Version/s: 3.3.0
Resolution: Fixed
Issue resolved by pull request 33825
[https://github.com/apache/spark/pull/33825]
> Missed broadcast join in V2 plan
> --------------------------------
>
> Key: SPARK-36568
> URL: https://issues.apache.org/jira/browse/SPARK-36568
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.2.0
> Reporter: Bruce Robbins
> Priority: Major
> Fix For: 3.3.0
>
>
> There are some joins that use broadcast hash join with DataSourceV1 but sort merge join with DataSourceV2, even though the two joins are loading the same files [1].
> Example:
> Create data:
> {noformat}
> import scala.util.Random
> val rand = new Random(245665L)
> val df = spark.range(1, 20000).map { x =>
> (x,
> rand.alphanumeric.take(20).mkString,
> rand.alphanumeric.take(20).mkString,
> rand.alphanumeric.take(20).mkString
> )
> }.toDF("key", "col1", "col2", "col3")
> df.write.mode("overwrite").format("parquet").save("/tmp/tbl")
> df.write.mode("overwrite").format("parquet").save("/tmp/lookup")
> {noformat}
> Run this code:
> {noformat}
> bin/spark-shell --conf spark.sql.autoBroadcastJoinThreshold=400000
> spark.read.format("parquet").load("/tmp/tbl").createOrReplaceTempView("tbl")
> spark.read.format("parquet").load("/tmp/lookup").createOrReplaceTempView("lookup")
> sql("""select t.key, t.col1, t.col2, t.col3
> from tbl t
> join lookup l
> on t.key = l.key""").explain
> {noformat}
> For V2, do the same, except set {{spark.sql.sources.useV1SourceList=""}}.
> For V1, the result is:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [key#0L, col1#1, col2#2, col3#3]
> +- BroadcastHashJoin [key#0L], [key#8L], Inner, BuildRight, false
> :- Filter isnotnull(key#0L)
> : +- FileScan parquet [key#0L,col1#1,col2#2,col3#3] Batched: true, DataFilters: [isnotnull(key#0L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/tbl], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,col1:string,col2:string,col3:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#32]
> +- Filter isnotnull(key#8L)
> +- FileScan parquet [key#8L] Batched: true, DataFilters: [isnotnull(key#8L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/lookup], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint>
> {noformat}
> For V2, the result is:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [key#0L, col1#1, col2#2, col3#3]
> +- SortMergeJoin [key#0L], [key#8L], Inner
> :- Sort [key#0L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(key#0L, 200), ENSURE_REQUIREMENTS, [id=#33]
> : +- Filter isnotnull(key#0L)
> : +- BatchScan[key#0L, col1#1, col2#2, col3#3] ParquetScan DataFilters: [isnotnull(key#0L)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/tbl], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,col1:string,col2:string,col3:string>, PushedFilters: [IsNotNull(key)] RuntimeFilters: []
> +- Sort [key#8L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(key#8L, 200), ENSURE_REQUIREMENTS, [id=#34]
> +- Filter isnotnull(key#8L)
> +- BatchScan[key#8L] ParquetScan DataFilters: [isnotnull(key#8L)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/lookup], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint>, PushedFilters: [IsNotNull(key)] RuntimeFilters: []
> {noformat}
> The initial plan with V1 uses broadcast hash join, but the initial plan with V2 uses sort merge join.
> The V1 logical plan contains a projection over the relation for {{lookup}}, which restricts the output columns to just {{key}}. As a result, {{SizeInBytesOnlyStatsPlanVisitor#visitUnaryNode}}, when visiting the project node, reduces sizeInBytes based on the pruning:
> {noformat}
> Project [key#0L, col1#1, col2#2, col3#3]
> +- Join Inner, (key#0L = key#8L)
> :- Filter isnotnull(key#0L)
> : +- Relation [key#0L,col1#1,col2#2,col3#3] parquet
> +- Project [key#8L]
> +- Filter isnotnull(key#8L)
> +- Relation [key#8L,col1#9,col2#10,col3#11] parquet
> {noformat}
> The V2 logical plan does not contain this projection:
> {noformat}
> +- Join Inner, (key#0L = key#8L)
> :- Filter isnotnull(key#0L)
> : +- RelationV2[key#0L, col1#1, col2#2, col3#3] parquet file:/tmp/tbl
> +- Filter isnotnull(key#8L)
> +- RelationV2[key#8L] parquet file:/tmp/lookup
> {noformat}
> [1] With my example, AQE converts the join to a broadcast hash join at run time for the V2 case. However, if AQE was disabled, it would obviously remain a sort merge join.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org