You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "László Bodor (Jira)" <ji...@apache.org> on 2022/04/27 08:57:00 UTC
[jira] [Commented] (HIVE-26008) Dynamic partition pruning not sending right partitions with subqueries
[ https://issues.apache.org/jira/browse/HIVE-26008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528667#comment-17528667 ]
László Bodor commented on HIVE-26008:
-------------------------------------
summary of recent investigation:
the minified query for repro:
[^HIVE_26008_1_DPP_path.svg]
{code}
select
sum(wr_return_quantity) wr_item_qty
from web_returns,
date_dim
where d_date in
(select d_date
from date_dim
where d_week_seq in
(select d_week_seq
from date_dim
where d_date in ('1998-01-02','1998-10-15','1998-11-10')))
and wr_returned_date_sk = d_date_sk;
{code}
manually rewritten query that made DPP to find the optimal path:
[^HIVE_26008_2_DPP_paths.svg]
{code}
select
sum(wr_return_quantity) wr_item_qty
from web_returns,
date_dim
where d_date_sk in
(select d_date_sk
from date_dim
where d_week_seq in
(select d_week_seq
from date_dim
where d_date in ('1998-01-02','1998-10-15','1998-11-10')))
and wr_returned_date_sk = d_date_sk;
{code}
so due to the mismatch in join keys, the FIL op (generated by SyntheticJoinPredicate) is not pushed down to the big table scan
challenging task here will be to find a general solution in changing the join key
in this particular case, as there is already a date_dim -> web_returns join with d_date_sk:
{code}
wr_returned_date_sk = d_date_sk;
{code}
the subquery select can be rewritten from:
{code}
where d_date in
(select d_date
{code}
to
{code}
where d_date_sk in
(select d_date_sk
{code}
> Dynamic partition pruning not sending right partitions with subqueries
> ----------------------------------------------------------------------
>
> Key: HIVE-26008
> URL: https://issues.apache.org/jira/browse/HIVE-26008
> Project: Hive
> Issue Type: Improvement
> Components: HiveServer2
> Reporter: Rajesh Balamohan
> Assignee: László Bodor
> Priority: Major
> Labels: performance
> Attachments: HIVE_26008_1_DPP_path.svg, HIVE_26008_2_DPP_paths.svg, Screenshot 2022-03-08 at 5.04.02 AM.png
>
>
> DPP isn't working fine when there are subqueries involved. Here is an example query (q83).
> Note that "date_dim" has another query involved. Due to this, DPP operator ends up sending entire "date_dim" to the fact tables.
> Because of this, data scanned for fact tables are way higher and query runtime is increased.
> For context, on a very small cluster, this query ran for 265 seconds and with the rewritten query it finished in 11 seconds!. Fact table scan was 10MB vs 10 GB.
> {noformat}
> HiveJoin(condition=[=($2, $5)], joinType=[inner])
> HiveJoin(condition=[=($0, $3)], joinType=[inner])
> HiveProject(cr_item_sk=[$1], cr_return_quantity=[$16], cr_returned_date_sk=[$26])
> HiveFilter(condition=[AND(IS NOT NULL($26), IS NOT NULL($1))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, catalog_returns]], table:alias=[catalog_returns])
> HiveProject(i_item_sk=[$0], i_item_id=[$1])
> HiveFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, item]], table:alias=[item])
> HiveProject(d_date_sk=[$0], d_date=[$2])
> HiveFilter(condition=[AND(IS NOT NULL($2), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject(d_date=[$0])
> HiveSemiJoin(condition=[=($1, $2)], joinType=[semi])
> HiveProject(d_date=[$2], d_week_seq=[$4])
> HiveFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($2))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject(d_week_seq=[$4])
> HiveFilter(condition=[AND(IN($2, 1998-01-02:DATE, 1998-10-15:DATE, 1998-11-10:DATE), IS NOT NULL($4))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> {noformat}
> *Original Query & Plan: *
> {noformat}
> explain cbo with sr_items as
> (select i_item_id item_id,
> sum(sr_return_quantity) sr_item_qty
> from store_returns,
> item,
> date_dim
> where sr_item_sk = i_item_sk
> and d_date in
> (select d_date
> from date_dim
> where d_week_seq in
> (select d_week_seq
> from date_dim
> where d_date in ('1998-01-02','1998-10-15','1998-11-10')))
> and sr_returned_date_sk = d_date_sk
> group by i_item_id),
> cr_items as
> (select i_item_id item_id,
> sum(cr_return_quantity) cr_item_qty
> from catalog_returns,
> item,
> date_dim
> where cr_item_sk = i_item_sk
> and d_date in
> (select d_date
> from date_dim
> where d_week_seq in
> (select d_week_seq
> from date_dim
> where d_date in ('1998-01-02','1998-10-15','1998-11-10')))
> and cr_returned_date_sk = d_date_sk
> group by i_item_id),
> wr_items as
> (select i_item_id item_id,
> sum(wr_return_quantity) wr_item_qty
> from web_returns,
> item,
> date_dim
> where wr_item_sk = i_item_sk
> and d_date in
> (select d_date
> from date_dim
> where d_week_seq in
> (select d_week_seq
> from date_dim
> where d_date in ('1998-01-02','1998-10-15','1998-11-10')))
> and wr_returned_date_sk = d_date_sk
> group by i_item_id)
> select sr_items.item_id
> ,sr_item_qty
> ,sr_item_qty/(sr_item_qty+cr_item_qty+wr_item_qty)/3.0 * 100 sr_dev
> ,cr_item_qty
> ,cr_item_qty/(sr_item_qty+cr_item_qty+wr_item_qty)/3.0 * 100 cr_dev
> ,wr_item_qty
> ,wr_item_qty/(sr_item_qty+cr_item_qty+wr_item_qty)/3.0 * 100 wr_dev
> ,(sr_item_qty+cr_item_qty+wr_item_qty)/3.0 average
> from sr_items
> ,cr_items
> ,wr_items
> where sr_items.item_id=cr_items.item_id
> and sr_items.item_id=wr_items.item_id
> order by sr_items.item_id
> ,sr_item_qty
> limit 100
> INFO : Starting task [Stage-3:EXPLAIN] in serial mode
> INFO : Completed executing command(queryId=hive_20220307055109_88ad0cbd-bd40-45bc-92ae-ab15fa6b1da4); Time taken: 0.973 seconds
> INFO : OK
> Explain
> CBO PLAN:
> HiveSortLimit(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[100])
> HiveProject(item_id=[$0], sr_item_qty=[$4], sr_dev=[*(/(/($5, CAST(+(+($4, $1), $7)):DOUBLE), 3), 100)], cr_item_qty=[$1], cr_dev=[*(/(/($2, CAST(+(+($4, $1), $7)):DOUBLE), 3), 100)], wr_item_qty=[$7], wr_dev=[*(/(/($8, CAST(+(+($4, $1), $7)):DOUBLE), 3), 100)], average=[/(CAST(+(+($4, $1), $7)):DECIMAL(19, 0), 3:DECIMAL(1, 0))])
> HiveJoin(condition=[=($0, $6)], joinType=[inner])
> HiveJoin(condition=[=($3, $0)], joinType=[inner])
> HiveProject($f0=[$0], $f1=[$1], EXPR$0=[CAST($1):DOUBLE])
> HiveAggregate(group=[{4}], agg#0=[sum($1)])
> HiveSemiJoin(condition=[=($6, $7)], joinType=[semi])
> HiveJoin(condition=[=($2, $5)], joinType=[inner])
> HiveJoin(condition=[=($0, $3)], joinType=[inner])
> HiveProject(cr_item_sk=[$1], cr_return_quantity=[$16], cr_returned_date_sk=[$26])
> HiveFilter(condition=[AND(IS NOT NULL($26), IS NOT NULL($1))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, catalog_returns]], table:alias=[catalog_returns])
> HiveProject(i_item_sk=[$0], i_item_id=[$1])
> HiveFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, item]], table:alias=[item])
> HiveProject(d_date_sk=[$0], d_date=[$2])
> HiveFilter(condition=[AND(IS NOT NULL($2), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject(d_date=[$0])
> HiveSemiJoin(condition=[=($1, $2)], joinType=[semi])
> HiveProject(d_date=[$2], d_week_seq=[$4])
> HiveFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($2))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject(d_week_seq=[$4])
> HiveFilter(condition=[AND(IN($2, 1998-01-02:DATE, 1998-10-15:DATE, 1998-11-10:DATE), IS NOT NULL($4))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject($f0=[$0], $f1=[$1], EXPR$0=[CAST($1):DOUBLE])
> HiveAggregate(group=[{4}], agg#0=[sum($1)])
> HiveSemiJoin(condition=[=($6, $7)], joinType=[semi])
> HiveJoin(condition=[=($2, $5)], joinType=[inner])
> HiveJoin(condition=[=($0, $3)], joinType=[inner])
> HiveProject(sr_item_sk=[$1], sr_return_quantity=[$9], sr_returned_date_sk=[$19])
> HiveFilter(condition=[AND(IS NOT NULL($19), IS NOT NULL($1))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, store_returns]], table:alias=[store_returns])
> HiveProject(i_item_sk=[$0], i_item_id=[$1])
> HiveFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, item]], table:alias=[item])
> HiveProject(d_date_sk=[$0], d_date=[$2])
> HiveFilter(condition=[AND(IS NOT NULL($2), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject(d_date=[$0])
> HiveSemiJoin(condition=[=($1, $2)], joinType=[semi])
> HiveProject(d_date=[$2], d_week_seq=[$4])
> HiveFilter(condition=[AND(IS NOT NULL($2), IS NOT NULL($4))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject(d_week_seq=[$4])
> HiveFilter(condition=[AND(IN($2, 1998-01-02:DATE, 1998-10-15:DATE, 1998-11-10:DATE), IS NOT NULL($4))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject($f0=[$0], $f1=[$1], EXPR$0=[CAST($1):DOUBLE])
> HiveAggregate(group=[{4}], agg#0=[sum($1)])
> HiveSemiJoin(condition=[=($6, $7)], joinType=[semi])
> HiveJoin(condition=[=($2, $5)], joinType=[inner])
> HiveJoin(condition=[=($0, $3)], joinType=[inner])
> HiveProject(wr_item_sk=[$1], wr_return_quantity=[$13], wr_returned_date_sk=[$23])
> HiveFilter(condition=[AND(IS NOT NULL($23), IS NOT NULL($1))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, web_returns]], table:alias=[web_returns])
> HiveProject(i_item_sk=[$0], i_item_id=[$1])
> HiveFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, item]], table:alias=[item])
> HiveProject(d_date_sk=[$0], d_date=[$2])
> HiveFilter(condition=[AND(IS NOT NULL($2), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject(d_date=[$0])
> HiveSemiJoin(condition=[=($1, $2)], joinType=[semi])
> HiveProject(d_date=[$2], d_week_seq=[$4])
> HiveFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($2))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject(d_week_seq=[$4])
> HiveFilter(condition=[AND(IN($2, 1998-01-02:DATE, 1998-10-15:DATE, 1998-11-10:DATE), IS NOT NULL($4))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> {noformat}
>
> *Modified Query & Plan:*
> {noformat}
> explain cbo
> with sr_items as
> (select i_item_id item_id,
> sum(sr_return_quantity) sr_item_qty
> from store_returns,
> item,
> date_dim
> where sr_item_sk = i_item_sk
> and d_date in
> (select d_date
> from date_dim
> where d_date in ('1998-01-02','1998-10-15','1998-11-10'))
> and sr_returned_date_sk = d_date_sk
> group by i_item_id),
> cr_items as
> (select i_item_id item_id,
> sum(cr_return_quantity) cr_item_qty
> from catalog_returns,
> item,
> date_dim
> where cr_item_sk = i_item_sk
> and d_date in
> (select d_date
> from date_dim
> where d_date in ('1998-01-02','1998-10-15','1998-11-10'))
> and cr_returned_date_sk = d_date_sk
> group by i_item_id),
> wr_items as
> (select i_item_id item_id,
> sum(wr_return_quantity) wr_item_qty
> from web_returns,
> item,
> date_dim
> where wr_item_sk = i_item_sk
> and d_date in
> (select d_date
> from date_dim
> where d_date in ('1998-01-02','1998-10-15','1998-11-10'))
> and wr_returned_date_sk = d_date_sk
> group by i_item_id)
> select sr_items.item_id
> ,sr_item_qty
> ,sr_item_qty/(sr_item_qty+cr_item_qty+wr_item_qty)/3.0 * 100 sr_dev
> ,cr_item_qty
> ,cr_item_qty/(sr_item_qty+cr_item_qty+wr_item_qty)/3.0 * 100 cr_dev
> ,wr_item_qty
> ,wr_item_qty/(sr_item_qty+cr_item_qty+wr_item_qty)/3.0 * 100 wr_dev
> ,(sr_item_qty+cr_item_qty+wr_item_qty)/3.0 average
> from sr_items
> ,cr_items
> ,wr_items
> where sr_items.item_id=cr_items.item_id
> and sr_items.item_id=wr_items.item_id
> order by sr_items.item_id
> ,sr_item_qty
> limit 100
> INFO : Starting task [Stage-3:EXPLAIN] in serial mode
> INFO : Completed executing command(queryId=hive_20220307062043_2847c12d-9c22-452e-aa84-3200a3b9018b); Time taken: 0.827 seconds
> INFO : OK
> Explain
> CBO PLAN:
> HiveSortLimit(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[100])
> HiveProject(item_id=[$0], sr_item_qty=[$4], sr_dev=[*(/(/($5, CAST(+(+($4, $1), $7)):DOUBLE), 3), 100)], cr_item_qty=[$1], cr_dev=[*(/(/($2, CAST(+(+($4, $1), $7)):DOUBLE), 3), 100)], wr_item_qty=[$7], wr_dev=[*(/(/($8, CAST(+(+($4, $1), $7)):DOUBLE), 3), 100)], average=[/(CAST(+(+($4, $1), $7)):DECIMAL(19, 0), 3:DECIMAL(1, 0))])
> HiveJoin(condition=[=($0, $6)], joinType=[inner])
> HiveJoin(condition=[=($3, $0)], joinType=[inner])
> HiveProject($f0=[$0], $f1=[$1], EXPR$0=[CAST($1):DOUBLE])
> HiveAggregate(group=[{4}], agg#0=[sum($1)])
> HiveSemiJoin(condition=[=($6, $7)], joinType=[semi])
> HiveJoin(condition=[=($2, $5)], joinType=[inner])
> HiveJoin(condition=[=($0, $3)], joinType=[inner])
> HiveProject(cr_item_sk=[$1], cr_return_quantity=[$16], cr_returned_date_sk=[$26])
> HiveFilter(condition=[AND(IS NOT NULL($26), IS NOT NULL($1))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, catalog_returns]], table:alias=[catalog_returns])
> HiveProject(i_item_sk=[$0], i_item_id=[$1])
> HiveFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, item]], table:alias=[item])
> HiveProject(d_date_sk=[$0], d_date=[$2])
> HiveFilter(condition=[AND(IN($2, 1998-01-02:DATE, 1998-10-15:DATE, 1998-11-10:DATE), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject(d_date=[$2])
> HiveFilter(condition=[IN($2, 1998-01-02:DATE, 1998-10-15:DATE, 1998-11-10:DATE)])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject($f0=[$0], $f1=[$1], EXPR$0=[CAST($1):DOUBLE])
> HiveAggregate(group=[{4}], agg#0=[sum($1)])
> HiveSemiJoin(condition=[=($6, $7)], joinType=[semi])
> HiveJoin(condition=[=($2, $5)], joinType=[inner])
> HiveJoin(condition=[=($0, $3)], joinType=[inner])
> HiveProject(sr_item_sk=[$1], sr_return_quantity=[$9], sr_returned_date_sk=[$19])
> HiveFilter(condition=[AND(IS NOT NULL($19), IS NOT NULL($1))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, store_returns]], table:alias=[store_returns])
> HiveProject(i_item_sk=[$0], i_item_id=[$1])
> HiveFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, item]], table:alias=[item])
> HiveProject(d_date_sk=[$0], d_date=[$2])
> HiveFilter(condition=[AND(IN($2, 1998-01-02:DATE, 1998-10-15:DATE, 1998-11-10:DATE), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject(d_date=[$2])
> HiveFilter(condition=[IN($2, 1998-01-02:DATE, 1998-10-15:DATE, 1998-11-10:DATE)])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject($f0=[$0], $f1=[$1], EXPR$0=[CAST($1):DOUBLE])
> HiveAggregate(group=[{4}], agg#0=[sum($1)])
> HiveSemiJoin(condition=[=($6, $7)], joinType=[semi])
> HiveJoin(condition=[=($2, $5)], joinType=[inner])
> HiveJoin(condition=[=($0, $3)], joinType=[inner])
> HiveProject(wr_item_sk=[$1], wr_return_quantity=[$13], wr_returned_date_sk=[$23])
> HiveFilter(condition=[AND(IS NOT NULL($23), IS NOT NULL($1))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, web_returns]], table:alias=[web_returns])
> HiveProject(i_item_sk=[$0], i_item_id=[$1])
> HiveFilter(condition=[AND(IS NOT NULL($1), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, item]], table:alias=[item])
> HiveProject(d_date_sk=[$0], d_date=[$2])
> HiveFilter(condition=[AND(IN($2, 1998-01-02:DATE, 1998-10-15:DATE, 1998-11-10:DATE), IS NOT NULL($0))])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> HiveProject(d_date=[$2])
> HiveFilter(condition=[IN($2, 1998-01-02:DATE, 1998-10-15:DATE, 1998-11-10:DATE)])
> HiveTableScan(table=[[tpcds_bin_partitioned_orc_10000, date_dim]], table:alias=[date_dim])
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)