You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@impala.apache.org by "Riza Suminto (Code Review)" <ge...@cloudera.org> on 2023/04/26 01:43:41 UTC

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Riza Suminto has uploaded this change for review. ( http://gerrit.cloudera.org:8080/19807


Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges. Query option
NUM_SCANNER_THREADS is repurposed as an upper bound on scan parallelism
if COMPUTE_PROCESSING_COST=true. If the number of scan ranges is fewer
than the maximum parallelism allowed by the scan node's processing cost,
that processing cost will be clamped down to (min_processing_per_thread
/ number of scan ranges). Lowering NUM_SCANNER_THREADS can also clamp
down the scan processing cost in a similar way.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting NUM_SCANNER_THREADS
equal to or lower than configured admission control slots value can help
lower scan parallelism and pass the admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with folowing
parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    --query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follow:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   +1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   +1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   +1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   +0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   +0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   -0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | -0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   -0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | -0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   -1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | -0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   -2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
---
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M fe/src/main/java/org/apache/impala/analysis/Analyzer.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/test/resources/llama-site-3-groups.xml
M testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/custom_cluster/test_executor_groups.py
17 files changed, 808 insertions(+), 774 deletions(-)



  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/07/19807/1
-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 1
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 7:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/12978/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 7
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Tue, 09 May 2023 19:25:45 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 9: Verified-1

Build failed: https://jenkins.impala.io/job/gerrit-verify-dryrun/9302/


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 9
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Wed, 10 May 2023 23:35:18 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 1:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/19807/1/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/19807/1/be/src/scheduling/scheduler.cc@441
PS1, Line 441: if (scan_hosts.empty()) {
I think this is a bug here.
This should also check if instances_per_host.empty() (there is no union node in the fragment).



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 1
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Fri, 28 Apr 2023 06:59:13 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Wenzhe Zhou (Code Review)" <ge...@cloudera.org>.
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 8: Code-Review+1


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 8
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Tue, 09 May 2023 21:17:45 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Wenzhe Zhou (Code Review)" <ge...@cloudera.org>.
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 10: Code-Review+2


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 10
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Thu, 11 May 2023 17:04:58 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Hello Quanlong Huang, Kurt Deschler, Csaba Ringhofer, Wenzhe Zhou, Impala Public Jenkins, 

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/19807

to look at the new patch set (#4).

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.

Query option MAX_FRAGMENT_INSTANCES_PER_NODE is repurposed as an upper
bound on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number
of scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
MAX_FRAGMENT_INSTANCES_PER_NODE can also clamp down the scan processing
cost in a similar way. For interior fragments, a combination of
MAX_FRAGMENT_INSTANCES_PER_NODE, PROCESSING_COST_MIN_THREADS, and the
number of available cores per node is accounted to determine maximum
fragment parallelism per node. For scan fragment, only the first two is
considered to encourage Frontend to choose a larger executor group as
needed.

Two new static state is added into exec-node.h: is_mt_fragment_ and
num_instances_per_node_. The backend code that refers to the MT_DOP
option is replaced with either is_mt_fragment_ or
num_instances_per_node_.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
MAX_FRAGMENT_INSTANCES_PER_NODE equal to or lower than configured
admission control slots value can help lower scan parallelism and pass
the admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with the
following parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    --query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   +1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   +1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   +1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   +0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   +0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   -0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | -0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   -0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | -0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   -1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | -0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   -2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
  MAX_FRAGMENT_INSTANCES_PER_NODE set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
---
M be/src/exec/blocking-join-node.h
M be/src/exec/data-sink.h
M be/src/exec/exec-node.cc
M be/src/exec/exec-node.h
M be/src/exec/hdfs-scanner.h
M be/src/exec/scan-node.cc
M be/src/exec/scan-node.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/Analyzer.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
M fe/src/test/resources/llama-site-3-groups.xml
M testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/custom_cluster/test_executor_groups.py
M tests/query_test/test_insert.py
M tests/query_test/test_tpcds_queries.py
36 files changed, 1,338 insertions(+), 1,101 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/07/19807/4
-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 4
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Hello Quanlong Huang, Kurt Deschler, Csaba Ringhofer, Wenzhe Zhou, Impala Public Jenkins, 

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/19807

to look at the new patch set (#2).

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.

Query option PROCESSING_COST_MAX_THREADS is repurposed as an upper bound
on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number of
scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
PROCESSING_COST_MAX_THREADS can also clamp down the scan processing cost
in a similar way. For interior fragments, a combination of
PROCESSING_COST_MAX_THREADS, PROCESSING_COST_MIN_THREADS, and the number
of available cores per node is accounted to determine maximum fragment
parallelism per node. For scan fragment, only the first two is
considered to encourage Frontend to choose a larger executor group as
needed.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
PROCESSING_COST_MAX_THREADS equal to or lower than configured admission
control slots value can help lower scan parallelism and pass the
admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with the
following parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    --query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   +1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   +1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   +1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   +0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   +0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   -0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | -0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   -0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | -0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   -1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | -0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   -2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
  PROCESSING_COST_MAX_THREADS set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
---
M be/src/exec/blocking-join-node.h
M be/src/exec/data-sink.h
M be/src/exec/exec-node.cc
M be/src/exec/exec-node.h
M be/src/exec/hdfs-scanner.h
M be/src/exec/scan-node.cc
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/Analyzer.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
M fe/src/test/resources/llama-site-3-groups.xml
M testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/custom_cluster/test_executor_groups.py
M tests/query_test/test_tpcds_queries.py
34 files changed, 1,281 insertions(+), 1,086 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/07/19807/2
-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 2
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Wenzhe Zhou (Code Review)" <ge...@cloudera.org>.
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 6:

(7 comments)

http://gerrit.cloudera.org:8080/#/c/19807/6//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/19807/6//COMMIT_MSG@28
PS6, Line 28: is
nit: are


http://gerrit.cloudera.org:8080/#/c/19807/6/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/19807/6/be/src/scheduling/scheduler.cc@285
PS6, Line 285: fragment_state->instance_states.empty()
Is it possible fragment_state->instance_states is empty()? num_host as 0 is not handled in line 317.


http://gerrit.cloudera.org:8080/#/c/19807/6/be/src/scheduling/scheduler.cc@286
PS6, Line 286: 1
0. Otherwise if fragment_state->instance_states.size() equals 1, index is out of range.


http://gerrit.cloudera.org:8080/#/c/19807/6/be/src/scheduling/scheduler.cc@305
PS6, Line 305: fragment_state->instance_states.size()
fragment_state->instance_states.size() - 1


http://gerrit.cloudera.org:8080/#/c/19807/6/common/thrift/ImpalaService.thrift
File common/thrift/ImpalaService.thrift:

http://gerrit.cloudera.org:8080/#/c/19807/6/common/thrift/ImpalaService.thrift@787
PS6, Line 787: host
nit: node


http://gerrit.cloudera.org:8080/#/c/19807/6/common/thrift/Query.thrift
File common/thrift/Query.thrift:

http://gerrit.cloudera.org:8080/#/c/19807/6/common/thrift/Query.thrift@114
PS6, Line 114: const i32 MAX_FRAGMENT_INSTANCES_PER_NODE = 128
nit: add a comment for this new constant since it's not for TQueryOption.num_nodes


http://gerrit.cloudera.org:8080/#/c/19807/6/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
File fe/src/main/java/org/apache/impala/planner/PlanFragment.java:

http://gerrit.cloudera.org:8080/#/c/19807/6/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@1110
PS6, Line 1110: canTryLower = false;
move out of loop?



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Tue, 09 May 2023 05:54:30 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 6:

(2 comments)

Patch set 6 adds override of perThreadIoBuffers to 2 at HdfsScanNode.java, following the constant at scan-node.cc.

The impact can be observed in the modified tpcds-processing-cost.test, where several Per-Instance Resources mem-estimate is now fixed at 16MB (2 * 8MB buffer). Consequently, Per-Host Resource Estimate is also lowered in some queries. This might impact scan speed, but I'm not sure if there is any benefit of having more IO buffer per scan instance.

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/scan-node.cc
File be/src/exec/scan-node.cc:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/scan-node.cc@298
PS3, Line 298:                   * FLAGS_max_queued_row_batches_per_scanner_thread));
> We can and should preserve legacy computations for MT_DOP. Just keep the co
Patch set 6 reorganize the calculation into separate blocks.


http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/ImpalaService.thrift
File common/thrift/ImpalaService.thrift:

http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/ImpalaService.thrift@793
PS3, Line 793:   MAX_FRAGMENT_INSTANCES_PER_NODE = 156
> Still 156
Per PM, 156 is the number corresponding to this in Query.thrift, not related with the upper limit 128.



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Sat, 06 May 2023 00:06:52 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 5:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@313
PS3, Line 313:         fragment_state->fragment.display_name, largest_inst_per_host,
> I missed this, will rename it in next patchset.
Fixed in patch set 5.



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 5
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Fri, 05 May 2023 21:03:28 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Wenzhe Zhou (Code Review)" <ge...@cloudera.org>.
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 8: Code-Review+2

carry +1 from Kurt


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 8
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Wed, 10 May 2023 18:10:32 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Hello Quanlong Huang, Kurt Deschler, Csaba Ringhofer, Wenzhe Zhou, Impala Public Jenkins, 

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/19807

to look at the new patch set (#7).

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.

Query option MAX_FRAGMENT_INSTANCES_PER_NODE is added as an upper
bound on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number
of scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
MAX_FRAGMENT_INSTANCES_PER_NODE can also clamp down the scan processing
cost in a similar way. For interior fragments, a combination of
MAX_FRAGMENT_INSTANCES_PER_NODE, PROCESSING_COST_MIN_THREADS, and the
number of available cores per node is accounted to determine maximum
fragment parallelism per node. For scan fragment, only the first two are
considered to encourage Frontend to choose a larger executor group as
needed.

Two new static state is added into exec-node.h: is_mt_fragment_ and
num_instances_per_node_. The backend code that refers to the MT_DOP
option is replaced with either is_mt_fragment_ or
num_instances_per_node_.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
MAX_FRAGMENT_INSTANCES_PER_NODE equal to or lower than configured
admission control slots value can help lower scan parallelism and pass
the admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with the
following parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    --query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   +1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   +1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   +1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   +0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   +0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   -0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | -0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   -0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | -0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   -1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | -0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   -2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
  MAX_FRAGMENT_INSTANCES_PER_NODE set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
---
M be/src/exec/blocking-join-node.h
M be/src/exec/data-sink.h
M be/src/exec/exec-node.cc
M be/src/exec/exec-node.h
M be/src/exec/hdfs-scanner.h
M be/src/exec/scan-node.cc
M be/src/exec/scan-node.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/Analyzer.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
M fe/src/test/resources/llama-site-3-groups.xml
M testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/custom_cluster/test_executor_groups.py
M tests/query_test/test_insert.py
M tests/query_test/test_tpcds_queries.py
36 files changed, 1,443 insertions(+), 1,196 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/07/19807/7
-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 7
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 9: Code-Review+2


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 9
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Wed, 10 May 2023 18:11:21 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 8:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/12979/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 8
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Tue, 09 May 2023 21:29:07 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 10:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/13000/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 10
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Thu, 11 May 2023 15:54:57 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Hello Quanlong Huang, Kurt Deschler, Csaba Ringhofer, Wenzhe Zhou, Impala Public Jenkins, 

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/19807

to look at the new patch set (#5).

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.

Query option MAX_FRAGMENT_INSTANCES_PER_NODE is repurposed as an upper
bound on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number
of scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
MAX_FRAGMENT_INSTANCES_PER_NODE can also clamp down the scan processing
cost in a similar way. For interior fragments, a combination of
MAX_FRAGMENT_INSTANCES_PER_NODE, PROCESSING_COST_MIN_THREADS, and the
number of available cores per node is accounted to determine maximum
fragment parallelism per node. For scan fragment, only the first two is
considered to encourage Frontend to choose a larger executor group as
needed.

Two new static state is added into exec-node.h: is_mt_fragment_ and
num_instances_per_node_. The backend code that refers to the MT_DOP
option is replaced with either is_mt_fragment_ or
num_instances_per_node_.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
MAX_FRAGMENT_INSTANCES_PER_NODE equal to or lower than configured
admission control slots value can help lower scan parallelism and pass
the admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with the
following parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    --query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   +1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   +1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   +1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   +0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   +0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   -0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | -0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   -0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | -0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   -1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | -0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   -2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
  MAX_FRAGMENT_INSTANCES_PER_NODE set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
---
M be/src/exec/blocking-join-node.h
M be/src/exec/data-sink.h
M be/src/exec/exec-node.cc
M be/src/exec/exec-node.h
M be/src/exec/hdfs-scanner.h
M be/src/exec/scan-node.cc
M be/src/exec/scan-node.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/Analyzer.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
M fe/src/test/resources/llama-site-3-groups.xml
M testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/custom_cluster/test_executor_groups.py
M tests/query_test/test_insert.py
M tests/query_test/test_tpcds_queries.py
36 files changed, 1,338 insertions(+), 1,101 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/07/19807/5
-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 5
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 6:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/12955/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Sat, 06 May 2023 00:16:11 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Hello Quanlong Huang, Kurt Deschler, Csaba Ringhofer, Wenzhe Zhou, Impala Public Jenkins, 

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/19807

to look at the new patch set (#3).

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.

Query option PROCESSING_COST_MAX_THREADS is repurposed as an upper bound
on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number of
scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
PROCESSING_COST_MAX_THREADS can also clamp down the scan processing cost
in a similar way. For interior fragments, a combination of
PROCESSING_COST_MAX_THREADS, PROCESSING_COST_MIN_THREADS, and the number
of available cores per node is accounted to determine maximum fragment
parallelism per node. For scan fragment, only the first two is
considered to encourage Frontend to choose a larger executor group as
needed.

Two new static state is added into exec-node.h: is_mt_fragment_ and
effective_dop_. The backend code that refers to the MT_DOP option is
replaced with either is_mt_fragment_ or effective_dop_.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
PROCESSING_COST_MAX_THREADS equal to or lower than configured admission
control slots value can help lower scan parallelism and pass the
admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with the
following parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    --query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   +1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   +1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   +1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   +0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   +0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   -0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | -0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   -0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | -0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   -1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | -0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   -2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
  PROCESSING_COST_MAX_THREADS set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
---
M be/src/exec/blocking-join-node.h
M be/src/exec/data-sink.h
M be/src/exec/exec-node.cc
M be/src/exec/exec-node.h
M be/src/exec/hdfs-scanner.h
M be/src/exec/scan-node.cc
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/Analyzer.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
M fe/src/test/resources/llama-site-3-groups.xml
M testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/custom_cluster/test_executor_groups.py
M tests/query_test/test_tpcds_queries.py
34 files changed, 1,281 insertions(+), 1,086 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/07/19807/3
-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 3
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 7:

(8 comments)

Thank you for catching those bugs!

http://gerrit.cloudera.org:8080/#/c/19807/6//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/19807/6//COMMIT_MSG@19
PS6, Line 19: is added as a
> is added
Done


http://gerrit.cloudera.org:8080/#/c/19807/6//COMMIT_MSG@28
PS6, Line 28: ar
> nit: are
Done


http://gerrit.cloudera.org:8080/#/c/19807/6/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/19807/6/be/src/scheduling/scheduler.cc@285
PS6, Line 285: o the first fragment instance.
> Is it possible fragment_state->instance_states is empty()? num_host as 0 is
It is not possible. Added DCHECK here.


http://gerrit.cloudera.org:8080/#/c/19807/6/be/src/scheduling/scheduler.cc@286
PS6, Line 286: 0
> 0. Otherwise if fragment_state->instance_states.size() equals 1, index is o
Done


http://gerrit.cloudera.org:8080/#/c/19807/6/be/src/scheduling/scheduler.cc@305
PS6, Line 305: fragment_state->instance_states.size()
> fragment_state->instance_states.size() - 1
Done


http://gerrit.cloudera.org:8080/#/c/19807/6/common/thrift/ImpalaService.thrift
File common/thrift/ImpalaService.thrift:

http://gerrit.cloudera.org:8080/#/c/19807/6/common/thrift/ImpalaService.thrift@787
PS6, Line 787: node
> nit: node
Done


http://gerrit.cloudera.org:8080/#/c/19807/6/common/thrift/Query.thrift
File common/thrift/Query.thrift:

http://gerrit.cloudera.org:8080/#/c/19807/6/common/thrift/Query.thrift@114
PS6, Line 114: // constant used as upperbound for TQueryOptions.processing_cost_min_threads and
> nit: add a comment for this new constant since it's not for TQueryOption.nu
Done


http://gerrit.cloudera.org:8080/#/c/19807/6/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
File fe/src/main/java/org/apache/impala/planner/PlanFragment.java:

http://gerrit.cloudera.org:8080/#/c/19807/6/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@1110
PS6, Line 1110:  Prevent caller from
> move out of loop?
Done



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 7
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Tue, 09 May 2023 19:08:36 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 9:

Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/9302/ DRY_RUN=false


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 9
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Wed, 10 May 2023 18:11:22 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 5:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/12954/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 5
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Fri, 05 May 2023 21:23:12 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Kurt Deschler (Code Review)" <ge...@cloudera.org>.
Kurt Deschler has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 5:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/scan-node.cc
File be/src/exec/scan-node.cc:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/scan-node.cc@298
PS3, Line 298:     max_row_batches =
> The more I look at this code, the more concern I am to change the existing 
We can and should preserve legacy computations for MT_DOP. Just keep the code well separated into different blocks.


http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/ImpalaService.thrift
File common/thrift/ImpalaService.thrift:

http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/ImpalaService.thrift@793
PS3, Line 793:   MAX_FRAGMENT_INSTANCES_PER_NODE = 156
> Done
Still 156



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 5
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Fri, 05 May 2023 21:33:04 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 3:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/19807/2/be/src/exec/exec-node.cc
File be/src/exec/exec-node.cc:

http://gerrit.cloudera.org:8080/#/c/19807/2/be/src/exec/exec-node.cc@93
PS2, Line 93: effective_dop_ = state->query_options().mt_dop;
> This should just be
Done



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 3
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Mon, 01 May 2023 15:27:56 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 3:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/12902/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 3
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Mon, 01 May 2023 15:49:15 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 9:

> Patch Set 9: Verified-1
> 
> Build failed: https://jenkins.impala.io/job/gerrit-verify-dryrun/9302/

This fail TestTpcdsQueryWithProcessingCost.test_tpcds_q51a in dockerised setup. I'll check if lowering max_fragment_instances_per_node from 5 to 4 can help lower the mem requirement.


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 9
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Thu, 11 May 2023 00:32:05 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 11: Verified+1


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 11
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Thu, 11 May 2023 22:46:30 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 11:

Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/9307/ DRY_RUN=false


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 11
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Thu, 11 May 2023 17:09:17 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Wenzhe Zhou (Code Review)" <ge...@cloudera.org>.
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 3:

(4 comments)

http://gerrit.cloudera.org:8080/#/c/19807/3//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/19807/3//COMMIT_MSG@17
PS3, Line 17: 200 scan range
just curious any reason to choose this number? Do we need to tune this number in future based on storage io throughput?


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/hdfs-scanner.h
File be/src/exec/hdfs-scanner.h:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/hdfs-scanner.h@388
PS3, Line 388: effective_dop
nit: effective_dop()


http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/Query.thrift
File common/thrift/Query.thrift:

http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/Query.thrift@632
PS3, Line 632:   157: optional i32 processing_cost_max_threads = 128;
> Can we consolidate there 128 constants somewhere?
Agree. The ranges of PROCESSING_COST_MAX_THREADS and PROCESSING_COST_MIN_THREADS are also bound to 128. We should define a constant 128 so later we can increase it easily.


http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
File fe/src/main/java/org/apache/impala/analysis/Analyzer.java:

http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/analysis/Analyzer.java@658
PS3, Line 658: =
Is 0 valid value?



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 3
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Wed, 03 May 2023 19:54:35 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 1:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/19807/1//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/19807/1//COMMIT_MSG@18
PS1, Line 18: NUM_SCANNER_THREADS
Should create new query option for this specific purpose rather than reuse  NUM_SCANNER_THREADS.


http://gerrit.cloudera.org:8080/#/c/19807/1/fe/src/main/java/org/apache/impala/planner/PlanNode.java
File fe/src/main/java/org/apache/impala/planner/PlanNode.java:

http://gerrit.cloudera.org:8080/#/c/19807/1/fe/src/main/java/org/apache/impala/planner/PlanNode.java@a1036
PS1, Line 1036: 
I need to look around if there is another reference to MT_DOP like this in frontend code. They should be replaced with analyzer.getMaxParallelismPerNode(). Backend code should get checked as well.



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 1
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Wed, 26 Apr 2023 21:06:11 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 4:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/12953/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 4
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Fri, 05 May 2023 20:42:48 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Kurt Deschler (Code Review)" <ge...@cloudera.org>.
Kurt Deschler has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 3:

(17 comments)

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/data-sink.h
File be/src/exec/data-sink.h:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/data-sink.h@48
PS3, Line 48: /// DataSink contains the runtime state and there can be up to effective_dop instances of
ExceNode::effective_dop()


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/exec-node.h
File be/src/exec/exec-node.h:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/exec-node.h@132
PS3, Line 132:   int effective_dop() const { return effective_dop_; }
Would rather not see the term "effective" applied during execution. At this point, the query is planned and performance is consequence of that plan.


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/scan-node.cc
File be/src/exec/scan-node.cc:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/scan-node.cc@298
PS3, Line 298:     max_row_batches = max(2, max_row_batches / parent->plan_node().effective_dop());
This is too disconnected from the max_row_batches computation above, which does not consider effective_dop. I don't see how max_queued_row_batches_per_scanner_thread would be enforced.


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@58
PS3, Line 58: static const int MAX_INSTANCES_PER_NODE = 128;
Please add comment and consider making this configurable.


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@275
PS3, Line 275:       // TODO: Fragment with IsExceedMaxFsWriters equals true is not checked for now
Let's address this case in this PR


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@283
PS3, Line 283:           fragment_state->fragment.display_name));
Print values with message


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@289
PS3, Line 289:     int idx = 1;
largest_inst_idx


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@290
PS3, Line 290:     for (int i = 1; i < fragment_state->instance_states.size(); i++) {
Add a comment to explain what the following blocks are checking. It's not clear how the (128) limit is being avoided here.


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@313
PS3, Line 313:     int effective_inst_per_host = ceil((float)effective_instance_count / num_host);
Rename planned_inst_per_host


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@322
PS3, Line 322:               << " num_host=" << num_host;
Can we suggest anything to correct if this occurs?


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/service/query-options.h
File be/src/service/query-options.h:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/service/query-options.h@293
PS3, Line 293:   QUERY_OPT_FN(processing_cost_max_threads, PROCESSING_COST_MAX_THREADS,                 \
Rename max_fragment_threads_per_executor or something more tangible to the user.


http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/ImpalaService.thrift
File common/thrift/ImpalaService.thrift:

http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/ImpalaService.thrift@793
PS3, Line 793:   PROCESSING_COST_MAX_THREADS = 156;
Typo 156


http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/Query.thrift
File common/thrift/Query.thrift:

http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/Query.thrift@632
PS3, Line 632:   157: optional i32 processing_cost_max_threads = 128;
Can we consolidate there 128 constants somewhere?


http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
File fe/src/main/java/org/apache/impala/analysis/Analyzer.java:

http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/analysis/Analyzer.java@165
PS3, Line 165:   public static final int PROCESSING_COST_THREADS_UPPERBOUND = 128;
max_fragment_threads_per_instance


http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/analysis/Analyzer.java@660
PS3, Line 660:         0;
Is 0 expected here and handled later?


http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/planner/ScanNode.java
File fe/src/main/java/org/apache/impala/planner/ScanNode.java:

http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/planner/ScanNode.java@56
PS3, Line 56:   private final static Logger LOG = LoggerFactory.getLogger(ScanNode.class);
Unused?


http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/test/resources/llama-site-3-groups.xml
File fe/src/test/resources/llama-site-3-groups.xml:

http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/test/resources/llama-site-3-groups.xml@41
PS3, Line 41:     <value>94371840</value>
Why is this change necessary?



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 3
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Wed, 03 May 2023 18:44:40 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Wenzhe Zhou (Code Review)" <ge...@cloudera.org>.
Wenzhe Zhou has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 7:

(3 comments)

http://gerrit.cloudera.org:8080/#/c/19807/7/be/src/service/query-options-test.cc
File be/src/service/query-options-test.cc:

http://gerrit.cloudera.org:8080/#/c/19807/7/be/src/service/query-options-test.cc@262
PS7, Line 262: {MAKE_OPTIONDEF(processing_cost_min_threads),    {1, 128}},
             :       {MAKE_OPTIONDEF(max_fragment_instances_per_node), {1, 128}},
replace 128 with new constant MAX_FRAGMENT_INSTANCES_PER_NODE


http://gerrit.cloudera.org:8080/#/c/19807/7/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
File fe/src/main/java/org/apache/impala/analysis/Analyzer.java:

http://gerrit.cloudera.org:8080/#/c/19807/7/fe/src/main/java/org/apache/impala/analysis/Analyzer.java@654
PS7, Line 654: globalState_.availableCoresPerNode_ > 0
Is it possible the get function is called before set function is called? If not, add a Preconditions.checkState().


http://gerrit.cloudera.org:8080/#/c/19807/7/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
File fe/src/main/java/org/apache/impala/planner/PlanFragment.java:

http://gerrit.cloudera.org:8080/#/c/19807/7/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@1093
PS7, Line 1093: Preconditions.checkState(scanNodes.size() <= 1);
              :         for (ScanNode scanNode : scanNodes)
why need a loop if the expected size is not bigger than 1.



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 7
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Tue, 09 May 2023 20:00:34 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 8:

(3 comments)

http://gerrit.cloudera.org:8080/#/c/19807/7/be/src/service/query-options-test.cc
File be/src/service/query-options-test.cc:

http://gerrit.cloudera.org:8080/#/c/19807/7/be/src/service/query-options-test.cc@262
PS7, Line 262: {MAKE_OPTIONDEF(max_fs_writers),                 {0, I32_MAX}},
             :       {MAKE_OPTIONDEF(default_ndv_scale),              {1, 10}},
> replace 128 with new constant MAX_FRAGMENT_INSTANCES_PER_NODE
Done


http://gerrit.cloudera.org:8080/#/c/19807/7/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
File fe/src/main/java/org/apache/impala/analysis/Analyzer.java:

http://gerrit.cloudera.org:8080/#/c/19807/7/fe/src/main/java/org/apache/impala/analysis/Analyzer.java@654
PS7, Line 654: tions.checkState(globalState_.available
> Is it possible the get function is called before set function is called? If
Done. Put Preconditions in both setter and getter.
Caller must make sure to always set positive value and setter is always called before getter.


http://gerrit.cloudera.org:8080/#/c/19807/7/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
File fe/src/main/java/org/apache/impala/planner/PlanFragment.java:

http://gerrit.cloudera.org:8080/#/c/19807/7/fe/src/main/java/org/apache/impala/planner/PlanFragment.java@1093
PS7, Line 1093: if (!scanNodes.isEmpty()) {
              :           Preconditions.checkState(scanNode
> why need a loop if the expected size is not bigger than 1.
Good point! Replaced it with IF.



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 8
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Tue, 09 May 2023 21:12:20 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Hello Quanlong Huang, Kurt Deschler, Csaba Ringhofer, Wenzhe Zhou, Impala Public Jenkins, 

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/19807

to look at the new patch set (#10).

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.

Query option MAX_FRAGMENT_INSTANCES_PER_NODE is added as an upper
bound on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number
of scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
MAX_FRAGMENT_INSTANCES_PER_NODE can also clamp down the scan processing
cost in a similar way. For interior fragments, a combination of
MAX_FRAGMENT_INSTANCES_PER_NODE, PROCESSING_COST_MIN_THREADS, and the
number of available cores per node is accounted to determine maximum
fragment parallelism per node. For scan fragment, only the first two are
considered to encourage Frontend to choose a larger executor group as
needed.

Two new static state is added into exec-node.h: is_mt_fragment_ and
num_instances_per_node_. The backend code that refers to the MT_DOP
option is replaced with either is_mt_fragment_ or
num_instances_per_node_.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
MAX_FRAGMENT_INSTANCES_PER_NODE equal to or lower than configured
admission control slots value can help lower scan parallelism and pass
the admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with the
following parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    --query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   +1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   +1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   +1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   +0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   +0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   -0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | -0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   -0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | -0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   -1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | -0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   -2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
  MAX_FRAGMENT_INSTANCES_PER_NODE set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
---
M be/src/exec/blocking-join-node.h
M be/src/exec/data-sink.h
M be/src/exec/exec-node.cc
M be/src/exec/exec-node.h
M be/src/exec/hdfs-scanner.h
M be/src/exec/scan-node.cc
M be/src/exec/scan-node.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/Analyzer.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
M fe/src/test/resources/llama-site-3-groups.xml
M testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/custom_cluster/test_executor_groups.py
M tests/query_test/test_insert.py
M tests/query_test/test_tpcds_queries.py
36 files changed, 1,448 insertions(+), 1,196 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/07/19807/10
-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 10
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 11: Code-Review+2


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 11
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Thu, 11 May 2023 17:09:16 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Kurt Deschler (Code Review)" <ge...@cloudera.org>.
Kurt Deschler has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 10: Code-Review+1


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 10
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Thu, 11 May 2023 16:56:04 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Hello Quanlong Huang, Kurt Deschler, Csaba Ringhofer, Wenzhe Zhou, Impala Public Jenkins, 

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/19807

to look at the new patch set (#8).

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.

Query option MAX_FRAGMENT_INSTANCES_PER_NODE is added as an upper
bound on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number
of scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
MAX_FRAGMENT_INSTANCES_PER_NODE can also clamp down the scan processing
cost in a similar way. For interior fragments, a combination of
MAX_FRAGMENT_INSTANCES_PER_NODE, PROCESSING_COST_MIN_THREADS, and the
number of available cores per node is accounted to determine maximum
fragment parallelism per node. For scan fragment, only the first two are
considered to encourage Frontend to choose a larger executor group as
needed.

Two new static state is added into exec-node.h: is_mt_fragment_ and
num_instances_per_node_. The backend code that refers to the MT_DOP
option is replaced with either is_mt_fragment_ or
num_instances_per_node_.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
MAX_FRAGMENT_INSTANCES_PER_NODE equal to or lower than configured
admission control slots value can help lower scan parallelism and pass
the admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with the
following parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    --query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   +1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   +1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   +1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   +0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   +0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   -0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | -0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   -0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | -0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   -1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | -0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   -2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
  MAX_FRAGMENT_INSTANCES_PER_NODE set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
---
M be/src/exec/blocking-join-node.h
M be/src/exec/data-sink.h
M be/src/exec/exec-node.cc
M be/src/exec/exec-node.h
M be/src/exec/hdfs-scanner.h
M be/src/exec/scan-node.cc
M be/src/exec/scan-node.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/Analyzer.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
M fe/src/test/resources/llama-site-3-groups.xml
M testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/custom_cluster/test_executor_groups.py
M tests/query_test/test_insert.py
M tests/query_test/test_tpcds_queries.py
36 files changed, 1,448 insertions(+), 1,196 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/07/19807/8
-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 8
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Kurt Deschler (Code Review)" <ge...@cloudera.org>.
Kurt Deschler has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 6: Code-Review+1


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Mon, 08 May 2023 14:21:46 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 3:

(5 comments)

Thank you for the feedback!
Replying some below:

http://gerrit.cloudera.org:8080/#/c/19807/3//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/19807/3//COMMIT_MSG@17
PS3, Line 17: 200 scan range
> just curious any reason to choose this number? Do we need to tune this numb
I just ballpark this according to tpcds_parquet.store_sales (1824 files) that we use in PlannerTest.testProcessingCost.
We definitely want to refine this further next or make it tunable with backend flag.


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/scan-node.cc
File be/src/exec/scan-node.cc:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/scan-node.cc@298
PS3, Line 298:     max_row_batches = max(2, max_row_batches / parent->plan_node().effective_dop());
> This is too disconnected from the max_row_batches computation above, which 
The max_row_batch value that has FLAGS_max_queued_row_batches_per_scanner_thread as component is the heuristic used in not MT_DOP mode (MT_DOP=0).

The change is for when scan node is in MT_DOP mode, which roughly translate to dividing max_row_batches from previous formula equally among instances in the same host node.


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/service/query-options.h
File be/src/service/query-options.h:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/service/query-options.h@293
PS3, Line 293:   QUERY_OPT_FN(processing_cost_max_threads, PROCESSING_COST_MAX_THREADS,                 \
> Rename max_fragment_threads_per_executor or something more tangible to the 
This is intended to match PROCESSING_COST_MIN_THREADS that already exist, and the context is per-fragment_id instead of per-query.

max_fragment_threads_per_executor sounds like per-query rather than per-fragment_id.


http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
File fe/src/main/java/org/apache/impala/analysis/Analyzer.java:

http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/analysis/Analyzer.java@658
PS3, Line 658: =
> Is 0 valid value?
Returned value will be Math.max against getMinParallelismPerNode(). But I'll change this method to return 1 just to be safe.


http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/test/resources/llama-site-3-groups.xml
File fe/src/test/resources/llama-site-3-groups.xml:

http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/test/resources/llama-site-3-groups.xml@41
PS3, Line 41:     <value>94371840</value>
> Why is this change necessary?
Some new test added in tests/custom_cluster/test_executor_groups.py hit this memory limit. Raising this a bit so they can fit to small pool both in term of cpu and memory.



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 3
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Wed, 03 May 2023 20:55:39 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 6:

(1 comment)

http://gerrit.cloudera.org:8080/#/c/19807/6//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/19807/6//COMMIT_MSG@19
PS6, Line 19: is repurposed
is added



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Sat, 06 May 2023 00:09:56 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Kurt Deschler (Code Review)" <ge...@cloudera.org>.
Kurt Deschler has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 3:

(2 comments)

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/scan-node.cc
File be/src/exec/scan-node.cc:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/scan-node.cc@298
PS3, Line 298:     max_row_batches = max(2, max_row_batches / parent->plan_node().effective_dop());
> The max_row_batch value that has FLAGS_max_queued_row_batches_per_scanner_t
I think the scanner threads should be computed last, based on the planned parallelism. Even better if they can be  determined in the planner and factored in during EG assignment.


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/service/query-options.h
File be/src/service/query-options.h:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/service/query-options.h@293
PS3, Line 293:   QUERY_OPT_FN(processing_cost_max_threads, PROCESSING_COST_MAX_THREADS,                 \
> This is intended to match PROCESSING_COST_MIN_THREADS that already exist, a
We should find a name that implies per-fragment per-instance. This is a limit and may not be tied to processing cost alone so would rather not mention that term.



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 3
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Thu, 04 May 2023 15:39:10 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 1:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/12868/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 1
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Comment-Date: Wed, 26 Apr 2023 02:05:11 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has submitted this change and it was merged. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.

Query option MAX_FRAGMENT_INSTANCES_PER_NODE is added as an upper
bound on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number
of scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
MAX_FRAGMENT_INSTANCES_PER_NODE can also clamp down the scan processing
cost in a similar way. For interior fragments, a combination of
MAX_FRAGMENT_INSTANCES_PER_NODE, PROCESSING_COST_MIN_THREADS, and the
number of available cores per node is accounted to determine maximum
fragment parallelism per node. For scan fragment, only the first two are
considered to encourage Frontend to choose a larger executor group as
needed.

Two new static state is added into exec-node.h: is_mt_fragment_ and
num_instances_per_node_. The backend code that refers to the MT_DOP
option is replaced with either is_mt_fragment_ or
num_instances_per_node_.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
MAX_FRAGMENT_INSTANCES_PER_NODE equal to or lower than configured
admission control slots value can help lower scan parallelism and pass
the admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with the
following parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    --query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   +1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   +1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   +1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   +0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   +0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   -0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | -0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   -0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | -0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   -1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | -0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   -2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
  MAX_FRAGMENT_INSTANCES_PER_NODE set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Reviewed-on: http://gerrit.cloudera.org:8080/19807
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
M be/src/exec/blocking-join-node.h
M be/src/exec/data-sink.h
M be/src/exec/exec-node.cc
M be/src/exec/exec-node.h
M be/src/exec/hdfs-scanner.h
M be/src/exec/scan-node.cc
M be/src/exec/scan-node.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/Analyzer.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
M fe/src/test/resources/llama-site-3-groups.xml
M testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/custom_cluster/test_executor_groups.py
M tests/query_test/test_insert.py
M tests/query_test/test_tpcds_queries.py
36 files changed, 1,448 insertions(+), 1,196 deletions(-)

Approvals:
  Impala Public Jenkins: Looks good to me, approved; Verified

-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: merged
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 12
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 10:

Patch set 10 lower max_fragment_instances_per_node from 5 to 4 in TestTpcdsQueryWithProcessingCost


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 10
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Thu, 11 May 2023 15:36:18 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 2:

(4 comments)

http://gerrit.cloudera.org:8080/#/c/19807/1//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/19807/1//COMMIT_MSG@18
PS1, Line 18: 
> Should create new query option for this specific purpose rather than reuse 
Added PROCESSING_COST_MAX_THREADS.


http://gerrit.cloudera.org:8080/#/c/19807/2/be/src/exec/exec-node.cc
File be/src/exec/exec-node.cc:

http://gerrit.cloudera.org:8080/#/c/19807/2/be/src/exec/exec-node.cc@93
PS2, Line 93: effective_dop_ = state->query_options().mt_dop > 0;
This should just be
effective_dop_ = state->query_options().mt_dop;


http://gerrit.cloudera.org:8080/#/c/19807/1/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/19807/1/be/src/scheduling/scheduler.cc@441
PS1, Line 441:     a) Map scan ranges to
> I think this is a bug here.
Submitted IMPALA-12106 to fix this separately. Patch set 2 has overlap with IMPALA-12106.


http://gerrit.cloudera.org:8080/#/c/19807/1/fe/src/main/java/org/apache/impala/planner/PlanNode.java
File fe/src/main/java/org/apache/impala/planner/PlanNode.java:

http://gerrit.cloudera.org:8080/#/c/19807/1/fe/src/main/java/org/apache/impala/planner/PlanNode.java@a1036
PS1, Line 1036: 
> I need to look around if there is another reference to MT_DOP like this in 
Checked all references to MT_DOP in both frontend and backend code. Replaced them as necessary.



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 2
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Mon, 01 May 2023 04:59:33 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Impala Public Jenkins (Code Review)" <ge...@cloudera.org>.
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 2:

Build Successful 

https://jenkins.impala.io/job/gerrit-code-review-checks/12901/ : Initial code review checks passed. Use gerrit-verify-dryrun-external or gerrit-verify-dryrun to run full precommit tests.


-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 2
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Mon, 01 May 2023 05:11:21 +0000
Gerrit-HasComments: No

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Riza Suminto has posted comments on this change. ( http://gerrit.cloudera.org:8080/19807 )

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................


Patch Set 4:

(17 comments)

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/exec-node.h
File be/src/exec/exec-node.h:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/exec-node.h@132
PS3, Line 132:   int num_instances_per_node() const { return num_instances_per_node_; }
> Would rather not see the term "effective" applied during execution. At this
Renamed to 'num_instances_per_node'


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/hdfs-scanner.h
File be/src/exec/hdfs-scanner.h:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/hdfs-scanner.h@388
PS3, Line 388: PlanNode::num
> nit: effective_dop()
Replaced with PlanNode::num_instances_per_node()


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/scan-node.cc
File be/src/exec/scan-node.cc:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/exec/scan-node.cc@298
PS3, Line 298:     max_row_batches =
> I think the scanner threads should be computed last, based on the planned p
The more I look at this code, the more concern I am to change the existing formula.
NUM_SCANNER_THREADS option have an impact on legacy parallelism mode, both MD_DOP=0 and MT_DOP>0. Changing them can lead to regression for user that is still using legacy parallelism mode.

I prefer to not change them now, but rather override them into fixed count if COMPUTE_PROCESSING_COST=true, regardless of CpuInfo::num_cores() or NUM_SCANNER_THREADS value. I think this makes frontend planner decision more deterministic.


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc
File be/src/scheduling/scheduler.cc:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@58
PS3, Line 58: 
> Please add comment and consider making this configurable.
Added MAX_FRAGMENT_INSTANCES_PER_NODE constant in Query.thrift


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@275
PS3, Line 275:   if (effective_instance_count < fragment_state->instance_states.size()) {
> Let's address this case in this PR
Done


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@283
PS3, Line 283:   int largest_inst_per_host = 0;
> Print values with message
Done


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@289
PS3, Line 289:   for (int i = 1; i < fragment_state->instance_states.size(); i++) {
> largest_inst_idx
Done


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@290
PS3, Line 290:     if (fragment_state->instance_states[i].host
> Add a comment to explain what the following blocks are checking. It's not c
Done


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@313
PS3, Line 313:         fragment_state->fragment.display_name, largest_inst_per_host,
> Rename planned_inst_per_host
I missed this, will rename it in next patchset.


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/scheduling/scheduler.cc@322
PS3, Line 322:             << " Consider running the query with COMPUTE_PROCESSING_COST=false."
> Can we suggest anything to correct if this occurs?
Add " Consider running the query with COMPUTE_PROCESSING_COST=false." to error message.

All returned error status from this method were previously a DCHECK.
I don't expect them to get raised, but if it does, I prefer this as returned status to client instead of DCHECK so I can debug it faster.


http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/service/query-options.h
File be/src/service/query-options.h:

http://gerrit.cloudera.org:8080/#/c/19807/3/be/src/service/query-options.h@293
PS3, Line 293:   QUERY_OPT_FN(max_fragment_instances_per_node, MAX_FRAGMENT_INSTANCES_PER_NODE,         \
> We should find a name that implies per-fragment per-instance. This is a lim
Renamed to MAX_FRAGMENT_INSTANCES_PER_NODE.
I'm leaving PROCESSING_COST_MIN_THREADS unchanged for now.


http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/ImpalaService.thrift
File common/thrift/ImpalaService.thrift:

http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/ImpalaService.thrift@793
PS3, Line 793:   MAX_FRAGMENT_INSTANCES_PER_NODE = 156
> Typo 156
Done


http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/Query.thrift
File common/thrift/Query.thrift:

http://gerrit.cloudera.org:8080/#/c/19807/3/common/thrift/Query.thrift@632
PS3, Line 632:   // See comment in ImpalaService.thrift
> Agree. The ranges of PROCESSING_COST_MAX_THREADS and PROCESSING_COST_MIN_TH
Added MAX_FRAGMENT_INSTANCES_PER_NODE constant in Query.thrift


http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
File fe/src/main/java/org/apache/impala/analysis/Analyzer.java:

http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/analysis/Analyzer.java@165
PS3, Line 165:   public final static String DB_ALREADY_EXISTS_ERROR_MSG = "Database already exists: ";
> max_fragment_threads_per_instance
Replaced with MAX_FRAGMENT_INSTANCES_PER_NODE constant in Query.thrift


http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/analysis/Analyzer.java@658
PS3, Line 658: 
> Returned value will be Math.max against getMinParallelismPerNode(). But I'l
Done


http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/analysis/Analyzer.java@660
PS3, Line 660:         Math.min(QueryConstants.MAX_FRAGMENT_INSTANCES_PER_NODE, x);
> Is 0 expected here and handled later?
Replaced with 1.


http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/planner/ScanNode.java
File fe/src/main/java/org/apache/impala/planner/ScanNode.java:

http://gerrit.cloudera.org:8080/#/c/19807/3/fe/src/main/java/org/apache/impala/planner/ScanNode.java@56
PS3, Line 56:   // scan ranges than would have been estimated assuming a uniform distribution.
> Unused?
Done



-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 4
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>
Gerrit-Comment-Date: Fri, 05 May 2023 20:36:59 +0000
Gerrit-HasComments: Yes

[Impala-ASF-CR] IMPALA-12091: Control scan parallelism by its processing cost

Posted by "Riza Suminto (Code Review)" <ge...@cloudera.org>.
Hello Quanlong Huang, Kurt Deschler, Csaba Ringhofer, Wenzhe Zhou, Impala Public Jenkins, 

I'd like you to reexamine a change. Please visit

    http://gerrit.cloudera.org:8080/19807

to look at the new patch set (#6).

Change subject: IMPALA-12091: Control scan parallelism by its processing cost
......................................................................

IMPALA-12091: Control scan parallelism by its processing cost

Before this patch, Impala still relies on MT_DOP option to decide the
degree of parallelism of the scan fragment when a query runs with
COMPUTE_PROCESSING_COST=1. This patch adds the scan node's processing
cost as another consideration to raise scan parallelism beyond MT_DOP.

Scan node cost is now adjusted to also consider the number of effective
scan ranges. Each scan range is given a weight of (0.5% *
min_processing_per_thread), which roughly means that one scan node
instance can handle at most 200 scan ranges.

Query option MAX_FRAGMENT_INSTANCES_PER_NODE is repurposed as an upper
bound on scan parallelism if COMPUTE_PROCESSING_COST=true. If the number
of scan ranges is fewer than the maximum parallelism allowed by the scan
node's processing cost, that processing cost will be clamped down
to (min_processing_per_thread / number of scan ranges). Lowering
MAX_FRAGMENT_INSTANCES_PER_NODE can also clamp down the scan processing
cost in a similar way. For interior fragments, a combination of
MAX_FRAGMENT_INSTANCES_PER_NODE, PROCESSING_COST_MIN_THREADS, and the
number of available cores per node is accounted to determine maximum
fragment parallelism per node. For scan fragment, only the first two is
considered to encourage Frontend to choose a larger executor group as
needed.

Two new static state is added into exec-node.h: is_mt_fragment_ and
num_instances_per_node_. The backend code that refers to the MT_DOP
option is replaced with either is_mt_fragment_ or
num_instances_per_node_.

Two new criteria are added during effective parallelism calculation in
PlanFragment.adjustToMaxParallelism():

- If a fragment has UnionNode, its parallelism is the maximum between
  its input fragments and its collocated ScanNode's expected
  parallelism.
- If a fragment only has a single ScanNode (and no UnionNode), its
  parallelism is calculated in the same fashion as the interior fragment
  but will not be lowered anymore since it will not have any child
  fragment to compare with.

Admission control slots remain unchanged. This may cause a query to fail
admission if Planner selects scan parallelism that is higher than the
configured admission control slots value. Setting
MAX_FRAGMENT_INSTANCES_PER_NODE equal to or lower than configured
admission control slots value can help lower scan parallelism and pass
the admission controller.

The previous workaround to control scan parallelism by IMPALA-12029 is
now removed. This patch also disables IMPALA-10287 optimization if
COMPUTE_PROCESSING_COST=true. This is because IMPALA-10287 relies on a
fixed number of fragment instances in DistributedPlanner.java. However,
effective parallelism calculation is done much later and may change the
final number of instances of hash join fragment, rendering
DistributionMode selected by IMPALA-10287 inaccurate.

This patch is benchmarked using single_node_perf_run.py with the
following parameters:

args="-gen_experimental_profile=true -default_query_options="
args+="mt_dop=4,compute_processing_cost=1,processing_cost_min_threads=1 "
./bin/single_node_perf_run.py --num_impalads=3 --scale=10 \
    --workloads=tpcds --iterations=5 --table_formats=parquet/none/none \
    --impalad_args="$args" \
    --query_names=TPCDS-Q3,TPCDS-Q14-1,TPCDS-Q14-2,TPCDS-Q23-1,TPCDS-Q23-2,TPCDS-Q49,TPCDS-Q76,TPCDS-Q78,TPCDS-Q80A \
    "IMPALA-12091~1" IMPALA-12091

The benchmark result is as follows:
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| Workload  | Query       | File Format           | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%)  | Base StdDev(%) | Iters | Median Diff(%) | MW Zval | Tval  |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+
| TPCDS(10) | TPCDS-Q23-1 | parquet / none / none | 4.62   | 4.54        |   +1.92%   |   0.23%    |   1.59%        | 5     |   +2.32%       | 1.15    | 2.67  |
| TPCDS(10) | TPCDS-Q14-1 | parquet / none / none | 5.82   | 5.76        |   +1.08%   |   5.27%    |   3.89%        | 5     |   +2.04%       | 0.00    | 0.37  |
| TPCDS(10) | TPCDS-Q23-2 | parquet / none / none | 4.65   | 4.58        |   +1.38%   |   1.97%    |   0.48%        | 5     |   +0.81%       | 0.87    | 1.51  |
| TPCDS(10) | TPCDS-Q49   | parquet / none / none | 1.49   | 1.48        |   +0.46%   | * 36.02% * | * 34.95% *     | 5     |   +1.26%       | 0.58    | 0.02  |
| TPCDS(10) | TPCDS-Q14-2 | parquet / none / none | 3.76   | 3.75        |   +0.39%   |   1.67%    |   0.58%        | 5     |   -0.03%       | -0.58   | 0.49  |
| TPCDS(10) | TPCDS-Q78   | parquet / none / none | 2.80   | 2.80        |   -0.04%   |   1.32%    |   1.33%        | 5     |   -0.42%       | -0.29   | -0.05 |
| TPCDS(10) | TPCDS-Q80A  | parquet / none / none | 2.87   | 2.89        |   -0.51%   |   1.33%    |   0.40%        | 5     |   -0.01%       | -0.29   | -0.82 |
| TPCDS(10) | TPCDS-Q3    | parquet / none / none | 0.18   | 0.19        |   -1.29%   | * 15.26% * | * 15.87% *     | 5     |   -0.54%       | -0.87   | -0.13 |
| TPCDS(10) | TPCDS-Q76   | parquet / none / none | 1.08   | 1.11        |   -2.98%   |   0.92%    |   1.70%        | 5     |   -3.99%       | -2.02   | -3.47 |
+-----------+-------------+-----------------------+--------+-------------+------------+------------+----------------+-------+----------------+---------+-------+

Testing:
- Pass PlannerTest.testProcessingCost
- Pass test_executor_groups.py
- Reenable test_tpcds_q51a in TestTpcdsQueryWithProcessingCost with
  MAX_FRAGMENT_INSTANCES_PER_NODE set to 5
- Pass test_tpcds_queries.py::TestTpcdsQueryWithProcessingCost
- Pass core tests

Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
---
M be/src/exec/blocking-join-node.h
M be/src/exec/data-sink.h
M be/src/exec/exec-node.cc
M be/src/exec/exec-node.h
M be/src/exec/hdfs-scanner.h
M be/src/exec/scan-node.cc
M be/src/exec/scan-node.h
M be/src/scheduling/scheduler.cc
M be/src/scheduling/scheduler.h
M be/src/service/query-options-test.cc
M be/src/service/query-options.cc
M be/src/service/query-options.h
M common/thrift/ImpalaService.thrift
M common/thrift/Planner.thrift
M common/thrift/Query.thrift
M fe/src/main/java/org/apache/impala/analysis/Analyzer.java
M fe/src/main/java/org/apache/impala/planner/AggregationNode.java
M fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java
M fe/src/main/java/org/apache/impala/planner/EmptySetNode.java
M fe/src/main/java/org/apache/impala/planner/HashJoinNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
M fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
M fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
M fe/src/main/java/org/apache/impala/planner/PlanFragment.java
M fe/src/main/java/org/apache/impala/planner/PlanNode.java
M fe/src/main/java/org/apache/impala/planner/Planner.java
M fe/src/main/java/org/apache/impala/planner/ProcessingCost.java
M fe/src/main/java/org/apache/impala/planner/ScanNode.java
M fe/src/main/java/org/apache/impala/planner/UnionNode.java
M fe/src/main/java/org/apache/impala/service/Frontend.java
M fe/src/test/java/org/apache/impala/planner/PlannerTest.java
M fe/src/test/resources/llama-site-3-groups.xml
M testdata/workloads/functional-planner/queries/PlannerTest/tpcds-processing-cost.test
M tests/custom_cluster/test_executor_groups.py
M tests/query_test/test_insert.py
M tests/query_test/test_tpcds_queries.py
36 files changed, 1,439 insertions(+), 1,194 deletions(-)


  git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/07/19807/6
-- 
To view, visit http://gerrit.cloudera.org:8080/19807
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: Impala-ASF
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: If948e45455275d9a61a6cd5d6a30a8b98a7c729a
Gerrit-Change-Number: 19807
Gerrit-PatchSet: 6
Gerrit-Owner: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Csaba Ringhofer <cs...@cloudera.com>
Gerrit-Reviewer: Impala Public Jenkins <im...@cloudera.com>
Gerrit-Reviewer: Kurt Deschler <kd...@cloudera.com>
Gerrit-Reviewer: Quanlong Huang <hu...@gmail.com>
Gerrit-Reviewer: Riza Suminto <ri...@cloudera.com>
Gerrit-Reviewer: Wenzhe Zhou <wz...@cloudera.com>