You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/10/12 10:46:00 UTC
[jira] [Assigned] (SPARK-40764) Extract partitioning through all children output expressions
[ https://issues.apache.org/jira/browse/SPARK-40764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-40764:
------------------------------------
Assignee: (was: Apache Spark)
> Extract partitioning through all children output expressions
> ------------------------------------------------------------
>
> Key: SPARK-40764
> URL: https://issues.apache.org/jira/browse/SPARK-40764
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.4.0
> Reporter: Yuming Wang
> Priority: Major
>
> {code:sql}
> WITH web_tv as (
> select
> ws_item_sk item_sk,
> d_date,
> sum(ws_sales_price) sumws,
> row_number() over (partition by ws_item_sk order by d_date) rk
> from
> web_sales, date_dim
> where
> ws_sold_date_sk=d_date_sk
> and d_month_seq between 1212 and 1212 + 11
> and ws_item_sk is not NULL
> group by
> ws_item_sk, d_date),
> web_v1 as (
> select
> v1.item_sk,
> v1.d_date,
> v1.sumws,
> sum(v2.sumws) cume_sales
> from
> web_tv v1, web_tv v2
> where
> v1.item_sk = v2.item_sk
> and v1.rk >= v2.rk
> group by
> v1.item_sk,
> v1.d_date,
> v1.sumws)
> select *
> from web_v1
> {code}
> Before:
> {noformat}
> == Physical Plan ==
> *(13) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], functions=[sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, cume_sales#5])
> +- *(13) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], functions=[partial_sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, sum#132, isEmpty#133])
> +- *(13) Project [item_sk#1, d_date#2, sumws#3, sumws#4]
> +- *(13) SortMergeJoin [item_sk#1], [item_sk#6], Inner, (rk#7 >= rk#8)
> :- *(6) Sort [item_sk#1 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(item_sk#1, 5), ENSURE_REQUIREMENTS, [plan_id=1]
> : +- *(5) Project [item_sk#1, d_date#2, sumws#3, rk#7]
> : +- Window [row_number() windowspecdefinition(ws_item_sk#9, d_date#2 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#7], [ws_item_sk#9], [d_date#2 ASC NULLS FIRST]
> : +- *(4) Sort [ws_item_sk#9 ASC NULLS FIRST, d_date#2 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(ws_item_sk#9, 5), ENSURE_REQUIREMENTS, [plan_id=2]
> : +- *(3) HashAggregate(keys=[ws_item_sk#9, d_date#2], functions=[sum(UnscaledValue(ws_sales_price#10))], output=[item_sk#1, d_date#2, sumws#3, ws_item_sk#9])
> : +- Exchange hashpartitioning(ws_item_sk#9, d_date#2, 5), ENSURE_REQUIREMENTS, [plan_id=3]
> : +- *(2) HashAggregate(keys=[ws_item_sk#9, d_date#2], functions=[partial_sum(UnscaledValue(ws_sales_price#10))], output=[ws_item_sk#9, d_date#2, sum#134])
> : +- *(2) Project [ws_item_sk#9, ws_sales_price#10, d_date#2]
> : +- *(2) BroadcastHashJoin [ws_sold_date_sk#11], [d_date_sk#12], Inner, BuildRight, false
> : :- *(2) Filter isnotnull(ws_item_sk#9)
> : : +- *(2) ColumnarToRow
> : : +- FileScan parquet spark_catalog.default.web_sales[ws_item_sk#9,ws_sales_price#10,ws_sold_date_sk#11] Batched: true, DataFilters: [isnotnull(ws_item_sk#9)], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], PartitionFilters: [isnotnull(ws_sold_date_sk#11)], PushedFilters: [IsNotNull(ws_item_sk)], ReadSchema: struct<ws_item_sk:int,ws_sales_price:decimal(7,2)>
> : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=4]
> : +- *(1) Project [d_date_sk#12, d_date#2]
> : +- *(1) Filter (((isnotnull(d_month_seq#44) AND (d_month_seq#44 >= 1212)) AND (d_month_seq#44 <= 1223)) AND isnotnull(d_date_sk#12))
> : +- *(1) ColumnarToRow
> : +- FileScan parquet spark_catalog.default.date_dim[d_date_sk#12,d_date#2,d_month_seq#44] Batched: true, DataFilters: [isnotnull(d_month_seq#44), (d_month_seq#44 >= 1212), (d_month_seq#44 <= 1223), isnotnull(d_date_..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/spark/parquet-1.13/launcher/spark-warehouse/org.ap..., PartitionFilters: [], PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1212), LessThanOrEqual(d_month_seq,1223),..., ReadSchema: struct<d_date_sk:int,d_date:date,d_month_seq:int>
> +- *(12) Sort [item_sk#6 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(item_sk#6, 5), ENSURE_REQUIREMENTS, [plan_id=5]
> +- *(11) Project [item_sk#1 AS item_sk#6, sumws#3 AS sumws#4, rk#8]
> +- Window [row_number() windowspecdefinition(ws_item_sk#70, d_date#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#8], [ws_item_sk#70], [d_date#71 ASC NULLS FIRST]
> +- *(10) Sort [ws_item_sk#70 ASC NULLS FIRST, d_date#71 ASC NULLS FIRST], false, 0
> +- ReusedExchange [item_sk#1, d_date#71, sumws#3, ws_item_sk#70], Exchange hashpartitioning(ws_item_sk#9, 5), ENSURE_REQUIREMENTS, [plan_id=2]
> {noformat}
> After:
> {noformat}
> == Physical Plan ==
> *(12) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], functions=[sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, cume_sales#5])
> +- *(12) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], functions=[partial_sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, sum#132, isEmpty#133])
> +- *(12) Project [item_sk#1, d_date#2, sumws#3, sumws#4]
> +- *(12) SortMergeJoin [item_sk#1], [item_sk#6], Inner, (rk#7 >= rk#8)
> :- *(6) Sort [item_sk#1 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(item_sk#1, 5), ENSURE_REQUIREMENTS, [plan_id=1]
> : +- *(5) Project [item_sk#1, d_date#2, sumws#3, rk#7]
> : +- Window [row_number() windowspecdefinition(ws_item_sk#9, d_date#2 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#7], [ws_item_sk#9], [d_date#2 ASC NULLS FIRST]
> : +- *(4) Sort [ws_item_sk#9 ASC NULLS FIRST, d_date#2 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(ws_item_sk#9, 5), ENSURE_REQUIREMENTS, [plan_id=2]
> : +- *(3) HashAggregate(keys=[ws_item_sk#9, d_date#2], functions=[sum(UnscaledValue(ws_sales_price#10))], output=[item_sk#1, d_date#2, sumws#3, ws_item_sk#9])
> : +- Exchange hashpartitioning(ws_item_sk#9, d_date#2, 5), ENSURE_REQUIREMENTS, [plan_id=3]
> : +- *(2) HashAggregate(keys=[ws_item_sk#9, d_date#2], functions=[partial_sum(UnscaledValue(ws_sales_price#10))], output=[ws_item_sk#9, d_date#2, sum#134])
> : +- *(2) Project [ws_item_sk#9, ws_sales_price#10, d_date#2]
> : +- *(2) BroadcastHashJoin [ws_sold_date_sk#11], [d_date_sk#12], Inner, BuildRight, false
> : :- *(2) Filter isnotnull(ws_item_sk#9)
> : : +- *(2) ColumnarToRow
> : : +- FileScan parquet spark_catalog.default.web_sales[ws_item_sk#9,ws_sales_price#10,ws_sold_date_sk#11] Batched: true, DataFilters: [isnotnull(ws_item_sk#9)], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], PartitionFilters: [isnotnull(ws_sold_date_sk#11)], PushedFilters: [IsNotNull(ws_item_sk)], ReadSchema: struct<ws_item_sk:int,ws_sales_price:decimal(7,2)>
> : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=4]
> : +- *(1) Project [d_date_sk#12, d_date#2]
> : +- *(1) Filter (((isnotnull(d_month_seq#44) AND (d_month_seq#44 >= 1212)) AND (d_month_seq#44 <= 1223)) AND isnotnull(d_date_sk#12))
> : +- *(1) ColumnarToRow
> : +- FileScan parquet spark_catalog.default.date_dim[d_date_sk#12,d_date#2,d_month_seq#44] Batched: true, DataFilters: [isnotnull(d_month_seq#44), (d_month_seq#44 >= 1212), (d_month_seq#44 <= 1223), isnotnull(d_date_..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/spark/parquet-1.13/launcher/spark-warehouse/org.ap..., PartitionFilters: [], PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1212), LessThanOrEqual(d_month_seq,1223),..., ReadSchema: struct<d_date_sk:int,d_date:date,d_month_seq:int>
> +- *(11) Project [item_sk#1 AS item_sk#6, sumws#3 AS sumws#4, rk#8]
> +- Window [row_number() windowspecdefinition(ws_item_sk#70, d_date#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#8], [ws_item_sk#70], [d_date#71 ASC NULLS FIRST]
> +- *(10) Sort [ws_item_sk#70 ASC NULLS FIRST, d_date#71 ASC NULLS FIRST], false, 0
> +- ReusedExchange [item_sk#1, d_date#71, sumws#3, ws_item_sk#70], Exchange hashpartitioning(ws_item_sk#9, 5), ENSURE_REQUIREMENTS, [plan_id=2]
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org