You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@impala.apache.org by "Tim Armstrong (JIRA)" <ji...@apache.org> on 2017/07/12 04:21:00 UTC

[jira] [Resolved] (IMPALA-4862) Planner's peak resource estimates do not accurately reflect the behaviour of joins and unions in the backend

     [ https://issues.apache.org/jira/browse/IMPALA-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tim Armstrong resolved IMPALA-4862.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: Impala 2.10.0


IMPALA-4862: make resource profile consistent with backend behaviour

This moves away from the PipelinedPlanNodeSet approach of enumerating
sets of concurrently-executing nodes because unions would force
creating many overlapping sets of nodes. The new approach computes
the peak resources during Open() and the peak resources between Open()
and Close() (i.e. while calling GetNext()) bottom-up for each plan node
in a fragment. The fragment resources are then combined to produce the
query resources.

The basic assumptions for the new resource estimates are:
* resources are acquired during or after the first call to Open()
  and released in Close().
* Blocking nodes call Open() on their child before acquiring
  their own resources (this required some backend changes).
* Blocking nodes call Close() on their children before returning
  from Open().
* The peak resource consumption of the query is the sum of the
  independent fragments (except for the parallel join build plans
  where we can assume there will be synchronisation). This is
  conservative but we don't synchronise fragment Open() and Close()
  across exchanges so can't make stronger assumptions in general.

Also compute the sum of minimum reservations. This will be useful
in the backend to determine exactly when all of the initial
reservations have been claimed from a shared pool of initial reservations.

Testing:
* Updated planner tests to reflect behavioural changes.
* Added extra resource requirement planner tests for unions, subplans,
  pipelines of blocking operators, and bushy join plans.
* Added single-node plans to resource-requirements tests. These have
  more complex plan trees inside a single fragment, which is useful
  for testing the peak resource requirement logic.

Change-Id: I492cf5052bb27e4e335395e2a8f8a3b07248ec9d
Reviewed-on: http://gerrit.cloudera.org:8080/7223
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins

> Planner's peak resource estimates do not accurately reflect the behaviour of joins and unions in the backend
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: IMPALA-4862
>                 URL: https://issues.apache.org/jira/browse/IMPALA-4862
>             Project: IMPALA
>          Issue Type: Bug
>          Components: Frontend
>    Affects Versions: Impala 2.9.0
>            Reporter: Tim Armstrong
>            Assignee: Tim Armstrong
>              Labels: planner, resource-management
>             Fix For: Impala 2.10.0
>
>
> In the following example the way the peak resource estimate is computed from per-node estimates is wrong. It should be 476.41MB, because the scan node is Open()ed in the backend *while* the concurrent join builds are executing.
> {code}
> set explain_level=1;
> explain select * from tpch.lineitem inner join tpch.orders on l_orderkey = o_orderkey
> {code}
> {code}
> Estimated Per-Host Requirements: Memory=388.41MB
> PLAN-ROOT SINK
> |
> 04:EXCHANGE [UNPARTITIONED]
> |  hosts=3 per-host-mem=0B
> |  tuple-ids=0,1 row-size=454B cardinality=5757710
> |
> 02:HASH JOIN [INNER JOIN, BROADCAST]
> |  hash predicates: l_orderkey = o_orderkey
> |  runtime filters: RF000 <- o_orderkey
> |  hosts=3 per-host-mem=300.41MB
> |  tuple-ids=0,1 row-size=454B cardinality=5757710
> |
> |--03:EXCHANGE [BROADCAST]
> |  |  hosts=2 per-host-mem=0B
> |  |  tuple-ids=1 row-size=191B cardinality=1500000
> |  |
> |  01:SCAN HDFS [tpch.orders, RANDOM]
> |     partitions=1/1 files=1 size=162.56MB
> |     table stats: 1500000 rows total
> |     column stats: all
> |     hosts=2 per-host-mem=88.00MB
> |     tuple-ids=1 row-size=191B cardinality=1500000
> |
> 00:SCAN HDFS [tpch.lineitem, RANDOM]
>    partitions=1/1 files=1 size=718.94MB
>    runtime filters: RF000 -> l_orderkey
>    table stats: 6001215 rows total
>    column stats: all
>    hosts=3 per-host-mem=88.00MB
>    tuple-ids=0 row-size=263B cardinality=6001215
> {code}
> Another example is this one, where in the backend the aggregations can execute concurrently with the join builds 
> {code}
> [localhost:21000] > explain select straight_join * from (select id, count(*) from functional.alltypes group by id) t1 inner join functional.alltypes t2 on t1.id = t2.id;
> Query: explain select straight_join * from (select id, count(*) from functional.alltypes group by id) t1 inner join functional.alltypes t2 on t1.id = t2.id
> +-----------------------------------------------------+
> | Explain String                                      |
> +-----------------------------------------------------+
> | Estimated Per-Host Requirements: Memory=180.00MB    |
> |                                                     |
> | PLAN-ROOT SINK                                      |
> | |                                                   |
> | 07:EXCHANGE [UNPARTITIONED]                         |
> | |  hosts=3 per-host-mem=0B                          |
> | |  tuple-ids=1,3 row-size=109B cardinality=7300     |
> | |                                                   |
> | 03:HASH JOIN [INNER JOIN, PARTITIONED]              |
> | |  hash predicates: id = t2.id                      |
> | |  runtime filters: RF000 <- t2.id                  |
> | |  hosts=3 per-host-mem=253.55KB                    |
> | |  tuple-ids=1,3 row-size=109B cardinality=7300     |
> | |                                                   |
> | |--06:EXCHANGE [HASH(t2.id)]                        |
> | |  |  hosts=3 per-host-mem=0B                       |
> | |  |  tuple-ids=3 row-size=97B cardinality=7300     |
> | |  |                                                |
> | |  02:SCAN HDFS [functional.alltypes t2, RANDOM]    |
> | |     partitions=24/24 files=24 size=478.45KB       |
> | |     table stats: 7300 rows total                  |
> | |     column stats: all                             |
> | |     hosts=3 per-host-mem=160.00MB                 |
> | |     tuple-ids=3 row-size=97B cardinality=7300     |
> | |                                                   |
> | 05:AGGREGATE [FINALIZE]                             |
> | |  output: count:merge(*)                           |
> | |  group by: id                                     |
> | |  hosts=3 per-host-mem=10.00MB                     |
> | |  tuple-ids=1 row-size=12B cardinality=7300        |
> | |                                                   |
> | 04:EXCHANGE [HASH(id)]                              |
> | |  hosts=3 per-host-mem=0B                          |
> | |  tuple-ids=1 row-size=12B cardinality=7300        |
> | |                                                   |
> | 01:AGGREGATE [STREAMING]                            |
> | |  output: count(*)                                 |
> | |  group by: id                                     |
> | |  hosts=3 per-host-mem=10.00MB                     |
> | |  tuple-ids=1 row-size=12B cardinality=7300        |
> | |                                                   |
> | 00:SCAN HDFS [functional.alltypes, RANDOM]          |
> |    partitions=24/24 files=24 size=478.45KB          |
> |    runtime filters: RF000 -> functional.alltypes.id |
> |    table stats: 7300 rows total                     |
> |    column stats: all                                |
> |    hosts=3 per-host-mem=160.00MB                    |
> |    tuple-ids=0 row-size=4B cardinality=7300         |
> +-----------------------------------------------------+
> {code}
> The behaviour for unions also is not accurate - branches of unions within the same fragment are execute serially, but anything below an exchanges is executed concurrently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)