You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yang Jie (Jira)" <ji...@apache.org> on 2023/02/22 02:14:00 UTC

[jira] [Resolved] (SPARK-40278) Used databricks spark-sql-pref with Spark 3.3 to run 3TB tpcds test failed

     [ https://issues.apache.org/jira/browse/SPARK-40278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yang Jie resolved SPARK-40278.
------------------------------
    Resolution: Duplicate

> Used databricks spark-sql-pref with Spark 3.3 to run 3TB tpcds test failed
> --------------------------------------------------------------------------
>
>                 Key: SPARK-40278
>                 URL: https://issues.apache.org/jira/browse/SPARK-40278
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.3.0
>            Reporter: Yang Jie
>            Priority: Major
>
> I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, the test code as follows:
> {code:java}
> val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T"
> val databaseName = "tpcds_database"
> val scaleFactor = "3072"
> val format = "parquet"
> import com.databricks.spark.sql.perf.tpcds.TPCDSTables
> val tables = new TPCDSTables(
>       spark.sqlContext,dsdgenDir = "./tpcds-kit/tools",
>       scaleFactor = scaleFactor,
>       useDoubleForDecimal = false,useStringForDate = false)
> spark.sql(s"create database $databaseName")
> tables.createTemporaryTables(rootDir, format)
> spark.sql(s"use $databaseName")// TPCDS 24a or 24b
> val result = spark.sql(""" with ssales as
>  (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
>         i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid
>  from store_sales, store_returns, store, item, customer, customer_address
>  where ss_ticket_number = sr_ticket_number
>    and ss_item_sk = sr_item_sk
>    and ss_customer_sk = c_customer_sk
>    and ss_item_sk = i_item_sk
>    and ss_store_sk = s_store_sk
>    and c_birth_country = upper(ca_country)
>    and s_zip = ca_zip
>  and s_market_id = 8
>  group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
>           i_current_price, i_manager_id, i_units, i_size)
>  select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
>  from ssales
>  where i_color = 'pale'
>  group by c_last_name, c_first_name, s_store_name
>  having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect()
>  sc.stop() {code}
> The above test may failed due to `Stage cancelled because SparkContext was shut down` of stage 31 and stage 36 when AQE enabled as follows:
>  
> !image-2022-08-30-21-09-48-763.png!
> !image-2022-08-30-21-10-24-862.png!
> !image-2022-08-30-21-10-57-128.png!
>  
> The DAG corresponding to sql is as follows:
> !image-2022-08-30-21-11-50-895.png!
> The details as follows:
>  
>  
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan (42)
> +- == Final Plan ==
>    LocalTableScan (1)
> +- == Initial Plan ==
>    Filter (41)
>    +- HashAggregate (40)
>       +- Exchange (39)
>          +- HashAggregate (38)
>             +- HashAggregate (37)
>                +- Exchange (36)
>                   +- HashAggregate (35)
>                      +- Project (34)
>                         +- BroadcastHashJoin Inner BuildRight (33)
>                            :- Project (29)
>                            :  +- BroadcastHashJoin Inner BuildRight (28)
>                            :     :- Project (24)
>                            :     :  +- BroadcastHashJoin Inner BuildRight (23)
>                            :     :     :- Project (19)
>                            :     :     :  +- BroadcastHashJoin Inner BuildRight (18)
>                            :     :     :     :- Project (13)
>                            :     :     :     :  +- SortMergeJoin Inner (12)
>                            :     :     :     :     :- Sort (6)
>                            :     :     :     :     :  +- Exchange (5)
>                            :     :     :     :     :     +- Project (4)
>                            :     :     :     :     :        +- Filter (3)
>                            :     :     :     :     :           +- Scan parquet  (2)
>                            :     :     :     :     +- Sort (11)
>                            :     :     :     :        +- Exchange (10)
>                            :     :     :     :           +- Project (9)
>                            :     :     :     :              +- Filter (8)
>                            :     :     :     :                 +- Scan parquet  (7)
>                            :     :     :     +- BroadcastExchange (17)
>                            :     :     :        +- Project (16)
>                            :     :     :           +- Filter (15)
>                            :     :     :              +- Scan parquet  (14)
>                            :     :     +- BroadcastExchange (22)
>                            :     :        +- Filter (21)
>                            :     :           +- Scan parquet  (20)
>                            :     +- BroadcastExchange (27)
>                            :        +- Filter (26)
>                            :           +- Scan parquet  (25)
>                            +- BroadcastExchange (32)
>                               +- Filter (31)
>                                  +- Scan parquet  (30)
> (1) LocalTableScan
> Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
> Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
> (2) Scan parquet 
> Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
> Batched: true
> Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales]
> PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)]
> ReadSchema: struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)>
> (3) Filter
> Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
> Condition : (((isnotnull(ss_ticket_number#138L) AND isnotnull(ss_item_sk#131)) AND isnotnull(ss_store_sk#136)) AND isnotnull(ss_customer_sk#132))
> (4) Project
> Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
> Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
> (5) Exchange
> Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
> Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), ENSURE_REQUIREMENTS, [id=#309]
> (6) Sort
> Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
> Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS FIRST], false, 0
> (7) Scan parquet 
> Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
> Batched: true
> Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns]
> PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)]
> ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint>
> (8) Filter
> Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
> Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177))
> (9) Project
> Output [2]: [sr_item_sk#177, sr_ticket_number#184L]
> Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
> (10) Exchange
> Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
> Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), ENSURE_REQUIREMENTS, [id=#310]
> (11) Sort
> Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
> Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS FIRST], false, 0
> (12) SortMergeJoin
> Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131]
> Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177]
> Join condition: None
> (13) Project
> Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149]
> Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L]
> (14) Scan parquet 
> Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
> Batched: true
> Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store]
> PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), IsNotNull(s_store_sk), IsNotNull(s_zip)]
> ReadSchema: struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string>
> (15) Filter
> Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
> Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689))
> (16) Project
> Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
> Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
> (17) BroadcastExchange
> Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
> Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#316]
> (18) BroadcastHashJoin
> Left keys [1]: [ss_store_sk#136]
> Right keys [1]: [s_store_sk#664]
> Join condition: None
> (19) Project
> Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689]
> Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
> (20) Scan parquet 
> Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
> Batched: true
> Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item]
> PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), IsNotNull(i_item_sk)]
> ReadSchema: struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int>
> (21) Filter
> Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
> Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND isnotnull(i_item_sk#564))
> (22) BroadcastExchange
> Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
> Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#320]
> (23) BroadcastHashJoin
> Left keys [1]: [ss_item_sk#131]
> Right keys [1]: [i_item_sk#564]
> Join condition: None
> (24) Project
> Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
> Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
> (25) Scan parquet 
> Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
> Batched: true
> Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer]
> PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)]
> ReadSchema: struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string>
> (26) Filter
> Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
> Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426))
> (27) BroadcastExchange
> Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
> Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#324]
> (28) BroadcastHashJoin
> Left keys [1]: [ss_customer_sk#132]
> Right keys [1]: [c_customer_sk#412]
> Join condition: None
> (29) Project
> Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426]
> Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
> (30) Scan parquet 
> Output [3]: [ca_state#456, ca_zip#457, ca_country#458]
> Batched: true
> Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address]
> PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)]
> ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string>
> (31) Filter
> Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
> Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457))
> (32) BroadcastExchange
> Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
> Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), input[1, string, false]),false), [id=#328]
> (33) BroadcastHashJoin
> Left keys [2]: [c_birth_country#426, s_zip#689]
> Right keys [2]: [upper(ca_country#458), ca_zip#457]
> Join condition: None
> (34) Project
> Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456]
> Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, ca_zip#457, ca_country#458]
> (35) HashAggregate
> Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456]
> Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579]
> Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))]
> Aggregate Attributes [1]: [sum#870L]
> Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
> (36) Exchange
> Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
> Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333]
> (37) HashAggregate
> Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
> Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579]
> Functions [1]: [sum(UnscaledValue(ss_net_paid#149))]
> Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L]
> Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852]
> (38) HashAggregate
> Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852]
> Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
> Functions [1]: [partial_sum(netpaid#852)]
> Aggregate Attributes [2]: [sum#866, isEmpty#867]
> Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
> (39) Exchange
> Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
> Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337]
> (40) HashAggregate
> Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
> Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
> Functions [1]: [sum(netpaid#852)]
> Aggregate Attributes [1]: [sum(netpaid#852)#854]
> Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, sum(netpaid#852)#854 AS paid#850]
> (41) Filter
> Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
> Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > cast(Subquery subquery#851, [id=#294] as decimal(33,8))))
> (42) AdaptiveSparkPlan
> Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
> Arguments: isFinalPlan=true {code}
>  
>  
> And I manually revert SPARK-35442, the problem no longer exists.
>  
> The DAG corresponding to sql is as follows:
> The details as follows:



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org