You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2022/10/12 10:46:00 UTC

[jira] [Assigned] (SPARK-40764) Extract partitioning through all children output expressions

     [ https://issues.apache.org/jira/browse/SPARK-40764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apache Spark reassigned SPARK-40764:
------------------------------------

    Assignee:     (was: Apache Spark)

> Extract partitioning through all children output expressions
> ------------------------------------------------------------
>
>                 Key: SPARK-40764
>                 URL: https://issues.apache.org/jira/browse/SPARK-40764
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Yuming Wang
>            Priority: Major
>
> {code:sql}
> WITH web_tv as (
>     select
>       ws_item_sk item_sk,
>       d_date,
>       sum(ws_sales_price) sumws,
>       row_number() over (partition by ws_item_sk order by d_date) rk
>     from
>       web_sales, date_dim
>     where
>       ws_sold_date_sk=d_date_sk
>         and d_month_seq between 1212 and 1212 + 11
>         and ws_item_sk is not NULL
>     group by
>       ws_item_sk, d_date),
> web_v1 as (
>     select
>       v1.item_sk,
>       v1.d_date,
>       v1.sumws,
>       sum(v2.sumws) cume_sales
>     from
>       web_tv v1, web_tv v2
>     where
>       v1.item_sk = v2.item_sk
>         and v1.rk >= v2.rk
>     group by
>       v1.item_sk,
>       v1.d_date,
>       v1.sumws)
> select *
> from web_v1
> {code}
> Before:
> {noformat}
> == Physical Plan ==
> *(13) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], functions=[sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, cume_sales#5])
> +- *(13) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], functions=[partial_sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, sum#132, isEmpty#133])
>    +- *(13) Project [item_sk#1, d_date#2, sumws#3, sumws#4]
>       +- *(13) SortMergeJoin [item_sk#1], [item_sk#6], Inner, (rk#7 >= rk#8)
>          :- *(6) Sort [item_sk#1 ASC NULLS FIRST], false, 0
>          :  +- Exchange hashpartitioning(item_sk#1, 5), ENSURE_REQUIREMENTS, [plan_id=1]
>          :     +- *(5) Project [item_sk#1, d_date#2, sumws#3, rk#7]
>          :        +- Window [row_number() windowspecdefinition(ws_item_sk#9, d_date#2 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#7], [ws_item_sk#9], [d_date#2 ASC NULLS FIRST]
>          :           +- *(4) Sort [ws_item_sk#9 ASC NULLS FIRST, d_date#2 ASC NULLS FIRST], false, 0
>          :              +- Exchange hashpartitioning(ws_item_sk#9, 5), ENSURE_REQUIREMENTS, [plan_id=2]
>          :                 +- *(3) HashAggregate(keys=[ws_item_sk#9, d_date#2], functions=[sum(UnscaledValue(ws_sales_price#10))], output=[item_sk#1, d_date#2, sumws#3, ws_item_sk#9])
>          :                    +- Exchange hashpartitioning(ws_item_sk#9, d_date#2, 5), ENSURE_REQUIREMENTS, [plan_id=3]
>          :                       +- *(2) HashAggregate(keys=[ws_item_sk#9, d_date#2], functions=[partial_sum(UnscaledValue(ws_sales_price#10))], output=[ws_item_sk#9, d_date#2, sum#134])
>          :                          +- *(2) Project [ws_item_sk#9, ws_sales_price#10, d_date#2]
>          :                             +- *(2) BroadcastHashJoin [ws_sold_date_sk#11], [d_date_sk#12], Inner, BuildRight, false
>          :                                :- *(2) Filter isnotnull(ws_item_sk#9)
>          :                                :  +- *(2) ColumnarToRow
>          :                                :     +- FileScan parquet spark_catalog.default.web_sales[ws_item_sk#9,ws_sales_price#10,ws_sold_date_sk#11] Batched: true, DataFilters: [isnotnull(ws_item_sk#9)], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], PartitionFilters: [isnotnull(ws_sold_date_sk#11)], PushedFilters: [IsNotNull(ws_item_sk)], ReadSchema: struct<ws_item_sk:int,ws_sales_price:decimal(7,2)>
>          :                                +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=4]
>          :                                   +- *(1) Project [d_date_sk#12, d_date#2]
>          :                                      +- *(1) Filter (((isnotnull(d_month_seq#44) AND (d_month_seq#44 >= 1212)) AND (d_month_seq#44 <= 1223)) AND isnotnull(d_date_sk#12))
>          :                                         +- *(1) ColumnarToRow
>          :                                            +- FileScan parquet spark_catalog.default.date_dim[d_date_sk#12,d_date#2,d_month_seq#44] Batched: true, DataFilters: [isnotnull(d_month_seq#44), (d_month_seq#44 >= 1212), (d_month_seq#44 <= 1223), isnotnull(d_date_..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/spark/parquet-1.13/launcher/spark-warehouse/org.ap..., PartitionFilters: [], PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1212), LessThanOrEqual(d_month_seq,1223),..., ReadSchema: struct<d_date_sk:int,d_date:date,d_month_seq:int>
>          +- *(12) Sort [item_sk#6 ASC NULLS FIRST], false, 0
>             +- Exchange hashpartitioning(item_sk#6, 5), ENSURE_REQUIREMENTS, [plan_id=5]
>                +- *(11) Project [item_sk#1 AS item_sk#6, sumws#3 AS sumws#4, rk#8]
>                   +- Window [row_number() windowspecdefinition(ws_item_sk#70, d_date#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#8], [ws_item_sk#70], [d_date#71 ASC NULLS FIRST]
>                      +- *(10) Sort [ws_item_sk#70 ASC NULLS FIRST, d_date#71 ASC NULLS FIRST], false, 0
>                         +- ReusedExchange [item_sk#1, d_date#71, sumws#3, ws_item_sk#70], Exchange hashpartitioning(ws_item_sk#9, 5), ENSURE_REQUIREMENTS, [plan_id=2]
> {noformat}
> After:
> {noformat}
> == Physical Plan ==
> *(12) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], functions=[sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, cume_sales#5])
> +- *(12) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], functions=[partial_sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, sum#132, isEmpty#133])
>    +- *(12) Project [item_sk#1, d_date#2, sumws#3, sumws#4]
>       +- *(12) SortMergeJoin [item_sk#1], [item_sk#6], Inner, (rk#7 >= rk#8)
>          :- *(6) Sort [item_sk#1 ASC NULLS FIRST], false, 0
>          :  +- Exchange hashpartitioning(item_sk#1, 5), ENSURE_REQUIREMENTS, [plan_id=1]
>          :     +- *(5) Project [item_sk#1, d_date#2, sumws#3, rk#7]
>          :        +- Window [row_number() windowspecdefinition(ws_item_sk#9, d_date#2 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#7], [ws_item_sk#9], [d_date#2 ASC NULLS FIRST]
>          :           +- *(4) Sort [ws_item_sk#9 ASC NULLS FIRST, d_date#2 ASC NULLS FIRST], false, 0
>          :              +- Exchange hashpartitioning(ws_item_sk#9, 5), ENSURE_REQUIREMENTS, [plan_id=2]
>          :                 +- *(3) HashAggregate(keys=[ws_item_sk#9, d_date#2], functions=[sum(UnscaledValue(ws_sales_price#10))], output=[item_sk#1, d_date#2, sumws#3, ws_item_sk#9])
>          :                    +- Exchange hashpartitioning(ws_item_sk#9, d_date#2, 5), ENSURE_REQUIREMENTS, [plan_id=3]
>          :                       +- *(2) HashAggregate(keys=[ws_item_sk#9, d_date#2], functions=[partial_sum(UnscaledValue(ws_sales_price#10))], output=[ws_item_sk#9, d_date#2, sum#134])
>          :                          +- *(2) Project [ws_item_sk#9, ws_sales_price#10, d_date#2]
>          :                             +- *(2) BroadcastHashJoin [ws_sold_date_sk#11], [d_date_sk#12], Inner, BuildRight, false
>          :                                :- *(2) Filter isnotnull(ws_item_sk#9)
>          :                                :  +- *(2) ColumnarToRow
>          :                                :     +- FileScan parquet spark_catalog.default.web_sales[ws_item_sk#9,ws_sales_price#10,ws_sold_date_sk#11] Batched: true, DataFilters: [isnotnull(ws_item_sk#9)], Format: Parquet, Location: InMemoryFileIndex(0 paths)[], PartitionFilters: [isnotnull(ws_sold_date_sk#11)], PushedFilters: [IsNotNull(ws_item_sk)], ReadSchema: struct<ws_item_sk:int,ws_sales_price:decimal(7,2)>
>          :                                +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=4]
>          :                                   +- *(1) Project [d_date_sk#12, d_date#2]
>          :                                      +- *(1) Filter (((isnotnull(d_month_seq#44) AND (d_month_seq#44 >= 1212)) AND (d_month_seq#44 <= 1223)) AND isnotnull(d_date_sk#12))
>          :                                         +- *(1) ColumnarToRow
>          :                                            +- FileScan parquet spark_catalog.default.date_dim[d_date_sk#12,d_date#2,d_month_seq#44] Batched: true, DataFilters: [isnotnull(d_month_seq#44), (d_month_seq#44 >= 1212), (d_month_seq#44 <= 1223), isnotnull(d_date_..., Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/spark/parquet-1.13/launcher/spark-warehouse/org.ap..., PartitionFilters: [], PushedFilters: [IsNotNull(d_month_seq), GreaterThanOrEqual(d_month_seq,1212), LessThanOrEqual(d_month_seq,1223),..., ReadSchema: struct<d_date_sk:int,d_date:date,d_month_seq:int>
>          +- *(11) Project [item_sk#1 AS item_sk#6, sumws#3 AS sumws#4, rk#8]
>             +- Window [row_number() windowspecdefinition(ws_item_sk#70, d_date#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#8], [ws_item_sk#70], [d_date#71 ASC NULLS FIRST]
>                +- *(10) Sort [ws_item_sk#70 ASC NULLS FIRST, d_date#71 ASC NULLS FIRST], false, 0
>                   +- ReusedExchange [item_sk#1, d_date#71, sumws#3, ws_item_sk#70], Exchange hashpartitioning(ws_item_sk#9, 5), ENSURE_REQUIREMENTS, [plan_id=2]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org