You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@impala.apache.org by "Tim Armstrong (Jira)" <ji...@apache.org> on 2021/02/11 01:06:00 UTC

[jira] [Resolved] (IMPALA-9853) Push rank() predicates into sort

     [ https://issues.apache.org/jira/browse/IMPALA-9853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tim Armstrong resolved IMPALA-9853.
-----------------------------------
    Fix Version/s: Impala 4.0
       Resolution: Fixed



Commit b42c64993d46893488a667fb9c425548fdf964ab in impala's branch refs/heads/master from Tim Armstrong
[ https://gitbox.apache.org/repos/asf?p=impala.git;h=b42c649 ]

IMPALA-9979: part 2: partitioned top-n

Planner changes:
---------------
The planner now identifies predicates that can be converted into
limits in a partitioned or unpartitioned top-n with the following
method:

    Push down predicates that reference analytic tuple into inline view.
    These will be evaluated after the analytic plan for the inline
    SelectStmt is generated.
    Identify predicates that reference the analytic tuple and could
    be converted to limits.
    If they can be applied to the last sort group of the analytic
    plan, and the windows are all compatible, then the lowest
    limit gets converted into a limit in the top N.
    Otherwise generate a select node with the conjuncts. We add
    logic to merge SELECT nodes to avoid generating duplicates
    from inside and outside the inline view.
    The pushed predicate is still added to the SELECT node
    because it is necessary for correctness for predicates
    like '=' to filter additional rows and also the limit
    pushdown optimization looks for analytic predicates
    there, so retaining all predicates simplifies that.
    The selectivity of the predicate is adjusted so that
    cardinality estimates remain accurate.

The optimization can be disabled by setting
ANALYTIC_RANK_PUSHDOWN_THRESHOLD=0. By default it is
only enabled for limits of 1000 or less, because the
in-memory Top-N may perform significantly worse than
a full sort for large heaps (since updating the heap
for every input row ends up being more expensive than
doing a traditional sort). We could probably optimize
this more with better tuning so that it can gracefully
fall back to doing the full sort at runtime.

rank() and row_number() are handled. rank() needs support in
the TopN node to include ties for the last place, which is
also added in this patch.

If predicates are trivially false, we generate empty nodes.

This interacts with the limit pushdwon optimization. The limit
pushdown optimization is applied after the partitioned top-n
is generated, and can sometimes result in more optimal plans,
so it is generalized to handle pushing into partitioned top-n
nodes.

Backend changes:
---------------
The top-n node in the backend is augmented to handle
the partitioned case, for which we use a std::map and a
comparator based on the partition exprs. The partitioned
top-n node has a soft limit of 64MB on the size of the
in-memory heaps and can spill with use of an embedded Sorter.
The current implementation tries to evict heaps that are
less effective at filtering rows.

Limitations:
-----------
There are several possible extensions to this that we did not do:

    dense_rank() is not supported because it would require additional
    backend support - IMPALA-10014.
    ntile() is not supported because it would require additional
    backend support - IMPALA-10174.
    Only one predicate per analytic is pushed.
    Redundant rank()/row_number() predicates are not merged,
    only the lowest is chosen.
    Lower bounds are not converted into OFFSET.
    The analytic operator cannot be eliminated even if the analytic
    expression was only used in the predicate.
    This doesn't push predicates into UNION - IMPALA-10013
    Always false predicates don't result in empty plan - IMPALA-10015

Tests:

    Planner tests - added tests that exercise the interesting code
    paths added in planning.

    Predicate ordering in SELECT nodes changed in a couple of cases
    because some predicates were pushed into the inline views.

    Modified SORT targeted perf tests to avoid conversion to Top-N
    Added targeted perf test for partitioned top-n.
    End-to-end tests

    Unpartitioned Top-N end-to-end tests
    Basic partitioning and duplicate handling tests on functional
    Similar basic tests on larger inputs from TPC-DS and with
    larger partition counts.
    I inspected the results and also ran the same tests with
    analytic_rank_pushdown_threshold=0 to confirm that the
    results were the same as with the full sort.
    Fallback to spilling sort.

Perf:

Added a targeted benchmark that goes from ~2s to ~1s with
mt_dop=8 on TPC-H 30 on my desktop.

Change-Id: Ic638af9495981d889a4cb7455a71e8be0eb1a8e5
Reviewed-on: http://gerrit.cloudera.org:8080/16242
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


> Push rank() predicates into sort
> --------------------------------
>
>                 Key: IMPALA-9853
>                 URL: https://issues.apache.org/jira/browse/IMPALA-9853
>             Project: IMPALA
>          Issue Type: Improvement
>          Components: Frontend
>            Reporter: Tim Armstrong
>            Assignee: Tim Armstrong
>            Priority: Major
>              Labels: performance, tpcds
>             Fix For: Impala 4.0
>
>
> TPC-DS Q67 would benefit significantly if we could push the rank() predicate into the sort to do some reduction of unneeded data. The sorter could evaluate this predicate if it had the partition expressions available - as a post-processing step to the in-memory sort for the analytic sort group, it could do a pass over the sorted run, resetting a counter at the start of each partition boundary.
> It might be best to start with tackling IMPALA-3471 by applying the limit within sorted runs, since that doesn't require any planner work.
> {noformat}
> with results as
> (     select i_category ,i_class ,i_brand ,i_product_name ,d_year ,d_qoy ,d_moy ,s_store_id
>                   ,sum(coalesce(ss_sales_price*ss_quantity,0)) sumsales
>             from store_sales ,date_dim ,store ,item
>        where  ss_sold_date_sk=d_date_sk
>           and ss_item_sk=i_item_sk
>           and ss_store_sk = s_store_sk
>           and d_month_seq between 1212 and 1212 + 11
>        group by i_category, i_class, i_brand, i_product_name, d_year, d_qoy, d_moy,s_store_id)
>  ,
>  results_rollup as
>  (select i_category, i_class, i_brand, i_product_name, d_year, d_qoy, d_moy, s_store_id, sumsales
>   from results
>   union all
>   select i_category, i_class, i_brand, i_product_name, d_year, d_qoy, d_moy, null s_store_id, sum(sumsales) sumsales
>   from results
>   group by i_category, i_class, i_brand, i_product_name, d_year, d_qoy, d_moy
>   union all
>   select i_category, i_class, i_brand, i_product_name, d_year, d_qoy, null d_moy, null s_store_id, sum(sumsales) sumsales
>   from results
>   group by i_category, i_class, i_brand, i_product_name, d_year, d_qoy
>   union all
>   select i_category, i_class, i_brand, i_product_name, d_year, null d_qoy, null d_moy, null s_store_id, sum(sumsales) sumsales
>   from results
>   group by i_category, i_class, i_brand, i_product_name, d_year
>   union all
>   select i_category, i_class, i_brand, i_product_name, null d_year, null d_qoy, null d_moy, null s_store_id, sum(sumsales) sumsales
>   from results
>   group by i_category, i_class, i_brand, i_product_name
>   union all
>   select i_category, i_class, i_brand, null i_product_name, null d_year, null d_qoy, null d_moy, null s_store_id, sum(sumsales) sumsales
>   from results
>   group by i_category, i_class, i_brand
>   union all
>   select i_category, i_class, null i_brand, null i_product_name, null d_year, null d_qoy, null d_moy, null s_store_id, sum(sumsales) sumsales
>   from results
>   group by i_category, i_class
>   union all
>   select i_category, null i_class, null i_brand, null i_product_name, null d_year, null d_qoy, null d_moy, null s_store_id, sum(sumsales) sumsales
>   from results
>   group by i_category
>   union all
>   select null i_category, null i_class, null i_brand, null i_product_name, null d_year, null d_qoy, null d_moy, null s_store_id, sum(sumsales) sumsales
>   from results)
>  select  *
> from (select i_category
>             ,i_class
>             ,i_brand
>             ,i_product_name
>             ,d_year
>             ,d_qoy
>             ,d_moy
>             ,s_store_id
>             ,sumsales
>             ,rank() over (partition by i_category order by sumsales desc) rk
>       from results_rollup) dw2
> where rk <= 100
> order by i_category
>         ,i_class
>         ,i_brand
>         ,i_product_name
>         ,d_year
>         ,d_qoy
>         ,d_moy
>         ,s_store_id
>         ,sumsales
>         ,rk
> limit 100
> {noformat}
> Assigning to myself to fill in more details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)