You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bruce Robbins (Jira)" <ji...@apache.org> on 2021/08/23 23:20:00 UTC

[jira] [Created] (SPARK-36568) Missed broadcast join in V2 plan

Bruce Robbins created SPARK-36568:
-------------------------------------

             Summary: Missed broadcast join in V2 plan
                 Key: SPARK-36568
                 URL: https://issues.apache.org/jira/browse/SPARK-36568
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.2.0
            Reporter: Bruce Robbins


There are some joins that use broadcast hash join with DataSourceV1 but sort merge join with DataSourceV2, even though the two joins are loading the same files [1].

Example:

Create data:
{noformat}
import scala.util.Random

val rand = new Random(245665L)

val df = spark.range(1, 20000).map { x =>
  (x,
   rand.alphanumeric.take(20).mkString,
   rand.alphanumeric.take(20).mkString,
   rand.alphanumeric.take(20).mkString
  )
}.toDF("key", "col1", "col2", "col3")

df.write.mode("overwrite").format("parquet").save("/tmp/tbl")
df.write.mode("overwrite").format("parquet").save("/tmp/lookup")
{noformat}
Run this code:
{noformat}
bin/spark-shell --conf spark.sql.autoBroadcastJoinThreshold=400000

spark.read.format("parquet").load("/tmp/tbl").createOrReplaceTempView("tbl")
spark.read.format("parquet").load("/tmp/lookup").createOrReplaceTempView("lookup")

sql("""select t.key, t.col1, t.col2, t.col3
from tbl t
join lookup l
on t.key = l.key""").explain
{noformat}
For V2, do the same, except set {{spark.sql.sources.useV1SourceList=""}}.

For V1, the result is:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [key#0L, col1#1, col2#2, col3#3]
   +- BroadcastHashJoin [key#0L], [key#8L], Inner, BuildRight, false
      :- Filter isnotnull(key#0L)
      :  +- FileScan parquet [key#0L,col1#1,col2#2,col3#3] Batched: true, DataFilters: [isnotnull(key#0L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/tbl], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,col1:string,col2:string,col3:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#32]
         +- Filter isnotnull(key#8L)
            +- FileScan parquet [key#8L] Batched: true, DataFilters: [isnotnull(key#8L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/lookup], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint>
{noformat}
For V2, the result is:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [key#0L, col1#1, col2#2, col3#3]
   +- SortMergeJoin [key#0L], [key#8L], Inner
      :- Sort [key#0L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(key#0L, 200), ENSURE_REQUIREMENTS, [id=#33]
      :     +- Filter isnotnull(key#0L)
      :        +- BatchScan[key#0L, col1#1, col2#2, col3#3] ParquetScan DataFilters: [isnotnull(key#0L)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/tbl], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint,col1:string,col2:string,col3:string>, PushedFilters: [IsNotNull(key)] RuntimeFilters: []
      +- Sort [key#8L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(key#8L, 200), ENSURE_REQUIREMENTS, [id=#34]
            +- Filter isnotnull(key#8L)
               +- BatchScan[key#8L] ParquetScan DataFilters: [isnotnull(key#8L)], Format: parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/lookup], PartitionFilters: [], PushedFilters: [IsNotNull(key)], ReadSchema: struct<key:bigint>, PushedFilters: [IsNotNull(key)] RuntimeFilters: []
{noformat}
The initial plan with V1 uses broadcast hash join, but the initial plan with V2 uses sort merge join.

The V1 logical plan contains a projection over the relation for {{lookup}}, which restricts the output columns to just {{key}}. As a result, {{SizeInBytesOnlyStatsPlanVisitor#visitUnaryNode}}, when visiting the project node, reduces sizeInBytes based on the pruning:
{noformat}
Project [key#0L, col1#1, col2#2, col3#3]
+- Join Inner, (key#0L = key#8L)
   :- Filter isnotnull(key#0L)
   :  +- Relation [key#0L,col1#1,col2#2,col3#3] parquet
   +- Project [key#8L]
      +- Filter isnotnull(key#8L)
         +- Relation [key#8L,col1#9,col2#10,col3#11] parquet
{noformat}
The V2 logical plan does not contain this projection:
{noformat}
+- Join Inner, (key#0L = key#8L)
   :- Filter isnotnull(key#0L)
   :  +- RelationV2[key#0L, col1#1, col2#2, col3#3] parquet file:/tmp/tbl
   +- Filter isnotnull(key#8L)
      +- RelationV2[key#8L] parquet file:/tmp/lookup
{noformat}
[1] With my example, AQE converts the join to a broadcast hash join at run time for the V2 case. However, if AQE was disabled, it would obviously remain a sort merge join.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org