You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yang Jie (Jira)" <ji...@apache.org> on 2022/08/30 13:33:00 UTC
[jira] [Updated] (SPARK-40278) Used databricks spark-sql-pref with Spark 3.3 to run 3TB tpcds test failed
[ https://issues.apache.org/jira/browse/SPARK-40278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yang Jie updated SPARK-40278:
-----------------------------
Description:
I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, the test code as follows:
{code:java}
val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T"
val databaseName = "tpcds_database"
val scaleFactor = "3072"
val format = "parquet"
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val tables = new TPCDSTables(
spark.sqlContext,dsdgenDir = "./tpcds-kit/tools",
scaleFactor = scaleFactor,
useDoubleForDecimal = false,useStringForDate = false)
spark.sql(s"create database $databaseName")
tables.createTemporaryTables(rootDir, format)
spark.sql(s"use $databaseName")// TPCDS 24a or 24b
val result = spark.sql(""" with ssales as
(select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid
from store_sales, store_returns, store, item, customer, customer_address
where ss_ticket_number = sr_ticket_number
and ss_item_sk = sr_item_sk
and ss_customer_sk = c_customer_sk
and ss_item_sk = i_item_sk
and ss_store_sk = s_store_sk
and c_birth_country = upper(ca_country)
and s_zip = ca_zip
and s_market_id = 8
group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
i_current_price, i_manager_id, i_units, i_size)
select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
from ssales
where i_color = 'pale'
group by c_last_name, c_first_name, s_store_name
having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect()
sc.stop() {code}
The above test may failed due to `Stage cancelled because SparkContext was shut down` of stage 31 and stage 36 when AQE enabled as follows:
!image-2022-08-30-21-09-48-763.png!
!image-2022-08-30-21-10-24-862.png!
!image-2022-08-30-21-10-57-128.png!
The DAG corresponding to sql is as follows:
!image-2022-08-30-21-11-50-895.png!
The details as follows:
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (42)
+- == Final Plan ==
LocalTableScan (1)
+- == Initial Plan ==
Filter (41)
+- HashAggregate (40)
+- Exchange (39)
+- HashAggregate (38)
+- HashAggregate (37)
+- Exchange (36)
+- HashAggregate (35)
+- Project (34)
+- BroadcastHashJoin Inner BuildRight (33)
:- Project (29)
: +- BroadcastHashJoin Inner BuildRight (28)
: :- Project (24)
: : +- BroadcastHashJoin Inner BuildRight (23)
: : :- Project (19)
: : : +- BroadcastHashJoin Inner BuildRight (18)
: : : :- Project (13)
: : : : +- SortMergeJoin Inner (12)
: : : : :- Sort (6)
: : : : : +- Exchange (5)
: : : : : +- Project (4)
: : : : : +- Filter (3)
: : : : : +- Scan parquet (2)
: : : : +- Sort (11)
: : : : +- Exchange (10)
: : : : +- Project (9)
: : : : +- Filter (8)
: : : : +- Scan parquet (7)
: : : +- BroadcastExchange (17)
: : : +- Project (16)
: : : +- Filter (15)
: : : +- Scan parquet (14)
: : +- BroadcastExchange (22)
: : +- Filter (21)
: : +- Scan parquet (20)
: +- BroadcastExchange (27)
: +- Filter (26)
: +- Scan parquet (25)
+- BroadcastExchange (32)
+- Filter (31)
+- Scan parquet (30)
(1) LocalTableScan
Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
(2) Scan parquet
Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
Batched: true
Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales]
PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)]
ReadSchema: struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)>
(3) Filter
Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
Condition : (((isnotnull(ss_ticket_number#138L) AND isnotnull(ss_item_sk#131)) AND isnotnull(ss_store_sk#136)) AND isnotnull(ss_customer_sk#132))
(4) Project
Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
(5) Exchange
Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), ENSURE_REQUIREMENTS, [id=#309]
(6) Sort
Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS FIRST], false, 0
(7) Scan parquet
Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
Batched: true
Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns]
PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)]
ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint>
(8) Filter
Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177))
(9) Project
Output [2]: [sr_item_sk#177, sr_ticket_number#184L]
Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
(10) Exchange
Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), ENSURE_REQUIREMENTS, [id=#310]
(11) Sort
Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS FIRST], false, 0
(12) SortMergeJoin
Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131]
Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177]
Join condition: None
(13) Project
Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149]
Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L]
(14) Scan parquet
Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
Batched: true
Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store]
PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), IsNotNull(s_store_sk), IsNotNull(s_zip)]
ReadSchema: struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string>
(15) Filter
Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689))
(16) Project
Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
(17) BroadcastExchange
Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#316]
(18) BroadcastHashJoin
Left keys [1]: [ss_store_sk#136]
Right keys [1]: [s_store_sk#664]
Join condition: None
(19) Project
Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689]
Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
(20) Scan parquet
Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
Batched: true
Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item]
PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), IsNotNull(i_item_sk)]
ReadSchema: struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int>
(21) Filter
Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND isnotnull(i_item_sk#564))
(22) BroadcastExchange
Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#320]
(23) BroadcastHashJoin
Left keys [1]: [ss_item_sk#131]
Right keys [1]: [i_item_sk#564]
Join condition: None
(24) Project
Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
(25) Scan parquet
Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
Batched: true
Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)]
ReadSchema: struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string>
(26) Filter
Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426))
(27) BroadcastExchange
Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#324]
(28) BroadcastHashJoin
Left keys [1]: [ss_customer_sk#132]
Right keys [1]: [c_customer_sk#412]
Join condition: None
(29) Project
Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426]
Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
(30) Scan parquet
Output [3]: [ca_state#456, ca_zip#457, ca_country#458]
Batched: true
Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address]
PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)]
ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string>
(31) Filter
Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457))
(32) BroadcastExchange
Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), input[1, string, false]),false), [id=#328]
(33) BroadcastHashJoin
Left keys [2]: [c_birth_country#426, s_zip#689]
Right keys [2]: [upper(ca_country#458), ca_zip#457]
Join condition: None
(34) Project
Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456]
Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, ca_zip#457, ca_country#458]
(35) HashAggregate
Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456]
Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579]
Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))]
Aggregate Attributes [1]: [sum#870L]
Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
(36) Exchange
Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333]
(37) HashAggregate
Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579]
Functions [1]: [sum(UnscaledValue(ss_net_paid#149))]
Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L]
Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852]
(38) HashAggregate
Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852]
Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
Functions [1]: [partial_sum(netpaid#852)]
Aggregate Attributes [2]: [sum#866, isEmpty#867]
Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
(39) Exchange
Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337]
(40) HashAggregate
Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
Functions [1]: [sum(netpaid#852)]
Aggregate Attributes [1]: [sum(netpaid#852)#854]
Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, sum(netpaid#852)#854 AS paid#850]
(41) Filter
Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > cast(Subquery subquery#851, [id=#294] as decimal(33,8))))
(42) AdaptiveSparkPlan
Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Arguments: isFinalPlan=true {code}
And I manually revert SPARK-35442, the problem no longer exists.
The DAG corresponding to sql is as follows:
The details as follows:
was:
I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, the test code as follows:
```scala
val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T"
val databaseName = "tpcds_database"
val scaleFactor = "3072"
val format = "parquet"
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val tables = new TPCDSTables(
spark.sqlContext,dsdgenDir = "./tpcds-kit/tools",
scaleFactor = scaleFactor,
useDoubleForDecimal = false,useStringForDate = false)
spark.sql(s"create database $databaseName")
tables.createTemporaryTables(rootDir, format)
spark.sql(s"use $databaseName")
// TPCDS 24a or 24b
val result = spark.sql(""" with ssales as
(select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid
from store_sales, store_returns, store, item, customer, customer_address
where ss_ticket_number = sr_ticket_number
and ss_item_sk = sr_item_sk
and ss_customer_sk = c_customer_sk
and ss_item_sk = i_item_sk
and ss_store_sk = s_store_sk
and c_birth_country = upper(ca_country)
and s_zip = ca_zip
and s_market_id = 8
group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
i_current_price, i_manager_id, i_units, i_size)
select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
from ssales
where i_color = 'pale'
group by c_last_name, c_first_name, s_store_name
having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect()
sc.stop()
```
The above test may failed due to `Stage cancelled because SparkContext was shut down` of stage 31 and stage 36 when AQE enabled as follows:
!image-2022-08-30-21-09-48-763.png!
!image-2022-08-30-21-10-24-862.png!
!image-2022-08-30-21-10-57-128.png!
The DAG corresponding to sql is as follows:
!image-2022-08-30-21-11-50-895.png!
The details as follows:
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (42)
+- == Final Plan ==
LocalTableScan (1)
+- == Initial Plan ==
Filter (41)
+- HashAggregate (40)
+- Exchange (39)
+- HashAggregate (38)
+- HashAggregate (37)
+- Exchange (36)
+- HashAggregate (35)
+- Project (34)
+- BroadcastHashJoin Inner BuildRight (33)
:- Project (29)
: +- BroadcastHashJoin Inner BuildRight (28)
: :- Project (24)
: : +- BroadcastHashJoin Inner BuildRight (23)
: : :- Project (19)
: : : +- BroadcastHashJoin Inner BuildRight (18)
: : : :- Project (13)
: : : : +- SortMergeJoin Inner (12)
: : : : :- Sort (6)
: : : : : +- Exchange (5)
: : : : : +- Project (4)
: : : : : +- Filter (3)
: : : : : +- Scan parquet (2)
: : : : +- Sort (11)
: : : : +- Exchange (10)
: : : : +- Project (9)
: : : : +- Filter (8)
: : : : +- Scan parquet (7)
: : : +- BroadcastExchange (17)
: : : +- Project (16)
: : : +- Filter (15)
: : : +- Scan parquet (14)
: : +- BroadcastExchange (22)
: : +- Filter (21)
: : +- Scan parquet (20)
: +- BroadcastExchange (27)
: +- Filter (26)
: +- Scan parquet (25)
+- BroadcastExchange (32)
+- Filter (31)
+- Scan parquet (30)
(1) LocalTableScan
Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
(2) Scan parquet
Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
Batched: true
Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales]
PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)]
ReadSchema: struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)>
(3) Filter
Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
Condition : (((isnotnull(ss_ticket_number#138L) AND isnotnull(ss_item_sk#131)) AND isnotnull(ss_store_sk#136)) AND isnotnull(ss_customer_sk#132))
(4) Project
Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
(5) Exchange
Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), ENSURE_REQUIREMENTS, [id=#309]
(6) Sort
Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS FIRST], false, 0
(7) Scan parquet
Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
Batched: true
Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns]
PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)]
ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint>
(8) Filter
Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177))
(9) Project
Output [2]: [sr_item_sk#177, sr_ticket_number#184L]
Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
(10) Exchange
Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), ENSURE_REQUIREMENTS, [id=#310]
(11) Sort
Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS FIRST], false, 0
(12) SortMergeJoin
Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131]
Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177]
Join condition: None
(13) Project
Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149]
Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L]
(14) Scan parquet
Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
Batched: true
Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store]
PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), IsNotNull(s_store_sk), IsNotNull(s_zip)]
ReadSchema: struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string>
(15) Filter
Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689))
(16) Project
Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
(17) BroadcastExchange
Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#316]
(18) BroadcastHashJoin
Left keys [1]: [ss_store_sk#136]
Right keys [1]: [s_store_sk#664]
Join condition: None
(19) Project
Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689]
Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
(20) Scan parquet
Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
Batched: true
Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item]
PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), IsNotNull(i_item_sk)]
ReadSchema: struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int>
(21) Filter
Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND isnotnull(i_item_sk#564))
(22) BroadcastExchange
Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#320]
(23) BroadcastHashJoin
Left keys [1]: [ss_item_sk#131]
Right keys [1]: [i_item_sk#564]
Join condition: None
(24) Project
Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
(25) Scan parquet
Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
Batched: true
Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)]
ReadSchema: struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string>
(26) Filter
Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426))
(27) BroadcastExchange
Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#324]
(28) BroadcastHashJoin
Left keys [1]: [ss_customer_sk#132]
Right keys [1]: [c_customer_sk#412]
Join condition: None
(29) Project
Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426]
Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
(30) Scan parquet
Output [3]: [ca_state#456, ca_zip#457, ca_country#458]
Batched: true
Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address]
PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)]
ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string>
(31) Filter
Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457))
(32) BroadcastExchange
Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), input[1, string, false]),false), [id=#328]
(33) BroadcastHashJoin
Left keys [2]: [c_birth_country#426, s_zip#689]
Right keys [2]: [upper(ca_country#458), ca_zip#457]
Join condition: None
(34) Project
Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456]
Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, ca_zip#457, ca_country#458]
(35) HashAggregate
Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456]
Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579]
Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))]
Aggregate Attributes [1]: [sum#870L]
Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
(36) Exchange
Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333]
(37) HashAggregate
Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579]
Functions [1]: [sum(UnscaledValue(ss_net_paid#149))]
Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L]
Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852]
(38) HashAggregate
Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852]
Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
Functions [1]: [partial_sum(netpaid#852)]
Aggregate Attributes [2]: [sum#866, isEmpty#867]
Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
(39) Exchange
Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337]
(40) HashAggregate
Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
Functions [1]: [sum(netpaid#852)]
Aggregate Attributes [1]: [sum(netpaid#852)#854]
Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, sum(netpaid#852)#854 AS paid#850]
(41) Filter
Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > cast(Subquery subquery#851, [id=#294] as decimal(33,8))))
(42) AdaptiveSparkPlan
Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Arguments: isFinalPlan=true {code}
And I manually revert SPARK-35442, the problem no longer exists.
The DAG corresponding to sql is as follows:
The details as follows:
> Used databricks spark-sql-pref with Spark 3.3 to run 3TB tpcds test failed
> --------------------------------------------------------------------------
>
> Key: SPARK-40278
> URL: https://issues.apache.org/jira/browse/SPARK-40278
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.3.0
> Reporter: Yang Jie
> Priority: Major
>
> I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, the test code as follows:
> {code:java}
> val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T"
> val databaseName = "tpcds_database"
> val scaleFactor = "3072"
> val format = "parquet"
> import com.databricks.spark.sql.perf.tpcds.TPCDSTables
> val tables = new TPCDSTables(
> spark.sqlContext,dsdgenDir = "./tpcds-kit/tools",
> scaleFactor = scaleFactor,
> useDoubleForDecimal = false,useStringForDate = false)
> spark.sql(s"create database $databaseName")
> tables.createTemporaryTables(rootDir, format)
> spark.sql(s"use $databaseName")// TPCDS 24a or 24b
> val result = spark.sql(""" with ssales as
> (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
> i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid
> from store_sales, store_returns, store, item, customer, customer_address
> where ss_ticket_number = sr_ticket_number
> and ss_item_sk = sr_item_sk
> and ss_customer_sk = c_customer_sk
> and ss_item_sk = i_item_sk
> and ss_store_sk = s_store_sk
> and c_birth_country = upper(ca_country)
> and s_zip = ca_zip
> and s_market_id = 8
> group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
> i_current_price, i_manager_id, i_units, i_size)
> select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
> from ssales
> where i_color = 'pale'
> group by c_last_name, c_first_name, s_store_name
> having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect()
> sc.stop() {code}
> The above test may failed due to `Stage cancelled because SparkContext was shut down` of stage 31 and stage 36 when AQE enabled as follows:
>
> !image-2022-08-30-21-09-48-763.png!
> !image-2022-08-30-21-10-24-862.png!
> !image-2022-08-30-21-10-57-128.png!
>
> The DAG corresponding to sql is as follows:
> !image-2022-08-30-21-11-50-895.png!
> The details as follows:
>
>
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan (42)
> +- == Final Plan ==
> LocalTableScan (1)
> +- == Initial Plan ==
> Filter (41)
> +- HashAggregate (40)
> +- Exchange (39)
> +- HashAggregate (38)
> +- HashAggregate (37)
> +- Exchange (36)
> +- HashAggregate (35)
> +- Project (34)
> +- BroadcastHashJoin Inner BuildRight (33)
> :- Project (29)
> : +- BroadcastHashJoin Inner BuildRight (28)
> : :- Project (24)
> : : +- BroadcastHashJoin Inner BuildRight (23)
> : : :- Project (19)
> : : : +- BroadcastHashJoin Inner BuildRight (18)
> : : : :- Project (13)
> : : : : +- SortMergeJoin Inner (12)
> : : : : :- Sort (6)
> : : : : : +- Exchange (5)
> : : : : : +- Project (4)
> : : : : : +- Filter (3)
> : : : : : +- Scan parquet (2)
> : : : : +- Sort (11)
> : : : : +- Exchange (10)
> : : : : +- Project (9)
> : : : : +- Filter (8)
> : : : : +- Scan parquet (7)
> : : : +- BroadcastExchange (17)
> : : : +- Project (16)
> : : : +- Filter (15)
> : : : +- Scan parquet (14)
> : : +- BroadcastExchange (22)
> : : +- Filter (21)
> : : +- Scan parquet (20)
> : +- BroadcastExchange (27)
> : +- Filter (26)
> : +- Scan parquet (25)
> +- BroadcastExchange (32)
> +- Filter (31)
> +- Scan parquet (30)
> (1) LocalTableScan
> Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
> Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
> (2) Scan parquet
> Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
> Batched: true
> Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales]
> PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)]
> ReadSchema: struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)>
> (3) Filter
> Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
> Condition : (((isnotnull(ss_ticket_number#138L) AND isnotnull(ss_item_sk#131)) AND isnotnull(ss_store_sk#136)) AND isnotnull(ss_customer_sk#132))
> (4) Project
> Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
> Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
> (5) Exchange
> Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
> Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), ENSURE_REQUIREMENTS, [id=#309]
> (6) Sort
> Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149]
> Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS FIRST], false, 0
> (7) Scan parquet
> Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
> Batched: true
> Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns]
> PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)]
> ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint>
> (8) Filter
> Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
> Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177))
> (9) Project
> Output [2]: [sr_item_sk#177, sr_ticket_number#184L]
> Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
> (10) Exchange
> Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
> Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), ENSURE_REQUIREMENTS, [id=#310]
> (11) Sort
> Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
> Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS FIRST], false, 0
> (12) SortMergeJoin
> Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131]
> Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177]
> Join condition: None
> (13) Project
> Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149]
> Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L]
> (14) Scan parquet
> Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
> Batched: true
> Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store]
> PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), IsNotNull(s_store_sk), IsNotNull(s_zip)]
> ReadSchema: struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string>
> (15) Filter
> Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
> Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689))
> (16) Project
> Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
> Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689]
> (17) BroadcastExchange
> Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
> Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#316]
> (18) BroadcastHashJoin
> Left keys [1]: [ss_store_sk#136]
> Right keys [1]: [s_store_sk#664]
> Join condition: None
> (19) Project
> Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689]
> Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
> (20) Scan parquet
> Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
> Batched: true
> Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item]
> PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), IsNotNull(i_item_sk)]
> ReadSchema: struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int>
> (21) Filter
> Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
> Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND isnotnull(i_item_sk#564))
> (22) BroadcastExchange
> Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
> Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#320]
> (23) BroadcastHashJoin
> Left keys [1]: [ss_item_sk#131]
> Right keys [1]: [i_item_sk#564]
> Join condition: None
> (24) Project
> Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
> Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584]
> (25) Scan parquet
> Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
> Batched: true
> Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer]
> PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)]
> ReadSchema: struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string>
> (26) Filter
> Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
> Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426))
> (27) BroadcastExchange
> Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
> Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#324]
> (28) BroadcastHashJoin
> Left keys [1]: [ss_customer_sk#132]
> Right keys [1]: [c_customer_sk#412]
> Join condition: None
> (29) Project
> Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426]
> Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426]
> (30) Scan parquet
> Output [3]: [ca_state#456, ca_zip#457, ca_country#458]
> Batched: true
> Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address]
> PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)]
> ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string>
> (31) Filter
> Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
> Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457))
> (32) BroadcastExchange
> Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
> Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), input[1, string, false]),false), [id=#328]
> (33) BroadcastHashJoin
> Left keys [2]: [c_birth_country#426, s_zip#689]
> Right keys [2]: [upper(ca_country#458), ca_zip#457]
> Join condition: None
> (34) Project
> Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456]
> Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, ca_zip#457, ca_country#458]
> (35) HashAggregate
> Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456]
> Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579]
> Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))]
> Aggregate Attributes [1]: [sum#870L]
> Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
> (36) Exchange
> Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
> Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333]
> (37) HashAggregate
> Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L]
> Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579]
> Functions [1]: [sum(UnscaledValue(ss_net_paid#149))]
> Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L]
> Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852]
> (38) HashAggregate
> Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852]
> Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
> Functions [1]: [partial_sum(netpaid#852)]
> Aggregate Attributes [2]: [sum#866, isEmpty#867]
> Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
> (39) Exchange
> Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
> Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337]
> (40) HashAggregate
> Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869]
> Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
> Functions [1]: [sum(netpaid#852)]
> Aggregate Attributes [1]: [sum(netpaid#852)#854]
> Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, sum(netpaid#852)#854 AS paid#850]
> (41) Filter
> Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
> Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > cast(Subquery subquery#851, [id=#294] as decimal(33,8))))
> (42) AdaptiveSparkPlan
> Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
> Arguments: isFinalPlan=true {code}
>
>
> And I manually revert SPARK-35442, the problem no longer exists.
>
> The DAG corresponding to sql is as follows:
> The details as follows:
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org