You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/12/20 17:51:32 UTC

(impala) branch master updated (012996a06 -> 172925bcb)

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 012996a06 IMPALA-12597: Basic Equality delete read support for Iceberg tables
     new 4417fbccc IMPALA-12205: Add support to STRUCT type Iceberg Metadata table columns
     new 172925bcb IMPALA-3825: Delegate runtime filter aggregation to some executors

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/logging.h                            |   2 +
 .../iceberg-metadata/iceberg-metadata-scan-node.cc |  73 +++-
 .../iceberg-metadata/iceberg-metadata-scan-node.h  |  26 +-
 be/src/exec/iceberg-metadata/iceberg-row-reader.cc |  37 +-
 be/src/exec/iceberg-metadata/iceberg-row-reader.h  |  19 +-
 be/src/runtime/coordinator.cc                      |  59 ++-
 be/src/runtime/data-stream-test.cc                 |   3 +
 be/src/runtime/query-state.cc                      |  32 +-
 be/src/runtime/query-state.h                       |   5 +
 be/src/runtime/runtime-filter-bank.cc              | 465 ++++++++++++++++++---
 be/src/runtime/runtime-filter-bank.h               | 105 ++++-
 be/src/runtime/runtime-filter.cc                   |  11 +-
 be/src/runtime/runtime-filter.h                    |  35 +-
 be/src/scheduling/scheduler.cc                     | 103 ++++-
 be/src/scheduling/scheduler.h                      |   6 +
 be/src/service/data-stream-service.cc              |  22 +
 be/src/service/data-stream-service.h               |   5 +
 be/src/service/query-options-test.cc               |   2 +
 be/src/service/query-options.cc                    |   9 +-
 be/src/service/query-options.h                     |   4 +-
 be/src/util/bloom-filter.cc                        |  24 +-
 be/src/util/bloom-filter.h                         |  19 +-
 be/src/util/network-util.h                         |   6 +
 be/src/util/runtime-profile-counters.h             |   3 +-
 common/protobuf/admission_control_service.proto    |  26 ++
 common/protobuf/data_stream_service.proto          |   4 +
 common/thrift/ImpalaInternalService.thrift         |  14 +
 common/thrift/ImpalaService.thrift                 |   7 +
 common/thrift/Query.thrift                         |   3 +
 .../org/apache/impala/analysis/FromClause.java     |  13 +
 .../java/org/apache/impala/analysis/SlotRef.java   |  43 +-
 .../catalog/iceberg/IcebergMetadataTable.java      |  20 +-
 .../org/apache/impala/planner/PlanFragment.java    |   7 +
 .../impala/planner/RuntimeFilterGenerator.java     |   1 +
 .../apache/impala/util/IcebergMetadataScanner.java |   6 +-
 .../PlannerTest/iceberg-metadata-table-scan.test   |  36 --
 .../queries/QueryTest/iceberg-metadata-tables.test |  97 +++++
 .../queries/QueryTest/runtime_filters.test         |  76 ++--
 .../QueryTest/runtime_row_filter_reservations.test |  38 ++
 .../queries/QueryTest/runtime_row_filters.test     |   6 +-
 tests/common/impala_test_suite.py                  |   4 +
 tests/common/test_vector.py                        |  12 +
 .../test_runtime_filter_aggregation.py             |  62 +++
 tests/query_test/test_iceberg.py                   |   4 +-
 tests/query_test/test_runtime_filters.py           | 216 +++++++---
 45 files changed, 1484 insertions(+), 286 deletions(-)
 create mode 100644 tests/custom_cluster/test_runtime_filter_aggregation.py


(impala) 02/02: IMPALA-3825: Delegate runtime filter aggregation to some executors

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 172925bcb7044deec562dcb8867798df57f8a6ea
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Mon Oct 2 15:32:41 2023 -0700

    IMPALA-3825: Delegate runtime filter aggregation to some executors
    
    IMPALA-4400 improve the runtime filter by aggregating runtime filters
    locally before sending filter update to the coordinator and sharing a
    single RuntimeFilterBank for all fragment instances in a query. However,
    local filter aggregation is still insufficient if the number of nodes in
    an impala cluster is large. For example, in a cluster of around 700
    impalad backends, aggregation of 1 MB bloom filter updates in the
    coordinator can exceed more than 1 second.
    
    This patch aims to reduce coordinator load and speed up runtime filter
    aggregation by doing intermediate aggregation in a few designated impala
    backends before doing final aggregation and publishing in the
    coordinator. Query option MAX_NUM_FILTERS_AGGREGATED_PER_HOST is added
    to control this feature. Given N as the number of backend executors
    excluding the coordinator, the selected number of intermediate
    aggregators M = ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting
    MAX_NUM_FILTERS_AGGREGATED_PER_HOST <= 1 will disable the intermediate
    aggregator feature. In the backend scheduler, M impalad will be selected
    randomly as the intermediate aggregator for that runtime filter.
    Information of this M selected impalad then passed from the scheduler to
    coordinator as a RuntimeFilterAggregatorInfoPB. The coordinator then
    converts the RuntimeFilterAggregatorInfoPB into a filter routing
    information TRuntimeFilterAggDesc that is piggy-backed in
    TRuntimeFilterSource.
    
    A new RPC endpoint named UpdateFilterFromRemote is added in
    data_stream_service.proto to handle filter updates from fellow impalad
    executor to the designated aggregator impalad. This RPC will merge
    filter updates into 'pending_remote_filter'. The intermediate aggregator
    will then combine 'pending_remote_filter' with
    'pending_merge_filter' (from local aggregation) into 'result_filter'
    which is then sent to the coordinator. RuntimeFilterBank of the
    intermediate aggregator will wait for all remote filter updates for at
    least RUNTIME_FILTER_WAIT_TIME_MS. If RuntimeFilterBank is closing and
    RUNTIME_FILTER_WAIT_TIME_MS has passed, any incomplete filter will be
    marked as ALWAYS_TRUE and sent to the coordinator.
    
    This patch currently targets the bloom filter produced by partitioned
    join build only. Another kind of runtime filter is still efficient to
    aggregate in coordinator only, while the bloom filter from broadcast
    join only requires 1 valid filter update for publishing.
    
    test_runtime_filters.py is modified to clarify the exec_options
    dimension, test matrix constraints, and reduce pytest.skip() calls on
    each test. runtime_filters.test is also changed to use counter
    aggregation and assert on ExecSummary table so that they stay valid
    irrespective of the number of fragment instances.
    
    We benchmark the aggregation speed of 1 MB runtime filter aggregation on
    20 executor nodes cluster with MT_DOP=36 that is instrumented to disable
    local aggregation, simulating 720 runtime filter updates. The speed is
    approximated as the duration between the earliest time a filter update
    is made and the time that the coordinator publishes the complete filter.
    The result is following:
    
    +---------------------+------------------------+
    | num aggregator node | Aggregation speed (ms) |
    +---------------------+------------------------+
    |                   0 |                   1296 |
    |                   1 |                   1229 |
    |                   2 |                    608 |
    |                   4 |                    329 |
    |                   8 |                    205 |
    +---------------------+------------------------+
    
    Testing:
    - Exercise MAX_NUM_FILTERS_AGGREGATED_PER_HOST in
      test_runtime_filters.py and query-options-test.cc
    - Add TestRuntimeFiltersLateRemoteUpdate.
    - Add custom_cluster/test_runtime_filter_aggregation.py.
    - Pass exhaustive tests.
    
    Change-Id: I11d38ed0f223d6e5b32a19ebe725af7738ee4ab0
    Reviewed-on: http://gerrit.cloudera.org:8080/20612
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/logging.h                            |   2 +
 be/src/runtime/coordinator.cc                      |  59 ++-
 be/src/runtime/data-stream-test.cc                 |   3 +
 be/src/runtime/query-state.cc                      |  32 +-
 be/src/runtime/query-state.h                       |   5 +
 be/src/runtime/runtime-filter-bank.cc              | 465 ++++++++++++++++++---
 be/src/runtime/runtime-filter-bank.h               | 105 ++++-
 be/src/runtime/runtime-filter.cc                   |  11 +-
 be/src/runtime/runtime-filter.h                    |  35 +-
 be/src/scheduling/scheduler.cc                     | 103 ++++-
 be/src/scheduling/scheduler.h                      |   6 +
 be/src/service/data-stream-service.cc              |  22 +
 be/src/service/data-stream-service.h               |   5 +
 be/src/service/query-options-test.cc               |   2 +
 be/src/service/query-options.cc                    |   9 +-
 be/src/service/query-options.h                     |   4 +-
 be/src/util/bloom-filter.cc                        |  24 +-
 be/src/util/bloom-filter.h                         |  19 +-
 be/src/util/network-util.h                         |   6 +
 be/src/util/runtime-profile-counters.h             |   3 +-
 common/protobuf/admission_control_service.proto    |  26 ++
 common/protobuf/data_stream_service.proto          |   4 +
 common/thrift/ImpalaInternalService.thrift         |  14 +
 common/thrift/ImpalaService.thrift                 |   7 +
 common/thrift/Query.thrift                         |   3 +
 .../org/apache/impala/planner/PlanFragment.java    |   7 +
 .../impala/planner/RuntimeFilterGenerator.java     |   1 +
 .../queries/QueryTest/runtime_filters.test         |  76 ++--
 .../QueryTest/runtime_row_filter_reservations.test |  38 ++
 .../queries/QueryTest/runtime_row_filters.test     |   6 +-
 tests/common/impala_test_suite.py                  |   4 +
 tests/common/test_vector.py                        |  12 +
 .../test_runtime_filter_aggregation.py             |  62 +++
 tests/query_test/test_runtime_filters.py           | 216 +++++++---
 34 files changed, 1204 insertions(+), 192 deletions(-)

diff --git a/be/src/common/logging.h b/be/src/common/logging.h
index 31fd42f7c..c39cfd8dc 100644
--- a/be/src/common/logging.h
+++ b/be/src/common/logging.h
@@ -54,6 +54,7 @@
 #define VLOG_FILE       VLOG(2)
 #define VLOG_ROW        VLOG(3)
 #define VLOG_PROGRESS   VLOG(2)
+#define VLOG_FILTER     VLOG(3)
 
 #define VLOG_CONNECTION_IS_ON VLOG_IS_ON(1)
 #define VLOG_RPC_IS_ON VLOG_IS_ON(2)
@@ -61,6 +62,7 @@
 #define VLOG_FILE_IS_ON VLOG_IS_ON(2)
 #define VLOG_ROW_IS_ON VLOG_IS_ON(3)
 #define VLOG_PROGRESS_IS_ON VLOG_IS_ON(2)
+#define VLOG_FILTER_IS_ON VLOG_IS_ON(3)
 
 // Define a range check macro to test x in the inclusive range from low to high.
 #define DCHECK_IN_RANGE(x, low, high) \
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 20a97498a..f8fae8582 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -423,13 +423,6 @@ void Coordinator::AddFilterSource(const FragmentExecParamsPB& src_fragment_param
     int num_instances, int num_backends, const TRuntimeFilterDesc& filter,
     int join_node_id) {
   FilterState* f = filter_routing_table_->GetOrCreateFilterState(filter);
-  // Set the 'pending_count_' to zero to indicate that for a filter with
-  // local-only targets the coordinator does not expect to receive any filter
-  // updates. We expect to receive a single aggregated filter from each backend
-  // for partitioned joins.
-  int pending_count = filter.is_broadcast_join
-      ? (filter.has_remote_targets ? 1 : 0) : num_backends;
-  f->set_pending_count(pending_count);
 
   // Determine which instances will produce the filters.
   // TODO: IMPALA-9333: having a shared RuntimeFilterBank between all fragments on
@@ -457,10 +450,60 @@ void Coordinator::AddFilterSource(const FragmentExecParamsPB& src_fragment_param
     random_shuffle(src_idxs.begin(), src_idxs.end());
     src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS);
   }
-  for (int src_idx : src_idxs) {
+
+  bool has_intermediate_aggregator = src_fragment_params.has_filter_agg_info()
+      && !filter.has_local_targets && !filter.is_broadcast_join
+      && filter.type == TRuntimeFilterType::BLOOM;
+
+  if (has_intermediate_aggregator) {
+    const RuntimeFilterAggregatorInfoPB& agg_info = src_fragment_params.filter_agg_info();
+    // Set the 'pending_count_' to num_aggregators from RuntimeFilterAggregatorInfoPB.
+    int num_agg = agg_info.num_aggregators();
+    DCHECK_EQ(
+        src_fragment_params.instances_size(), agg_info.aggregator_idx_to_report_size());
+    DCHECK_EQ(num_agg, agg_info.aggregator_krpc_addresses_size());
+    DCHECK_EQ(num_agg, agg_info.aggregator_krpc_backends_size());
+    DCHECK_EQ(num_agg, agg_info.num_reporter_per_aggregator_size());
+    for (int i = 0; i < num_agg; i++) {
+      VLOG(2) << "Filter " << filter.filter_id << " backend aggregator " << (i + 1)
+              << " krpc_address=" << agg_info.aggregator_krpc_addresses(i)
+              << " krpc_backend=" << agg_info.aggregator_krpc_backends(i);
+    }
+    f->set_pending_count(num_agg);
+  } else {
+    // Set the 'pending_count_' to zero to indicate that for a filter with
+    // local-only targets the coordinator does not expect to receive any filter
+    // updates. We expect to receive a single aggregated filter from each backend
+    // for partitioned joins.
+    int pending_count =
+        filter.is_broadcast_join ? (filter.has_remote_targets ? 1 : 0) : num_backends;
+    f->set_pending_count(pending_count);
+  }
+
+  for (int i = 0; i < src_idxs.size(); i++) {
+    int src_idx = src_idxs[i];
     TRuntimeFilterSource filter_src;
     filter_src.src_node_id = join_node_id;
     filter_src.filter_id = filter.filter_id;
+    if (has_intermediate_aggregator) {
+      // Find target aggregator for fragment instance i.
+      const RuntimeFilterAggregatorInfoPB& agg_info =
+          src_fragment_params.filter_agg_info();
+      int agg_idx = agg_info.aggregator_idx_to_report(i);
+      string agg_hostname = agg_info.aggregator_krpc_addresses(agg_idx).hostname();
+      int num_reporter = agg_info.num_reporter_per_aggregator(agg_idx);
+      TNetworkAddress agg_address =
+          FromNetworkAddressPB(agg_info.aggregator_krpc_backends(agg_idx));
+
+      // Populate TRuntimeFilterAggDesc for fragment instance i.
+      TRuntimeFilterAggDesc agg_desc;
+      agg_desc.__set_krpc_hostname(agg_hostname);
+      agg_desc.__set_krpc_address(agg_address);
+      agg_desc.__set_num_reporting_hosts(num_reporter);
+      filter_src.__set_aggregator_desc(agg_desc);
+      VLOG(3) << "Instance " << src_fragment_params.instances(i)
+              << " report to backend aggregator " << (agg_idx + 1);
+    }
     filter_routing_table_->finstance_filters_produced[src_idx].emplace_back(
         filter_src);
   }
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index a41d3436b..e78c088b3 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -130,6 +130,9 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
   virtual void UpdateFilter(
       const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp, RpcContext* context) {}
 
+  virtual void UpdateFilterFromRemote(
+      const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp, RpcContext* context) {}
+
   virtual void PublishFilter(const PublishFilterParamsPB* req,
       PublishFilterResultPB* resp, RpcContext* context) {}
 
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 4417aa589..bf61b26e3 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -379,6 +379,7 @@ bool VerifyFiltersProduced(const vector<TPlanFragmentInstanceCtx>& instance_ctxs
 }
 
 Status QueryState::InitFilterBank() {
+  const NetworkAddressPB& this_krpc_address = ExecEnv::GetInstance()->krpc_address();
   int64_t runtime_filters_reservation_bytes = 0;
   int fragment_ctx_idx = -1;
   const vector<TPlanFragment>& fragments = fragment_info_.fragments;
@@ -428,9 +429,32 @@ Status QueryState::InitFilterBank() {
     for (const TRuntimeFilterSource& produced_filter : instance_ctx.filters_produced) {
       auto it = filters.find(produced_filter.filter_id);
       DCHECK(it != filters.end());
-      ++it->second.num_producers;
+      FilterRegistration& reg = it->second;
+      ++reg.num_producers;
+
+      if (produced_filter.__isset.aggregator_desc) {
+        TRuntimeFilterAggDesc agg_desc = produced_filter.aggregator_desc;
+        if (reg.need_subaggregation) {
+          // This filter registration is already set before.
+          // Do sanity check to make sure it match with the rest of TRuntimeFilterSource.
+          DCHECK_EQ(reg.num_reporting_hosts, agg_desc.num_reporting_hosts);
+          DCHECK_EQ(reg.krpc_hostname_to_report, agg_desc.krpc_hostname);
+          DCHECK_EQ(reg.krpc_backend_to_report, agg_desc.krpc_address);
+        } else {
+          // This filter bank is a backend aggregator for 'filter'.
+          reg.need_subaggregation = true;
+          reg.num_reporting_hosts = agg_desc.num_reporting_hosts;
+          reg.krpc_hostname_to_report = agg_desc.krpc_hostname;
+          reg.krpc_backend_to_report = agg_desc.krpc_address;
+          if (KrpcAddressEqual(
+                  this_krpc_address, FromTNetworkAddress(agg_desc.krpc_address))) {
+            reg.is_intermediate_aggregator = true;
+          }
+        }
+      }
     }
   }
+
   filter_bank_.reset(
       new RuntimeFilterBank(this, filters, runtime_filters_reservation_bytes));
   return filter_bank_->ClaimBufferReservation();
@@ -1004,6 +1028,12 @@ void QueryState::PublishFilter(const PublishFilterParamsPB& params, RpcContext*
   filter_bank_->PublishGlobalFilter(params, context);
 }
 
+void QueryState::UpdateFilterFromRemote(
+    const UpdateFilterParamsPB& params, RpcContext* context) {
+  if (!WaitForPrepare().ok()) return;
+  filter_bank_->UpdateFilterFromRemote(params, context);
+}
+
 Status QueryState::StartSpilling(RuntimeState* runtime_state, MemTracker* mem_tracker) {
   // Return an error message with the root cause of why spilling is disabled.
   if (query_options().scratch_limit == 0) {
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 838a78836..15195d64d 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -64,6 +64,7 @@ class ScalarExpr;
 class ScannerMemLimiter;
 class TmpFileGroup;
 class TRuntimeProfileForest;
+class UpdateFilterParamsPB;
 
 /// Central class for all backend execution state (example: the FragmentInstanceStates
 /// of the individual fragment instances) created for a particular query.
@@ -235,6 +236,10 @@ class QueryState {
   /// Blocks until all fragment instances have finished their Prepare phase.
   void PublishFilter(const PublishFilterParamsPB& params, kudu::rpc::RpcContext* context);
 
+  /// Handle filter update that comes from remote backend executor.
+  void UpdateFilterFromRemote(
+      const UpdateFilterParamsPB& params, kudu::rpc::RpcContext* context);
+
   /// Cancels all actively executing fragment instances. Blocks until all fragment
   /// instances have finished their Prepare phase. Idempotent.
   /// For uninitialized QueryState, just set is_cancelled_ and don't need to cancel
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 0752eaecd..e7b149050 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -56,6 +56,8 @@ DEFINE_double(max_filter_error_rate, 0.75, "(Advanced) The target false positive
     "probability used to determine the ideal size for each bloom filter size. This value "
     "can be overriden by the RUNTIME_FILTER_ERROR_RATE query option.");
 
+DECLARE_int32(runtime_filter_wait_time_ms);
+
 const int64_t RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE;
 const int64_t RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE;
 
@@ -80,19 +82,39 @@ RuntimeFilterBank::BuildFilterMap(
   unordered_map<int32_t, unique_ptr<PerFilterState>> result;
   for (auto& entry : filters) {
     const FilterRegistration reg = entry.second;
+    int num_remote_updates = 0;
     RuntimeFilter* result_filter = nullptr;
     RuntimeFilter* consumed_filter = nullptr;
     if (reg.has_consumer) {
-      VLOG(3) << "registered consumer filter " << reg.desc.filter_id;
+      VLOG_FILTER << "registered consumer filter " << reg.desc.filter_id;
       consumed_filter =
           obj_pool->Add(new RuntimeFilter(reg.desc, reg.desc.filter_size_bytes));
     }
     if (reg.num_producers > 0) {
       result_filter =
           obj_pool->Add(new RuntimeFilter(reg.desc, reg.desc.filter_size_bytes));
+      if (reg.need_subaggregation) {
+        DCHECK_EQ(TRuntimeFilterType::BLOOM, reg.desc.type);
+        DCHECK(!reg.desc.is_broadcast_join);
+        VLOG(2) << "Detected intermediate aggregation for filter_id="
+                << reg.desc.filter_id
+                << " need_subaggregation=" << reg.need_subaggregation
+                << " is_intermediate_aggregator=" << reg.is_intermediate_aggregator
+                << " num_reporting_hosts=" << reg.num_reporting_hosts
+                << " krpc_hostname_to_report=" << reg.krpc_hostname_to_report
+                << " krpc_backend_to_report=" << reg.krpc_backend_to_report;
+        if (reg.is_intermediate_aggregator) {
+          DCHECK_GE(reg.num_reporting_hosts, 1);
+          num_remote_updates = reg.num_reporting_hosts - 1;
+        }
+        // Store backend aggregator's address in result_filter.
+        result_filter->SetIntermediateAggregation(reg.is_intermediate_aggregator,
+            reg.krpc_hostname_to_report, FromTNetworkAddress(reg.krpc_backend_to_report));
+      }
     }
     result.emplace(entry.first,
-        make_unique<PerFilterState>(reg.num_producers, result_filter, consumed_filter));
+        make_unique<PerFilterState>(
+            reg.num_producers, num_remote_updates, result_filter, consumed_filter));
   }
   return result;
 }
@@ -132,13 +154,13 @@ RuntimeFilter* RuntimeFilterBank::RegisterConsumer(
   PerFilterState* fs = it->second.get();
   DCHECK(fs->consumed_filter != nullptr)
       << "Consumed filters must be created in constructor";
-  VLOG(3) << "Consumer registered for filter " << filter_desc.filter_id;
+  VLOG_FILTER << "Consumer registered for filter " << filter_desc.filter_id;
   DCHECK_EQ(filter_desc.filter_size_bytes, fs->consumed_filter->filter_size());
   return fs->consumed_filter;
 }
 
-void RuntimeFilterBank::UpdateFilterCompleteCb(
-    const RpcController* rpc_controller, const UpdateFilterResultPB* res) {
+void RuntimeFilterBank::UpdateFilterCompleteCb(const RpcController* rpc_controller,
+    const UpdateFilterResultPB* res, bool is_remote_update) {
   const kudu::Status controller_status = rpc_controller->status();
 
   // In the case of an unsuccessful KRPC call, e.g., request dropped due to
@@ -146,10 +168,16 @@ void RuntimeFilterBank::UpdateFilterCompleteCb(
   // filter is not a query-wide error - the remote fragment will continue
   // regardless.
   if (!controller_status.ok()) {
-    LOG(ERROR) << "UpdateFilter() failed: " << controller_status.message().ToString();
+    LOG(ERROR) << (is_remote_update ? "UpdateFilterFromRemote" : "UpdateFilter")
+               << "() failed: " << controller_status.message().ToString();
+  }
+  if (res->status().status_code() != TErrorCode::OK) {
+    DCHECK(is_remote_update) << "DataStreamService::UpdateFilter() should never set an"
+                             << " error status";
+    Status failed_status(res->status());
+    LOG(ERROR) << "UpdateFilterFromRemote failed with error code "
+               << res->status().status_code() << ". Detail:" << failed_status.GetDetail();
   }
-  // DataStreamService::UpdateFilter() should never set an error status
-  DCHECK_EQ(res->status().status_code(), TErrorCode::OK);
 
   {
     std::unique_lock<SpinLock> l(num_inflight_rpcs_lock_);
@@ -159,17 +187,151 @@ void RuntimeFilterBank::UpdateFilterCompleteCb(
   krpcs_done_cv_.notify_one();
 }
 
-void RuntimeFilterBank::UpdateFilterFromLocal(
-    int32_t filter_id, BloomFilter* bloom_filter, MinMaxFilter* min_max_filter,
+void RuntimeFilterBank::UpdateFilterFromRemote(
+    const UpdateFilterParamsPB& params, RpcContext* context) {
+  VLOG_RPC << "UpdateFilterFromRemote(filter_id=" << params.filter_id() << ")";
+  DCHECK_NE(query_state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
+      << "Should not be calling UpdateFilterFromRemote() if filtering is disabled";
+  DCHECK(params.has_bloom_filter())
+      << "UpdateFilterFromRemote rpc only valid for bloom filter!";
+
+  DebugActionNoFail(query_state_->query_options(), "REMOTE_FILTER_UPDATE_DELAY");
+
+  if (closed_) {
+    // Late RPC might come while filter bank is closing.
+    // Just log and ignore the update in this case.
+    VLOG_RPC << "filter_id=" << params.filter_id() << " ignored late filter update from "
+             << context->requestor_string() << ". RuntimeFilterBank has closed.";
+    return;
+  }
+
+  auto it = filters_.find(params.filter_id());
+  DCHECK(it != filters_.end()) << "Tried to update unregistered filter: "
+                               << params.filter_id();
+  PerFilterState* fs = it->second.get();
+  RuntimeFilter* complete_filter = nullptr; // Set if the filter should be sent out.
+
+  {
+    unique_lock<SpinLock> l(fs->lock);
+    ProducedFilter& produced_filter = fs->produced_filter;
+    RuntimeFilter* result_filter = produced_filter.result_filter;
+    DCHECK_NE(nullptr, result_filter);
+    DCHECK(result_filter->RequireSubAggregation())
+        << "UpdateFilterFromRemote() called for ineligible filter " << params.filter_id();
+    DCHECK(result_filter->is_intermediate_aggregator());
+    if (produced_filter.pending_remotes <= 0) {
+      // Filter bank cleanup might have happened and filter has been sent to coordinator.
+      // Ignore this remote update.
+      VLOG_FILTER << "filter_id=" << params.filter_id()
+                  << " ignored late or redundant UpdateFilterFromRemote RPC."
+                  << " rpc_time_since_registration="
+                  << result_filter->TimeSinceRegistrationMs()
+                  << " always_true=" << result_filter->AlwaysTrue()
+                  << " always_false=" << result_filter->AlwaysFalse();
+      return;
+    }
+
+    if (produced_filter.pending_remote_filter.get() == nullptr) {
+      // This is the first remote update for this runtime filter.
+      // Allocate scratch bloom filter to 'pending_remote_filter' to contain all remote
+      // updates.
+      produced_filter.pending_remote_filter.reset(
+          new RuntimeFilter(result_filter->filter_desc(), result_filter->filter_size()));
+      BloomFilter* bloom_filter = AllocateScratchBloomFilterLocked(&l, fs);
+      if (bloom_filter == nullptr) {
+        // Treat unsuccessful initial allocation as a DCHECK error in DEBUG build
+        // but recover from it in RELEASE build by disabling filter
+        // (setting to ALWAYS_TRUE_FILTER).
+        DCHECK(false) << "Initial buffer for pending_remote_filter should be available!";
+        LOG(ERROR) << "filter_id=" << params.filter_id()
+                   << " cannot allocate scratch bloom filter for pending_remote_filter";
+        bloom_filter = BloomFilter::ALWAYS_TRUE_FILTER;
+      }
+      produced_filter.pending_remote_filter->SetFilter(bloom_filter, nullptr, nullptr);
+    } else {
+      DCHECK_NE(nullptr, produced_filter.pending_remote_filter->get_bloom_filter());
+    }
+
+    RuntimeFilter* pending_remote_filter = produced_filter.pending_remote_filter.get();
+    if (pending_remote_filter->AlwaysTrue()) {
+      // Do nothing since this filter is already disabled.
+    } else if (params.bloom_filter().always_true()) {
+      // An always_true filter is received. We don't need to wait for other pending
+      // backends. Disable filter by setting to an always true filter.
+      VLOG_FILTER << "filter_id=" << params.filter_id()
+                  << " received an always true filter from remote reporter."
+                  << " This filter is now disabled.";
+      pending_remote_filter = DisableBloomFilter(produced_filter.pending_remote_filter);
+    } else if (params.bloom_filter().always_false()) {
+      VLOG_FILTER << "filter_id=" << params.filter_id()
+                  << " received an always false filter update from remote reporter."
+                  << " No merge needed.";
+    } else {
+      VLOG_FILTER << "filter_id=" << params.filter_id() << " received filter update from "
+                  << context->requestor_string();
+      // If the incoming Bloom filter is neither an always true filter nor an
+      // always false filter, then it must be the case that a non-empty sidecar slice
+      // has been received. Refer to BloomFilter::ToProtobuf() for further details.
+      DCHECK(params.bloom_filter().has_directory_sidecar_idx());
+      kudu::Slice sidecar_slice;
+      kudu::Status status = context->GetInboundSidecar(
+          params.bloom_filter().directory_sidecar_idx(), &sidecar_slice);
+      if (!status.ok()) {
+        LOG(ERROR) << "filter_id=" << params.filter_id()
+                   << " cannot get inbound sidecar from remote filter update: "
+                   << status.message().ToString();
+        // Disable filter by setting to an always true filter
+        pending_remote_filter = DisableBloomFilter(produced_filter.pending_remote_filter);
+      } else {
+        BloomFilter* target = pending_remote_filter->get_bloom_filter();
+        VLOG_FILTER << "filter_id=" << params.filter_id() << " received filter to merge."
+                    << " pending_remote_filter.AlwaysFalse="
+                    << pending_remote_filter->AlwaysFalse();
+        target->Or(params.bloom_filter(), sidecar_slice);
+        target->MarkNotAlwaysFalse();
+        DCHECK(!pending_remote_filter->AlwaysFalse());
+      }
+    }
+
+    --produced_filter.pending_remotes;
+    if (pending_remote_filter->AlwaysTrue()) {
+      // There is no point in waiting the remaining remote updates if
+      // pending_remote_filter has turned into ALWAYS_TRUE_FILTER.
+      // Set pending_remotes = 0 to ignore remaining remote updates.
+      // pending_producers remain unchanged to ensure all local updates are collected.
+      // Therefore, both 'pending_merge_filter' and 'pending_remote_filter' will merge
+      // in consistent way.
+      produced_filter.pending_remotes = 0;
+      VLOG(2) << "filter_id=" << params.filter_id()
+              << " turned into ALWAYS_TRUE_FILTER. Set pending_remotes=0.";
+    }
+
+    VLOG(2) << "Updated filter_id=" << params.filter_id()
+            << ". pending_producers=" << produced_filter.pending_producers
+            << " pending_remotes=" << produced_filter.pending_remotes;
+
+    if (produced_filter.IsComplete()) {
+      CombinePeerAndLocalUpdates(&l, produced_filter);
+      complete_filter = result_filter;
+      VLOG_FILTER << "Aggregation of partitioned join filter " << params.filter_id()
+                  << " is complete.";
+    }
+    DCHECK(!produced_filter.IsComplete() || complete_filter != nullptr);
+
+    if (complete_filter != nullptr) {
+      DistributeCompleteFilter(&l, fs, complete_filter);
+    }
+  }
+}
+
+void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
+    BloomFilter* bloom_filter, MinMaxFilter* min_max_filter,
     InListFilter* in_list_filter) {
   DCHECK_NE(query_state_->query_options().runtime_filter_mode, TRuntimeFilterMode::OFF)
       << "Should not be calling UpdateFilterFromLocal() if filtering is disabled";
   // This function is only called from ExecNode::Open() or more specifically
   // PartitionedHashJoinNode::Open().
   DCHECK(!closed_);
-  // A runtime filter may have both local and remote targets.
-  bool has_local_target = false;
-  bool has_remote_target = false;
   RuntimeFilter* complete_filter = nullptr; // Set if the filter should be sent out.
   auto it = filters_.find(filter_id);
   DCHECK(it != filters_.end()) << "Tried to update unregistered filter: " << filter_id;
@@ -186,10 +348,10 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
       // ignored (because they should have identical contents).
       if (result_filter->HasFilter()) {
         // Don't need to merge, the previous broadcast filter contained all values.
-        VLOG(3) << "Dropping redundant broadcast filter " << filter_id;
+        VLOG_FILTER << "Dropping redundant broadcast filter " << filter_id;
         return;
       }
-      VLOG(3) << "Setting broadcast filter " << filter_id;
+      VLOG_FILTER << "Setting broadcast filter " << filter_id;
       result_filter->SetFilter(bloom_filter, min_max_filter, in_list_filter);
       complete_filter = result_filter;
     } else {
@@ -207,36 +369,110 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
         // Drop the lock while doing the merge so that other merges can proceed in
         // parallel.
         l.unlock();
-        VLOG(3) << "Merging partitioned join filter " << filter_id;
+        VLOG_FILTER << "Merging partitioned join filter " << filter_id;
         tmp_filter->Or(pending_merge.get());
         l.lock();
       }
       // At this point, either we've merged all the filters or we're waiting for more
       // filters.
-      if (produced_filter.pending_producers > 1) {
-        // A subsequent caller of UpdateFilterFromLocal() is responsible for merging this
-        // filter into the final one.
+      if (produced_filter.AllRemainingProducers() > 1
+          || result_filter->is_intermediate_aggregator()) {
+        // A subsequent caller of UpdateFilterFromLocal() / UpdateFilterFromRemote() is
+        // responsible for merging this filter into the final one.
+        // Intermediate aggregator always combine its final filter update through
+        // CombinePeerAndLocalUpdates at later step.
         produced_filter.pending_merge_filter = std::move(tmp_filter);
       } else {
         // Everything was merged into 'tmp_filter'. It is therefore the result filter.
         result_filter->SetFilter(tmp_filter.get());
-        complete_filter = result_filter;
-        VLOG(3) << "Partitioned join filter " << filter_id << " is locally complete.";
       }
     }
-    int remaining_producers = --produced_filter.pending_producers;
-    VLOG(3) << "Filter " << filter_id << " updated. " << remaining_producers
-            << " producers left on the backend.";
-    DCHECK(remaining_producers > 0 || result_filter != nullptr);
-    has_local_target = result_filter->filter_desc().has_local_targets;
-    has_remote_target = result_filter->filter_desc().has_remote_targets;
+
+    --produced_filter.pending_producers;
+    if (produced_filter.IsComplete()) {
+      if (result_filter->is_intermediate_aggregator()) {
+        CombinePeerAndLocalUpdates(&l, produced_filter);
+      }
+      complete_filter = result_filter;
+      VLOG_FILTER << "Aggregation of partitioned join filter " << filter_id
+                  << " is complete.";
+    }
+
+    VLOG(2) << "Updated filter_id=" << filter_id
+            << ". pending_producers=" << produced_filter.pending_producers
+            << " pending_remotes=" << produced_filter.pending_remotes;
+    DCHECK(!produced_filter.IsComplete() || complete_filter != nullptr);
+
+    if (complete_filter != nullptr) {
+      DistributeCompleteFilter(&l, fs, complete_filter);
+    }
   }
+}
 
-  if (complete_filter != nullptr && has_local_target) {
+void RuntimeFilterBank::CombinePeerAndLocalUpdates(
+    unique_lock<SpinLock>* lock, ProducedFilter& produced_filter) {
+  DCHECK(lock != nullptr);
+  DCHECK(lock->owns_lock());
+  DCHECK_EQ(0, produced_filter.pending_producers)
+      << "Local aggregation has not complete!";
+  DCHECK_NE(nullptr, produced_filter.result_filter);
+  DCHECK_NE(nullptr, produced_filter.pending_merge_filter.get());
+
+  RuntimeFilter* result_filter = produced_filter.result_filter;
+  DCHECK(!result_filter->HasFilter());
+  DCHECK(result_filter->is_intermediate_aggregator());
+  DCHECK_EQ(TRuntimeFilterType::BLOOM, result_filter->filter_desc().type);
+
+  unique_ptr<RuntimeFilter> local_filter =
+      std::move(produced_filter.pending_merge_filter);
+  unique_ptr<RuntimeFilter> remote_filter =
+      std::move(produced_filter.pending_remote_filter);
+
+  if (!produced_filter.IsComplete()) {
+    // Some of remote filter update is missing. Disable this filter.
+    result_filter->SetFilter(BloomFilter::ALWAYS_TRUE_FILTER, nullptr, nullptr);
+  } else if (remote_filter.get() == nullptr) {
+    // This is an intermediate aggregator that is scheduled without any peer hosts
+    // reporting to it. Just local_filter value is enough.
+    result_filter->SetFilter(local_filter.get());
+  } else if (remote_filter->AlwaysTrue()) {
+    // Remote filter is an always true filter.
+    // The combination should be an always true filter as well.
+    result_filter->SetFilter(BloomFilter::ALWAYS_TRUE_FILTER, nullptr, nullptr);
+  } else {
+    // Initialize 'result_filter' with aggregated local filter update.
+    result_filter->SetFilter(local_filter.get());
+    if (!remote_filter->AlwaysFalse()) {
+      // Merge 'pending_remote_filter' into 'result_filter'.
+      BloomFilter* result_bloom = result_filter->get_bloom_filter();
+      BloomFilter* remote_bloom = remote_filter->get_bloom_filter();
+      result_bloom->RawOr(*remote_bloom);
+      result_bloom->MarkNotAlwaysFalse();
+      DCHECK(!result_filter->AlwaysFalse());
+    }
+  }
+}
+
+void RuntimeFilterBank::DistributeCompleteFilter(
+    unique_lock<SpinLock>* lock, PerFilterState* fs, RuntimeFilter* complete_filter) {
+  DCHECK(lock != nullptr);
+  DCHECK(lock->owns_lock());
+  DCHECK_NE(nullptr, fs);
+  DCHECK_NE(nullptr, complete_filter);
+
+  // A runtime filter may have both local and remote targets.
+  int filter_id = complete_filter->id();
+  bool has_local_target = complete_filter->filter_desc().has_local_targets;
+  bool has_remote_target = complete_filter->filter_desc().has_remote_targets;
+  BloomFilter* bloom_filter = complete_filter->get_bloom_filter();
+  MinMaxFilter* min_max_filter = complete_filter->get_min_max();
+  InListFilter* in_list_filter = complete_filter->get_in_list_filter();
+
+  if (has_local_target) {
+    DCHECK(!complete_filter->RequireSubAggregation());
     // Do a short circuit publication by pushing the same filter to the consumer side.
     RuntimeFilter* consumed_filter;
     {
-      lock_guard<SpinLock> l(fs->lock);
       if (fs->consumed_filter == nullptr) return;
       consumed_filter = fs->consumed_filter;
       // Update the filter while still holding the lock to avoid racing with the
@@ -263,8 +499,37 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
     }
   }
 
-  if (complete_filter != nullptr && has_remote_target &&
-      query_state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
+  if (has_remote_target
+      && query_state_->query_options().runtime_filter_mode
+          == TRuntimeFilterMode::GLOBAL) {
+    DCHECK_EQ(fs->produced_filter.result_filter, complete_filter);
+    // Find whether to send to coordinator or intermediate aggregator.
+    bool to_coordinator = !complete_filter->IsReportToSubAggregator();
+    const NetworkAddressPB& krpc_address = to_coordinator ?
+        FromTNetworkAddress(query_state_->query_ctx().coord_ip_address) :
+        complete_filter->krpc_backend_to_report();
+    const std::string& hostname = to_coordinator ?
+        query_state_->query_ctx().coord_hostname :
+        complete_filter->krpc_hostname_to_report();
+    VLOG_RPC << "Sending update for filter_id=" << filter_id << " to "
+             << (to_coordinator ? "coordinator" : "aggregator")
+             << " krpc_address=" << krpc_address << " hostname=" << hostname;
+
+    // Use 'proxy' to send the filter to destination.
+    unique_ptr<DataStreamServiceProxy> proxy;
+    Status get_proxy_status = DataStreamService::GetProxy(krpc_address, hostname, &proxy);
+    if (!get_proxy_status.ok()) {
+      // Failing to send a filter is not a query-wide error - the remote fragment will
+      // continue regardless.
+      LOG(INFO) << Substitute("Failed to get proxy to $0 $1: $2",
+          (to_coordinator ? "coordinator" : "aggregator"), hostname,
+          get_proxy_status.msg().msg());
+      return;
+    }
+
+    // fs->lock can be unlocked after 'num_inflight_rpcs_' increased.
+    lock->unlock();
+
     UpdateFilterParamsPB params;
     // The memory associated with the following 2 objects needs to live until
     // the asynchronous KRPC call proxy->UpdateFilterAsync() is completed.
@@ -283,20 +548,7 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
       DCHECK_EQ(type, TRuntimeFilterType::IN_LIST);
       InListFilter::ToProtobuf(in_list_filter, params.mutable_in_list_filter());
     }
-    const NetworkAddressPB& krpc_address =
-        FromTNetworkAddress(query_state_->query_ctx().coord_ip_address);
-    const std::string& hostname = query_state_->query_ctx().coord_hostname;
 
-    // Use 'proxy' to send the filter to the coordinator.
-    unique_ptr<DataStreamServiceProxy> proxy;
-    Status get_proxy_status = DataStreamService::GetProxy(krpc_address, hostname, &proxy);
-    if (!get_proxy_status.ok()) {
-      // Failing to send a filter is not a query-wide error - the remote fragment will
-      // continue regardless.
-      LOG(INFO) << Substitute("Failed to get proxy to coordinator $0: $1", hostname,
-          get_proxy_status.msg().msg());
-      return;
-    }
     // Increment 'num_inflight_rpcs_' to make sure that the filter will not be deallocated
     // in Close() until all in-flight RPCs complete.
     {
@@ -305,14 +557,32 @@ void RuntimeFilterBank::UpdateFilterFromLocal(
       ++num_inflight_rpcs_;
     }
 
-    proxy->UpdateFilterAsync(params, res, controller,
-        boost::bind(&RuntimeFilterBank::UpdateFilterCompleteCb, this, controller, res));
+    if (to_coordinator) {
+      proxy->UpdateFilterAsync(params, res, controller,
+          boost::bind(
+              &RuntimeFilterBank::UpdateFilterCompleteCb, this, controller, res, false));
+
+      if (complete_filter->RequireSubAggregation()) {
+        // Show information about final aggregation in runtime profile.
+        query_state_->host_profile()->AddInfoString(
+            Substitute("Filter $0 inflight for final aggregation", filter_id),
+            PrettyPrinter::Print(
+                complete_filter->TimeSinceRegistrationMs(), TUnit::TIME_MS));
+      }
+    } else {
+      proxy->UpdateFilterFromRemoteAsync(params, res, controller,
+          boost::bind(
+              &RuntimeFilterBank::UpdateFilterCompleteCb, this, controller, res, true));
+    }
+
+    // Lock fs->lock again before returning to caller.
+    lock->lock();
   }
 }
 
 void RuntimeFilterBank::PublishGlobalFilter(
     const PublishFilterParamsPB& params, RpcContext* context) {
-  VLOG(3) << "PublishGlobalFilter(filter_id=" << params.filter_id() << ")";
+  VLOG_RPC << "PublishGlobalFilter(filter_id=" << params.filter_id() << ")";
   auto it = filters_.find(params.filter_id());
   DCHECK(it != filters_.end()) << "Filter ID " << params.filter_id() << " not registered";
   PerFilterState* fs = it->second.get();
@@ -396,9 +666,17 @@ BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilter(int32_t filter_id) {
   auto it = filters_.find(filter_id);
   DCHECK(it != filters_.end()) << "Filter ID " << filter_id << " not registered";
   PerFilterState* fs = it->second.get();
-  lock_guard<SpinLock> l(fs->lock);
+  unique_lock<SpinLock> l(fs->lock);
   if (closed_) return nullptr;
 
+  return AllocateScratchBloomFilterLocked(&l, fs);
+}
+
+BloomFilter* RuntimeFilterBank::AllocateScratchBloomFilterLocked(
+    unique_lock<SpinLock>* lock, PerFilterState* fs) {
+  DCHECK(lock != nullptr);
+  DCHECK(lock->owns_lock());
+
   // Track required space
   int64_t log_filter_size =
       BitUtil::Log2Ceiling64(fs->produced_filter.result_filter->filter_size());
@@ -448,12 +726,71 @@ InListFilter* RuntimeFilterBank::AllocateScratchInListFilter(
   return in_list_filter;
 }
 
+RuntimeFilter* RuntimeFilterBank::DisableBloomFilter(
+    unique_ptr<RuntimeFilter>& bloom_filter) {
+  DCHECK(bloom_filter->is_bloom_filter());
+  const TRuntimeFilterDesc& filter_desc = bloom_filter->filter_desc();
+  int64_t filter_size = bloom_filter->filter_size();
+  bloom_filter.reset(new RuntimeFilter(filter_desc, filter_size));
+  bloom_filter->SetFilter(BloomFilter::ALWAYS_TRUE_FILTER, nullptr, nullptr);
+  return bloom_filter.get();
+}
+
 vector<unique_lock<SpinLock>> RuntimeFilterBank::LockAllFilters() {
   vector<unique_lock<SpinLock>> locks;
   for (auto& entry : filters_) locks.emplace_back(entry.second->lock);
   return locks;
 }
 
+void RuntimeFilterBank::SendIncompleteFilters() {
+  int32_t wait_time_ms = FLAGS_runtime_filter_wait_time_ms;
+  if (query_state_->query_options().runtime_filter_wait_time_ms > 0) {
+    wait_time_ms = query_state_->query_options().runtime_filter_wait_time_ms;
+  }
+
+  bool try_wait_aggregation = !cancelled_;
+  for (auto& entry : filters_) {
+    unique_lock<SpinLock> l(entry.second->lock);
+    if (cancelled_) return;
+    ProducedFilter& produced_filter = entry.second->produced_filter;
+    RuntimeFilter* result_filter = entry.second->produced_filter.result_filter;
+    if (!produced_filter.IsComplete() && result_filter != nullptr
+        && result_filter->is_intermediate_aggregator()) {
+      DCHECK_EQ(0, produced_filter.pending_producers)
+          << "Local filter producers must complete before RuntimeFilterBank's "
+          << "non-cancellation cleanup!";
+      if (try_wait_aggregation) {
+        // Unlock and try wait for aggregation.
+        l.unlock();
+        // For result_filter, WaitForArrival is equivalent to wait until
+        // result_filter->SetFilter() is called. Skip waiting if wait_time_ms has passed.
+        try_wait_aggregation &= result_filter->WaitForArrival(wait_time_ms);
+        VLOG_QUERY << "RuntimeFilterBank::SendIncompleteFilters waited aggregation of"
+                   << " filter_id=" << result_filter->id() << " for "
+                   << result_filter->arrival_delay_ms()
+                   << " ms. try_wait_aggregation=" << try_wait_aggregation;
+        l.lock();
+      }
+
+      if (!cancelled_ && !produced_filter.IsComplete()) {
+        // Mark this incomplete filter as always true and send to coordinator without
+        // waiting for the remaining remote updates. Disable this filter from receiving
+        // further UpdateFilterFromRemote RPC.
+        int receivedUpdate = produced_filter.ReceivedUpdate();
+        if (produced_filter.pending_remote_filter.get() != nullptr) {
+          DisableBloomFilter(produced_filter.pending_remote_filter);
+        }
+        CombinePeerAndLocalUpdates(&l, produced_filter);
+        produced_filter.pending_remotes = 0;
+        VLOG_QUERY << "Sending incomplete filter_id=" << result_filter->id() << " with "
+                   << receivedUpdate << "/" << produced_filter.total_producers
+                   << " updates to coordinator";
+        DistributeCompleteFilter(&l, entry.second.get(), result_filter);
+      }
+    }
+  }
+}
+
 void RuntimeFilterBank::Cancel() {
   auto all_locks = LockAllFilters();
   CancelLocked();
@@ -470,14 +807,18 @@ void RuntimeFilterBank::CancelLocked() {
   cancelled_ = true;
 }
 
+void RuntimeFilterBank::WaitForInFlightRpc() {
+  unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
+  while (num_inflight_rpcs_ > 0) {
+    krpcs_done_cv_.wait(l1);
+  }
+}
+
 void RuntimeFilterBank::Close() {
+  // Send any incomplete filters if this filter bank is not cancelled.
+  SendIncompleteFilters();
   // Wait for all in-flight RPCs to complete before closing the filters.
-  {
-    unique_lock<SpinLock> l1(num_inflight_rpcs_lock_);
-    while (num_inflight_rpcs_ > 0) {
-      krpcs_done_cv_.wait(l1);
-    }
-  }
+  WaitForInFlightRpc();
   auto all_locks = LockAllFilters();
   CancelLocked();
   // We do not have to set 'closed_' to true before waiting for all in-flight RPCs to
@@ -505,9 +846,13 @@ void RuntimeFilterBank::Close() {
 }
 
 RuntimeFilterBank::ProducedFilter::ProducedFilter(
-    int pending_producers, RuntimeFilter* result_filter)
-  : result_filter(result_filter), pending_producers(pending_producers) {}
-
-RuntimeFilterBank::PerFilterState::PerFilterState(
-    int pending_producers, RuntimeFilter* result_filter, RuntimeFilter* consumed_filter)
-  : produced_filter(pending_producers, result_filter), consumed_filter(consumed_filter) {}
+    int pending_producers, int pending_remotes, RuntimeFilter* result_filter)
+  : result_filter(result_filter),
+    pending_producers(pending_producers),
+    pending_remotes(pending_remotes),
+    total_producers(pending_producers + pending_remotes) {}
+
+RuntimeFilterBank::PerFilterState::PerFilterState(int pending_producers,
+    int pending_remotes, RuntimeFilter* result_filter, RuntimeFilter* consumed_filter)
+  : produced_filter(pending_producers, pending_remotes, result_filter),
+    consumed_filter(consumed_filter) {}
diff --git a/be/src/runtime/runtime-filter-bank.h b/be/src/runtime/runtime-filter-bank.h
index f97d43dd1..b656deedd 100644
--- a/be/src/runtime/runtime-filter-bank.h
+++ b/be/src/runtime/runtime-filter-bank.h
@@ -50,6 +50,7 @@ class InListFilter;
 class RuntimeFilter;
 class QueryState;
 class TBloomFilter;
+class TNetworkAddress;
 class TRuntimeFilterDesc;
 class TQueryCtx;
 
@@ -65,6 +66,22 @@ struct FilterRegistration {
 
   // The number of producers of this filter executing on the backend.
   int num_producers = 0;
+
+  // True means this filter is planned for intermediate aggregation.
+  bool need_subaggregation = false;
+
+  // True if this Impala backend is the intermediate aggregator for this filter.
+  bool is_intermediate_aggregator = false;
+
+  // Num hosts that will send filter update to the intermediate aggregator backend
+  // (including the aggregator backend itself) if 'need_subaggregation' is True.
+  int num_reporting_hosts = 0;
+
+  // ip and port of the aggregator backend.
+  TNetworkAddress krpc_backend_to_report;
+
+  // Hostname of the aggregator backend.
+  std::string krpc_hostname_to_report;
 };
 
 /// RuntimeFilters are produced and consumed by plan nodes at run time to propagate
@@ -85,6 +102,14 @@ struct FilterRegistration {
 /// called. The expected number of filters to be produced locally must be specified ahead
 /// of time so that RuntimeFilterBank knows when the filter is complete.
 ///
+/// If distributed runtime filter aggregation is enabled
+/// (MAX_NUM_FILTERS_AGGREGATED_PER_HOST>1), few number of backend executors will be
+/// selected as intermediate filter aggregator to help coordinator. Besides doing
+/// local aggregation, each intermediate aggregator will also listen and aggregate
+/// filter updates from at most MAX_NUM_FILTERS_AGGREGATED_PER_HOST-1 other executors.
+/// Intermediate aggregator then sends the aggregated filter update to coordinator for
+/// final aggregation and publishing.
+///
 /// After PublishGlobalFilter() has been called (at most once per filter_id), the
 /// RuntimeFilter object associated with filter_id will have a valid bloom_filter,
 /// min_max_filter or in_list_filter, and may be used for filter evaluation. This
@@ -129,6 +154,10 @@ class RuntimeFilterBank {
   void UpdateFilterFromLocal(int32_t filter_id, BloomFilter* bloom_filter,
       MinMaxFilter* min_max_filter, InListFilter* in_list_filter);
 
+  /// Update filter received from remote hosts.
+  void UpdateFilterFromRemote(
+      const UpdateFilterParamsPB& params, kudu::rpc::RpcContext* context);
+
   /// Makes a filter (aggregated globally from all producer fragments) available for
   /// consumption by operators that wish to use it for filtering.
   void PublishGlobalFilter(
@@ -172,25 +201,61 @@ class RuntimeFilterBank {
   /// Acquire locks for all filters, returning them to the caller.
   std::vector<std::unique_lock<SpinLock>> LockAllFilters();
 
+  /// For each filter, acquire its lock and check whether aggregation has completed or
+  /// not. If there is a runtime filter has not completed aggregation, unlock and wait
+  /// for RUNTIME_FILTER_WAIT_TIME_MS for aggregation to happen for that incomplete
+  /// filter. If aggregation still does not happen, send the incomplete filter to
+  /// Coordinator.
+  void SendIncompleteFilters();
+
   /// Implementation of Cancel(). All filter locks must be held by caller.
   void CancelLocked();
 
+  /// Wait for in-flight RPC to complete.
+  void WaitForInFlightRpc();
+
   /// Data tracked for each produced filter in the filter bank.
   struct ProducedFilter {
-    ProducedFilter(int pending_producers, RuntimeFilter* result_filter);
+    ProducedFilter(
+        int pending_producers, int pending_remotes, RuntimeFilter* result_filter);
 
     /// The initial filter returned from RegisterProducer() metadata about the filter.
     /// Not modified by producers. Owned by 'obj_pool_'.
     RuntimeFilter* const result_filter;
 
-    // The expected number of instances of the filter yet to arrive, i.e. additional
-    // UpdateFilterFromLocal() calls expected.
+    /// The expected number of instances of the filter yet to arrive, i.e. additional
+    /// UpdateFilterFromLocal() calls expected.
     int pending_producers;
 
-    // A temporary filter that needs to be merged into the final filter. See
-    // UpdateFilterFromLocal() for details on the algorithm for merging.
-    // Only used for partitioned join filters.
+    /// The expected number of remote host to send their filter update to this backend.
+    int pending_remotes;
+
+    /// Total number of all producers for this ProducedFilter.
+    int total_producers;
+
+    /// Pointer to runtime filter that holds the merge result of all remote updates.
+    std::unique_ptr<RuntimeFilter> pending_remote_filter;
+
+    /// A temporary filter that needs to be merged into the final filter. See
+    /// UpdateFilterFromLocal() for details on the algorithm for merging.
+    /// Only used for partitioned join filters.
     std::unique_ptr<RuntimeFilter> pending_merge_filter;
+
+    /// Return number of remaining filter producers, both remote and local.
+    inline int AllRemainingProducers() { return pending_remotes + pending_producers; }
+
+    /// Return true if no filter update has been received.
+    inline bool IsEmpty() {
+      return pending_remotes + pending_producers == total_producers;
+    }
+
+    /// Return true if all filter updates have been received.
+    inline bool IsComplete() { return pending_remotes + pending_producers <= 0; }
+
+    /// Return number of filter updates that have been received.
+    inline int ReceivedUpdate() {
+      return total_producers - (pending_remotes + pending_producers);
+    }
   };
 
   /// All state tracked for a particular filter in this filter bank. PerFilterStates are
@@ -203,8 +268,8 @@ class RuntimeFilterBank {
     ///   there are any producers. Must be owned by 'obj_pool_'.
     /// consumed_filter: the filter that will be returned to consumers. Non-NULL if there
     ///   are any consumers. Must be owned by 'obj_pool_'.
-    PerFilterState(int pending_producers, RuntimeFilter* result_filter,
-        RuntimeFilter* consumed_filter);
+    PerFilterState(int pending_producers, int pending_remotes,
+        RuntimeFilter* result_filter, RuntimeFilter* consumed_filter);
 
     /// Lock protecting the structures in this PerFilterState. If multiple locks are
     /// acquired, they must be acquired in the 'filters_' map iteration order.
@@ -284,10 +349,30 @@ class RuntimeFilterBank {
   /// methods.
   BufferPool::ClientHandle buffer_pool_client_;
 
+  /// Combine both 'pending_merge_filter' and 'pending_remote_filter' into
+  /// 'result_filter'. 'pending_merge_filter' and 'pending_remote_filter' will be
+  /// discarded after this function call. Only valid to call if
+  /// produced_filter.result_filter->is_intermediate_aggregator() == true.
+  void CombinePeerAndLocalUpdates(
+      std::unique_lock<SpinLock>* lock, ProducedFilter& produced_filter);
+
+  /// Distribute 'complete_filter' to local and/or remote target.
+  /// Caller must hold 'lock' (which is over 'PerFilterState.lock').
+  void DistributeCompleteFilter(std::unique_lock<SpinLock>* lock, PerFilterState* fs,
+      RuntimeFilter* complete_filter);
+
   /// This is the callback for the asynchronous rpc UpdateFilterAsync() in
   /// UpdateFilterFromLocal().
-  void UpdateFilterCompleteCb(
-      const kudu::rpc::RpcController* rpc_controller, const UpdateFilterResultPB* res);
+  void UpdateFilterCompleteCb(const kudu::rpc::RpcController* rpc_controller,
+      const UpdateFilterResultPB* res, bool is_remote_update);
+
+  /// A locked implementation of AllocateScratchBloomFilter().
+  BloomFilter* AllocateScratchBloomFilterLocked(
+      std::unique_lock<SpinLock>* lock, PerFilterState* fs);
+
+  /// Disable a bloom filter by replacing it with an ALWAYS_TRUE_FILTER.
+  /// Return a pointer to the new runtime filter.
+  RuntimeFilter* DisableBloomFilter(std::unique_ptr<RuntimeFilter>& bloom_filter);
 };
 
 }
diff --git a/be/src/runtime/runtime-filter.cc b/be/src/runtime/runtime-filter.cc
index 8dc0d6d4e..025735351 100644
--- a/be/src/runtime/runtime-filter.cc
+++ b/be/src/runtime/runtime-filter.cc
@@ -16,7 +16,7 @@
 // under the License.
 
 #include "runtime/runtime-filter.inline.h"
-
+#include "util/network-util.h"
 #include "util/time.h"
 
 #include "common/names.h"
@@ -90,3 +90,12 @@ bool RuntimeFilter::WaitForArrival(int32_t timeout_ms) const {
   }
   return arrival_time_.Load() != 0;
 }
+
+void RuntimeFilter::SetIntermediateAggregation(bool is_intermediate_aggregator,
+    std::string intermediate_krpc_hostname, NetworkAddressPB intermediate_krpc_backend) {
+  DCHECK(!intermediate_krpc_hostname.empty());
+  DCHECK(IsResolvedAddress(intermediate_krpc_backend));
+  is_intermediate_aggregator_ = is_intermediate_aggregator;
+  intermediate_krpc_hostname_ = intermediate_krpc_hostname;
+  intermediate_krpc_backend_ = intermediate_krpc_backend;
+}
\ No newline at end of file
diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h
index ce5aa4561..6c1e8a210 100644
--- a/be/src/runtime/runtime-filter.h
+++ b/be/src/runtime/runtime-filter.h
@@ -56,6 +56,13 @@ class RuntimeFilter {
   bool HasFilter() const { return has_filter_.Load(); }
 
   const TRuntimeFilterDesc& filter_desc() const { return filter_desc_; }
+  const std::string& krpc_hostname_to_report() const {
+    return intermediate_krpc_hostname_;
+  }
+  const NetworkAddressPB& krpc_backend_to_report() const {
+    return intermediate_krpc_backend_;
+  }
+  bool is_intermediate_aggregator() const { return is_intermediate_aggregator_; }
   int32_t id() const { return filter_desc().filter_id; }
   int64_t filter_size() const { return filter_size_; }
   ColumnType type() const {
@@ -109,10 +116,15 @@ class RuntimeFilter {
   /// filter and its arrival. If the filter has not yet arrived, it returns the time
   /// elapsed since registration.
   int32_t arrival_delay_ms() const {
-    if (arrival_time_.Load() == 0L) return MonotonicMillis() - registration_time_;
+    if (arrival_time_.Load() == 0L) return TimeSinceRegistrationMs();
     return arrival_time_.Load() - registration_time_;
   }
 
+  /// Return the amount of time since 'registration_time_'.
+  int32_t TimeSinceRegistrationMs() const {
+    return MonotonicMillis() - registration_time_;
+  }
+
   /// Periodically (every 20ms) checks to see if the global filter has arrived. Waits for
   /// a maximum of timeout_ms before returning. Returns true if the filter has arrived,
   /// false otherwise.
@@ -133,6 +145,21 @@ class RuntimeFilter {
     return filter_desc().targets[target_ndx].is_column_in_data_file;
   }
 
+  /// Set intermediate aggregation info for this runtime filter.
+  void SetIntermediateAggregation(bool is_intermediate_aggregator,
+      std::string intermediate_krpc_hostname, NetworkAddressPB intermediate_krpc_backend);
+
+  /// Return true if runtime filter update from this fragment instance should report
+  /// filter update to intermediate aggregator rather than coordinator.
+  /// Otherwise, return false, which means filter update report should go to coordinator.
+  bool IsReportToSubAggregator() const {
+    return !intermediate_krpc_hostname_.empty() && !is_intermediate_aggregator_;
+  }
+
+  /// Return true if this runtime filter is scheduled with subaggregation strategy.
+  /// Otherwise, return false (all filter updates aggregation happen in coordinator).
+  bool RequireSubAggregation() const { return !intermediate_krpc_hostname_.empty(); }
+
   /// Frequency with which to check for filter arrival in WaitForArrival()
   static const int SLEEP_PERIOD_MS;
 
@@ -180,5 +207,11 @@ class RuntimeFilter {
   /// Injection delay for WaitForArrival. Used in testing only.
   /// See IMPALA-9612.
   int64_t injection_delay_ = 0;
+
+  bool is_intermediate_aggregator_ = false;
+
+  std::string intermediate_krpc_hostname_;
+
+  NetworkAddressPB intermediate_krpc_backend_;
 };
 }
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index d4ee971a3..7f16a4058 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -226,12 +226,23 @@ Status Scheduler::ComputeFragmentExecParams(
     // Set destinations, per_exch_num_senders, sender_id.
     for (const TPlanFragment& src_fragment : plan_exec_info.fragments) {
       VLOG(3) << "Computing exec params for fragment " << src_fragment.display_name;
+      if (!src_fragment.output_sink.__isset.stream_sink
+          && !src_fragment.output_sink.__isset.join_build_sink) {
+        continue;
+      }
+
+      FragmentScheduleState* src_state =
+          state->GetFragmentScheduleState(src_fragment.idx);
+      if (state->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL
+          && src_fragment.produced_runtime_filters_reservation_bytes > 0) {
+        ComputeRandomKrpcForAggregation(executor_config, state, src_state,
+            state->query_options().max_num_filters_aggregated_per_host);
+      }
+
       if (!src_fragment.output_sink.__isset.stream_sink) continue;
       FragmentIdx dest_idx =
           state->GetFragmentIdx(src_fragment.output_sink.stream_sink.dest_node_id);
       FragmentScheduleState* dest_state = state->GetFragmentScheduleState(dest_idx);
-      FragmentScheduleState* src_state =
-          state->GetFragmentScheduleState(src_fragment.idx);
 
       // populate src_state->destinations
       for (int i = 0; i < dest_state->instance_states.size(); ++i) {
@@ -268,6 +279,94 @@ Status Scheduler::ComputeFragmentExecParams(
   return Status::OK();
 }
 
+// Helper type to map instance indexes to aggregator indexes.
+typedef vector<pair<int, int>> InstanceToAggPairs;
+
+void Scheduler::ComputeRandomKrpcForAggregation(const ExecutorConfig& executor_config,
+    ScheduleState* state, FragmentScheduleState* src_state, int num_filters_per_host) {
+  if (num_filters_per_host <= 1) return;
+  // src_state->instance_states organize fragment instances as one dimension vector
+  // where instances scheduled in same host is placed in adjacent indices.
+  // Group instance indices from common host to 'instance_groups' by comparing
+  // krpc address between adjacent element of instance_states.
+  const NetworkAddressPB& coord_address = executor_config.coord_desc.krpc_address();
+  vector<InstanceToAggPairs> instance_groups;
+  InstanceToAggPairs coordinator_instances;
+  for (int i = 0; i < src_state->instance_states.size(); ++i) {
+    const NetworkAddressPB& krpc_address = src_state->instance_states[i].krpc_host;
+    if (KrpcAddressEqual(krpc_address, coord_address)) {
+      coordinator_instances.emplace_back(i, 0);
+    } else {
+      if (i == 0
+          || !KrpcAddressEqual(
+              krpc_address, src_state->instance_states[i - 1].krpc_host)) {
+        // This is the first fragment instance scheduled in a host.
+        // Append empty InstanceToAggPairs to 'instance_groups'.
+        instance_groups.emplace_back();
+      }
+      instance_groups.back().emplace_back(i, 0);
+    }
+  }
+
+  int num_non_coordinator_host = instance_groups.size();
+  if (num_non_coordinator_host == 0) return;
+
+  // Select number of intermediate aggregator so that each aggregator will receive
+  // runtime filter update from at most 'num_filters_per_host' executors.
+  int num_agg = (int)ceil((double)num_non_coordinator_host / num_filters_per_host);
+  DCHECK_GT(num_agg, 0);
+
+  std::shuffle(instance_groups.begin(), instance_groups.end(), *state->rng());
+  if (coordinator_instances.size() > 0) {
+    // Put coordinator group behind so that coordinator won't be selected as intermediate
+    // aggregator.
+    instance_groups.push_back(coordinator_instances);
+  }
+
+  RuntimeFilterAggregatorInfoPB* agg_info =
+      src_state->exec_params->mutable_filter_agg_info();
+  agg_info->set_num_aggregators(num_agg);
+
+  int group_idx = 0;
+  int agg_idx = -1;
+  InstanceToAggPairs instance_to_agg;
+  vector<int> num_reporting_hosts(num_agg, 0);
+  for (auto group : instance_groups) {
+    const NetworkAddressPB& host = src_state->instance_states[group[0].first].host;
+    if (group_idx < num_agg) {
+      // First 'num_agg' of 'instance_groups' host selected as intermediate aggregator.
+      const BackendDescriptorPB& desc = LookUpBackendDesc(executor_config, host);
+      NetworkAddressPB* address = agg_info->add_aggregator_krpc_addresses();
+      *address = desc.address();
+      DCHECK(desc.has_krpc_address());
+      DCHECK(IsResolvedAddress(desc.krpc_address()));
+      NetworkAddressPB* krpc_address = agg_info->add_aggregator_krpc_backends();
+      *krpc_address = desc.krpc_address();
+    }
+    agg_idx = (agg_idx + 1) % num_agg;
+    num_reporting_hosts[agg_idx] += 1;
+    for (auto entry : group) {
+      entry.second = agg_idx;
+      instance_to_agg.push_back(entry);
+    }
+    ++group_idx;
+  }
+
+  // Sort 'instance_to_agg' based on the original instance order to populate
+  // 'aggregator_idx_to_report'.
+  sort(instance_to_agg.begin(), instance_to_agg.end());
+  DCHECK_EQ(src_state->instance_states.size(), instance_to_agg.size());
+  for (auto entry : instance_to_agg) {
+    agg_info->add_aggregator_idx_to_report(entry.second);
+  }
+
+  // Write how many host is expected to report to each intermediate aggregator,
+  // including the aggregator host itself.
+  for (auto num_reporters : num_reporting_hosts) {
+    agg_info->add_num_reporter_per_aggregator(num_reporters);
+  }
+}
+
 Status Scheduler::CheckEffectiveInstanceCount(
     const FragmentScheduleState* fragment_state, ScheduleState* state) {
   // These checks are only intended if COMPUTE_PROCESSING_COST=true.
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index b8a5aeae0..7e116e503 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -380,6 +380,12 @@ class Scheduler {
       const TPlanExecInfo& plan_exec_info, FragmentScheduleState* fragment_state,
       ScheduleState* state);
 
+  /// For a given 'src_state' and 'num_filters_per_host', select few backend as
+  /// intermediate filter aggregator before final aggregation in coordinator.
+  /// Do nothing if 'num_filters_per_host' <= 1.
+  void ComputeRandomKrpcForAggregation(const ExecutorConfig& executor_config,
+      ScheduleState* state, FragmentScheduleState* src_state, int num_filters_per_host);
+
   /// Create instances of the fragment corresponding to fragment_state, which contains
   /// either a Union node, one or more scan nodes, or both.
   ///
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index bbc8b4196..f26c67cbf 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -122,6 +122,28 @@ void DataStreamService::UpdateFilter(
   RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
 }
 
+void DataStreamService::UpdateFilterFromRemote(
+    const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp, RpcContext* context) {
+  DCHECK(req->has_filter_id());
+  DCHECK(req->has_query_id());
+  DCHECK(
+      req->has_bloom_filter() || req->has_min_max_filter() || req->has_in_list_filter());
+  QueryState::ScopedRef qs(ProtoToQueryId(req->query_id()));
+
+  if (qs.get() != nullptr) {
+    qs->UpdateFilterFromRemote(*req, context);
+    RespondAndReleaseRpc(Status::OK(), resp, context, mem_tracker_.get());
+  } else {
+    // Query state for requested query_id might have been cancelled or closed.
+    // i.e., RUNTIME_FILTER_WAIT_TIME_MS has passed and all fragment instances of
+    // query_id has complete their execution.
+    string err_msg = Substitute("Query State not found for query_id=$0",
+        PrintId(ProtoToQueryId(req->query_id())));
+    LOG(INFO) << err_msg;
+    RespondAndReleaseRpc(Status(err_msg), resp, context, mem_tracker_.get());
+  }
+}
+
 void DataStreamService::PublishFilter(
     const PublishFilterParamsPB* req, PublishFilterResultPB* resp, RpcContext* context) {
   // This failpoint is to allow jitter to be injected.
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index 9b63e8541..a934bc686 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -69,6 +69,11 @@ class DataStreamService : public DataStreamServiceIf {
   virtual void UpdateFilter(const UpdateFilterParamsPB* req, UpdateFilterResultPB* resp,
       kudu::rpc::RpcContext* context);
 
+  /// Called by fragment instances that produce local runtime filters to deliver them to
+  /// the aggregator backend for intermediate aggregation.
+  virtual void UpdateFilterFromRemote(const UpdateFilterParamsPB* req,
+      UpdateFilterResultPB* resp, kudu::rpc::RpcContext* context);
+
   /// Called by the coordinator to deliver global runtime filters to fragments for
   /// application at plan nodes.
   virtual void PublishFilter(const PublishFilterParamsPB* req,
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 909c2f10f..66cea4991 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -268,6 +268,8 @@ TEST(QueryOptions, SetIntOptions) {
           {1, qc.MAX_FRAGMENT_INSTANCES_PER_NODE}},
       {MAKE_OPTIONDEF(max_fragment_instances_per_node),
           {1, qc.MAX_FRAGMENT_INSTANCES_PER_NODE}},
+      {MAKE_OPTIONDEF(max_num_filters_aggregated_per_host),
+          {-1, I32_MAX}},
   };
   for (const auto& test_case : case_set) {
     const OptionDef<int32_t>& option_def = test_case.first;
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index ee52b08a5..ec6dfce5e 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1166,7 +1166,7 @@ Status impala::SetQueryOption(const string& key, const string& value,
         RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val));
         query_options->__set_hdfs_scanner_non_reserved_bytes(mem_spec_val.value);
         break;
-      };
+      }
       case TImpalaQueryOptions::CODEGEN_OPT_LEVEL: {
         TCodeGenOptLevel::type enum_type;
         RETURN_IF_ERROR(GetThriftEnum(
@@ -1188,6 +1188,13 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_runtime_filter_cardinality_reduction_scale(double_val);
         break;
       }
+      case TImpalaQueryOptions::MAX_NUM_FILTERS_AGGREGATED_PER_HOST: {
+        int32_t int32_t_val = 0;
+        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckInclusiveLowerBound<int32_t>(
+            option, value, -1, &int32_t_val));
+        query_options->__set_max_num_filters_aggregated_per_host(int32_t_val);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 8c2bff23f..d31116d3f 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                                 \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                                 \
-      TImpalaQueryOptions::RUNTIME_FILTER_CARDINALITY_REDUCTION_SCALE + 1);              \
+      TImpalaQueryOptions::MAX_NUM_FILTERS_AGGREGATED_PER_HOST + 1);                     \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)               \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)             \
@@ -319,6 +319,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       DISABLE_KUDU_LOCAL_TIMESTAMP_BLOOM_FILTER, TQueryOptionLevel::ADVANCED)            \
   QUERY_OPT_FN(runtime_filter_cardinality_reduction_scale,                               \
       RUNTIME_FILTER_CARDINALITY_REDUCTION_SCALE, TQueryOptionLevel::DEVELOPMENT)        \
+  QUERY_OPT_FN(max_num_filters_aggregated_per_host, MAX_NUM_FILTERS_AGGREGATED_PER_HOST, \
+      TQueryOptionLevel::DEVELOPMENT)                                                    \
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index a2898b877..d308aa696 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -108,7 +108,7 @@ void BloomFilter::AddDirectorySidecar(BloomFilterPB* rpc_params,
 void BloomFilter::ToProtobuf(
     BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const {
   protobuf->set_log_bufferpool_space(block_bloom_filter_.log_space_bytes());
-  if (block_bloom_filter_.always_false()) {
+  if (AlwaysFalse()) {
     protobuf->set_always_false(true);
     protobuf->set_always_true(false);
     return;
@@ -144,6 +144,28 @@ void BloomFilter::Or(const BloomFilter& other) {
   block_bloom_filter_.Or(other.block_bloom_filter_);
 }
 
+void BloomFilter::RawOr(const BloomFilter& other) {
+  DCHECK_NE(this, &other);
+  DCHECK_NE(&other, ALWAYS_TRUE_FILTER);
+  DCHECK_EQ(
+      block_bloom_filter_.log_space_bytes(), other.block_bloom_filter_.log_space_bytes());
+  kudu::Slice target_slice = block_bloom_filter_.directory();
+  kudu::Slice input_slice = other.block_bloom_filter_.directory();
+  kudu::BlockBloomFilter::OrEqualArray(
+      target_slice.size(), input_slice.data(), const_cast<uint8_t*>(target_slice.data()));
+}
+
+void BloomFilter::Or(const BloomFilterPB& in, const kudu::Slice& input_slice) {
+  DCHECK_NE(this, BloomFilter::ALWAYS_TRUE_FILTER);
+  DCHECK(!in.always_true());
+  if (in.always_false()) return;
+  DCHECK_EQ(in.log_bufferpool_space(), block_bloom_filter_.log_space_bytes());
+  kudu::Slice target_slice = block_bloom_filter_.directory();
+  DCHECK_EQ(input_slice.size(), target_slice.size());
+  kudu::BlockBloomFilter::OrEqualArray(
+      target_slice.size(), input_slice.data(), const_cast<uint8_t*>(target_slice.data()));
+}
+
 void BloomFilter::Or(const BloomFilterPB& in, const uint8_t* directory_in,
     BloomFilterPB* out, uint8_t* directory_out, size_t directory_size) {
   DCHECK(out != nullptr);
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 5244e8bfe..61f0100fb 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -99,7 +99,11 @@ class BloomFilter {
   static void ToProtobuf(const BloomFilter* filter, kudu::rpc::RpcController* controller,
       BloomFilterPB* protobuf);
 
-  bool AlwaysFalse() const { return block_bloom_filter_.always_false(); }
+  bool AlwaysFalse() const {
+    return block_bloom_filter_.always_false() && !not_always_false_;
+  }
+
+  void MarkNotAlwaysFalse() { not_always_false_ = true; }
 
   /// Adds an element to the BloomFilter. The function used to generate 'hash' need not
   /// have good uniformity, but it should have low collision probability. For instance, if
@@ -118,6 +122,16 @@ class BloomFilter {
   /// filter.
   void Or(const BloomFilter& other);
 
+  /// Computes the logical OR of this filter with 'other' and stores the result in this
+  /// filter. Different from Or(), the operation happen straight in the raw bytes rather
+  /// than goes through kudu::BlockBloomFilter::Or() method. The logical OR operation
+  /// still happen even if other.AlwaysFalse() is true.
+  void RawOr(const BloomFilter& other);
+
+  /// Computes the logical OR of this filter with 'in' and its corresponding
+  /// 'input_slice' and stores the result in this filter.
+  void Or(const BloomFilterPB& in, const kudu::Slice& input_slice);
+
   /// This function computes the logical OR of 'directory_in' with 'directory_out'
   /// and stores the result in 'directory_out'. 'in' must be a valid filter object
   /// (i.e. not ALWAYS_TRUE_FILTER).
@@ -179,6 +193,9 @@ class BloomFilter {
   /// Embedded Kudu BlockBloomFilter object
   kudu::BlockBloomFilter block_bloom_filter_;
 
+  /// Flag to override block_bloom_filter_.always_false() in AlwaysFalse() method.
+  bool not_always_false_ = false;
+
   /// Serializes this filter as Protobuf.
   void ToProtobuf(BloomFilterPB* protobuf, kudu::rpc::RpcController* controller) const;
 
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index bffcde812..600bb7396 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -103,6 +103,12 @@ Status NetworkAddressPBToSockaddr(
 /// a free ephemeral port can't be found after 100 tries.
 int FindUnusedEphemeralPort();
 
+/// Return true if two NetworkAddressPB are match.
+inline bool KrpcAddressEqual(const NetworkAddressPB& lhs, const NetworkAddressPB& rhs) {
+  return lhs.hostname() == rhs.hostname() && lhs.port() == rhs.port()
+      && lhs.uds_address() == rhs.uds_address();
+}
+
 extern const std::string LOCALHOST_IP_STR;
 
 } // namespace impala
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 8b12662e1..ad4261026 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -659,10 +659,11 @@ class RuntimeProfile::EventSequence {
 
   /// Stores an event in sequence with the given label and the current time
   /// (relative to the first time Start() was called) as the timestamp.
-  void MarkEvent(std::string label) {
+  int64_t MarkEvent(std::string label) {
     Event event = make_pair(move(label), sw_.ElapsedTime() + offset_);
     std::lock_guard<SpinLock> event_lock(lock_);
     events_.emplace_back(move(event));
+    return event.second;
   }
 
   int64_t ElapsedTime() { return sw_.ElapsedTime(); }
diff --git a/common/protobuf/admission_control_service.proto b/common/protobuf/admission_control_service.proto
index a9dfe9981..e8eb2d16a 100644
--- a/common/protobuf/admission_control_service.proto
+++ b/common/protobuf/admission_control_service.proto
@@ -97,6 +97,30 @@ message BackendExecParamsPB {
   optional bool is_coord_backend = 7;
 }
 
+// Information about selected backend that designated as runtime filter preaggregator
+// (before final aggregation in the coordinator) and fragment instances that must send
+// filter update to them. This is populated by Scheduler::ComputeRandomKrpcForAggregation
+// only for fragment having partitioned join that produce bloom filter and only if num
+// backend executor (excluding coordinator) is at least 2x num aggregator (which default
+// to 2).
+message RuntimeFilterAggregatorInfoPB {
+  // Number of aggregator.
+  required int32 num_aggregators = 1;
+
+  // hostname:port of designated aggregators.
+  repeated NetworkAddressPB aggregator_krpc_addresses = 2;
+
+  // ip:port of designated aggregators.
+  repeated NetworkAddressPB aggregator_krpc_backends = 3;
+
+  // Number of backend executor that report to each designated aggregator + 1
+  // (including itself).
+  repeated int32 num_reporter_per_aggregator = 4;
+
+  // Size must be equal to the size of FragmentExecParamsPB.instances().
+  repeated int32 aggregator_idx_to_report = 5;
+}
+
 // Execution parameters shared between fragment instances
 message FragmentExecParamsPB {
   // Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx.
@@ -115,6 +139,8 @@ message FragmentExecParamsPB {
   // Total number of backends this fragment is scheduled on. Note that this represents
   // the number of individual impalads, not the number of physical hosts.
   optional int32 num_hosts = 5;
+
+  optional RuntimeFilterAggregatorInfoPB filter_agg_info = 6;
 }
 
 // Contains the output from scheduling and admission control that is used by the
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index 6b9993fd2..6bca8e4d7 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -174,6 +174,10 @@ service DataStreamService {
   // the coordinator for aggregation and broadcast.
   rpc UpdateFilter(UpdateFilterParamsPB) returns (UpdateFilterResultPB);
 
+  // Called by fragment instances that produce local runtime filters to deliver them to
+  // the aggregator backend for intermediate aggregation.
+  rpc UpdateFilterFromRemote(UpdateFilterParamsPB) returns (UpdateFilterResultPB);
+
   // Called by the coordinator to deliver global runtime filters to fragments for
   // application at plan nodes.
   rpc PublishFilter(PublishFilterParamsPB) returns (PublishFilterResultPB);
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 577a6b0a6..7e2dbe6a1 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -51,11 +51,25 @@ struct TDebugOptions {
   4: optional string action_param
 }
 
+// Descriptor about impalad address designated as runtime filter aggregator.
+struct TRuntimeFilterAggDesc {
+  // Hostname of aggregator backend.
+  1: required string krpc_hostname
+  // Ip:port of aggregator backend.
+  2: required Types.TNetworkAddress krpc_address
+  // Number of impalad that report filter update to this aggregator backend,
+  // including the aggregator backend itself.
+  3: required i32 num_reporting_hosts
+}
 
 // Descriptor that indicates that a runtime filter is produced by a plan node.
 struct TRuntimeFilterSource {
   1: required Types.TPlanNodeId src_node_id
   2: required i32 filter_id
+
+  // The following field is only set if a filter source need to send filter update
+  // to a designated backend aggregator intead of the coordinator.
+  3: optional TRuntimeFilterAggDesc aggregator_desc
 }
 
 // The Thrift portion of the execution parameters of a single fragment instance. Every
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index ed8e07bf9..3276c2785 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -900,6 +900,13 @@ enum TImpalaQueryOptions {
   // estimated to reduce cardinality to 500K, setting value 0.25 will result in an 875K
   // cardinality estimate. Default to 1.0.
   RUNTIME_FILTER_CARDINALITY_REDUCTION_SCALE = 171
+
+  // Maximum number of backend executor that can send bloom runtime filter updates to
+  // one intermediate aggregator. Given N as number of backend executor excluding
+  // coordinator, the selected number of designated intermediate aggregator is
+  // ceil(N / MAX_NUM_FILTERS_AGGREGATED_PER_HOST). Setting 1, 0, or negative value
+  // will disable the intermediate aggregator feature. Default to -1 (disabled).
+  MAX_NUM_FILTERS_AGGREGATED_PER_HOST = 172
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index bbfbbe7bd..794301257 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -690,6 +690,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   172: optional double runtime_filter_cardinality_reduction_scale = 1.0
+
+  // See comment in ImpalaService.thrift
+  173: optional i32 max_num_filters_aggregated_per_host = -1
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and external
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index b8e5838f4..33c5f24ba 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -465,6 +465,13 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     }
     for (RuntimeFilter f : producedFilters.values()) {
       producedRuntimeFiltersMemReservationBytes_ += f.getFilterSize();
+      if (analyzer.getQueryOptions().max_num_filters_aggregated_per_host > 1
+          && !f.isBroadcast()) {
+        // If distributed aggregation is enabled, any backend can be selected as
+        // an intermediate aggregator. Add the same amount of memory per backend to
+        // anticipate for it.
+        consumedGlobalRuntimeFiltersMemReservationBytes_ += f.getFilterSize();
+      }
     }
     for (RuntimeFilter f : consumedFilters.values()) {
       if (!producedFilters.containsKey(f.getFilterId())) {
diff --git a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
index d42d9d3f5..faca8f7f3 100644
--- a/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
+++ b/fe/src/main/java/org/apache/impala/planner/RuntimeFilterGenerator.java
@@ -662,6 +662,7 @@ public final class RuntimeFilterGenerator {
     public Operator getExprCompOp() { return exprCmpOp_; }
     public long getFilterSize() { return filterSizeBytes_; }
     public boolean isTimestampTruncation() { return isTimestampTruncation_; }
+    public boolean isBroadcast() { return isBroadcastJoin_; }
     public PlanNode getSrc() { return src_; }
 
     private long getBuildKeyNumRowStats() {
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
index 13057c0d2..b06b39b3c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
@@ -2,8 +2,7 @@
 ---- QUERY
 ####################################################
 # Test case 1: broadcast join.
-# Without filtering, expect 7300 / 3 = 2433 rows per scan fragment.
-# With filtering, expect 618 / 3 = 206 rows to be read from 3 files (one per scan).
+# Expect 7300 from scan node id=0 without filtering and 620 with filtering.
 ####################################################
 
 # Basic filtering use case: filter p's partition columns thanks to an implicit
@@ -14,7 +13,7 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 ---- RESULTS
 620
 ---- RUNTIME_PROFILE
-row_regex: .*RowsRead: 2.43K .*
+row_regex: 00:SCAN.*s[ ]+7.30K[ ]+[0-9\.]+K.*
 ====
 ---- QUERY
 # Now turn on local filtering: we expect to see a reduction in scan volume.
@@ -26,17 +25,16 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 ---- RESULTS
 620
 ---- RUNTIME_PROFILE
-row_regex: .*Files rejected: 7 \(7\).*
+aggregation(SUM, Files rejected): 22
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: .*RowsRead: 206 .*
+row_regex: 00:SCAN KUDU.*s[ ]+620[ ]+7.30K.*
 ====
 
 
 ---- QUERY
 ####################################################
 # Test case 2: shuffle join - test for filter propagation (or lack thereof in LOCAL mode).
-# Without filtering, expect 7300 / 3 = 2433 rows per scan fragment.
-# With filtering, expect 618 / 3 = 206 rows to be read from 3 files (one per scan).
+# Expect 7300 from scan node id=0 without filtering and 620 with filtering.
 ####################################################
 
 # Local mode. Filters won't be propagated to scan, so scans will read all rows.
@@ -48,7 +46,7 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 ---- RESULTS
 620
 ---- RUNTIME_PROFILE
-row_regex: .*RowsRead: 2.43K .*
+row_regex: 00:SCAN.*s[ ]+7.30K[ ]+[0-9\.]+K.*
 ====
 ---- QUERY
 # Shuffle join, global mode. Expect filters to be propagated.
@@ -59,9 +57,9 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 ---- RESULTS
 620
 ---- RUNTIME_PROFILE
-row_regex: .*Files rejected: 7 \(7\).*
+aggregation(SUM, Files rejected): 22
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: .*RowsRead: 206 .*
+row_regex: 00:SCAN KUDU.*s[ ]+620[ ]+7.30K.*
 ====
 
 
@@ -69,7 +67,7 @@ row_regex: .*RowsRead: 206 .*
 ####################################################
 # Test case 3: two-hop filter chain with BROADCAST
 # joins.
-# Without filtering in left-most scan, expect 7300 / 3 = 2433 rows.
+# Without filtering in left-most scan, expect 7300 rows.
 # With filtering, expect 0 rows as all files are rejected by the partition
 # column filter.
 ####################################################
@@ -85,9 +83,9 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-row_regex: .*Files rejected: 0 .*
+aggregation(SUM, Files rejected): 0
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: .*RowsRead: 2.43K .*
+row_regex: 00:SCAN KUDU.*s[ ]+7.30K[ ]+7.30K.*
 ====
 ---- QUERY
 # Global mode. Scan of 'b' will receive highly effective filter, and will propagate that
@@ -101,10 +99,10 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-row_regex: .*Files rejected: 8 .*
+aggregation(SUM, Files rejected): 48
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: .*RowsRead: 0 .*
-row_regex: .*ScanRangesComplete: 0 .*
+aggregation(SUM, RowsRead): 0
+row_regex: 00:SCAN KUDU.*s[ ]+0[ ]+7.30K.*
 ====
 
 
@@ -124,10 +122,10 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-row_regex: .*RowsRead: 0 .*
-row_regex: .*Files rejected: 8 .*
+aggregation(SUM, RowsRead): 8
+aggregation(SUM, Files rejected): 24
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: .*RowsRead: 2.43K .*
+row_regex: 00:SCAN KUDU.*s[ ]+7.30K[ ]+7.30K.*
 ====
 
 
@@ -145,7 +143,7 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-row_regex: .*FiltersReceived: 0 .*
+aggregation(SUM, FiltersReceived): 0
 ====
 ---- QUERY
 # Global mode. Coordinator should report 0 filter updates received.
@@ -157,11 +155,11 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-row_regex: .*FiltersReceived: 0 .*
-row_regex: .*Files rejected: 8 .*
+aggregation(SUM, FiltersReceived): 0
+aggregation(SUM, Files rejected): 24
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: .*FiltersReceived: 0 .*
-row_regex: .*RowsRead: 0 .*
+aggregation(SUM, FiltersReceived): 0
+row_regex: 00:SCAN KUDU.*s[ ]+0[ ]+7.30K.*
 ====
 
 
@@ -179,10 +177,12 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-row_regex: .*FiltersReceived: 0 .*
+aggregation(SUM, FiltersReceived): 0
 ====
 ---- QUERY
-# Global mode. Coordinator should report 1 filter update per backend.
+# Global mode. Coordinator should report 1 filter update per backend
+# or exactly 1 if MAX_NUM_FILTERS_AGGREGATED_PER_HOST equals to num executors
+# (excluding coordinator).
 # For Kudu, join expr has implicit casting so no bloom filter will
 # be created, only min-max filter will be created.
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
@@ -194,10 +194,10 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-row_regex: .*FiltersReceived: 3 .*
+aggregation(SUM, FiltersReceived): $NUM_FILTER_UPDATES
 row_regex: .*REMOTE.*ms.*ms.*true
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: .*FiltersReceived: 3 .*
+aggregation(SUM, FiltersReceived): 3
 row_regex: .*REMOTE.*ms.*ms.*true
 ====
 
@@ -217,9 +217,9 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RESULTS
 2480
 ---- RUNTIME_PROFILE
-row_regex: .*Files rejected: 7 .*
+aggregation(SUM, Files rejected): 23
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: .*RowsRead: 2.43K .*
+row_regex: 00:SCAN KUDU.*s[ ]+7.30K[ ]+7.30K.*
 ====
 
 
@@ -236,7 +236,7 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RESULTS
 7300
 ---- RUNTIME_PROFILE
-row_regex: .*RowsReturned: 2.43K .*
+row_regex: 00:SCAN.*s[ ]+7.30K[ ]+[0-9\.]+K.*
 ====
 
 
@@ -254,9 +254,9 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RESULTS
 8
 ---- RUNTIME_PROFILE
-row_regex: .*Files rejected: 8 .*
+aggregation(SUM, Files rejected): 24
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: .*RowsRead: 0 .*
+row_regex: 00:SCAN KUDU.*s[ ]+0[ ]+7.30K.*
 ====
 
 ---- QUERY
@@ -272,7 +272,7 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RESULTS
 7308
 ---- RUNTIME_PROFILE
-row_regex: .*RowsReturned: 2.43K .*
+row_regex: 00:SCAN.*s[ ]+7.30K[ ]+[0-9\.]+K.*
 ====
 
 
@@ -297,9 +297,9 @@ select STRAIGHT_JOIN count(*) from alltypesagg a
 ---- RESULTS
 1000
 ---- RUNTIME_PROFILE
-row_regex: .*Files rejected: 3 .*
+aggregation(SUM, Files rejected): 10
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: .*RowsRead: 3.67K .*
+row_regex: 00:SCAN KUDU.*s[ ]+11.00K[ ]+11.00K.*
 ====
 
 
@@ -319,9 +319,9 @@ with t1 as (select month x, bigint_col y from alltypes limit 7301),
 ---- RESULTS
 620
 ---- RUNTIME_PROFILE
-row_regex: .*Files rejected: 7 .*
+aggregation(SUM, Files rejected): 22
 ---- RUNTIME_PROFILE: table_format=kudu
-row_regex: .*RowsRead: 206 .*
+row_regex: 00:SCAN KUDU.*s[ ]+620[ ]+7.30K.*
 ====
 
 
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filter_reservations.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filter_reservations.test
index 6ddc0d694..a3eddef75 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filter_reservations.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filter_reservations.test
@@ -9,6 +9,7 @@ SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET RUNTIME_FILTER_MIN_SIZE=128MB;
 SET RUNTIME_FILTER_MAX_SIZE=500MB;
+SET MAX_NUM_FILTERS_AGGREGATED_PER_HOST=0;
 # Query would have been admitted if memory for runtime filters was not accounted for.
 SET BUFFER_POOL_LIMIT=290MB;
 select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b
@@ -20,6 +21,7 @@ SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET RUNTIME_FILTER_MIN_SIZE=128MB;
 SET RUNTIME_FILTER_MAX_SIZE=500MB;
+SET MAX_NUM_FILTERS_AGGREGATED_PER_HOST=0;
 # Disable the estimation of cardinality for an hdfs table withot stats.
 SET DISABLE_HDFS_NUM_ROWS_ESTIMATE=1;
 # Query would have been admitted if memory for runtime filters was not accounted for.
@@ -38,6 +40,7 @@ SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET RUNTIME_FILTER_MIN_SIZE=128MB;
 SET RUNTIME_FILTER_MAX_SIZE=500MB;
+SET MAX_NUM_FILTERS_AGGREGATED_PER_HOST=0;
 # This would run perfectly with just enough memory provided by the buffer pool.
 SET BUFFER_POOL_LIMIT=295MB;
 select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b
@@ -48,3 +51,38 @@ row_regex: .*Filter 0 \(128.00 MB\).*
 aggregation(SUM, Files processed): 24
 aggregation(SUM, Files rejected): 24
 ====
+---- QUERY
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+SET RUNTIME_FILTER_MIN_SIZE=128MB;
+SET RUNTIME_FILTER_MAX_SIZE=500MB;
+# Enable distributed aggregation.
+SET MAX_NUM_FILTERS_AGGREGATED_PER_HOST=2;
+# Query would have been admitted if memory for runtime filters was not accounted for.
+SET BUFFER_POOL_LIMIT=390MB;
+select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b
+    on a.month = b.id and b.int_col = -3
+---- RESULTS
+---- CATCH
+row_regex:.*minimum memory reservation on backend '.*' is greater than memory available to
+ the query for buffer reservations\. Increase the buffer_pool_limit to 390.11 MB\. See
+ the query profile for more information about the per-node memory requirements\.
+====
+---- QUERY
+# Confirm that with broadcast join, memory limit is not hit.
+SET RUNTIME_FILTER_MODE=GLOBAL;
+SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
+SET RUNTIME_FILTER_MIN_SIZE=128MB;
+SET RUNTIME_FILTER_MAX_SIZE=500MB;
+# Enable distributed aggregation.
+SET MAX_NUM_FILTERS_AGGREGATED_PER_HOST=2;
+# This would run perfectly with just enough memory provided by the buffer pool.
+SET BUFFER_POOL_LIMIT=391MB;
+select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b
+    on a.month = b.id and b.int_col = -3
+---- RESULTS
+---- RUNTIME_PROFILE
+row_regex: .*Filter 0 \(128.00 MB\).*
+aggregation(SUM, Files processed): 24
+aggregation(SUM, Files rejected): 24
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
index 33270bb90..317f1ff81 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_row_filters.test
@@ -171,7 +171,9 @@ select STRAIGHT_JOIN count(*) from alltypes a
 aggregation(SUM, FiltersReceived): 0
 ====
 ---- QUERY
-# Global mode. Coordinator should report 1 filter updates per backend.
+# Global mode. Coordinator should report 1 filter updates per backend
+# or exactly 1 if MAX_NUM_FILTERS_AGGREGATED_PER_HOST equals to num executors
+# (excluding coordinator).
 SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS;
 SET RUNTIME_FILTER_MODE=GLOBAL;
 select STRAIGHT_JOIN count(*) from alltypes a
@@ -180,7 +182,7 @@ select STRAIGHT_JOIN count(*) from alltypes a
 ---- RESULTS
 0
 ---- RUNTIME_PROFILE
-aggregation(SUM, FiltersReceived): 3
+aggregation(SUM, FiltersReceived): $NUM_FILTER_UPDATES
 ====
 
 ---- QUERY
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 3fa18055f..bbf430206 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -774,6 +774,10 @@ class ImpalaTestSuite(BaseTestSuite):
         rt_profile_info = 'RUNTIME_PROFILE'
 
       if rt_profile_info is not None:
+        if test_file_vars:
+          # only do test_file_vars replacement if it exist.
+          test_section[rt_profile_info] = self.__do_replacements(
+              test_section[rt_profile_info], extra=test_file_vars)
         rt_profile = verify_runtime_profile(test_section[rt_profile_info],
                                result.runtime_profile,
                                update_section=pytest.config.option.update_results)
diff --git a/tests/common/test_vector.py b/tests/common/test_vector.py
index 8e0bebfd5..7919fd796 100644
--- a/tests/common/test_vector.py
+++ b/tests/common/test_vector.py
@@ -88,6 +88,18 @@ class ImpalaTestVector(object):
         return vector_value.value
     raise ValueError("Test vector does not contain value '%s'" % name)
 
+  def get_exec_option_dict(self):
+    return self.get_value(EXEC_OPTION_KEY)
+
+  def get_exec_option(self, option_name):
+    return self.get_value(EXEC_OPTION_KEY)[option_name]
+
+  def set_exec_option(self, option_name, option_value):
+    self.get_value(EXEC_OPTION_KEY)[option_name] = option_value
+
+  def unset_exec_option(self, option_name):
+    del self.get_value(EXEC_OPTION_KEY)[option_name]
+
   def __str__(self):
       return ' | '.join(['%s' % vector_value for vector_value in self.vector_values])
 
diff --git a/tests/custom_cluster/test_runtime_filter_aggregation.py b/tests/custom_cluster/test_runtime_filter_aggregation.py
new file mode 100644
index 000000000..da00e5367
--- /dev/null
+++ b/tests/custom_cluster/test_runtime_filter_aggregation.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import absolute_import, division, print_function
+import math
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.environ import build_flavor_timeout, ImpalaTestClusterProperties
+from tests.common.test_dimensions import add_mandatory_exec_option
+
+# slow_build_timeout is set to 200000 to avoid failures like IMPALA-8064 where the
+# runtime filters don't arrive in time.
+WAIT_TIME_MS = build_flavor_timeout(60000, slow_build_timeout=200000)
+
+# Check whether the Impala under test in slow build. Query option ASYNC_CODEGEN will
+# be enabled when test runs for slow build like ASAN, TSAN, UBSAN, etc. This avoid
+# failures like IMPALA-9889 where the runtime filters don't arrive in time due to
+# the slowness of codegen.
+build_runs_slowly = ImpalaTestClusterProperties.get_instance().runs_slowly()
+
+
+class TestRuntimeFilterAggregation(CustomClusterTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestRuntimeFilterAggregation, cls).add_test_dimensions()
+    add_mandatory_exec_option(cls, 'max_num_filters_aggregated_per_host', 2)
+    # Enable query option ASYNC_CODEGEN for slow build
+    if build_runs_slowly:
+      add_mandatory_exec_option(cls, "async_codegen", 1)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=6, num_exclusive_coordinators=1)
+  def test_basic_filters(self, vector):
+    num_filters_per_host = vector.get_exec_option('max_num_filters_aggregated_per_host')
+    num_backend = 5  # exclude coordinator
+    num_updates = (num_backend if num_filters_per_host == 0
+        else int(math.ceil(num_backend / num_filters_per_host)))
+    vars = {
+      '$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS),
+      '$NUM_FILTER_UPDATES': str(num_updates)
+    }
+    self.run_test_case('QueryTest/runtime_filters', vector, test_file_vars=vars)
+    self.run_test_case('QueryTest/bloom_filters', vector)
diff --git a/tests/query_test/test_runtime_filters.py b/tests/query_test/test_runtime_filters.py
index 87f299d0e..cd76eba50 100644
--- a/tests/query_test/test_runtime_filters.py
+++ b/tests/query_test/test_runtime_filters.py
@@ -17,7 +17,7 @@
 #
 
 from __future__ import absolute_import, division, print_function
-from copy import deepcopy
+import math
 import os
 import pytest
 import re
@@ -27,14 +27,20 @@ from tests.common.environ import build_flavor_timeout, ImpalaTestClusterProperti
 from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfEC, SkipIfLocal, SkipIfFS
-from tests.common.test_dimensions import add_mandatory_exec_option
-from tests.common.test_vector import ImpalaTestDimension
+from tests.common.test_dimensions import (
+  add_mandatory_exec_option, add_exec_option_dimension)
 from tests.verifiers.metric_verifier import MetricVerifier
 from tests.util.filesystem_utils import WAREHOUSE
 
 # slow_build_timeout is set to 200000 to avoid failures like IMPALA-8064 where the
 # runtime filters don't arrive in time.
 WAIT_TIME_MS = build_flavor_timeout(60000, slow_build_timeout=200000)
+DEFAULT_VARS = {'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)}
+
+# Dimensions exercised in some of the tests.
+DIM_MT_DOP_SMALL = [0, 1]
+DIM_MT_DOP_MEDIUM = [0, 4]
+DIM_MAX_NUM_FILTERS_AGGREGATED_PER_HOST = [0, 2]
 
 # Check whether the Impala under test in slow build. Query option ASYNC_CODEGEN will
 # be enabled when test runs for slow build like ASAN, TSAN, UBSAN, etc. This avoid
@@ -42,6 +48,7 @@ WAIT_TIME_MS = build_flavor_timeout(60000, slow_build_timeout=200000)
 # the slowness of codegen.
 build_runs_slowly = ImpalaTestClusterProperties.get_instance().runs_slowly()
 
+
 # Some of the queries in runtime_filters consume a lot of memory, leading to
 # significant memory reservations in parallel tests.
 # Skipping Isilon due to IMPALA-6998. TODO: Remove when there's a holistic revamp of
@@ -62,27 +69,37 @@ class TestRuntimeFilters(ImpalaTestSuite):
         lambda v: v.get_value('table_format').file_format not in ['hbase'])
     # Exercise both mt and non-mt code paths. Some tests assume 3 finstances, so
     # tests are not expected to work unmodified with higher mt_dop values.
-    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('mt_dop', 0, 1))
+    add_exec_option_dimension(cls, 'mt_dop', DIM_MT_DOP_SMALL)
+    # Exercise max_num_filters_aggregated_per_host.
+    add_exec_option_dimension(cls, 'max_num_filters_aggregated_per_host',
+      DIM_MAX_NUM_FILTERS_AGGREGATED_PER_HOST)
     # Don't test all combinations of file format and mt_dop, only test a few
     # representative formats.
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: v.get_value('table_format').file_format in ['parquet', 'text', 'kudu']
         or v.get_value('mt_dop') == 0)
+    # Limit 'max_num_filters_aggregated_per_host' exercise for parquet only.
+    cls.ImpalaTestMatrix.add_constraint(
+      lambda v: v.get_value('table_format').file_format == 'parquet'
+      or v.get_value('max_num_filters_aggregated_per_host') == 0)
     # Enable query option ASYNC_CODEGEN for slow build
     if build_runs_slowly:
       add_mandatory_exec_option(cls, "async_codegen", 1)
 
   def test_basic_filters(self, vector):
-    new_vector = deepcopy(vector)
-    new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
-    self.run_test_case('QueryTest/runtime_filters', new_vector,
-        test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS' : str(WAIT_TIME_MS)})
+    num_filters_per_host = vector.get_exec_option('max_num_filters_aggregated_per_host')
+    num_backend = 2  # exclude coordinator
+    num_updates = (num_backend + 1 if num_filters_per_host == 0
+        else int(math.ceil(num_backend / num_filters_per_host)))
+    vars = {
+      '$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS),
+      '$NUM_FILTER_UPDATES': str(num_updates)
+    }
+    self.run_test_case('QueryTest/runtime_filters', vector, test_file_vars=vars)
 
   def test_wait_time(self, vector):
     """Test that a query that has global filters does not wait for them if run in LOCAL
     mode"""
-    new_vector = deepcopy(vector)
-    new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     now = time.time()
     self.run_test_case('QueryTest/runtime_filters_wait', vector)
     duration_s = time.time() - now
@@ -94,8 +111,6 @@ class TestRuntimeFilters(ImpalaTestSuite):
   def test_wait_time_cancellation(self, vector):
     """Regression test for IMPALA-9065 to ensure that threads waiting for filters
     get woken up and exit promptly when the query is cancelled."""
-    new_vector = deepcopy(vector)
-    new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     # Make sure the cluster is quiesced before we start this test
     self._verify_no_fragments_running()
 
@@ -106,7 +121,7 @@ class TestRuntimeFilters(ImpalaTestSuite):
     QUERY = """select straight_join *
                from alltypes t1
                     join /*+shuffle*/ alltypestiny t2 on t1.id = t2.id"""
-    self.client.set_configuration(new_vector.get_value('exec_option'))
+    self.client.set_configuration(vector.get_exec_option_dict())
     self.client.set_configuration_option("DEBUG_ACTION", "1:OPEN:WAIT")
     self.client.set_configuration_option("RUNTIME_FILTER_WAIT_TIME_MS", "10000000")
     # Run same query with different delays to better exercise call paths.
@@ -133,25 +148,21 @@ class TestRuntimeFilters(ImpalaTestSuite):
   def test_file_filtering(self, vector):
     if 'kudu' in str(vector.get_value('table_format')):
       return
-    new_vector = deepcopy(vector)
-    new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     self.change_database(self.client, vector.get_value('table_format'))
     self.execute_query("SET RUNTIME_FILTER_MODE=GLOBAL")
     self.execute_query("SET RUNTIME_FILTER_WAIT_TIME_MS=10000")
     result = self.execute_query("""select STRAIGHT_JOIN * from alltypes inner join
                                 (select * from alltypessmall where smallint_col=-1) v
                                 on v.year = alltypes.year""",
-                                new_vector.get_value('exec_option'))
-    assert re.search("Files rejected: 8 \(8\)", result.runtime_profile) is not None
-    assert re.search("Splits rejected: [^0] \([^0]\)", result.runtime_profile) is None
+                                vector.get_exec_option_dict())
+    assert re.search(r"Files rejected: 8 \(8\)", result.runtime_profile) is not None
+    assert re.search(r"Splits rejected: [^0] \([^0]\)", result.runtime_profile) is None
 
   def test_file_filtering_late_arriving_filter(self, vector):
     """Test that late-arriving filters are applied to files when the scanner starts processing
     each scan range."""
     if 'kudu' in str(vector.get_value('table_format')):
       return
-    new_vector = deepcopy(vector)
-    new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     self.change_database(self.client, vector.get_value('table_format'))
     self.execute_query("SET RUNTIME_FILTER_MODE=GLOBAL")
     self.execute_query("SET RUNTIME_FILTER_WAIT_TIME_MS=1")
@@ -166,8 +177,9 @@ class TestRuntimeFilters(ImpalaTestSuite):
                                       where smallint_col > sleep(100)) v
                                      on v.id = alltypes.id
                                    where alltypes.id < sleep(10);""",
-                                   new_vector.get_value('exec_option'))
-    assert re.search("Splits rejected: [^0] \([^0]\)", result.runtime_profile) is not None
+                                   vector.get_exec_option_dict())
+    splits_re = r"Splits rejected: [^0] \([^0]\)"
+    assert re.search(splits_re, result.runtime_profile) is not None
 
 
 @SkipIfLocal.multiple_impalad
@@ -179,38 +191,58 @@ class TestBloomFilters(ImpalaTestSuite):
   @classmethod
   def add_test_dimensions(cls):
     super(TestBloomFilters, cls).add_test_dimensions()
+    # Exercise max_num_filters_aggregated_per_host.
+    add_exec_option_dimension(cls, 'max_num_filters_aggregated_per_host',
+      DIM_MAX_NUM_FILTERS_AGGREGATED_PER_HOST)
     # Bloom filters are disabled on HBase
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: v.get_value('table_format').file_format not in ['hbase'])
+    # Limit 'max_num_filters_aggregated_per_host' exercise for parquet only.
+    cls.ImpalaTestMatrix.add_constraint(
+      lambda v: v.get_value('table_format').file_format == 'parquet'
+      or v.get_value('max_num_filters_aggregated_per_host') == 0)
     # Enable query option ASYNC_CODEGEN for slow build
     if build_runs_slowly:
       add_mandatory_exec_option(cls, "async_codegen", 1)
 
   def test_bloom_filters(self, vector):
-    vector.get_value('exec_option')['ENABLED_RUNTIME_FILTER_TYPES'] = 'BLOOM'
     self.run_test_case('QueryTest/bloom_filters', vector)
 
+
+@SkipIfLocal.multiple_impalad
+class TestBloomFiltersOnParquet(ImpalaTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestBloomFiltersOnParquet, cls).add_test_dimensions()
+    # Only enable bloom filter.
+    add_mandatory_exec_option(cls, 'enabled_runtime_filter_types', 'BLOOM')
+    # Exercise max_num_filters_aggregated_per_host.
+    add_exec_option_dimension(cls, 'max_num_filters_aggregated_per_host',
+      DIM_MAX_NUM_FILTERS_AGGREGATED_PER_HOST)
+    # Test exclusively on parquet
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+    # Enable query option ASYNC_CODEGEN for slow build
+    if build_runs_slowly:
+      add_mandatory_exec_option(cls, "async_codegen", 1)
+
   def test_iceberg_dictionary_runtime_filter(self, vector, unique_database):
-    if (vector.get_value('table_format').file_format != 'parquet'):
-      pytest.skip()
-    vector.get_value('exec_option')['ENABLED_RUNTIME_FILTER_TYPES'] = 'BLOOM'
     self.run_test_case('QueryTest/iceberg-dictionary-runtime-filter', vector,
-      unique_database, test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+      unique_database, test_file_vars=DEFAULT_VARS)
 
   def test_parquet_dictionary_runtime_filter(self, vector, unique_database):
-    if (vector.get_value('table_format').file_format != 'parquet'):
-      pytest.skip()
-    vector.get_value('exec_option')['ENABLED_RUNTIME_FILTER_TYPES'] = 'BLOOM'
-    vector.get_value('exec_option')['PARQUET_READ_STATISTICS'] = 'false'
+    vector.set_exec_option('PARQUET_READ_STATISTICS', 'false')
     self.run_test_case('QueryTest/parquet-dictionary-runtime-filter', vector,
-      unique_database, test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+      unique_database, test_file_vars=DEFAULT_VARS)
 
   def test_iceberg_partition_runtime_filter(self, vector, unique_database):
-    if (vector.get_value('table_format').file_format != 'parquet'):
-      pytest.skip()
-    vector.get_value('exec_option')['ENABLED_RUNTIME_FILTER_TYPES'] = 'BLOOM'
     self.run_test_case('QueryTest/iceberg-partition-runtime-filter', vector,
-      unique_database, test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+      unique_database, test_file_vars=DEFAULT_VARS)
+
 
 @SkipIfLocal.multiple_impalad
 class TestMinMaxFilters(ImpalaTestSuite):
@@ -234,14 +266,13 @@ class TestMinMaxFilters(ImpalaTestSuite):
 
   def test_min_max_filters(self, vector):
     self.execute_query("SET MINMAX_FILTER_THRESHOLD=0.5")
-    self.run_test_case('QueryTest/min_max_filters', vector,
-        test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+    self.run_test_case('QueryTest/min_max_filters', vector, test_file_vars=DEFAULT_VARS)
 
   def test_decimal_min_max_filters(self, vector):
     if self.exploration_strategy() != 'exhaustive':
       pytest.skip("skip decimal min max filter test with various joins")
     self.run_test_case('QueryTest/decimal_min_max_filters', vector,
-        test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+                       test_file_vars=DEFAULT_VARS)
 
   def test_large_strings(self, cursor, unique_database):
     """Tests that truncation of large strings by min-max filters still gives correct
@@ -252,7 +283,7 @@ class TestMinMaxFilters(ImpalaTestSuite):
     # Min-max bounds are truncated at 1024 characters, so construct some strings that are
     # longer than that, as well as some that are very close to the min/max bounds.
     matching_vals =\
-        ('b' * 1100, 'b' * 1099 + 'c', 'd' * 1100, 'f'* 1099 + 'e', 'f' * 1100)
+        ('b' * 1100, 'b' * 1099 + 'c', 'd' * 1100, 'f' * 1099 + 'e', 'f' * 1100)
     cursor.execute("insert into %s values ('%s'), ('%s'), ('%s'), ('%s'), ('%s')"
         % ((table1,) + matching_vals))
     non_matching_vals = ('b' * 1099 + 'a', 'c', 'f' * 1099 + 'g')
@@ -300,7 +331,7 @@ class TestOverlapMinMaxFilters(ImpalaTestSuite):
     super(TestOverlapMinMaxFilters, cls).add_test_dimensions()
     # Overlap min-max filters are only implemented for parquet.
     cls.ImpalaTestMatrix.add_constraint(
-        lambda v: v.get_value('table_format').file_format in ['parquet'])
+        lambda v: v.get_value('table_format').file_format == 'parquet')
     # Enable query option ASYNC_CODEGEN for slow build
     if build_runs_slowly:
       add_mandatory_exec_option(cls, "async_codegen", 1)
@@ -311,18 +342,16 @@ class TestOverlapMinMaxFilters(ImpalaTestSuite):
     # on sorted columns (by default).
     self.execute_query("SET MINMAX_FILTER_PARTITION_COLUMNS=false")
     self.run_test_case('QueryTest/overlap_min_max_filters', vector, unique_database,
-        test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+                       test_file_vars=DEFAULT_VARS)
     self.execute_query("SET MINMAX_FILTER_PARTITION_COLUMNS=true")
 
   def test_overlap_min_max_filters_on_sorted_columns(self, vector, unique_database):
     self.run_test_case('QueryTest/overlap_min_max_filters_on_sorted_columns', vector,
-                       unique_database,
-        test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+                       unique_database, test_file_vars=DEFAULT_VARS)
 
   def test_overlap_min_max_filters_on_partition_columns(self, vector, unique_database):
     self.run_test_case('QueryTest/overlap_min_max_filters_on_partition_columns', vector,
-                       unique_database,
-        test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+                       unique_database, test_file_vars=DEFAULT_VARS)
 
   @SkipIfLocal.hdfs_client
   def test_partition_column_in_parquet_data_file(self, vector, unique_database):
@@ -332,9 +361,10 @@ class TestOverlapMinMaxFilters(ImpalaTestSuite):
     self.execute_query("CREATE TABLE {0}.{1} (i INT) PARTITIONED BY (d DATE) "
                        "STORED AS PARQUET".format(unique_database, tbl_name))
     tbl_loc = "%s/%s/%s/d=2022-02-22/" % (WAREHOUSE, unique_database, tbl_name)
+    src_loc = (os.environ['IMPALA_HOME']
+               + '/testdata/data/partition_col_in_parquet.parquet')
     self.filesystem_client.make_dir(tbl_loc)
-    self.filesystem_client.copy_from_local(os.environ['IMPALA_HOME'] +
-        '/testdata/data/partition_col_in_parquet.parquet', tbl_loc)
+    self.filesystem_client.copy_from_local(src_loc, tbl_loc)
     self.execute_query("ALTER TABLE {0}.{1} RECOVER PARTITIONS".format(
         unique_database, tbl_name))
     self.execute_query("SET PARQUET_FALLBACK_SCHEMA_RESOLUTION=NAME")
@@ -359,8 +389,8 @@ class TestInListFilters(ImpalaTestSuite):
       add_mandatory_exec_option(cls, "async_codegen", 1)
 
   def test_in_list_filters(self, vector):
-    vector.get_value('exec_option')['enabled_runtime_filter_types'] = 'in_list'
-    vector.get_value('exec_option')['runtime_filter_wait_time_ms'] = WAIT_TIME_MS
+    vector.set_exec_option('enabled_runtime_filter_types', 'in_list')
+    vector.set_exec_option('runtime_filter_wait_time_ms', WAIT_TIME_MS)
     self.run_test_case('QueryTest/in_list_filters', vector)
 
 
@@ -386,12 +416,12 @@ class TestAllRuntimeFilters(ImpalaTestSuite):
     self.execute_query("SET MINMAX_FILTER_SORTED_COLUMNS=false")
     self.execute_query("SET MINMAX_FILTER_PARTITION_COLUMNS=false")
     self.run_test_case('QueryTest/all_runtime_filters', vector,
-                       test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+                       test_file_vars=DEFAULT_VARS)
 
   def test_diff_runtime_filter_types(self, vector):
     # compare number of probe rows when apply different types of runtime filter
     self.run_test_case('QueryTest/diff_runtime_filter_types', vector,
-                       test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+                       test_file_vars=DEFAULT_VARS)
 
 
 @SkipIfLocal.multiple_impalad
@@ -407,27 +437,85 @@ class TestRuntimeRowFilters(ImpalaTestSuite):
         v.get_value('table_format').file_format in ['parquet'])
     # Exercise both mt and non-mt code paths. Some tests assume 3 finstances, so
     # tests are not expected to work unmodified with higher mt_dop values.
-    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('mt_dop', 0, 4))
+    add_exec_option_dimension(cls, 'mt_dop', DIM_MT_DOP_MEDIUM)
+    # Exercise max_num_filters_aggregated_per_host.
+    add_exec_option_dimension(cls, 'max_num_filters_aggregated_per_host',
+      DIM_MAX_NUM_FILTERS_AGGREGATED_PER_HOST)
     # Enable query option ASYNC_CODEGEN for slow build
     if build_runs_slowly:
       add_mandatory_exec_option(cls, "async_codegen", 1)
 
   def test_row_filters(self, vector):
-    new_vector = deepcopy(vector)
-    new_vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     # Disable generating min/max filters for sorted columns
     self.execute_query("SET MINMAX_FILTER_SORTED_COLUMNS=false")
     self.execute_query("SET MINMAX_FILTER_PARTITION_COLUMNS=false")
     self.execute_query("SET PARQUET_DICTIONARY_RUNTIME_FILTER_ENTRY_LIMIT=0")
-    self.run_test_case('QueryTest/runtime_row_filters', new_vector,
-                       test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
+    num_filters_per_host = vector.get_exec_option('max_num_filters_aggregated_per_host')
+    num_backend = 2  # exclude coordinator
+    num_updates = (num_backend + 1 if num_filters_per_host == 0
+        else int(math.ceil(num_backend / num_filters_per_host)))
+    vars = {
+      '$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS),
+      '$NUM_FILTER_UPDATES': str(num_updates)
+    }
+    self.run_test_case('QueryTest/runtime_row_filters', vector, test_file_vars=vars)
+
+
+@SkipIfLocal.multiple_impalad
+class TestRuntimeRowFilterReservation(ImpalaTestSuite):
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestRuntimeRowFilterReservation, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+      lambda v: v.get_value('table_format').file_format == 'parquet')
+    # Enable query option ASYNC_CODEGEN for slow build
+    if build_runs_slowly:
+      add_mandatory_exec_option(cls, "async_codegen", 1)
 
   def test_row_filter_reservation(self, vector):
     """Test handling of runtime filter memory reservations. Tuned for mt_dop=0."""
-    mt_dop = vector.get_value('mt_dop')
-    if mt_dop != 0:
-        pytest.skip("Memory reservations tuned for mt_dop=0")
-    new_vector = deepcopy(vector)
-    new_vector.get_value('exec_option')['mt_dop'] = mt_dop
-    self.run_test_case('QueryTest/runtime_row_filter_reservations', new_vector,
-                       test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS' : str(WAIT_TIME_MS)})
+    self.run_test_case('QueryTest/runtime_row_filter_reservations', vector,
+                       test_file_vars=DEFAULT_VARS)
+
+
+class TestRuntimeFiltersLateRemoteUpdate(ImpalaTestSuite):
+  """Test that distributed runtime filter aggregation still works
+  when remote filter update is late to reach the intermediate aggregator."""
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestRuntimeFiltersLateRemoteUpdate, cls).add_test_dimensions()
+    # Only run this test in parquet
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format in ['parquet'])
+    # Set mandatory query options.
+    add_mandatory_exec_option(cls, 'max_num_filters_aggregated_per_host', 2)
+    add_mandatory_exec_option(cls, 'runtime_filter_wait_time_ms', WAIT_TIME_MS // 10)
+    add_mandatory_exec_option(cls, 'debug_action',
+        'REMOTE_FILTER_UPDATE_DELAY:SLEEP@%d' % WAIT_TIME_MS)
+    # Enable query option ASYNC_CODEGEN for slow build
+    if build_runs_slowly:
+      add_mandatory_exec_option(cls, "async_codegen", 1)
+
+  def test_late_remote_update(self, vector):
+    """Test that a query with global filters does not wait for late remote filter
+    update."""
+    query = ('select count(*) from functional.alltypes p '
+             'join [SHUFFLE] functional.alltypestiny b '
+             'on p.month = b.int_col and b.month = 1 and b.string_col = "1"')
+    now = time.time()
+    exec_options = vector.get_value('exec_option')
+    result = self.execute_query_expect_success(self.client, query, exec_options)
+    assert result.data[0] == '620'
+    duration_s = time.time() - now
+    assert duration_s < (WAIT_TIME_MS / 1000), \
+        "Query took too long (%ss, possibly waiting for late filters?)" \
+        % str(duration_s)


(impala) 01/02: IMPALA-12205: Add support to STRUCT type Iceberg Metadata table columns

Posted by mi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4417fbccc219b0166674b650a3a98d8af6d4001d
Author: Tamas Mate <tm...@apache.org>
AuthorDate: Wed Dec 6 14:24:16 2023 +0100

    IMPALA-12205: Add support to STRUCT type Iceberg Metadata table columns
    
    As the slots have already been created on the frontend this change
    focuses on populating them on the backend side. There are two major
    parts of this commit. Obtaining the right Accessors for the slot and
    recursively filling the tuples with data.
    
    The field ids are present in the struct slot's ColumnType field as a
    list of integers. This list can be indexed with the correct element of
    the SchemaPath to obtain the field id for a struct member and with that
    the Accessor.
    
    Once the Accessors are available the IcebergRowReader's MaterializeTuple
    method can be called recursively to write the primitive slots of a
    struct slot.
    
    Testing:
     - Added E2E tests
    
    Change-Id: I953ad7253b270f2855bfcaee4ad023d1c4469273
    Reviewed-on: http://gerrit.cloudera.org:8080/20759
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Gabor Kaszab <ga...@cloudera.com>
---
 .../iceberg-metadata/iceberg-metadata-scan-node.cc | 73 +++++++++++++---
 .../iceberg-metadata/iceberg-metadata-scan-node.h  | 26 ++++--
 be/src/exec/iceberg-metadata/iceberg-row-reader.cc | 37 +++++++--
 be/src/exec/iceberg-metadata/iceberg-row-reader.h  | 19 +++--
 .../org/apache/impala/analysis/FromClause.java     | 13 +++
 .../java/org/apache/impala/analysis/SlotRef.java   | 43 ++++++----
 .../catalog/iceberg/IcebergMetadataTable.java      | 20 ++++-
 .../apache/impala/util/IcebergMetadataScanner.java |  6 +-
 .../PlannerTest/iceberg-metadata-table-scan.test   | 36 --------
 .../queries/QueryTest/iceberg-metadata-tables.test | 97 ++++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |  4 +-
 11 files changed, 280 insertions(+), 94 deletions(-)

diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
index 1e1ba76a5..f7f5c9a6d 100644
--- a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
@@ -83,22 +83,73 @@ Status IcebergMetadataScanNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jmetadata_scanner, &jmetadata_scanner_));
   RETURN_ERROR_IF_EXC(env);
   RETURN_IF_ERROR(ScanMetadataTable());
-  // Create field accessors
+  RETURN_IF_ERROR(CreateFieldAccessors());
+  return Status::OK();
+}
+
+Status IcebergMetadataScanNode::CreateFieldAccessors() {
+  JNIEnv* env = JniUtil::GetJNIEnv();
+  if (env == nullptr) return Status("Failed to get/create JVM");
   for (SlotDescriptor* slot_desc: tuple_desc_->slots()) {
-    jobject accessor_for_field = env->CallObjectMethod(jmetadata_scanner_,
-        get_accessor_, slot_desc->col_pos());
-    RETURN_ERROR_IF_EXC(env);
-    jobject accessor_for_field_global_ref;
-    RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, accessor_for_field,
-        &accessor_for_field_global_ref));
-    jaccessors_[slot_desc->col_pos()] = accessor_for_field_global_ref;
+    if (slot_desc->type().IsStructType()) {
+      // Get the top level struct's field id from the ColumnDescriptor then recursively
+      // get the field ids for struct fields
+      int field_id = tuple_desc_->table_desc()->GetColumnDesc(slot_desc).field_id();
+      RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, slot_desc->id()));
+      RETURN_IF_ERROR(CreateFieldAccessors(env, slot_desc));
+    } else if (slot_desc->col_path().size() > 1) {
+      DCHECK(!slot_desc->type().IsComplexType());
+      // Slot that is child of a struct without tuple, can occur when a struct member is
+      // in the select list. ColumnType has a tree structure, and this loop finds the
+      // STRUCT node that stores the primitive type. Because, that struct node has the
+      // field id list of its childs.
+      int root_type_index = slot_desc->col_path()[0];
+      ColumnType current_type =
+          tuple_desc_->table_desc()->col_descs()[root_type_index].type();
+      for (int i = 1; i < slot_desc->col_path().size() - 1; ++i) {
+        current_type = current_type.children[slot_desc->col_path()[i]];
+      }
+      int field_id = current_type.field_ids[slot_desc->col_path().back()];
+      RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, slot_desc->id()));
+    } else {
+      // For primitives in the top level tuple, use the ColumnDescriptor
+      int field_id = tuple_desc_->table_desc()->GetColumnDesc(slot_desc).field_id();
+      RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, slot_desc->id()));
+    }
   }
   return Status::OK();
 }
 
+Status IcebergMetadataScanNode::CreateFieldAccessors(JNIEnv* env,
+    const SlotDescriptor* struct_slot_desc) {
+  if (!struct_slot_desc->type().IsStructType()) return Status::OK();
+  const std::vector<int>& struct_field_ids = struct_slot_desc->type().field_ids;
+  for (SlotDescriptor* child_slot_desc:
+      struct_slot_desc->children_tuple_descriptor()->slots()) {
+    int field_id = struct_field_ids[child_slot_desc->col_path().back()];
+    RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, child_slot_desc->id()));
+    if (child_slot_desc->type().IsStructType()) {
+      RETURN_IF_ERROR(CreateFieldAccessors(env, child_slot_desc));
+    }
+  }
+  return Status::OK();
+}
+
+Status IcebergMetadataScanNode::AddAccessorForFieldId(JNIEnv* env, int field_id,
+    SlotId slot_id) {
+  jobject accessor_for_field = env->CallObjectMethod(jmetadata_scanner_,
+      get_accessor_, field_id);
+  RETURN_ERROR_IF_EXC(env);
+  jobject accessor_for_field_global_ref;
+  RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, accessor_for_field,
+      &accessor_for_field_global_ref));
+  jaccessors_[slot_id] = accessor_for_field_global_ref;
+  return Status::OK();
+}
+
 Status IcebergMetadataScanNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ScanNode::Open(state));
-  iceberg_row_reader_.reset(new IcebergRowReader(tuple_desc_, jaccessors_));
+  iceberg_row_reader_.reset(new IcebergRowReader(jaccessors_));
   return Status::OK();
 }
 
@@ -128,8 +179,8 @@ Status IcebergMetadataScanNode::GetNext(RuntimeState* state, RowBatch* row_batch
       return Status::OK();
     }
     // Translate a StructLikeRow from Iceberg to Tuple
-    RETURN_IF_ERROR(iceberg_row_reader_->MaterializeRow(env, struct_like_row, tuple,
-        row_batch->tuple_data_pool()));
+    RETURN_IF_ERROR(iceberg_row_reader_->MaterializeTuple(env, struct_like_row,
+        tuple_desc_, tuple, row_batch->tuple_data_pool()));
     env->DeleteLocalRef(struct_like_row);
     RETURN_ERROR_IF_EXC(env);
     COUNTER_ADD(rows_read_counter(), 1);
diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
index c994c7682..64f1d3aa5 100644
--- a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
@@ -100,12 +100,10 @@ class IcebergMetadataScanNode : public ScanNode {
 
   /// Accessor map for the scan result, pairs the slot ids with the java Accessor
   /// objects.
-  std::unordered_map<int, jobject> jaccessors_;
+  std::unordered_map<SlotId, jobject> jaccessors_;
 
-  /// Tuple id resolved in Prepare() to set 'tuple_desc_'.
-  TupleId tuple_id_;
-
-  /// Descriptor of tuples read from Iceberg metadata table.
+  // The TupleId and TupleDescriptor of the tuple that this scan node will populate.
+  const TupleId tuple_id_;
   const TupleDescriptor* tuple_desc_ = nullptr;
 
   /// Table and metadata table names.
@@ -121,6 +119,24 @@ class IcebergMetadataScanNode : public ScanNode {
 
   /// Gets the FeIceberg table from the Frontend.
   Status GetCatalogTable(jobject* jtable);
+
+  /// Populates the jaccessors_ map by creating the accessors for the columns in the JVM.
+  /// To create a field accessor for a column the Iceberg field id is needed. For
+  /// primitive type columns that are not a field of a struct, this can be found in the
+  /// ColumnDescriptor. However, ColumnDescriptors are not available for struct fields,
+  /// in this case the SlotDescriptor can be used.
+  Status CreateFieldAccessors();
+
+  /// Recursive part of the Accessor collection, when there is a struct in the tuple.
+  /// Collects the field ids of the struct members. The type_ field inside the struct slot
+  /// stores an ordered list of Iceberg Struct member field ids. This list can be indexed
+  /// with the last element of SchemaPath col_path to obtain the correct field id of the
+  /// struct member.
+  Status CreateFieldAccessors(JNIEnv* env, const SlotDescriptor* struct_slot_desc);
+
+  /// Helper method to simplify adding new accessors to the jaccessors_ map. It obtains
+  /// the Accessor through JNI and persists it into the jaccessors_ map.
+  Status AddAccessorForFieldId(JNIEnv* env, int field_id, SlotId slot_id);
 };
 
 }
diff --git a/be/src/exec/iceberg-metadata/iceberg-row-reader.cc b/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
index 219569ad5..d2f680222 100644
--- a/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
+++ b/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
@@ -24,10 +24,8 @@
 
 namespace impala {
 
-IcebergRowReader::IcebergRowReader(
-    const TupleDescriptor* tuple_desc, const std::unordered_map<int, jobject>& jaccessor)
-  : tuple_desc_(tuple_desc),
-    jaccessors_(jaccessor) {}
+IcebergRowReader::IcebergRowReader(const std::unordered_map<SlotId, jobject>& jaccessors)
+  : jaccessors_(jaccessors) {}
 
 Status IcebergRowReader::InitJNI() {
   DCHECK(iceberg_accessor_cl_ == nullptr) << "InitJNI() already called!";
@@ -65,15 +63,19 @@ Status IcebergRowReader::InitJNI() {
   return Status::OK();
 }
 
-Status IcebergRowReader::MaterializeRow(JNIEnv* env,
-    jobject struct_like_row, Tuple* tuple, MemPool* tuple_data_pool) {
+Status IcebergRowReader::MaterializeTuple(JNIEnv* env,
+    jobject struct_like_row, const TupleDescriptor* tuple_desc, Tuple* tuple,
+    MemPool* tuple_data_pool) {
   DCHECK(env != nullptr);
   DCHECK(struct_like_row != nullptr);
   DCHECK(tuple != nullptr);
   DCHECK(tuple_data_pool != nullptr);
-  for (SlotDescriptor* slot_desc: tuple_desc_->slots()) {
-    jobject accessed_value = env->CallObjectMethod(jaccessors_.at(slot_desc->col_pos()),
-        iceberg_accessor_get_, struct_like_row);
+  DCHECK(tuple_desc != nullptr);
+
+  for (SlotDescriptor* slot_desc: tuple_desc->slots()) {
+    jobject accessor = jaccessors_.at(slot_desc->id());
+    jobject accessed_value = env->CallObjectMethod(accessor, iceberg_accessor_get_,
+        struct_like_row);
     RETURN_ERROR_IF_EXC(env);
     if (accessed_value == nullptr) {
       tuple->SetNull(slot_desc->null_indicator_offset());
@@ -96,6 +98,10 @@ Status IcebergRowReader::MaterializeRow(JNIEnv* env,
       } case TYPE_STRING: { // java.lang.String
         RETURN_IF_ERROR(WriteStringSlot(env, accessed_value, slot, tuple_data_pool));
         break;
+      } case TYPE_STRUCT: {
+        RETURN_IF_ERROR(WriteStructSlot(env, struct_like_row, slot_desc, tuple,
+            tuple_data_pool));
+        break;
       }
       default:
         // Skip the unsupported type and set it to NULL
@@ -108,6 +114,7 @@ Status IcebergRowReader::MaterializeRow(JNIEnv* env,
 
 Status IcebergRowReader::WriteBooleanSlot(JNIEnv* env, jobject accessed_value,
     void* slot) {
+  DCHECK(accessed_value != nullptr);
   DCHECK(env->IsInstanceOf(accessed_value, java_boolean_cl_) == JNI_TRUE);
   jboolean result = env->CallBooleanMethod(accessed_value, boolean_value_);
   RETURN_ERROR_IF_EXC(env);
@@ -116,6 +123,7 @@ Status IcebergRowReader::WriteBooleanSlot(JNIEnv* env, jobject accessed_value,
 }
 
 Status IcebergRowReader::WriteIntSlot(JNIEnv* env, jobject accessed_value, void* slot) {
+  DCHECK(accessed_value != nullptr);
   DCHECK(env->IsInstanceOf(accessed_value, java_int_cl_) == JNI_TRUE);
   jint result = env->CallIntMethod(accessed_value, int_value_);
   RETURN_ERROR_IF_EXC(env);
@@ -124,6 +132,7 @@ Status IcebergRowReader::WriteIntSlot(JNIEnv* env, jobject accessed_value, void*
 }
 
 Status IcebergRowReader::WriteLongSlot(JNIEnv* env, jobject accessed_value, void* slot) {
+  DCHECK(accessed_value != nullptr);
   DCHECK(env->IsInstanceOf(accessed_value, java_long_cl_) == JNI_TRUE);
   jlong result = env->CallLongMethod(accessed_value, long_value_);
   RETURN_ERROR_IF_EXC(env);
@@ -133,6 +142,7 @@ Status IcebergRowReader::WriteLongSlot(JNIEnv* env, jobject accessed_value, void
 
 Status IcebergRowReader::WriteTimeStampSlot(JNIEnv* env, jobject accessed_value,
     void* slot) {
+  DCHECK(accessed_value != nullptr);
   DCHECK(env->IsInstanceOf(accessed_value, java_long_cl_) == JNI_TRUE);
   jlong result = env->CallLongMethod(accessed_value, long_value_);
   RETURN_ERROR_IF_EXC(env);
@@ -143,6 +153,7 @@ Status IcebergRowReader::WriteTimeStampSlot(JNIEnv* env, jobject accessed_value,
 
 Status IcebergRowReader::WriteStringSlot(JNIEnv* env, jobject accessed_value, void* slot,
       MemPool* tuple_data_pool) {
+  DCHECK(accessed_value != nullptr);
   DCHECK(env->IsInstanceOf(accessed_value, java_char_sequence_cl_) == JNI_TRUE);
   jstring result = static_cast<jstring>(env->CallObjectMethod(accessed_value,
       char_sequence_to_string_));
@@ -162,4 +173,12 @@ Status IcebergRowReader::WriteStringSlot(JNIEnv* env, jobject accessed_value, vo
   return Status::OK();
 }
 
+Status IcebergRowReader::WriteStructSlot(JNIEnv* env, jobject struct_like_row,
+    SlotDescriptor* slot_desc, Tuple* tuple, MemPool* tuple_data_pool) {
+  DCHECK(slot_desc != nullptr);
+  RETURN_IF_ERROR(MaterializeTuple(env, struct_like_row,
+      slot_desc->children_tuple_descriptor(), tuple, tuple_data_pool));
+  return Status::OK();
+}
+
 }
\ No newline at end of file
diff --git a/be/src/exec/iceberg-metadata/iceberg-row-reader.h b/be/src/exec/iceberg-metadata/iceberg-row-reader.h
index 15c71a038..51395df20 100644
--- a/be/src/exec/iceberg-metadata/iceberg-row-reader.h
+++ b/be/src/exec/iceberg-metadata/iceberg-row-reader.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include "common/global-types.h"
+
 #include <jni.h>
 #include <unordered_map>
 
@@ -33,16 +35,15 @@ class TupleDescriptor;
 class IcebergRowReader {
  public:
   /// Initialize the tuple descriptor and accessors
-  IcebergRowReader(const TupleDescriptor* tuple_desc,
-      const std::unordered_map<int, jobject>& jaccessors);
+  IcebergRowReader(const std::unordered_map<SlotId, jobject>& jaccessors);
 
   /// JNI setup. Create global references for Java classes and find method ids.
   /// Initializes static members, should be called once per process lifecycle.
   static Status InitJNI();
 
-  /// Materlilize the StructLike Java objects into Impala rows.
-  Status MaterializeRow(JNIEnv* env, jobject struct_like_row, Tuple* tuple,
-      MemPool* tuple_data_pool);
+  /// Materialize the StructLike Java objects into Impala rows.
+  Status MaterializeTuple(JNIEnv* env, jobject struct_like_row,
+      const TupleDescriptor* tuple_desc, Tuple* tuple,  MemPool* tuple_data_pool);
 
  private:
   /// Global class references created with JniUtil.
@@ -62,12 +63,9 @@ class IcebergRowReader {
   inline static jmethodID long_value_ = nullptr;
   inline static jmethodID char_sequence_to_string_ = nullptr;
 
-  /// TupleDescriptor received from the ScanNode.
-  const TupleDescriptor* tuple_desc_;
-
   /// Accessor map for the scan result, pairs the slot ids with the java Accessor
   /// objects.
-  const std::unordered_map<int, jobject> jaccessors_;
+  const std::unordered_map<SlotId, jobject> jaccessors_;
 
   /// Reads the value of a primitive from the StructLike, translates it to a matching
   /// Impala type and writes it into the target tuple. The related Accessor objects are
@@ -82,6 +80,9 @@ class IcebergRowReader {
   /// and reclaims the memory area.
   Status WriteStringSlot(JNIEnv* env, jobject accessed_value, void* slot,
       MemPool* tuple_data_pool);
+  /// Recursively calls MaterializeTuple method with the child tuple of the struct slot.
+  Status WriteStructSlot(JNIEnv* env, jobject struct_like_row, SlotDescriptor* slot_desc,
+      Tuple* tuple, MemPool* tuple_data_pool);
 };
 
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/FromClause.java b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
index b61951592..656c609c4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FromClause.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
@@ -93,6 +93,7 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
       tblRef.analyze(analyzer);
       leftTblRef = tblRef;
       if (tblRef instanceof CollectionTableRef) {
+        checkIcebergCollectionSupport((CollectionTableRef)tblRef);
         checkTopLevelComplexAcidScan(analyzer, (CollectionTableRef)tblRef);
         if (firstZippingUnnestRef != null && tblRef.isZippingUnnest() &&
             firstZippingUnnestRef.getResolvedPath().getRootTable() !=
@@ -161,6 +162,18 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
     analyzer.setHasTopLevelAcidCollectionTableRef();
   }
 
+  private void checkIcebergCollectionSupport(CollectionTableRef tblRef)
+      throws AnalysisException {
+    Preconditions.checkNotNull(tblRef);
+    Preconditions.checkNotNull(tblRef.getDesc());
+    Preconditions.checkNotNull(tblRef.getDesc().getPath());
+    Preconditions.checkNotNull(tblRef.getDesc().getPath().getRootTable());
+    if (tblRef.getDesc().getPath().getRootTable() instanceof IcebergMetadataTable) {
+      throw new AnalysisException("Querying collection types (ARRAY/MAP) is not " +
+          "supported for Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)");
+    }
+  }
+
   @Override
   public FromClause clone() {
     List<TableRef> clone = new ArrayList<>();
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index 65e74d098..33aef0458 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -29,6 +29,7 @@ import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.TypeCompatibility;
+import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.UnsupportedFeatureException;
 import org.apache.impala.thrift.TExprNode;
@@ -226,29 +227,37 @@ public class SlotRef extends Expr {
     if (resolvedPath_ != null) {
       FeTable rootTable = resolvedPath_.getRootTable();
       if (rootTable != null) {
-        if (!(rootTable instanceof FeFsTable)) {
-          throw new AnalysisException(
-              String.format("%s is not supported when querying STRUCT type %s", rootTable,
-                  type_.toSql()));
-        }
-        FeFsTable feTable = (FeFsTable) rootTable;
-        for (HdfsFileFormat format : feTable.getFileFormats()) {
-          if (!formatSupportsQueryingStruct(format)) {
-            throw new AnalysisException("Querying STRUCT is only supported for ORC and "
-                + "Parquet file formats.");
-          }
+        checkTableTypeSupportsStruct(rootTable);
+        if (rootTable instanceof FeFsTable) {
+          checkFileFormatSupportsStruct((FeFsTable)rootTable);
         }
       }
     }
   }
 
-  // Returns true if the given HdfsFileFormat supports querying STRUCT types. Iceberg
-  // tables also have ICEBERG as HdfsFileFormat. We can return TRUE in case of Iceberg
-  // because the data file formats in the Iceberg table will be also tested separately.
-  private static boolean formatSupportsQueryingStruct(HdfsFileFormat format) {
-    return format == HdfsFileFormat.PARQUET ||
+  private void checkTableTypeSupportsStruct(FeTable feTable) throws AnalysisException {
+    if (!(feTable instanceof FeFsTable) &&
+        !(feTable instanceof IcebergMetadataTable)) {
+      throw new AnalysisException(
+          String.format("%s is not supported when querying STRUCT type %s",
+              feTable, type_.toSql()));
+    }
+  }
+
+  // Throws exception if the given HdfsFileFormat does not support querying STRUCT types.
+  // Iceberg tables also have ICEBERG as HdfsFileFormat. In case of Iceberg there is no
+  // need to throw exception because the data file formats in the Iceberg table will be
+  // also tested separately.
+  private void checkFileFormatSupportsStruct(FeFsTable feFsTable)
+      throws AnalysisException {
+    for (HdfsFileFormat format : feFsTable.getFileFormats()) {
+      if (! (format == HdfsFileFormat.PARQUET ||
            format == HdfsFileFormat.ORC ||
-           format == HdfsFileFormat.ICEBERG;
+           format == HdfsFileFormat.ICEBERG)) {
+        throw new AnalysisException("Querying STRUCT is only supported for ORC and "
+            + "Parquet file formats.");
+      }
+    }
   }
 
   // Assumes this 'SlotRef' is a struct and that desc_.itemTupleDesc_ has already been
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
index a92b07471..877b31a46 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
@@ -26,14 +26,20 @@ import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.impala.analysis.TableName;
+import org.apache.impala.catalog.CatalogObject.ThriftObjectType;
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.VirtualTable;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TColumnDescriptor;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.IcebergSchemaConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -44,6 +50,7 @@ import com.google.common.base.Preconditions;
  * table object based on the Iceberg API.
  */
 public class IcebergMetadataTable extends VirtualTable {
+  private final static Logger LOG = LoggerFactory.getLogger(IcebergMetadataTable.class);
 
   // The Iceberg table that is the base of the metadata table.
   private FeIcebergTable baseTable_;
@@ -64,6 +71,8 @@ public class IcebergMetadataTable extends VirtualTable {
     Schema metadataTableSchema = metadataTable.schema();
     for (Column col : IcebergSchemaConverter.convertToImpalaSchema(
         metadataTableSchema)) {
+      LOG.trace("Adding column: \"{}\" with type: \"{}\" to metadata table.",
+          col.getName(), col.getType());
       addColumn(col);
     }
   }
@@ -111,16 +120,23 @@ public class IcebergMetadataTable extends VirtualTable {
   @Override
   public TTableDescriptor toThriftDescriptor(int tableId,
       Set<Long> referencedPartitions) {
-    TTableDescriptor desc = baseTable_.toThriftDescriptor(tableId, referencedPartitions);
+    TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.ICEBERG_TABLE,
+        getTColumnDescriptors(), numClusteringCols_, name_, db_.getName());
+    desc.setIcebergTable(FeIcebergTable.Utils.getTIcebergTable(baseTable_,
+        ThriftObjectType.DESCRIPTOR_ONLY));
     return desc;
   }
 
+  private List<TColumnDescriptor> getTColumnDescriptors() {
+    return FeCatalogUtils.getTColumnDescriptors(this);
+  }
+
   /**
    * Returns true if the table ref is referring to a valid metadata table.
    */
   public static boolean isIcebergMetadataTable(List<String> tblRefPath) {
     if (tblRefPath == null) return false;
-    if (tblRefPath.size() != 3) return false;
+    if (tblRefPath.size() < 3) return false;
     String vTableName = tblRefPath.get(2).toUpperCase();
     return EnumUtils.isValidEnum(MetadataTableType.class, vTableName);
   }
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java b/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
index a2f7c5b23..18f3d8981 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
@@ -89,11 +89,10 @@ public class IcebergMetadataScanner {
   }
 
   /**
-   * Returns the field {Accessor} for the specified column position. This {Accessor} is
+   * Returns the field {Accessor} for the specified field id. This {Accessor} then is
    * used to access a field in the {StructLike} object.
    */
-  public Accessor GetAccessor(int slotColPos) {
-    int fieldId = metadataTable_.schema().columns().get(slotColPos).fieldId();
+  public Accessor GetAccessor(int fieldId) {
     return metadataTable_.schema().accessorForField(fieldId);
   }
 
@@ -112,4 +111,5 @@ public class IcebergMetadataScanner {
     }
     return null;
   }
+
 }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test
index a605c94b8..2294b034b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test
@@ -106,40 +106,4 @@ PLAN-ROOT SINK
 |
 00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q]
    row-size=8B cardinality=unavailable
-====
-select * from functional_parquet.iceberg_alltypes_part_orc.manifests a, a.partition_summaries
----- PLAN
-PLAN-ROOT SINK
-|
-01:SUBPLAN
-|  row-size=98B cardinality=unavailable
-|
-|--04:NESTED LOOP JOIN [CROSS JOIN]
-|  |  row-size=98B cardinality=10
-|  |
-|  |--02:SINGULAR ROW SRC
-|  |     row-size=72B cardinality=1
-|  |
-|  03:UNNEST [a.partition_summaries]
-|     row-size=0B cardinality=10
-|
-00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.MANIFESTS a]
-   row-size=72B cardinality=unavailable
----- DISTRIBUTEDPLAN
-PLAN-ROOT SINK
-|
-01:SUBPLAN
-|  row-size=98B cardinality=unavailable
-|
-|--04:NESTED LOOP JOIN [CROSS JOIN]
-|  |  row-size=98B cardinality=10
-|  |
-|  |--02:SINGULAR ROW SRC
-|  |     row-size=72B cardinality=1
-|  |
-|  03:UNNEST [a.partition_summaries]
-|     row-size=0B cardinality=10
-|
-00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.MANIFESTS a]
-   row-size=72B cardinality=unavailable
 ====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
index 5a9928451..c34f3aacc 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
@@ -453,4 +453,101 @@ ParseException: Syntax error in line 1
 alter table functional_parquet.iceberg_query_metadata.snapshots add columns (col int);
 ---- CATCH
 ParseException: Syntax error in line 1
+====
+
+####
+# Test 9 : Query STRUCT type columns
+####
+====
+---- QUERY
+select readable_metrics from functional_parquet.iceberg_query_metadata.entries;
+---- RESULTS
+'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":3,"upper_bound":3}}'
+'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":2,"upper_bound":2}}'
+'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":1,"upper_bound":1}}'
+'{"i":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null}}'
+---- TYPES
+STRING
+====
+---- QUERY
+select snapshot_id, readable_metrics from functional_parquet.iceberg_query_metadata.entries;
+---- RESULTS
+row_regex:[1-9]\d*|0,'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":3,"upper_bound":3}}'
+row_regex:[1-9]\d*|0,'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":2,"upper_bound":2}}'
+row_regex:[1-9]\d*|0,'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":1,"upper_bound":1}}'
+row_regex:[1-9]\d*|0,'{"i":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null}}'
+---- TYPES
+BIGINT,STRING
+====
+---- QUERY
+select snapshot_id, readable_metrics.i.lower_bound as lower_bound from functional_parquet.iceberg_query_metadata.entries;
+---- RESULTS
+row_regex:[1-9]\d*|0,3
+row_regex:[1-9]\d*|0,2
+row_regex:[1-9]\d*|0,1
+row_regex:[1-9]\d*|0,'NULL'
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+select snapshot_id, readable_metrics.i.lower_bound as lower_bound from functional_parquet.iceberg_query_metadata.entries
+order by lower_bound;
+---- RESULTS
+row_regex:[1-9]\d*|0,1
+row_regex:[1-9]\d*|0,2
+row_regex:[1-9]\d*|0,3
+row_regex:[1-9]\d*|0,'NULL'
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+select SUM(readable_metrics.i.lower_bound) from functional_parquet.iceberg_query_metadata.entries;
+---- RESULTS
+6
+---- TYPES
+BIGINT
+====
+---- QUERY
+select all_ent.data_file.file_path, ent.readable_metrics.i.lower_bound
+from functional_parquet.iceberg_query_metadata.entries ent
+join functional_parquet.iceberg_query_metadata.all_entries all_ent
+on ent.snapshot_id = all_ent.snapshot_id
+order by ent.readable_metrics.i.lower_bound;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq',1
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq',2
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq',3
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq',NULL
+---- TYPES
+STRING,INT
+====
+---- QUERY
+select i from functional_parquet.iceberg_query_metadata.entries.readable_metrics;
+---- CATCH
+AnalysisException: Illegal table reference to non-collection type: 'functional_parquet.iceberg_query_metadata.entries.readable_metrics'
+====
+---- QUERY
+select delete_ids.item
+from functional_parquet.iceberg_query_metadata.all_files, functional_parquet.iceberg_query_metadata.all_files.equality_ids delete_ids;
+---- CATCH
+AnalysisException: Querying collection types (ARRAY/MAP) is not supported for Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
+====
+---- QUERY
+select null_value_counts.key, null_value_counts.value
+from functional_parquet.iceberg_query_metadata.all_files, functional_parquet.iceberg_query_metadata.all_files.null_value_counts null_value_counts;
+---- CATCH
+AnalysisException: Querying collection types (ARRAY/MAP) is not supported for Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
+====
+---- QUERY
+select item
+from functional_parquet.iceberg_query_metadata.all_files a, a.equality_ids e, e.delete_ids;
+---- CATCH
+AnalysisException: Querying collection types (ARRAY/MAP) is not supported for Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
+====
+---- QUERY
+create view iceberg_query_metadata_all_files
+as select equality_ids from functional_parquet.iceberg_query_metadata.all_files;
+select item from iceberg_query_metadata_all_files a, a.equality_ids e, e.delete_ids;
+---- CATCH
+AnalysisException: Querying collection types (ARRAY/MAP) is not supported for Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
 ====
\ No newline at end of file
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 557334544..3524e68ea 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1291,7 +1291,7 @@ class TestIcebergV2Table(IcebergTestSuite):
         use_db="functional_parquet")
 
   @SkipIf.hardcoded_uris
-  def test_metadata_tables(self, vector):
+  def test_metadata_tables(self, vector, unique_database):
     with self.create_impala_client() as impalad_client:
       overwrite_snapshot_id = impalad_client.execute("select snapshot_id from "
                              "functional_parquet.iceberg_query_metadata.snapshots "
@@ -1300,7 +1300,7 @@ class TestIcebergV2Table(IcebergTestSuite):
                              "functional_parquet.iceberg_query_metadata.snapshots "
                              "where operation = 'overwrite';")
       self.run_test_case('QueryTest/iceberg-metadata-tables', vector,
-          use_db="functional_parquet",
+          unique_database,
           test_file_vars={'$OVERWRITE_SNAPSHOT_ID': str(overwrite_snapshot_id.data[0]),
                           '$OVERWRITE_SNAPSHOT_TS': str(overwrite_snapshot_ts.data[0])})