You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuming Wang (Jira)" <ji...@apache.org> on 2022/10/01 08:57:00 UTC
[jira] [Updated] (SPARK-40626) Reorder join keys impact performance
[ https://issues.apache.org/jira/browse/SPARK-40626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuming Wang updated SPARK-40626:
--------------------------------
Summary: Reorder join keys impact performance (was: Do not reorder join keys in EnsureRequirements if they are not simple expressions)
> Reorder join keys impact performance
> ------------------------------------
>
> Key: SPARK-40626
> URL: https://issues.apache.org/jira/browse/SPARK-40626
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.4.0
> Reporter: Yuming Wang
> Priority: Major
> Attachments: Pull out complex join condition.png, change the aggregate key order.png, default(join order will change).png
>
>
> {code:scala}
> sql("CREATE TABLE t1 (itemid BIGINT, eventType STRING, dt STRING) USING parquet PARTITIONED BY (dt)")
> sql("CREATE TABLE t2 (cal_dt DATE, item_id BIGINT) using parquet")
> sql("set spark.sql.autoBroadcastJoinThreshold=-1")
> sql(
> """
> |SELECT itemid,
> | eventtype
> |FROM t1 a
> | INNER JOIN (SELECT DISTINCT cal_dt,
> | item_id
> | FROM t2) b
> | ON a.itemid = b.item_id
> | AND To_date(a.dt, 'yyyyMMdd') = b.cal_dt
> """.stripMargin).explain()
> {code}
> The plan:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [itemid#10L, eventtype#11]
> +- SortMergeJoin [cast(gettimestamp(dt#12, yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date), itemid#10L], [cal_dt#13, item_id#14L], Inner
> :- Sort [cast(gettimestamp(dt#12, yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date) ASC NULLS FIRST, itemid#10L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(cast(gettimestamp(dt#12, yyyyMMdd, TimestampType, Some(America/Los_Angeles), false) as date), itemid#10L, 5), ENSURE_REQUIREMENTS, [plan_id=48]
> : +- Filter isnotnull(itemid#10L)
> : +- FileScan parquet spark_catalog.default.t1[itemid#10L,eventType#11,dt#12]
> +- Sort [cal_dt#13 ASC NULLS FIRST, item_id#14L ASC NULLS FIRST], false, 0
> +- HashAggregate(keys=[cal_dt#13, item_id#14L], functions=[])
> +- Exchange hashpartitioning(cal_dt#13, item_id#14L, 5), ENSURE_REQUIREMENTS, [plan_id=44]
> +- HashAggregate(keys=[cal_dt#13, item_id#14L], functions=[])
> +- Filter (isnotnull(item_id#14L) AND isnotnull(cal_dt#13))
> +- FileScan parquet spark_catalog.default.t2[cal_dt#13,item_id#14L]
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org