You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues-all@impala.apache.org by "ASF subversion and git services (Jira)" <ji...@apache.org> on 2023/03/09 14:14:00 UTC

[jira] [Commented] (IMPALA-11604) Planner changes for CPU usage

    [ https://issues.apache.org/jira/browse/IMPALA-11604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17698399#comment-17698399 ] 

ASF subversion and git services commented on IMPALA-11604:
----------------------------------------------------------

Commit dafc0fb7a86273e1d5f3550e5d6f8cb0f42b05fc in impala's branch refs/heads/master from Riza Suminto
[ https://gitbox.apache.org/repos/asf?p=impala.git;h=dafc0fb7a ]

IMPALA-11604 (part 2): Compute Effective Parallelism of Query

Part 1 of IMPALA-11604 implements the ProcessingCost model for each
PlanNode and DataSink. This second part builds on top of ProcessingCost
model by adjusting the number of instances for each fragment after
considering their production-consumption ratio, and then finally returns
a number representing an ideal CPU core count required for a query to
run efficiently. A more detailed explanation of the CPU costing
algorithm can be found in the three steps below.

I. Compute the total ProcessingCost of a fragment.

The costing algorithm splits a query fragment into several segments
divided by blocking PlanNode/DataSink boundary. Each fragment segment is
a subtree of PlanNodes/DataSink in the fragment with a DataSink or
blocking PlanNode as root and non-blocking leaves. All other nodes in
the segment are non-blocking. PlanNodes or DataSink that belong to the
same segment will have their ProcessingCost summed. A new CostingSegment
class is added to represent this segment.

A fragment that has a blocking PlanNode or blocking DataSink is called a
blocking fragment. Currently, only JoinBuildSink is considered as
blocking DataSink. A fragment without any blocking nodes is called a
non-blocking fragment. Step III discuss further about blocking and
non-blocking fragment.

Take an example of the following fragment plant, which is blocking since
it has 3 blocking PlanNode: 12:AGGREGATE, 06:SORT, and 08:TOP-N.

  F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=6 (adjusted from 12)
  fragment-costs=[34974657, 2159270, 23752870, 22]
  08:TOP-N [LIMIT=100]
  |  cost=900
  |
  07:ANALYTIC
  |  cost=23751970
  |
  06:SORT
  |  cost=2159270
  |
  12:AGGREGATE [FINALIZE]
  |  cost=34548320
  |
  11:EXCHANGE [HASH(i_class)]
     cost=426337

In bottom-up direction, there exist four segments in F03:
  Blocking segment 1: (11:EXCHANGE, 12:AGGREGATE)
  Blocking segment 2: 06:SORT
  Blocking segment 3: (07:ANALYTIC, 08:TOP-N)
  Non-blocking segment 4: DataStreamSink of F03

Therefore we have:
  PC(segment 1) = 426337+34548320
  PC(segment 2) = 2159270
  PC(segment 3) = 23751970+900
  PC(segment 4) = 22

These per-segment costs stored in a CostingSegment tree rooted at
PlanFragment.rootSegment_, and are [34974657, 2159270, 23752870, 22]
respectively after the post-order traversal.

This is implemented in PlanFragment.computeCostingSegment() and
PlanFragment.collectCostingSegmentHelper().

II. Compute the effective degree of parallelism (EDoP) of fragments.

The costing algorithm walks PlanFragments of the query plan tree in
post-order traversal. Upon visiting a PlanFragment, the costing
algorithm attempts to adjust the number of instances (effective
parallelism) of that fragment by comparing the last segment's
ProcessingCost of its child and production-consumption rate between its
adjacent segments from step I. To simplify this initial implementation,
the parallelism of PlanFragment containing EmptySetNode, UnionNode, or
ScanNode will remain unchanged (follow MT_DOP).

This step is implemented at PlanFragment.traverseEffectiveParallelism().

III. Compute the EDoP of the query.

Effective parallelism of a query is the maximum upper bound of CPU core
count that can parallelly work on a query when considering the
overlapping between fragment execution and blocking operators. We
compute this in a similar post-order traversal as step II and split the
query tree into blocking fragment subtrees similar to step I. The
following is an example of a query plan from TPCDS-Q12.

  F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
  PLAN-ROOT SINK
  |
  13:MERGING-EXCHANGE [UNPARTITIONED]
  |
  F03:PLAN FRAGMENT [HASH(i_class)] hosts=3 instances=3 (adjusted from 12)
  08:TOP-N [LIMIT=100]
  |
  07:ANALYTIC
  |
  06:SORT
  |
  12:AGGREGATE [FINALIZE]
  |
  11:EXCHANGE [HASH(i_class)]
  |
  F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=12
  05:AGGREGATE [STREAMING]
  |
  04:HASH JOIN [INNER JOIN, BROADCAST]
  |
  |--F05:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
  |  JOIN BUILD
  |  |
  |  10:EXCHANGE [BROADCAST]
  |  |
  |  F02:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
  |  02:SCAN HDFS [tpcds10_parquet.date_dim, RANDOM]
  |
  03:HASH JOIN [INNER JOIN, BROADCAST]
  |
  |--F06:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
  |  JOIN BUILD
  |  |
  |  09:EXCHANGE [BROADCAST]
  |  |
  |  F01:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
  |  01:SCAN HDFS [tpcds10_parquet.item, RANDOM]
  |
  00:SCAN HDFS [tpcds10_parquet.web_sales, RANDOM]

A blocking fragment is a fragment that has a blocking PlanNode or
blocking DataSink in it. The costing algorithm splits the query plan
tree into blocking subtrees divided by blocking fragment boundary. Each
blocking subtree has a blocking fragment as a root and non-blocking
fragments as the intermediate or leaf nodes. From the TPCDS-Q12 example
above, the query plan is divided into five blocking subtrees of
[(F05, F02), (F06, F01), F00, F03, F04].

A CoreCount is a container class that represents the CPU core
requirement of a subtree of a query or the query itself. Each blocking
subtree will have its fragment's adjusted instance count summed into a
single CoreCount. This means that all fragments within a blocking
subtree can run in parallel and should be assigned one core per fragment
instance. The CoreCount for each blocking subtree in the TPCDS-Q12
example is [4, 4, 12, 3, 1].

Upon visiting a blocking fragment, the maximum between current
CoreCount (rooted at that blocking fragment) vs previous blocking
subtrees CoreCount is taken and the algorithm continues up to the next
ancestor PlanFragment. The final CoreCount for the TPCDS-Q12 example is
12.

This step is implemented at Planner.computeBlockingAwareCores() and
PlanFragment.traverseBlockingAwareCores().

The resulting CoreCount at the root PlanFragment is then taken as the
ideal CPU core count / EDoP of the query. This number will be compared
against the total CPU count of an Impala executor group to determine if
it fits to run in that set or not. A backend flag
query_cpu_count_divisor is added to help scale down/up the EDoP of a
query if needed.

Two query options are added to control the entire computation of EDoP.
1. COMPUTE_PROCESSING_COST
   Control whether to enable this CPU costing algorithm or not.
   Must also set MT_DOP > 0 for this query option to take effect.

2. PROCESSING_COST_MIN_THREADS
   Control the minimum number of fragment instances (threads) that the
   costing algorithm is allowed to adjust. The costing algorithm is in
   charge of increasing the fragment's instance count beyond this
   minimum number through producer-consumer rate comparison. The maximum
   number of fragment is max between PROCESSING_COST_MIN_THREADS,
   MT_DOP, and number of cores per executor.

This patch also adds three backend flags to tune the algorithm.
1. query_cpu_count_divisor
   Divide the CPU requirement of a query to fit the total available CPU
   in the executor group. For example, setting value 2 will fit the
   query with CPU requirement 2X to an executor group with total
   available CPU X. Note that setting with a fractional value less than
   1 effectively multiplies the query CPU requirement. A valid value is
   > 0.0. The default value is 1.

2. processing_cost_use_equal_expr_weight
   If true, all expression evaluations are weighted equally to 1 during
   the plan node's processing cost calculation. If false, expression
   cost from IMPALA-2805 will be used. Default to true.

3. min_processing_per_thread
   Minimum processing load (in processing cost unit) that a fragment
   instance needs to work on before planner considers increasing
   instance count based on the processing cost rather than the MT_DOP
   setting. The decision is per fragment. Setting this to high number
   will reduce parallelism of a fragment (more workload per fragment),
   while setting to low number will increase parallelism (less workload
   per fragment). Actual parallelism might still be constrained by the
   total number of cores in selected executor group, MT_DOP, or
   PROCESSING_COST_MIN_THREAD query option. Must be a positive integer.
   Currently default to 10M.

As an example, the following are additional ProcessingCost information
printed to coordinator log for Q3, Q12, and Q15 ran on TPCDS 10GB scale,
3 executors, MT_DOP=4, PROCESSING_COST_MAX_THREADS=4, and
processing_cost_use_equal_expr_weight=false.

  Q3
  CoreCount={total=12 trace=F00:12}

  Q12
  CoreCount={total=12 trace=F00:12}

  Q15
  CoreCount={total=15 trace=N07:3+F00:12}

There are a few TODOs which will be done in follow up tasks:
1. Factor in row width in ProcessingCost calcuation (IMPALA-11972).
2. Tune the individual expression cost from IMPALA-2805.
3. Benchmark and tune min_processing_per_thread with an optimal value.
4. Revisit cases where cardinality is not available (getCardinality() or
   getInputCardinality() return -1).
5. Bound SCAN and UNION fragments by ProcessingCost as well (need to
   address IMPALA-8081).

Testing:
- Add TestTpcdsQueryWithProcessingCost, which is a similar run of
  TestTpcdsQuery, but with COMPUTE_PROCESSING_COST=1 and MT_DOP=4.
  Setting log level TRACE for PlanFragment and manually running
  TestTpcdsQueryWithProcessingCost in minicluster shows several fragment
  instance count reduction from 12 to either of 9, 6, or 3 in
  coordinator log.
- Add PlannerTest#testProcessingCost
  Adjusted fragment count is indicated by "(adjusted from 12)" in the
  query profile.
- Add TestExecutorGroups::test_query_cpu_count_divisor.

Co-authored-by: Qifan Chen <qc...@cloudera.com>

Change-Id: Ibb2a796fdf78336e95991955d89c671ec82be62e
Reviewed-on: http://gerrit.cloudera.org:8080/19593
Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Kurt Deschler <kd...@cloudera.com>
Reviewed-by: Riza Suminto <ri...@cloudera.com>


> Planner changes for CPU usage
> -----------------------------
>
>                 Key: IMPALA-11604
>                 URL: https://issues.apache.org/jira/browse/IMPALA-11604
>             Project: IMPALA
>          Issue Type: Improvement
>            Reporter: Qifan Chen
>            Assignee: Riza Suminto
>            Priority: Major
>             Fix For: Impala 4.3.0
>
>
> Plan scaling based on estimated peak memory has been enabled in 
> IMPALA-10992. However, it is sometime desirable to consider CPU-usage (such as the number of data processed) as a scaling factor. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscribe@impala.apache.org
For additional commands, e-mail: issues-all-help@impala.apache.org