You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (Jira)" <ji...@apache.org> on 2021/04/06 12:51:00 UTC
[jira] [Created] (SPARK-34966) Avoid shuffle if join type do not
match
Yuming Wang created SPARK-34966:
-----------------------------------
Summary: Avoid shuffle if join type do not match
Key: SPARK-34966
URL: https://issues.apache.org/jira/browse/SPARK-34966
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.2.0
Reporter: Yuming Wang
How to reproduce this issue:
{code:scala}
spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")
spark.sql("CREATE TABLE t1 using parquet clustered by (id) into 200 buckets AS SELECT cast(id as bigint) FROM range(1000)")
spark.sql("CREATE TABLE t2 using parquet clustered by (id) into 200 buckets AS SELECT cast(id as int) FROM range(500)")
spark.sql("select * from t1 join t2 on (t1.id = t2.id)").explain
{code}
Current:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#14L], [cast(id#15 as bigint)], Inner
:- Sort [id#14L ASC NULLS FIRST], false, 0
: +- Filter isnotnull(id#14L)
: +- FileScan parquet default.t1[id#14L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 200 out of 200
+- Sort [cast(id#15 as bigint) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cast(id#15 as bigint), 200), ENSURE_REQUIREMENTS, [id=#58]
+- Filter isnotnull(id#15)
+- FileScan parquet default.t2[id#15] Batched: true, DataFilters: [isnotnull(id#15)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int>
{noformat}
Expected:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#14L], [cast(id#15 as bigint)], Inner
:- Sort [id#14L ASC NULLS FIRST], false, 0
: +- Filter isnotnull(id#14L)
: +- FileScan parquet default.t1[id#14L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 200 out of 200
+- Sort [cast(id#15 as bigint) ASC NULLS FIRST], false, 0
+- Filter isnotnull(id#15)
+- FileScan parquet default.t2[id#15] Batched: true, DataFilters: [isnotnull(id#15)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int>, SelectedBucketsCount: 200 out of 200
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org