You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (Jira)" <ji...@apache.org> on 2021/04/06 12:51:00 UTC

[jira] [Created] (SPARK-34966) Avoid shuffle if join type do not match

Yuming Wang created SPARK-34966:
-----------------------------------

             Summary: Avoid shuffle if join type do not match
                 Key: SPARK-34966
                 URL: https://issues.apache.org/jira/browse/SPARK-34966
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.2.0
            Reporter: Yuming Wang


How to reproduce this issue:
{code:scala}
    spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")
    spark.sql("CREATE TABLE t1 using parquet clustered by (id) into 200 buckets AS SELECT cast(id as bigint) FROM range(1000)")
    spark.sql("CREATE TABLE t2 using parquet clustered by (id) into 200 buckets AS SELECT cast(id as int) FROM range(500)")
    spark.sql("select * from t1 join t2 on (t1.id = t2.id)").explain
{code}

Current:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#14L], [cast(id#15 as bigint)], Inner
   :- Sort [id#14L ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(id#14L)
   :     +- FileScan parquet default.t1[id#14L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 200 out of 200
   +- Sort [cast(id#15 as bigint) ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(cast(id#15 as bigint), 200), ENSURE_REQUIREMENTS, [id=#58]
         +- Filter isnotnull(id#15)
            +- FileScan parquet default.t2[id#15] Batched: true, DataFilters: [isnotnull(id#15)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int>
{noformat}

Expected:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#14L], [cast(id#15 as bigint)], Inner
   :- Sort [id#14L ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(id#14L)
   :     +- FileScan parquet default.t1[id#14L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 200 out of 200
   +- Sort [cast(id#15 as bigint) ASC NULLS FIRST], false, 0
      +- Filter isnotnull(id#15)
         +- FileScan parquet default.t2[id#15] Batched: true, DataFilters: [isnotnull(id#15)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int>, SelectedBucketsCount: 200 out of 200
{noformat}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org