You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues-all@impala.apache.org by "ASF subversion and git services (Jira)" <ji...@apache.org> on 2023/12/20 17:52:00 UTC

[jira] [Commented] (IMPALA-4400) Aggregate runtime filters locally

    [ https://issues.apache.org/jira/browse/IMPALA-4400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17799117#comment-17799117 ] 

ASF subversion and git services commented on IMPALA-4400:
---------------------------------------------------------

Commit 172925bcb7044deec562dcb8867798df57f8a6ea in impala's branch refs/heads/master from Riza Suminto
[ https://gitbox.apache.org/repos/asf?p=impala.git;h=172925bcb ]

IMPALA-3825: Delegate runtime filter aggregation to some executors

IMPALA-4400 improve the runtime filter by aggregating runtime filters
locally before sending filter update to the coordinator and sharing a
single RuntimeFilterBank for all fragment instances in a query. However,
local filter aggregation is still insufficient if the number of nodes in
an impala cluster is large. For example, in a cluster of around 700
impalad backends, aggregation of 1 MB bloom filter updates in the
coordinator can exceed more than 1 second.

This patch aims to reduce coordinator load and speed up runtime filter
aggregation by doing intermediate aggregation in a few designated impala
backends before doing final aggregation and publishing in the
coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
to control this feature. Given N as the number of backend executors
excluding the coordinator, the selected number of intermediate
aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
aggregator feature. In the backend scheduler, M impalad will be selected
randomly as the intermediate aggregator for that runtime filter.
Information of this M selected impalad then passed from the scheduler to
coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
converts the RuntimeFilterAggregatorInfoPB into a filter routing
information TRuntimeFilterAggDesc that is piggy-backed in
TRuntimeFilterSource.

A new RPC endpoint named UpdateFilterFromRemote is added in
data_stream_service.proto to handle filter updates from fellow impalad
executor to the designated aggregator impalad. This RPC will merge
filter updates into 'pending_remote_filter'. The intermediate aggregator
will then combine 'pending_remote_filter' with
'pending_merge_filter' (from local aggregation) into 'result_filter'
which is then sent to the coordinator. RuntimeFilterBank of the
intermediate aggregator will wait for all remote filter updates for at
least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
marked as ALWAYS_TRUE and sent to the coordinator.

This patch currently targets the bloom filter produced by partitioned
join build only. Another kind of runtime filter is still efficient to
aggregate in coordinator only, while the bloom filter from broadcast
join only requires 1 valid filter update for publishing.

test_runtime_filters.py is modified to clarify the exec_options
dimension, test matrix constraints, and reduce pytest.skip() calls on
each test. runtime_filters.test is also changed to use counter
aggregation and assert on ExecSummary table so that they stay valid
irrespective of the number of fragment instances.

We benchmark the aggregation speed of 1 MB runtime filter aggregation on
20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
local aggregation, simulating 720 runtime filter updates. The speed is
approximated as the duration between the earliest time a filter update
is made and the time that the coordinator publishes the complete filter.
The result is following:

+---------------------+------------------------+
| num aggregator node | Aggregation speed (ms) |
+---------------------+------------------------+
|                   0 |                   1296 |
|                   1 |                   1229 |
|                   2 |                    608 |
|                   4 |                    329 |
|                   8 |                    205 |
+---------------------+------------------------+

Testing:
- Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
  test_runtime_filters.py and query-options-test.cc
- Add TestRuntimeFiltersLateRemoteUpdate.
- Add custom_cluster/test_runtime_filter_aggregation.py.
- Pass exhaustive tests.

Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
Reviewed-on: http://gerrit.cloudera.org:8080/20612
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


> Aggregate runtime filters locally
> ---------------------------------
>
>                 Key: IMPALA-4400
>                 URL: https://issues.apache.org/jira/browse/IMPALA-4400
>             Project: IMPALA
>          Issue Type: Improvement
>          Components: Backend
>    Affects Versions: Impala 2.8.0
>            Reporter: Marcel Kinard
>            Assignee: Tim Armstrong
>            Priority: Major
>              Labels: multithreading
>             Fix For: Impala 3.4.0
>
>
> At the moment, runtime filters are sent from each fragment instance directly to the coordinator for aggregation (ORing) at the coordinator.
> With multi-threaded execution, we will have an order of magnitude more fragment instances per node, at which point the coordinator would become a bottleneck during the aggregation process. To avoid that, we need to aggregate the local instances' runtime filters at each node before sending the filter off to the coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscribe@impala.apache.org
For additional commands, e-mail: issues-all-help@impala.apache.org