You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zengrui (Jira)" <ji...@apache.org> on 2021/08/12 10:51:00 UTC
[jira] [Updated] (SPARK-36494) SortMergeJoin do unnecessary shuffle
for tables whose provider is hive
[ https://issues.apache.org/jira/browse/SPARK-36494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zengrui updated SPARK-36494:
----------------------------
Description:
Create two tables in spark like this:
{code:java}
CREATE TABLE catalog_sales
(
cs_sold_date_sk BIGINT,
cs_sold_time_sk BIGINT,
cs_ship_date_sk BIGINT,
cs_bill_customer_sk BIGINT,
cs_bill_cdemo_sk BIGINT,
cs_bill_hdemo_sk BIGINT,
cs_bill_addr_sk BIGINT,
cs_ship_customer_sk BIGINT,
cs_ship_cdemo_sk BIGINT,
cs_ship_hdemo_sk BIGINT,
cs_ship_addr_sk BIGINT,
cs_call_center_sk BIGINT,
cs_catalog_page_sk BIGINT,
cs_ship_mode_sk BIGINT,
cs_warehouse_sk BIGINT,
cs_item_sk BIGINT,
cs_promo_sk BIGINT,
cs_order_number BIGINT,
cs_quantity BIGINT,
cs_wholesale_cost FLOAT,
cs_list_price FLOAT,
cs_sales_price FLOAT,
cs_ext_discount_amt FLOAT,
cs_ext_sales_price FLOAT,
cs_ext_wholesale_cost FLOAT,
cs_ext_list_price FLOAT,
cs_ext_tax FLOAT,
cs_coupon_amt FLOAT,
cs_ext_ship_cost FLOAT,
cs_net_paid FLOAT,
cs_net_paid_inc_tax FLOAT,
cs_net_paid_inc_ship FLOAT,
cs_net_paid_inc_ship_tax FLOAT,
cs_net_profit FLOAT
)
clustered by (cs_item_sk) into 10 buckets
stored as ORC;
CREATE TABLE store_sales
(
ss_sold_date_sk BIGINT,
ss_sold_time_sk BIGINT,
ss_item_sk BIGINT,
ss_customer_sk BIGINT,
ss_cdemo_sk BIGINT,
ss_hdemo_sk BIGINT,
ss_addr_sk BIGINT,
ss_store_sk BIGINT,
ss_promo_sk BIGINT,
ss_ticket_number BIGINT,
ss_quantity BIGINT,
ss_wholesale_cost FLOAT,
ss_list_price FLOAT,
ss_sales_price FLOAT,
ss_ext_discount_amt FLOAT,
ss_ext_sales_price FLOAT,
ss_ext_wholesale_cost FLOAT,
ss_ext_list_price FLOAT,
ss_ext_tax FLOAT,
ss_coupon_amt FLOAT,
ss_net_paid FLOAT,
ss_net_paid_inc_tax FLOAT,
ss_net_profit FLOAT
)
CLUSTERED BY (ss_item_sk) INTO 10 buckets
stored as ORC;{code}
Then the table's provider is hive, when execute sql:
{code:java}
SELECT * FROM tpcds_orc_mintest.store_sales a join tpcds_orc_mintest.catalog_sales b on a.ss_item_sk = b.cs_item_sk limit 1000;
{code}
Spark do unnecessary shuffle before SortMergeJoin.
was:
Create two tables in spark like this:
{code:java}
CREATE TABLE catalog_sales
(
cs_sold_date_sk BIGINT,
cs_sold_time_sk BIGINT,
cs_ship_date_sk BIGINT,
cs_bill_customer_sk BIGINT,
cs_bill_cdemo_sk BIGINT,
cs_bill_hdemo_sk BIGINT,
cs_bill_addr_sk BIGINT,
cs_ship_customer_sk BIGINT,
cs_ship_cdemo_sk BIGINT,
cs_ship_hdemo_sk BIGINT,
cs_ship_addr_sk BIGINT,
cs_call_center_sk BIGINT,
cs_catalog_page_sk BIGINT,
cs_ship_mode_sk BIGINT,
cs_warehouse_sk BIGINT,
cs_item_sk BIGINT,
cs_promo_sk BIGINT,
cs_order_number BIGINT,
cs_quantity BIGINT,
cs_wholesale_cost FLOAT,
cs_list_price FLOAT,
cs_sales_price FLOAT,
cs_ext_discount_amt FLOAT,
cs_ext_sales_price FLOAT,
cs_ext_wholesale_cost FLOAT,
cs_ext_list_price FLOAT,
cs_ext_tax FLOAT,
cs_coupon_amt FLOAT,
cs_ext_ship_cost FLOAT,
cs_net_paid FLOAT,
cs_net_paid_inc_tax FLOAT,
cs_net_paid_inc_ship FLOAT,
cs_net_paid_inc_ship_tax FLOAT,
cs_net_profit FLOAT
)
clustered by (cs_item_sk) into 10 buckets
stored as ORC;CREATE TABLE store_sales
(
ss_sold_date_sk BIGINT,
ss_sold_time_sk BIGINT,
ss_item_sk BIGINT,
ss_customer_sk BIGINT,
ss_cdemo_sk BIGINT,
ss_hdemo_sk BIGINT,
ss_addr_sk BIGINT,
ss_store_sk BIGINT,
ss_promo_sk BIGINT,
ss_ticket_number BIGINT,
ss_quantity BIGINT,
ss_wholesale_cost FLOAT,
ss_list_price FLOAT,
ss_sales_price FLOAT,
ss_ext_discount_amt FLOAT,
ss_ext_sales_price FLOAT,
ss_ext_wholesale_cost FLOAT,
ss_ext_list_price FLOAT,
ss_ext_tax FLOAT,
ss_coupon_amt FLOAT,
ss_net_paid FLOAT,
ss_net_paid_inc_tax FLOAT,
ss_net_profit FLOAT
)
CLUSTERED BY (ss_item_sk) INTO 10 buckets
stored as ORC;{code}
Then the table's provider is hive, when execute sql:
{code:java}
SELECT * FROM tpcds_orc_mintest.store_sales a join tpcds_orc_mintest.catalog_sales b on a.ss_item_sk = b.cs_item_sk limit 1000;
{code}
Spark do unnecessary shuffle before SortMergeJoin.
> SortMergeJoin do unnecessary shuffle for tables whose provider is hive
> ----------------------------------------------------------------------
>
> Key: SPARK-36494
> URL: https://issues.apache.org/jira/browse/SPARK-36494
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.1.1
> Reporter: zengrui
> Priority: Major
>
> Create two tables in spark like this:
> {code:java}
> CREATE TABLE catalog_sales
> (
> cs_sold_date_sk BIGINT,
> cs_sold_time_sk BIGINT,
> cs_ship_date_sk BIGINT,
> cs_bill_customer_sk BIGINT,
> cs_bill_cdemo_sk BIGINT,
> cs_bill_hdemo_sk BIGINT,
> cs_bill_addr_sk BIGINT,
> cs_ship_customer_sk BIGINT,
> cs_ship_cdemo_sk BIGINT,
> cs_ship_hdemo_sk BIGINT,
> cs_ship_addr_sk BIGINT,
> cs_call_center_sk BIGINT,
> cs_catalog_page_sk BIGINT,
> cs_ship_mode_sk BIGINT,
> cs_warehouse_sk BIGINT,
> cs_item_sk BIGINT,
> cs_promo_sk BIGINT,
> cs_order_number BIGINT,
> cs_quantity BIGINT,
> cs_wholesale_cost FLOAT,
> cs_list_price FLOAT,
> cs_sales_price FLOAT,
> cs_ext_discount_amt FLOAT,
> cs_ext_sales_price FLOAT,
> cs_ext_wholesale_cost FLOAT,
> cs_ext_list_price FLOAT,
> cs_ext_tax FLOAT,
> cs_coupon_amt FLOAT,
> cs_ext_ship_cost FLOAT,
> cs_net_paid FLOAT,
> cs_net_paid_inc_tax FLOAT,
> cs_net_paid_inc_ship FLOAT,
> cs_net_paid_inc_ship_tax FLOAT,
> cs_net_profit FLOAT
> )
> clustered by (cs_item_sk) into 10 buckets
> stored as ORC;
> CREATE TABLE store_sales
> (
> ss_sold_date_sk BIGINT,
> ss_sold_time_sk BIGINT,
> ss_item_sk BIGINT,
> ss_customer_sk BIGINT,
> ss_cdemo_sk BIGINT,
> ss_hdemo_sk BIGINT,
> ss_addr_sk BIGINT,
> ss_store_sk BIGINT,
> ss_promo_sk BIGINT,
> ss_ticket_number BIGINT,
> ss_quantity BIGINT,
> ss_wholesale_cost FLOAT,
> ss_list_price FLOAT,
> ss_sales_price FLOAT,
> ss_ext_discount_amt FLOAT,
> ss_ext_sales_price FLOAT,
> ss_ext_wholesale_cost FLOAT,
> ss_ext_list_price FLOAT,
> ss_ext_tax FLOAT,
> ss_coupon_amt FLOAT,
> ss_net_paid FLOAT,
> ss_net_paid_inc_tax FLOAT,
> ss_net_profit FLOAT
> )
> CLUSTERED BY (ss_item_sk) INTO 10 buckets
> stored as ORC;{code}
> Then the table's provider is hive, when execute sql:
> {code:java}
> SELECT * FROM tpcds_orc_mintest.store_sales a join tpcds_orc_mintest.catalog_sales b on a.ss_item_sk = b.cs_item_sk limit 1000;
> {code}
> Spark do unnecessary shuffle before SortMergeJoin.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org