You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@impala.apache.org by "Riza Suminto (Code Review)" <ge...@cloudera.org> on 2023/05/01 04:50:34 UTC

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Hello Quanlong Huang, Kurt Deschler, Csaba Ringhofer, Wenzhe Zhou, Impala Public Jenkins, 

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/19807

to look at the new patch set (#2).

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.

Query option PROCESSING_COST_MAX_THREADS is repurposed as an upper bound
on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number of
scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
PROCESSING_COST_MAX_THREADS can also clamp down the scan processing cost
in a similar way. For interior fragments, a combination of
PROCESSING_COST_MAX_THREADS, PROCESSING_COST_MIN_THREADS, and the number
of available cores per node is accounted to determine maximum fragment
parallelism per node. For scan fragment, only the first two is
considered to encourage Frontend to choose a larger executor group as
needed.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
PROCESSING_COST_MAX_THREADS equal to or lower than configured admission
control slots value can help lower scan parallelism and pass the
admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with the
following parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    --query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   +1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   +1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   +1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   +0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   +0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   -0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | -0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   -0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | -0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   -1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | -0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   -2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
  PROCESSING_COST_MAX_THREADS set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
---
M be/src/exec/blocking-join-node.h
M be/src/exec/data-sink.h
M be/src/exec/exec-node.cc
M be/src/exec/exec-node.h
M be/src/exec/hdfs-scanner.h
M be/src/exec/scan-node.cc
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/Analyzer.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
M fe/src/test/resources/llama-site-3-groups.xml
M testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/custom_cluster/test_executor_groups.py
M tests/query_test/test_tpcds_queries.py
34 files changed, 1,281 insertions(+), 1,086 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/07/19807/2
-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 2
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>