You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zengrui (Jira)" <ji...@apache.org> on 2021/08/12 10:51:00 UTC

[jira] [Updated] (SPARK-36494) SortMergeJoin do unnecessary shuffle for tables whose provider is hive

     [ https://issues.apache.org/jira/browse/SPARK-36494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

zengrui updated SPARK-36494:
----------------------------
    Description: 
Create two tables in spark like this:
{code:java}
CREATE TABLE catalog_sales
(
    cs_sold_date_sk           BIGINT,
    cs_sold_time_sk           BIGINT,
    cs_ship_date_sk           BIGINT,
    cs_bill_customer_sk       BIGINT,
    cs_bill_cdemo_sk          BIGINT,
    cs_bill_hdemo_sk          BIGINT,
    cs_bill_addr_sk           BIGINT,
    cs_ship_customer_sk       BIGINT,
    cs_ship_cdemo_sk          BIGINT,
    cs_ship_hdemo_sk          BIGINT,
    cs_ship_addr_sk           BIGINT,
    cs_call_center_sk         BIGINT,
    cs_catalog_page_sk        BIGINT,
    cs_ship_mode_sk           BIGINT,
    cs_warehouse_sk           BIGINT,
    cs_item_sk                BIGINT,
    cs_promo_sk               BIGINT,
    cs_order_number           BIGINT,
    cs_quantity               BIGINT,
    cs_wholesale_cost         FLOAT,
    cs_list_price             FLOAT,
    cs_sales_price            FLOAT,
    cs_ext_discount_amt       FLOAT,
    cs_ext_sales_price        FLOAT,
    cs_ext_wholesale_cost     FLOAT,
    cs_ext_list_price         FLOAT,
    cs_ext_tax                FLOAT,
    cs_coupon_amt             FLOAT,
    cs_ext_ship_cost          FLOAT,
    cs_net_paid               FLOAT,
    cs_net_paid_inc_tax       FLOAT,
    cs_net_paid_inc_ship      FLOAT,
    cs_net_paid_inc_ship_tax  FLOAT,
    cs_net_profit             FLOAT
)
clustered by (cs_item_sk)  into  10 buckets
stored as ORC;

CREATE TABLE store_sales
(
    ss_sold_date_sk           BIGINT,
    ss_sold_time_sk           BIGINT,
    ss_item_sk                BIGINT,
    ss_customer_sk            BIGINT,
    ss_cdemo_sk               BIGINT,
    ss_hdemo_sk               BIGINT,
    ss_addr_sk                BIGINT,
    ss_store_sk               BIGINT,
    ss_promo_sk               BIGINT,
    ss_ticket_number          BIGINT,
    ss_quantity               BIGINT,
    ss_wholesale_cost         FLOAT,
    ss_list_price             FLOAT,
    ss_sales_price            FLOAT,
    ss_ext_discount_amt       FLOAT,
    ss_ext_sales_price        FLOAT,
    ss_ext_wholesale_cost     FLOAT,
    ss_ext_list_price         FLOAT,
    ss_ext_tax                FLOAT,
    ss_coupon_amt             FLOAT,
    ss_net_paid               FLOAT,
    ss_net_paid_inc_tax       FLOAT,
    ss_net_profit             FLOAT
)
CLUSTERED BY (ss_item_sk) INTO 10 buckets
stored as ORC;{code}
Then the table's provider is hive, when execute sql:
{code:java}
SELECT * FROM tpcds_orc_mintest.store_sales a join tpcds_orc_mintest.catalog_sales b on a.ss_item_sk = b.cs_item_sk limit 1000;
{code}
Spark do unnecessary shuffle before SortMergeJoin.

  was:
Create two tables in spark like this:
{code:java}
CREATE TABLE catalog_sales
(
    cs_sold_date_sk           BIGINT,
    cs_sold_time_sk           BIGINT,
    cs_ship_date_sk           BIGINT,
    cs_bill_customer_sk       BIGINT,
    cs_bill_cdemo_sk          BIGINT,
    cs_bill_hdemo_sk          BIGINT,
    cs_bill_addr_sk           BIGINT,
    cs_ship_customer_sk       BIGINT,
    cs_ship_cdemo_sk          BIGINT,
    cs_ship_hdemo_sk          BIGINT,
    cs_ship_addr_sk           BIGINT,
    cs_call_center_sk         BIGINT,
    cs_catalog_page_sk        BIGINT,
    cs_ship_mode_sk           BIGINT,
    cs_warehouse_sk           BIGINT,
    cs_item_sk                BIGINT,
    cs_promo_sk               BIGINT,
    cs_order_number           BIGINT,
    cs_quantity               BIGINT,
    cs_wholesale_cost         FLOAT,
    cs_list_price             FLOAT,
    cs_sales_price            FLOAT,
    cs_ext_discount_amt       FLOAT,
    cs_ext_sales_price        FLOAT,
    cs_ext_wholesale_cost     FLOAT,
    cs_ext_list_price         FLOAT,
    cs_ext_tax                FLOAT,
    cs_coupon_amt             FLOAT,
    cs_ext_ship_cost          FLOAT,
    cs_net_paid               FLOAT,
    cs_net_paid_inc_tax       FLOAT,
    cs_net_paid_inc_ship      FLOAT,
    cs_net_paid_inc_ship_tax  FLOAT,
    cs_net_profit             FLOAT
)
clustered by (cs_item_sk)  into  10 buckets
stored as ORC;CREATE TABLE store_sales
(
    ss_sold_date_sk           BIGINT,
    ss_sold_time_sk           BIGINT,
    ss_item_sk                BIGINT,
    ss_customer_sk            BIGINT,
    ss_cdemo_sk               BIGINT,
    ss_hdemo_sk               BIGINT,
    ss_addr_sk                BIGINT,
    ss_store_sk               BIGINT,
    ss_promo_sk               BIGINT,
    ss_ticket_number          BIGINT,
    ss_quantity               BIGINT,
    ss_wholesale_cost         FLOAT,
    ss_list_price             FLOAT,
    ss_sales_price            FLOAT,
    ss_ext_discount_amt       FLOAT,
    ss_ext_sales_price        FLOAT,
    ss_ext_wholesale_cost     FLOAT,
    ss_ext_list_price         FLOAT,
    ss_ext_tax                FLOAT,
    ss_coupon_amt             FLOAT,
    ss_net_paid               FLOAT,
    ss_net_paid_inc_tax       FLOAT,
    ss_net_profit             FLOAT
)
CLUSTERED BY (ss_item_sk) INTO 10 buckets
stored as ORC;{code}
Then the table's provider is hive, when execute sql:
{code:java}
SELECT * FROM tpcds_orc_mintest.store_sales a join tpcds_orc_mintest.catalog_sales b on a.ss_item_sk = b.cs_item_sk limit 1000;
{code}
Spark do unnecessary shuffle before SortMergeJoin.


> SortMergeJoin do unnecessary shuffle for tables whose provider is hive
> ----------------------------------------------------------------------
>
>                 Key: SPARK-36494
>                 URL: https://issues.apache.org/jira/browse/SPARK-36494
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.1
>            Reporter: zengrui
>            Priority: Major
>
> Create two tables in spark like this:
> {code:java}
> CREATE TABLE catalog_sales
> (
>     cs_sold_date_sk           BIGINT,
>     cs_sold_time_sk           BIGINT,
>     cs_ship_date_sk           BIGINT,
>     cs_bill_customer_sk       BIGINT,
>     cs_bill_cdemo_sk          BIGINT,
>     cs_bill_hdemo_sk          BIGINT,
>     cs_bill_addr_sk           BIGINT,
>     cs_ship_customer_sk       BIGINT,
>     cs_ship_cdemo_sk          BIGINT,
>     cs_ship_hdemo_sk          BIGINT,
>     cs_ship_addr_sk           BIGINT,
>     cs_call_center_sk         BIGINT,
>     cs_catalog_page_sk        BIGINT,
>     cs_ship_mode_sk           BIGINT,
>     cs_warehouse_sk           BIGINT,
>     cs_item_sk                BIGINT,
>     cs_promo_sk               BIGINT,
>     cs_order_number           BIGINT,
>     cs_quantity               BIGINT,
>     cs_wholesale_cost         FLOAT,
>     cs_list_price             FLOAT,
>     cs_sales_price            FLOAT,
>     cs_ext_discount_amt       FLOAT,
>     cs_ext_sales_price        FLOAT,
>     cs_ext_wholesale_cost     FLOAT,
>     cs_ext_list_price         FLOAT,
>     cs_ext_tax                FLOAT,
>     cs_coupon_amt             FLOAT,
>     cs_ext_ship_cost          FLOAT,
>     cs_net_paid               FLOAT,
>     cs_net_paid_inc_tax       FLOAT,
>     cs_net_paid_inc_ship      FLOAT,
>     cs_net_paid_inc_ship_tax  FLOAT,
>     cs_net_profit             FLOAT
> )
> clustered by (cs_item_sk)  into  10 buckets
> stored as ORC;
> CREATE TABLE store_sales
> (
>     ss_sold_date_sk           BIGINT,
>     ss_sold_time_sk           BIGINT,
>     ss_item_sk                BIGINT,
>     ss_customer_sk            BIGINT,
>     ss_cdemo_sk               BIGINT,
>     ss_hdemo_sk               BIGINT,
>     ss_addr_sk                BIGINT,
>     ss_store_sk               BIGINT,
>     ss_promo_sk               BIGINT,
>     ss_ticket_number          BIGINT,
>     ss_quantity               BIGINT,
>     ss_wholesale_cost         FLOAT,
>     ss_list_price             FLOAT,
>     ss_sales_price            FLOAT,
>     ss_ext_discount_amt       FLOAT,
>     ss_ext_sales_price        FLOAT,
>     ss_ext_wholesale_cost     FLOAT,
>     ss_ext_list_price         FLOAT,
>     ss_ext_tax                FLOAT,
>     ss_coupon_amt             FLOAT,
>     ss_net_paid               FLOAT,
>     ss_net_paid_inc_tax       FLOAT,
>     ss_net_profit             FLOAT
> )
> CLUSTERED BY (ss_item_sk) INTO 10 buckets
> stored as ORC;{code}
> Then the table's provider is hive, when execute sql:
> {code:java}
> SELECT * FROM tpcds_orc_mintest.store_sales a join tpcds_orc_mintest.catalog_sales b on a.ss_item_sk = b.cs_item_sk limit 1000;
> {code}
> Spark do unnecessary shuffle before SortMergeJoin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org