You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by cs...@apache.org on 2019/08/14 09:59:38 UTC

[impala] 02/03: IMPALA-8791: Handle the case where there is no fragment scheduled on the coordinator

This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e0c0fe988ac7c0c4b274c472ec544965d19c6184
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Fri Aug 2 09:45:51 2019 -0700

    IMPALA-8791: Handle the case where there is no fragment scheduled on
    the coordinator
    
    This patch fixes a bug where if an insert or CTAS query has no
    fragments scheduled on the coordinator and a mem limit is to be
    enforced on the query (either through query option or automatically
    through estimates) then the same limit is also applied to the
    coordinator backend even though it does not execute anything.
    
    Highlights:
    - coord_backend_mem_to_admit_/mem_limit will always refer to the memory
    to admit/limit for the coordinator regardless of which fragments are
    scheduled on it.
    
    - There will always be a BackendExecParams added for the coordinator
    because coordinator always spawns a QueryState object with a mem_tracker
    for tracking runtime filter mem and the result set cache. For the case
    where this BackendExecParams is empty (no instances scheduled) it would
    ensure that some minimal amount of memory is accounted for by the
    admission controller and the right mem limit is applied to the
    QueryState spawned by the coordinator
    
    - added changes to Coordinator and Coordinator::BackendState classes
    to handle an empty BackendExecParams object
    
    Testing:
    The following cases need to be tested where the kind of fragments
    schduled on the coordinator backend are:
    1. Coordinator fragment + other exec fragments
    2. Coordinator fragment only
    3. other exec fragments only (eg. insert into values OR insert
       into select 1)
    4. No fragments, but coordinator still creates a QueryState
    
    Case 1 is covered by tests working with non-dedicated coordinators.
    Rest are covered by test_dedicated_coordinator_planner_estimates and
    test_sanity_checks_dedicated_coordinator in
    test_admission_controller.py
    
    Change-Id: Ic4fba02bb7b20553a20634f8c5989d43ba2c0721
    Reviewed-on: http://gerrit.cloudera.org:8080/14058
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc        | 14 ++++-
 be/src/runtime/coordinator-backend-state.h         |  8 ++-
 be/src/runtime/coordinator.cc                      | 12 ++---
 be/src/runtime/coordinator.h                       |  4 +-
 be/src/scheduling/admission-controller-test.cc     | 33 ++++++++----
 be/src/scheduling/admission-controller.cc          | 61 ++++++++++------------
 be/src/scheduling/query-schedule.cc                | 57 +++++++++++---------
 be/src/scheduling/query-schedule.h                 | 43 +++++++--------
 be/src/scheduling/scheduler.cc                     | 15 +++---
 be/src/scheduling/scheduler.h                      |  5 +-
 .../java/org/apache/impala/planner/Planner.java    | 16 +++---
 .../QueryTest/dedicated-coord-mem-estimates.test   |  8 +--
 tests/custom_cluster/test_admission_controller.py  | 16 +++++-
 13 files changed, 167 insertions(+), 125 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 028dd70..f3d81a9 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -69,8 +69,8 @@ void Coordinator::BackendState::Init(
     const BackendExecParams& exec_params, const vector<FragmentStats*>& fragment_stats,
     RuntimeProfile* host_profile_parent, ObjectPool* obj_pool) {
   backend_exec_params_ = &exec_params;
-  host_ = backend_exec_params_->instance_params[0]->host;
-  krpc_host_ = backend_exec_params_->instance_params[0]->krpc_host;
+  host_ = backend_exec_params_->be_desc.address;
+  krpc_host_ = backend_exec_params_->be_desc.krpc_address;
   num_remaining_instances_ = backend_exec_params_->instance_params.size();
 
   host_profile_ = RuntimeProfile::Create(obj_pool, TNetworkAddressToString(host_));
@@ -187,6 +187,14 @@ void Coordinator::BackendState::Exec(
     last_report_time_ms_ = GenerateReportTimestamp();
     exec_complete_barrier->Notify();
   });
+
+  // Do not issue an ExecQueryFInstances RPC if there are no fragment instances
+  // scheduled to run on this backend.
+  if (IsEmptyBackend()) {
+    DCHECK(backend_exec_params_->is_coord_backend);
+    return;
+  }
+
   std::unique_ptr<ControlServiceProxy> proxy;
   Status get_proxy_status = ControlService::GetProxy(krpc_host_, host_.hostname, &proxy);
   if (!get_proxy_status.ok()) {
@@ -349,6 +357,7 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
     const ReportExecStatusRequestPB& backend_exec_status,
     const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary,
     ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state) {
+  DCHECK(!IsEmptyBackend());
   // Hold the exec_summary's lock to avoid exposing it half-way through
   // the update loop below.
   lock_guard<SpinLock> l1(exec_summary->lock);
@@ -453,6 +462,7 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
 
 void Coordinator::BackendState::UpdateHostProfile(
     const TRuntimeProfileTree& thrift_profile) {
+  DCHECK(!IsEmptyBackend());
   host_profile_->Update(thrift_profile);
 }
 
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 4c41c8c..4b0cfad 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -52,7 +52,9 @@ class ExecSummary;
 struct FInstanceExecParams;
 
 /// This class manages all aspects of the execution of all fragment instances of a
-/// single query on a particular backend.
+/// single query on a particular backend. For the coordinator backend its possible to have
+/// no fragment instances scheduled on it. In that case, no RPCs are issued and the
+/// Backend state transitions to 'done' state right after Exec() is called on it.
 /// Thread-safe unless pointed out otherwise.
 class Coordinator::BackendState {
  public:
@@ -72,6 +74,7 @@ class Coordinator::BackendState {
   /// notifies on rpc_complete_barrier when the rpc completes. Success/failure is
   /// communicated through GetStatus(). Uses filter_routing_table to remove filters
   /// that weren't selected during its construction.
+  /// No RPC is issued if there are no fragment instances scheduled on this backend.
   /// The debug_options are applied to the appropriate TPlanFragmentInstanceCtxs, based
   /// on their node_id/instance_idx.
   void Exec(const DebugOptions& debug_options,
@@ -159,6 +162,9 @@ class Coordinator::BackendState {
   /// Returns a timestamp using monotonic time for tracking arrival of status reports.
   static int64_t GenerateReportTimestamp() { return MonotonicMillis(); }
 
+  /// Returns True if there are no fragment instances scheduled on this backend.
+  bool IsEmptyBackend() { return backend_exec_params_->instance_params.empty(); }
+
  private:
   /// Execution stats for a single fragment instance.
   /// Not thread-safe.
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index bd4fb75..4df15d9 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -111,14 +111,8 @@ Status Coordinator::Exec() {
   const string& str = Substitute("Query $0", PrintId(query_id()));
   progress_.Init(str, schedule_.num_scan_ranges());
 
-  // If there is no coord fragment then pick the mem limit computed for an executor.
-  // TODO: IMPALA-8791: make sure minimal or no limit is imposed for cases where no
-  // fragments are scheduled to run on the coordinator backend.
-  int64_t coord_mem_limit = schedule_.requiresCoordinatorFragment() ?
-      schedule_.coord_backend_mem_limit() :
-      schedule_.per_backend_mem_limit();
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
-      query_ctx(), coord_mem_limit);
+      query_ctx(), schedule_.coord_backend_mem_limit());
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
       -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
 
@@ -411,6 +405,10 @@ Status Coordinator::FinishBackendStartup() {
       max_latency_host = TNetworkAddressToString(backend_state->impalad_address());
     }
     latencies.Update(backend_state->rpc_latency());
+    // Mark backend complete if no fragment instances were assigned to it.
+    if (backend_state->IsEmptyBackend()) {
+      backend_exec_complete_barrier_->Notify();
+    }
   }
   query_profile_->AddInfoString(
       "Backend startup latencies", latencies.ToHumanReadable());
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 9098dd0..72ab3af 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -482,7 +482,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   /// Helper for Exec(). Checks for errors encountered when starting backend execution,
   /// using any non-OK status, if any, as the overall status. Returns the overall
-  /// status. Also updates query_profile_ with the startup latency histogram.
+  /// status. Also updates query_profile_ with the startup latency histogram and the
+  /// backend_exec_complete_barrier_ if there is any backend which is already done (only
+  /// possible at this point if no fragment instances were assigned to it).
   Status FinishBackendStartup() WARN_UNUSED_RESULT;
 
   /// Build the filter routing table by iterating over all plan nodes and collecting the
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
index a241525..17c8457 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -97,11 +97,10 @@ class AdmissionControllerTest : public testing::Test {
     TQueryOptions* query_options = pool_.Add(new TQueryOptions());
     query_options->__set_mem_limit(mem_limit);
     QuerySchedule* query_schedule = pool_.Add(new QuerySchedule(
-        *query_id, *request, *query_options, is_dedicated_coord, profile));
+        *query_id, *request, *query_options, profile));
     query_schedule->set_executor_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
+    SetHostsInQuerySchedule(*query_schedule, num_hosts, is_dedicated_coord);
     query_schedule->UpdateMemoryRequirements(config);
-
-    SetHostsInQuerySchedule(*query_schedule, num_hosts);
     return query_schedule;
   }
 
@@ -113,14 +112,24 @@ class AdmissionControllerTest : public testing::Test {
   }
 
   /// Replace the per-backend hosts in the schedule with one having 'count' hosts.
+  /// Note: no FInstanceExecParams are added so
+  /// QuerySchedule::UseDedicatedCoordEstimates() would consider this schedule as not
+  /// having anything scheduled on the backend which would result in always returning true
+  /// if a dedicated coordinator backend exists.
   void SetHostsInQuerySchedule(QuerySchedule& query_schedule, const int count,
-      int64_t min_mem_reservation_bytes = 0, int64_t admit_mem_limit = 200L * MEGABYTE) {
+      bool is_dedicated_coord, int64_t min_mem_reservation_bytes = 0,
+      int64_t admit_mem_limit = 200L * MEGABYTE) {
     PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
     for (int i = 0; i < count; ++i) {
       BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
-      backend_exec_params->admit_mem_limit = admit_mem_limit;
       backend_exec_params->min_mem_reservation_bytes = min_mem_reservation_bytes;
-      if (i == 0) backend_exec_params->contains_coord_fragment = true;
+      backend_exec_params->be_desc.__set_admit_mem_limit(admit_mem_limit);
+      backend_exec_params->be_desc.__set_is_executor(true);
+      if (i == 0) {
+        // Add first element as the coordinator.
+        backend_exec_params->is_coord_backend = true;
+        backend_exec_params->be_desc.__set_is_executor(!is_dedicated_coord);
+      }
       const string host_name = Substitute("host$0", i);
       per_backend_exec_params->emplace(
           MakeNetworkAddress(host_name, 25000), *backend_exec_params);
@@ -479,7 +488,7 @@ TEST_F(AdmissionControllerTest, QueryRejection) {
 
   // Adjust the QuerySchedule to have minimum memory reservation of 45MB.
   // This will be rejected immediately as minimum memory reservation is too high.
-  SetHostsInQuerySchedule(*query_schedule, host_count, 45L * MEGABYTE);
+  SetHostsInQuerySchedule(*query_schedule, host_count, false, 45L * MEGABYTE);
   string rejected_reserved_reason;
   ASSERT_TRUE(admission_controller->RejectForSchedule(
       *query_schedule, config_d, host_count, host_count, &rejected_reserved_reason));
@@ -799,17 +808,19 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
   // Add coordinator backend.
   BackendExecParams* coord_exec_params = pool_.Add(new BackendExecParams());
-  coord_exec_params->admit_mem_limit = 512 * MEGABYTE;
-  coord_exec_params->contains_coord_fragment = true;
+  coord_exec_params->is_coord_backend = true;
   coord_exec_params->thread_reservation = 1;
+  coord_exec_params->be_desc.__set_admit_mem_limit(512 * MEGABYTE);
+  coord_exec_params->be_desc.__set_is_executor(false);
   const string coord_host_name = Substitute("host$0", 1);
   TNetworkAddress coord_addr = MakeNetworkAddress(coord_host_name, 25000);
   const string coord_host = TNetworkAddressToString(coord_addr);
   per_backend_exec_params->emplace(coord_addr, *coord_exec_params);
   // Add executor backend.
   BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
-  backend_exec_params->admit_mem_limit = GIGABYTE;
   backend_exec_params->thread_reservation = 1;
+  backend_exec_params->be_desc.__set_admit_mem_limit(GIGABYTE);
+  backend_exec_params->be_desc.__set_is_executor(true);
   const string exec_host_name = Substitute("host$0", 2);
   TNetworkAddress exec_addr = MakeNetworkAddress(exec_host_name, 25000);
   const string exec_host = TNetworkAddressToString(exec_addr);
@@ -853,7 +864,7 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   // Test 2: coord's admit_mem_limit < executor's admit_mem_limit. Query rejected because
   // coord's admit_mem_limit is less than mem_to_admit on the coord.
   // Re-using previous QuerySchedule object.
-  coord_exec_params->admit_mem_limit = 100 * MEGABYTE;
+  coord_exec_params->be_desc.__set_admit_mem_limit(100 * MEGABYTE);
   (*per_backend_exec_params)[coord_addr] = *coord_exec_params;
   query_schedule->set_per_backend_exec_params(*per_backend_exec_params);
   ASSERT_TRUE(admission_controller->RejectForSchedule(
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index d31c50b..666659b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -56,8 +56,6 @@ DEFINE_int64_hidden(admission_control_stale_topic_threshold_ms, 5 * 1000,
     "capture most cases where the Impala daemon is disconnected from the statestore "
     "or topic updates are seriously delayed.");
 
-DECLARE_bool(is_executor);
-
 namespace impala {
 
 const int64_t AdmissionController::PoolStats::HISTOGRAM_NUM_OF_BINS = 128;
@@ -398,7 +396,7 @@ void AdmissionController::UpdateHostStats(
   int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host_addr = entry.first;
-    int64_t mem_to_admit = entry.second.contains_coord_fragment ?
+    int64_t mem_to_admit = entry.second.is_coord_backend ?
         schedule.coord_backend_mem_to_admit() : per_backend_mem_to_admit;
     if (!is_admitting) mem_to_admit *= -1;
     const string host = TNetworkAddressToString(host_addr);
@@ -441,9 +439,8 @@ bool AdmissionController::CanAccommodateMaxInitialReservation(
   const int64_t coord_min_reservation = schedule.coord_min_reservation();
   return CanMemLimitAccommodateReservation(
              executor_mem_limit, executor_min_reservation, mem_unavailable_reason)
-      && (!schedule.requiresCoordinatorFragment()
-             || CanMemLimitAccommodateReservation(
-                    coord_mem_limit, coord_min_reservation, mem_unavailable_reason));
+      && CanMemLimitAccommodateReservation(
+             coord_mem_limit, coord_min_reservation, mem_unavailable_reason);
 }
 
 bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule,
@@ -483,11 +480,11 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
-    int64_t admit_mem_limit = entry.second.admit_mem_limit;
+    int64_t admit_mem_limit = entry.second.be_desc.admit_mem_limit;
     const HostStats& host_stats = host_stats_[host_id];
     int64_t mem_reserved = host_stats.mem_reserved;
     int64_t mem_admitted = host_stats.mem_admitted;
-    int64_t mem_to_admit = entry.second.contains_coord_fragment ?
+    int64_t mem_to_admit = entry.second.is_coord_backend ?
         coord_mem_to_admit : executor_mem_to_admit;
     VLOG_ROW << "Checking memory on host=" << host_id
              << " mem_reserved=" << PrintBytes(mem_reserved)
@@ -519,7 +516,7 @@ bool AdmissionController::HasAvailableSlot(const QuerySchedule& schedule,
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
-    int64_t admit_num_queries_limit = entry.second.admit_num_queries_limit;
+    int64_t admit_num_queries_limit = entry.second.be_desc.admit_num_queries_limit;
     int64_t num_admitted = host_stats_[host_id].num_admitted;
     VLOG_ROW << "Checking available slot on host=" << host_id
              << " num_admitted=" << num_admitted << " needs=" << num_admitted + 1
@@ -640,21 +637,21 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
       nullptr, std::numeric_limits<int64_t>::max());
   int64_t cluster_thread_reservation = 0;
   for (const auto& e : schedule.per_backend_exec_params()) {
-    cluster_min_mem_reservation_bytes += e.second.min_mem_reservation_bytes;
-    if (e.second.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
-      largest_min_mem_reservation =
-          make_pair(&e.first, e.second.min_mem_reservation_bytes);
+    const BackendExecParams& bp = e.second;
+    cluster_min_mem_reservation_bytes += bp.min_mem_reservation_bytes;
+    if (bp.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
+      largest_min_mem_reservation = make_pair(&e.first, bp.min_mem_reservation_bytes);
     }
-    cluster_thread_reservation += e.second.thread_reservation;
-    if (e.second.thread_reservation > max_thread_reservation.second) {
-      max_thread_reservation = make_pair(&e.first, e.second.thread_reservation);
+    cluster_thread_reservation += bp.thread_reservation;
+    if (bp.thread_reservation > max_thread_reservation.second) {
+      max_thread_reservation = make_pair(&e.first, bp.thread_reservation);
     }
-    if (e.second.contains_coord_fragment) {
+    if (bp.is_coord_backend) {
       coord_admit_mem_limit.first = &e.first;
-      coord_admit_mem_limit.second = e.second.admit_mem_limit;
-    } else if (e.second.admit_mem_limit < min_executor_admit_mem_limit.second) {
+      coord_admit_mem_limit.second = bp.be_desc.admit_mem_limit;
+    } else if (bp.be_desc.admit_mem_limit < min_executor_admit_mem_limit.second) {
       min_executor_admit_mem_limit.first = &e.first;
-      min_executor_admit_mem_limit.second = e.second.admit_mem_limit;
+      min_executor_admit_mem_limit.second = bp.be_desc.admit_mem_limit;
     }
   }
 
@@ -724,17 +721,15 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
               TNetworkAddressToString(*min_executor_admit_mem_limit.first));
       return true;
     }
-    if (schedule.requiresCoordinatorFragment()) {
-      int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
-      VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
-               << coord_mem_to_admit
-               << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
-      if (coord_mem_to_admit > coord_admit_mem_limit.second) {
-        *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
-            PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
-            TNetworkAddressToString(*coord_admit_mem_limit.first));
-        return true;
-      }
+    int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
+    VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
+             << coord_mem_to_admit
+             << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
+    if (coord_mem_to_admit > coord_admit_mem_limit.second) {
+      *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
+          PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
+          TNetworkAddressToString(*coord_admit_mem_limit.first));
+      return true;
     }
   }
   return false;
@@ -1148,11 +1143,9 @@ Status AdmissionController::ComputeGroupSchedules(
   for (const ExecutorGroup* executor_group : executor_groups) {
     DCHECK(executor_group->IsHealthy());
     DCHECK_GT(executor_group->NumExecutors(), 0);
-    bool is_dedicated_coord = !FLAGS_is_executor;
     unique_ptr<QuerySchedule> group_schedule =
         make_unique<QuerySchedule>(request.query_id, request.request,
-            request.query_options, is_dedicated_coord, request.summary_profile,
-            request.query_events);
+            request.query_options, request.summary_profile, request.query_events);
     const string& group_name = executor_group->name();
     VLOG(3) << "Scheduling for executor group: " << group_name << " with "
             << executor_group->NumExecutors() << " executors";
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index fceee52..86fe8fd 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -43,29 +43,26 @@ DEFINE_bool_hidden(use_dedicated_coordinator_estimates, true,
 namespace impala {
 
 QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-    const TQueryOptions& query_options, bool is_dedicated_coord,
-    RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events)
+    const TQueryOptions& query_options, RuntimeProfile* summary_profile,
+    RuntimeProfile::EventSequence* query_events)
   : query_id_(query_id),
     request_(request),
     query_options_(query_options),
     summary_profile_(summary_profile),
     query_events_(query_events),
     num_scan_ranges_(0),
-    next_instance_id_(query_id),
-    is_dedicated_coord_(is_dedicated_coord) {
+    next_instance_id_(query_id) {
   Init();
 }
 
 QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-    const TQueryOptions& query_options, bool is_dedicated_coord,
-    RuntimeProfile* summary_profile)
+    const TQueryOptions& query_options, RuntimeProfile* summary_profile)
   : query_id_(query_id),
     request_(request),
     query_options_(query_options),
     summary_profile_(summary_profile),
     num_scan_ranges_(0),
-    next_instance_id_(query_id),
-    is_dedicated_coord_(is_dedicated_coord) {
+    next_instance_id_(query_id) {
   // Init() is not called, this constructor is for white box testing only.
   DCHECK(TestInfo::is_test());
 }
@@ -89,7 +86,7 @@ void QuerySchedule::Init() {
 
   // mark coordinator fragment
   const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
-  if (requiresCoordinatorFragment()) {
+  if (RequiresCoordinatorFragment()) {
     fragment_exec_params_[root_fragment.idx].is_coord_fragment = true;
     // the coordinator instance gets index 0, generated instance ids start at 1
     next_instance_id_ = CreateInstanceId(next_instance_id_, 1);
@@ -182,7 +179,11 @@ void QuerySchedule::Validate() const {
       }
     }
   }
-  // TODO: add validation for BackendExecParams
+
+  for (const auto& elem: per_backend_exec_params_) {
+    const BackendExecParams& bp = elem.second;
+    DCHECK(!bp.instance_params.empty() || bp.is_coord_backend);
+  }
 }
 
 int64_t QuerySchedule::GetPerExecutorMemoryEstimate() const {
@@ -223,7 +224,7 @@ void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) c
 }
 
 const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const {
-  DCHECK(requiresCoordinatorFragment());
+  DCHECK(RequiresCoordinatorFragment());
   const TPlanFragment& coord_fragment =  request_.plan_exec_info[0].fragments[0];
   const FragmentExecParams& fragment_params = fragment_exec_params_[coord_fragment.idx];
   DCHECK_EQ(fragment_params.instance_exec_params.size(), 1);
@@ -247,20 +248,25 @@ int QuerySchedule::GetNumFragmentInstances() const {
 }
 
 int64_t QuerySchedule::GetClusterMemoryToAdmit() const {
-  if (!requiresCoordinatorFragment()) {
-    // For this case, there will be no coordinator fragment so only use the per
-    // executor mem to admit while accounting for admitted memory. This will also ensure
-    // the per backend mem admitted accounting is consistent with the cluster-wide mem
-    // admitted.
-    return per_backend_mem_to_admit() * per_backend_exec_params_.size();
-  } else {
-    return per_backend_mem_to_admit() * (per_backend_exec_params_.size() - 1)
-        + coord_backend_mem_to_admit();
-  }
+  // There will always be an entry for the coordinator in per_backend_exec_params_.
+  return per_backend_mem_to_admit() * (per_backend_exec_params_.size() - 1)
+      + coord_backend_mem_to_admit();
 }
 
 bool QuerySchedule::UseDedicatedCoordEstimates() const {
-  return is_dedicated_coord_ && FLAGS_use_dedicated_coordinator_estimates;
+  for (auto& itr : per_backend_exec_params_) {
+    if (!itr.second.is_coord_backend) continue;
+    auto& coord = itr.second;
+    bool is_dedicated_coord = !coord.be_desc.is_executor;
+    bool only_coord_fragment_scheduled =
+        RequiresCoordinatorFragment() && coord.instance_params.size() == 1;
+    bool no_fragment_scheduled = coord.instance_params.size() == 0;
+    return FLAGS_use_dedicated_coordinator_estimates && is_dedicated_coord
+        && (only_coord_fragment_scheduled || no_fragment_scheduled);
+  }
+  DCHECK(false)
+      << "Coordinator backend should always have a entry in per_backend_exec_params_";
+  return false;
 }
 
 void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
@@ -269,6 +275,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
   // mem_limit if it is set in the query options, else sets it to -1 (no limit).
   bool mimic_old_behaviour =
       pool_cfg.min_query_mem_limit == 0 && pool_cfg.max_query_mem_limit == 0;
+  bool use_dedicated_coord_estimates = UseDedicatedCoordEstimates();
 
   per_backend_mem_to_admit_ = 0;
   coord_backend_mem_to_admit_ = 0;
@@ -281,7 +288,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
 
   if (!has_query_option) {
     per_backend_mem_to_admit_ = GetPerExecutorMemoryEstimate();
-    coord_backend_mem_to_admit_ = UseDedicatedCoordEstimates() ?
+    coord_backend_mem_to_admit_ = use_dedicated_coord_estimates ?
         GetDedicatedCoordMemoryEstimate() :
         GetPerExecutorMemoryEstimate();
     if (!mimic_old_behaviour) {
@@ -299,7 +306,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
     if (pool_cfg.min_query_mem_limit > 0) {
       per_backend_mem_to_admit_ =
           max(per_backend_mem_to_admit_, pool_cfg.min_query_mem_limit);
-      if (!UseDedicatedCoordEstimates() || has_query_option) {
+      if (!use_dedicated_coord_estimates || has_query_option) {
         // The minimum mem limit option does not apply to dedicated coordinators -
         // this would result in over-reserving of memory. Treat coordinator and
         // executor mem limits the same if the query option was explicitly set.
@@ -321,7 +328,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
   coord_backend_mem_to_admit_ = min(coord_backend_mem_to_admit_, MemInfo::physical_mem());
 
   // If the query is only scheduled to run on the coordinator.
-  if (per_backend_exec_params_.size() == 1 && requiresCoordinatorFragment()) {
+  if (per_backend_exec_params_.size() == 1 && RequiresCoordinatorFragment()) {
     per_backend_mem_to_admit_ = 0;
   }
 
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index ad169c7..6725774 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -45,7 +45,9 @@ typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
 typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
     FragmentScanRangeAssignment;
 
-/// Execution parameters for a single backend. Computed by Scheduler::Schedule(), set
+/// Execution parameters for a single backend. This gets created for every backend that
+/// participates in query execution, which includes, every backend that has fragments
+/// scheduled on it and the coordinator backend. Computed by Scheduler::Schedule(), set
 /// via QuerySchedule::set_per_backend_exec_params(). Used as an input to
 /// AdmissionController and a BackendState.
 struct BackendExecParams {
@@ -54,6 +56,8 @@ struct BackendExecParams {
   /// The fragment instance params assigned to this backend. All instances of a
   /// particular fragment are contiguous in this vector. Query lifetime;
   /// FInstanceExecParams are owned by QuerySchedule::fragment_exec_params_.
+  /// This can be empty only for the coordinator backend, that is, if 'is_coord_backend'
+  /// is true.
   std::vector<const FInstanceExecParams*> instance_params;
 
   // The minimum query-wide buffer reservation size (in bytes) required for this backend.
@@ -74,16 +78,8 @@ struct BackendExecParams {
   // concurrently-executing fragment instances at any point in query execution.
   int64_t thread_reservation = 0;
 
-  // The maximum bytes of memory that can be admitted to this backend by the
-  // admission controller. Obtained from the scheduler's executors configuration
-  // which is updated by membership updates from the statestore.
-  int64_t admit_mem_limit = 0;
-
-  // The maximum number of queries that this backend can execute concurrently.
-  int64_t admit_num_queries_limit = 0;
-
-  // Indicates whether this backend will run the coordinator fragment.
-  bool contains_coord_fragment = false;
+  // Indicates whether this backend is the coordinator.
+  bool is_coord_backend = false;
 };
 
 /// Map from an impalad host address to the list of assigned fragment instance params.
@@ -162,18 +158,17 @@ struct FragmentExecParams {
 class QuerySchedule {
  public:
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-      const TQueryOptions& query_options, bool is_dedicated_coord,
-      RuntimeProfile* summary_profile,
+      const TQueryOptions& query_options, RuntimeProfile* summary_profile,
       RuntimeProfile::EventSequence* query_events);
 
   /// For testing only: build a QuerySchedule object but do not run Init().
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-      const TQueryOptions& query_options, bool is_dedicated_coord,
-      RuntimeProfile* summary_profile);
+      const TQueryOptions& query_options, RuntimeProfile* summary_profile);
 
   /// Verifies that the schedule is well-formed (and DCHECKs if it isn't):
   /// - all fragments have a FragmentExecParams
   /// - all scan ranges are assigned
+  /// - all BackendExecParams have instances assigned except for coordinator.
   void Validate() const;
 
   const TUniqueId& query_id() const { return query_id_; }
@@ -287,6 +282,11 @@ class QuerySchedule {
 
   /// Returns true if coordinator estimates calculated by the planner and specialized for
   /// dedicated a coordinator are to be used for estimating memory requirements.
+  /// This happens when the following conditions are true:
+  /// 1. Coordinator fragment is scheduled on a dedicated coordinator
+  /// 2. Either only the coordinator fragment or no fragments are scheduled on the
+  /// coordinator backend. This essentially means that no executor fragments are scheduled
+  /// on the coordinator backend.
   bool UseDedicatedCoordEstimates() const;
 
   /// Populates or updates the per host query memory limit and the amount of memory to be
@@ -299,11 +299,6 @@ class QuerySchedule {
 
   void set_executor_group(string executor_group);
 
-  /// Returns true if a coordinator fragment is required based on the query stmt type.
-  bool requiresCoordinatorFragment() const {
-    return request_.stmt_type == TStmtType::QUERY;
-  }
-
  private:
   /// These references are valid for the lifetime of this query schedule because they
   /// are all owned by the enclosing QueryExecState.
@@ -366,9 +361,6 @@ class QuerySchedule {
   /// The coordinator's backend memory reservation. Set in Scheduler::Schedule().
   int64_t coord_min_reservation_ = 0;
 
-  /// Indicates whether coordinator fragment will be running on a dedicated coordinator.
-  bool is_dedicated_coord_ = false;
-
   /// The name of the executor group that this schedule was computed for. Set by the
   /// Scheduler and only valid after scheduling completes successfully.
   string executor_group_;
@@ -377,6 +369,11 @@ class QuerySchedule {
   /// Sets is_coord_fragment and input_fragments.
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
   void Init();
+
+  /// Returns true if a coordinator fragment is required based on the query stmt type.
+  bool RequiresCoordinatorFragment() const {
+    return request_.stmt_type == TStmtType::QUERY;
+  }
 };
 }
 
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index b5e6807..467694b 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -673,21 +673,22 @@ void Scheduler::ComputeBackendExecParams(
       be_params.initial_mem_reservation_total_claims +=
           f.fragment.initial_mem_reservation_total_claims;
       be_params.thread_reservation += f.fragment.thread_reservation;
-      if (f.is_coord_fragment) be_params.contains_coord_fragment = true;
     }
   }
 
-  int64_t largest_min_reservation = 0;
+  // This also ensures an entry always exists for the coordinator backend.
   int64_t coord_min_reservation = 0;
+  const TNetworkAddress& coord_addr = executor_config.local_be_desc.address;
+  BackendExecParams& coord_be_params = per_backend_params[coord_addr];
+  coord_be_params.is_coord_backend = true;
+  coord_min_reservation = coord_be_params.min_mem_reservation_bytes;
+
+  int64_t largest_min_reservation = 0;
   for (auto& backend : per_backend_params) {
     const TNetworkAddress& host = backend.first;
     const TBackendDescriptor be_desc = LookUpBackendDesc(executor_config, host);
-    backend.second.admit_mem_limit = be_desc.admit_mem_limit;
-    backend.second.admit_num_queries_limit = be_desc.admit_num_queries_limit;
     backend.second.be_desc = be_desc;
-    if (backend.second.contains_coord_fragment) {
-      coord_min_reservation = backend.second.min_mem_reservation_bytes;
-    } else {
+    if (!backend.second.is_coord_backend) {
       largest_min_reservation =
           max(largest_min_reservation, backend.second.min_mem_reservation_bytes);
     }
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 01ebb3f..34ac10d 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -343,8 +343,9 @@ class Scheduler {
       const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
       FragmentScanRangeAssignment* assignment);
 
-  /// Computes BackendExecParams for all backends assigned in the query. Must be called
-  /// after ComputeFragmentExecParams().
+  /// Computes BackendExecParams for all backends assigned in the query and always one for
+  /// the coordinator backend since it participates in execution regardless. Must be
+  /// called after ComputeFragmentExecParams().
   void ComputeBackendExecParams(
       const ExecutorConfig& executor_config, QuerySchedule* schedule);
 
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index b9d1f30..dbcff87 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -441,12 +441,16 @@ public class Planner {
         maxPerHostPeakResources.getMinMemReservationBytes());
     request.setMax_per_host_thread_reservation(
         maxPerHostPeakResources.getThreadReservation());
-    // Assuming the root fragment will always run on the coordinator backend, which
-    // might not be true for queries that don't have a coordinator fragment
-    // (request.getStmt_type() != TStmtType.QUERY). TODO: Fix in IMPALA-8791.
-    request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
-        .getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
-        DEDICATED_COORD_SAFETY_BUFFER_BYTES));
+    if (getAnalysisResult().isQueryStmt()) {
+      request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
+          .getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
+          DEDICATED_COORD_SAFETY_BUFFER_BYTES));
+    } else {
+      // For queries that don't have a coordinator fragment, estimate a small
+      // amount of memory that the query state spwaned on the coordinator can use.
+      request.setDedicated_coord_mem_estimate(totalRuntimeFilterMemBytes +
+          DEDICATED_COORD_SAFETY_BUFFER_BYTES);
+    }
     if (LOG.isTraceEnabled()) {
       LOG.trace("Max per-host min reservation: " +
           maxPerHostPeakResources.getMinMemReservationBytes());
diff --git a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
index cd53dac..39b505f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
@@ -4,8 +4,8 @@
 create table test as select id from functional.alltypes where id > 1
 ---- RUNTIME_PROFILE
 row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
-row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
-row_regex: .*Cluster Memory Admitted: 32.00 MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
+row_regex: .*Cluster Memory Admitted: 132.00 MB.*
 ====
 ---- QUERY
 # Truncate table to run the following inserts.
@@ -24,8 +24,8 @@ row_regex: .*Cluster Memory Admitted: 10.00 MB.*
 insert into test select id from functional.alltypes where id > 3
 ---- RUNTIME_PROFILE
 row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
-row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
-row_regex: .*Cluster Memory Admitted: 32.00 MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
+row_regex: .*Cluster Memory Admitted: 132.00 MB.*
 ====
 ---- QUERY
 # SELECT with merging exchange (i.e. order by).
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index a180503..f0fd4ac 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -515,8 +515,9 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       fs_allocation_file="mem-limit-test-fair-scheduler.xml",
       llama_site_file="mem-limit-test-llama-site.xml"), num_exclusive_coordinators=1,
     cluster_size=2)
-  def test_sanity_checks_dedicated_coordinator(self, vector):
-    """Test for verifying targeted dedicated coordinator memory estimation behavior."""
+  def test_sanity_checks_dedicated_coordinator(self, vector, unique_database):
+    """Sanity tests for verifying targeted dedicated coordinator memory estimations and
+    behavior."""
     self.client.set_configuration_option('request_pool', "root.regularPool")
     ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
     exec_options = vector.get_value('exec_option')
@@ -547,6 +548,17 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       "mem_to_admit:" + str(mem_to_admit)
     self.client.close_query(handle)
 
+    # Make sure query execution works perfectly for a query that does not have any
+    # fragments schdeuled on the coordinator, but has runtime-filters that need to be
+    # aggregated at the coordinator.
+    exec_options = vector.get_value('exec_option')
+    exec_options['RUNTIME_FILTER_WAIT_TIME_MS'] = 30000
+    query = """CREATE TABLE {0}.temp_tbl AS SELECT STRAIGHT_JOIN o_orderkey
+    FROM tpch_parquet.lineitem INNER JOIN [SHUFFLE] tpch_parquet.orders
+    ON o_orderkey = l_orderkey GROUP BY 1""".format(unique_database)
+    result = self.execute_query_expect_success(self.client, query, exec_options)
+    assert "Runtime filters: All filters arrived" in result.runtime_profile
+
   def __verify_mem_accounting(self, vector, using_dedicated_coord_estimates):
     """Helper method used by test_dedicated_coordinator_*_mem_accounting that verifies
     the actual vs expected values for mem admitted and mem limit for both coord and