You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/13 03:12:03 UTC

[impala] 03/03: Revert "IMPALA-8791: Handle the case where there is no fragment scheduled on"

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2df3b8cf82af66199f5851c84f3aa065577f6d7d
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Aug 12 17:57:32 2019 -0700

    Revert "IMPALA-8791: Handle the case where there is no fragment scheduled on"
    
    This reverts commit 760169edcbca438c5964380a604b6c271c6bd1a3.
    
    Change-Id: Id20cf3581995f450de6f491e7874cbcf23b52cda
    Reviewed-on: http://gerrit.cloudera.org:8080/14052
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Tim Armstrong <ta...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc        | 15 +-----
 be/src/runtime/coordinator-backend-state.h         |  8 +--
 be/src/runtime/coordinator.cc                      | 12 +++--
 be/src/runtime/coordinator.h                       |  4 +-
 be/src/scheduling/admission-controller-test.cc     | 33 ++++--------
 be/src/scheduling/admission-controller.cc          | 61 ++++++++++++----------
 be/src/scheduling/query-schedule.cc                | 57 +++++++++-----------
 be/src/scheduling/query-schedule.h                 | 43 ++++++++-------
 be/src/scheduling/scheduler.cc                     | 15 +++---
 be/src/scheduling/scheduler.h                      |  5 +-
 .../java/org/apache/impala/planner/Planner.java    | 16 +++---
 .../QueryTest/dedicated-coord-mem-estimates.test   |  8 +--
 12 files changed, 123 insertions(+), 154 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 60e67c1..028dd70 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -69,8 +69,8 @@ void Coordinator::BackendState::Init(
     const BackendExecParams& exec_params, const vector<FragmentStats*>& fragment_stats,
     RuntimeProfile* host_profile_parent, ObjectPool* obj_pool) {
   backend_exec_params_ = &exec_params;
-  host_ = backend_exec_params_->be_desc.address;
-  krpc_host_ = backend_exec_params_->be_desc.krpc_address;
+  host_ = backend_exec_params_->instance_params[0]->host;
+  krpc_host_ = backend_exec_params_->instance_params[0]->krpc_host;
   num_remaining_instances_ = backend_exec_params_->instance_params.size();
 
   host_profile_ = RuntimeProfile::Create(obj_pool, TNetworkAddressToString(host_));
@@ -187,14 +187,6 @@ void Coordinator::BackendState::Exec(
     last_report_time_ms_ = GenerateReportTimestamp();
     exec_complete_barrier->Notify();
   });
-
-  // Don not issue an ExecQueryFInstances RPC if there are no fragment instances
-  // scheduled to run on this backend.
-  if (IsEmptyBackend()) {
-    DCHECK(backend_exec_params_->is_coord_backend);
-    return;
-  }
-
   std::unique_ptr<ControlServiceProxy> proxy;
   Status get_proxy_status = ControlService::GetProxy(krpc_host_, host_.hostname, &proxy);
   if (!get_proxy_status.ok()) {
@@ -357,7 +349,6 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
     const ReportExecStatusRequestPB& backend_exec_status,
     const TRuntimeProfileForest& thrift_profiles, ExecSummary* exec_summary,
     ProgressUpdater* scan_range_progress, DmlExecState* dml_exec_state) {
-  DCHECK(!IsEmptyBackend());
   // Hold the exec_summary's lock to avoid exposing it half-way through
   // the update loop below.
   lock_guard<SpinLock> l1(exec_summary->lock);
@@ -462,7 +453,6 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
 
 void Coordinator::BackendState::UpdateHostProfile(
     const TRuntimeProfileTree& thrift_profile) {
-  DCHECK(!IsEmptyBackend());
   host_profile_->Update(thrift_profile);
 }
 
@@ -544,7 +534,6 @@ bool Coordinator::BackendState::Cancel() {
 }
 
 void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) {
-  DCHECK(!IsEmptyBackend());
   DCHECK(rpc_params.dst_query_id == query_id());
   // If the backend is already done, it's not waiting for this filter, so we skip
   // sending it in this case.
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 4b0cfad..4c41c8c 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -52,9 +52,7 @@ class ExecSummary;
 struct FInstanceExecParams;
 
 /// This class manages all aspects of the execution of all fragment instances of a
-/// single query on a particular backend. For the coordinator backend its possible to have
-/// no fragment instances scheduled on it. In that case, no RPCs are issued and the
-/// Backend state transitions to 'done' state right after Exec() is called on it.
+/// single query on a particular backend.
 /// Thread-safe unless pointed out otherwise.
 class Coordinator::BackendState {
  public:
@@ -74,7 +72,6 @@ class Coordinator::BackendState {
   /// notifies on rpc_complete_barrier when the rpc completes. Success/failure is
   /// communicated through GetStatus(). Uses filter_routing_table to remove filters
   /// that weren't selected during its construction.
-  /// No RPC is issued if there are no fragment instances scheduled on this backend.
   /// The debug_options are applied to the appropriate TPlanFragmentInstanceCtxs, based
   /// on their node_id/instance_idx.
   void Exec(const DebugOptions& debug_options,
@@ -162,9 +159,6 @@ class Coordinator::BackendState {
   /// Returns a timestamp using monotonic time for tracking arrival of status reports.
   static int64_t GenerateReportTimestamp() { return MonotonicMillis(); }
 
-  /// Returns True if there are no fragment instances scheduled on this backend.
-  bool IsEmptyBackend() { return backend_exec_params_->instance_params.empty(); }
-
  private:
   /// Execution stats for a single fragment instance.
   /// Not thread-safe.
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 4df15d9..bd4fb75 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -111,8 +111,14 @@ Status Coordinator::Exec() {
   const string& str = Substitute("Query $0", PrintId(query_id()));
   progress_.Init(str, schedule_.num_scan_ranges());
 
+  // If there is no coord fragment then pick the mem limit computed for an executor.
+  // TODO: IMPALA-8791: make sure minimal or no limit is imposed for cases where no
+  // fragments are scheduled to run on the coordinator backend.
+  int64_t coord_mem_limit = schedule_.requiresCoordinatorFragment() ?
+      schedule_.coord_backend_mem_limit() :
+      schedule_.per_backend_mem_limit();
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
-      query_ctx(), schedule_.coord_backend_mem_limit());
+      query_ctx(), coord_mem_limit);
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
       -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
 
@@ -405,10 +411,6 @@ Status Coordinator::FinishBackendStartup() {
       max_latency_host = TNetworkAddressToString(backend_state->impalad_address());
     }
     latencies.Update(backend_state->rpc_latency());
-    // Mark backend complete if no fragment instances were assigned to it.
-    if (backend_state->IsEmptyBackend()) {
-      backend_exec_complete_barrier_->Notify();
-    }
   }
   query_profile_->AddInfoString(
       "Backend startup latencies", latencies.ToHumanReadable());
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 72ab3af..9098dd0 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -482,9 +482,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   /// Helper for Exec(). Checks for errors encountered when starting backend execution,
   /// using any non-OK status, if any, as the overall status. Returns the overall
-  /// status. Also updates query_profile_ with the startup latency histogram and the
-  /// backend_exec_complete_barrier_ if there is any backend which is already done (only
-  /// possible at this point if no fragment instances were assigned to it).
+  /// status. Also updates query_profile_ with the startup latency histogram.
   Status FinishBackendStartup() WARN_UNUSED_RESULT;
 
   /// Build the filter routing table by iterating over all plan nodes and collecting the
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
index 17c8457..a241525 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -97,10 +97,11 @@ class AdmissionControllerTest : public testing::Test {
     TQueryOptions* query_options = pool_.Add(new TQueryOptions());
     query_options->__set_mem_limit(mem_limit);
     QuerySchedule* query_schedule = pool_.Add(new QuerySchedule(
-        *query_id, *request, *query_options, profile));
+        *query_id, *request, *query_options, is_dedicated_coord, profile));
     query_schedule->set_executor_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
-    SetHostsInQuerySchedule(*query_schedule, num_hosts, is_dedicated_coord);
     query_schedule->UpdateMemoryRequirements(config);
+
+    SetHostsInQuerySchedule(*query_schedule, num_hosts);
     return query_schedule;
   }
 
@@ -112,24 +113,14 @@ class AdmissionControllerTest : public testing::Test {
   }
 
   /// Replace the per-backend hosts in the schedule with one having 'count' hosts.
-  /// Note: no FInstanceExecParams are added so
-  /// QuerySchedule::UseDedicatedCoordEstimates() would consider this schedule as not
-  /// having anything scheduled on the backend which would result in always returning true
-  /// if a dedicated coordinator backend exists.
   void SetHostsInQuerySchedule(QuerySchedule& query_schedule, const int count,
-      bool is_dedicated_coord, int64_t min_mem_reservation_bytes = 0,
-      int64_t admit_mem_limit = 200L * MEGABYTE) {
+      int64_t min_mem_reservation_bytes = 0, int64_t admit_mem_limit = 200L * MEGABYTE) {
     PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
     for (int i = 0; i < count; ++i) {
       BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
+      backend_exec_params->admit_mem_limit = admit_mem_limit;
       backend_exec_params->min_mem_reservation_bytes = min_mem_reservation_bytes;
-      backend_exec_params->be_desc.__set_admit_mem_limit(admit_mem_limit);
-      backend_exec_params->be_desc.__set_is_executor(true);
-      if (i == 0) {
-        // Add first element as the coordinator.
-        backend_exec_params->is_coord_backend = true;
-        backend_exec_params->be_desc.__set_is_executor(!is_dedicated_coord);
-      }
+      if (i == 0) backend_exec_params->contains_coord_fragment = true;
       const string host_name = Substitute("host$0", i);
       per_backend_exec_params->emplace(
           MakeNetworkAddress(host_name, 25000), *backend_exec_params);
@@ -488,7 +479,7 @@ TEST_F(AdmissionControllerTest, QueryRejection) {
 
   // Adjust the QuerySchedule to have minimum memory reservation of 45MB.
   // This will be rejected immediately as minimum memory reservation is too high.
-  SetHostsInQuerySchedule(*query_schedule, host_count, false, 45L * MEGABYTE);
+  SetHostsInQuerySchedule(*query_schedule, host_count, 45L * MEGABYTE);
   string rejected_reserved_reason;
   ASSERT_TRUE(admission_controller->RejectForSchedule(
       *query_schedule, config_d, host_count, host_count, &rejected_reserved_reason));
@@ -808,19 +799,17 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
   // Add coordinator backend.
   BackendExecParams* coord_exec_params = pool_.Add(new BackendExecParams());
-  coord_exec_params->is_coord_backend = true;
+  coord_exec_params->admit_mem_limit = 512 * MEGABYTE;
+  coord_exec_params->contains_coord_fragment = true;
   coord_exec_params->thread_reservation = 1;
-  coord_exec_params->be_desc.__set_admit_mem_limit(512 * MEGABYTE);
-  coord_exec_params->be_desc.__set_is_executor(false);
   const string coord_host_name = Substitute("host$0", 1);
   TNetworkAddress coord_addr = MakeNetworkAddress(coord_host_name, 25000);
   const string coord_host = TNetworkAddressToString(coord_addr);
   per_backend_exec_params->emplace(coord_addr, *coord_exec_params);
   // Add executor backend.
   BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
+  backend_exec_params->admit_mem_limit = GIGABYTE;
   backend_exec_params->thread_reservation = 1;
-  backend_exec_params->be_desc.__set_admit_mem_limit(GIGABYTE);
-  backend_exec_params->be_desc.__set_is_executor(true);
   const string exec_host_name = Substitute("host$0", 2);
   TNetworkAddress exec_addr = MakeNetworkAddress(exec_host_name, 25000);
   const string exec_host = TNetworkAddressToString(exec_addr);
@@ -864,7 +853,7 @@ TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
   // Test 2: coord's admit_mem_limit < executor's admit_mem_limit. Query rejected because
   // coord's admit_mem_limit is less than mem_to_admit on the coord.
   // Re-using previous QuerySchedule object.
-  coord_exec_params->be_desc.__set_admit_mem_limit(100 * MEGABYTE);
+  coord_exec_params->admit_mem_limit = 100 * MEGABYTE;
   (*per_backend_exec_params)[coord_addr] = *coord_exec_params;
   query_schedule->set_per_backend_exec_params(*per_backend_exec_params);
   ASSERT_TRUE(admission_controller->RejectForSchedule(
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 666659b..d31c50b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -56,6 +56,8 @@ DEFINE_int64_hidden(admission_control_stale_topic_threshold_ms, 5 * 1000,
     "capture most cases where the Impala daemon is disconnected from the statestore "
     "or topic updates are seriously delayed.");
 
+DECLARE_bool(is_executor);
+
 namespace impala {
 
 const int64_t AdmissionController::PoolStats::HISTOGRAM_NUM_OF_BINS = 128;
@@ -396,7 +398,7 @@ void AdmissionController::UpdateHostStats(
   int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host_addr = entry.first;
-    int64_t mem_to_admit = entry.second.is_coord_backend ?
+    int64_t mem_to_admit = entry.second.contains_coord_fragment ?
         schedule.coord_backend_mem_to_admit() : per_backend_mem_to_admit;
     if (!is_admitting) mem_to_admit *= -1;
     const string host = TNetworkAddressToString(host_addr);
@@ -439,8 +441,9 @@ bool AdmissionController::CanAccommodateMaxInitialReservation(
   const int64_t coord_min_reservation = schedule.coord_min_reservation();
   return CanMemLimitAccommodateReservation(
              executor_mem_limit, executor_min_reservation, mem_unavailable_reason)
-      && CanMemLimitAccommodateReservation(
-             coord_mem_limit, coord_min_reservation, mem_unavailable_reason);
+      && (!schedule.requiresCoordinatorFragment()
+             || CanMemLimitAccommodateReservation(
+                    coord_mem_limit, coord_min_reservation, mem_unavailable_reason));
 }
 
 bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule,
@@ -480,11 +483,11 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
-    int64_t admit_mem_limit = entry.second.be_desc.admit_mem_limit;
+    int64_t admit_mem_limit = entry.second.admit_mem_limit;
     const HostStats& host_stats = host_stats_[host_id];
     int64_t mem_reserved = host_stats.mem_reserved;
     int64_t mem_admitted = host_stats.mem_admitted;
-    int64_t mem_to_admit = entry.second.is_coord_backend ?
+    int64_t mem_to_admit = entry.second.contains_coord_fragment ?
         coord_mem_to_admit : executor_mem_to_admit;
     VLOG_ROW << "Checking memory on host=" << host_id
              << " mem_reserved=" << PrintBytes(mem_reserved)
@@ -516,7 +519,7 @@ bool AdmissionController::HasAvailableSlot(const QuerySchedule& schedule,
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
-    int64_t admit_num_queries_limit = entry.second.be_desc.admit_num_queries_limit;
+    int64_t admit_num_queries_limit = entry.second.admit_num_queries_limit;
     int64_t num_admitted = host_stats_[host_id].num_admitted;
     VLOG_ROW << "Checking available slot on host=" << host_id
              << " num_admitted=" << num_admitted << " needs=" << num_admitted + 1
@@ -637,21 +640,21 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
       nullptr, std::numeric_limits<int64_t>::max());
   int64_t cluster_thread_reservation = 0;
   for (const auto& e : schedule.per_backend_exec_params()) {
-    const BackendExecParams& bp = e.second;
-    cluster_min_mem_reservation_bytes += bp.min_mem_reservation_bytes;
-    if (bp.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
-      largest_min_mem_reservation = make_pair(&e.first, bp.min_mem_reservation_bytes);
+    cluster_min_mem_reservation_bytes += e.second.min_mem_reservation_bytes;
+    if (e.second.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
+      largest_min_mem_reservation =
+          make_pair(&e.first, e.second.min_mem_reservation_bytes);
     }
-    cluster_thread_reservation += bp.thread_reservation;
-    if (bp.thread_reservation > max_thread_reservation.second) {
-      max_thread_reservation = make_pair(&e.first, bp.thread_reservation);
+    cluster_thread_reservation += e.second.thread_reservation;
+    if (e.second.thread_reservation > max_thread_reservation.second) {
+      max_thread_reservation = make_pair(&e.first, e.second.thread_reservation);
     }
-    if (bp.is_coord_backend) {
+    if (e.second.contains_coord_fragment) {
       coord_admit_mem_limit.first = &e.first;
-      coord_admit_mem_limit.second = bp.be_desc.admit_mem_limit;
-    } else if (bp.be_desc.admit_mem_limit < min_executor_admit_mem_limit.second) {
+      coord_admit_mem_limit.second = e.second.admit_mem_limit;
+    } else if (e.second.admit_mem_limit < min_executor_admit_mem_limit.second) {
       min_executor_admit_mem_limit.first = &e.first;
-      min_executor_admit_mem_limit.second = bp.be_desc.admit_mem_limit;
+      min_executor_admit_mem_limit.second = e.second.admit_mem_limit;
     }
   }
 
@@ -721,15 +724,17 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
               TNetworkAddressToString(*min_executor_admit_mem_limit.first));
       return true;
     }
-    int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
-    VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
-             << coord_mem_to_admit
-             << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
-    if (coord_mem_to_admit > coord_admit_mem_limit.second) {
-      *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
-          PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
-          TNetworkAddressToString(*coord_admit_mem_limit.first));
-      return true;
+    if (schedule.requiresCoordinatorFragment()) {
+      int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
+      VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
+               << coord_mem_to_admit
+               << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
+      if (coord_mem_to_admit > coord_admit_mem_limit.second) {
+        *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
+            PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
+            TNetworkAddressToString(*coord_admit_mem_limit.first));
+        return true;
+      }
     }
   }
   return false;
@@ -1143,9 +1148,11 @@ Status AdmissionController::ComputeGroupSchedules(
   for (const ExecutorGroup* executor_group : executor_groups) {
     DCHECK(executor_group->IsHealthy());
     DCHECK_GT(executor_group->NumExecutors(), 0);
+    bool is_dedicated_coord = !FLAGS_is_executor;
     unique_ptr<QuerySchedule> group_schedule =
         make_unique<QuerySchedule>(request.query_id, request.request,
-            request.query_options, request.summary_profile, request.query_events);
+            request.query_options, is_dedicated_coord, request.summary_profile,
+            request.query_events);
     const string& group_name = executor_group->name();
     VLOG(3) << "Scheduling for executor group: " << group_name << " with "
             << executor_group->NumExecutors() << " executors";
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 86fe8fd..fceee52 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -43,26 +43,29 @@ DEFINE_bool_hidden(use_dedicated_coordinator_estimates, true,
 namespace impala {
 
 QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-    const TQueryOptions& query_options, RuntimeProfile* summary_profile,
-    RuntimeProfile::EventSequence* query_events)
+    const TQueryOptions& query_options, bool is_dedicated_coord,
+    RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events)
   : query_id_(query_id),
     request_(request),
     query_options_(query_options),
     summary_profile_(summary_profile),
     query_events_(query_events),
     num_scan_ranges_(0),
-    next_instance_id_(query_id) {
+    next_instance_id_(query_id),
+    is_dedicated_coord_(is_dedicated_coord) {
   Init();
 }
 
 QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-    const TQueryOptions& query_options, RuntimeProfile* summary_profile)
+    const TQueryOptions& query_options, bool is_dedicated_coord,
+    RuntimeProfile* summary_profile)
   : query_id_(query_id),
     request_(request),
     query_options_(query_options),
     summary_profile_(summary_profile),
     num_scan_ranges_(0),
-    next_instance_id_(query_id) {
+    next_instance_id_(query_id),
+    is_dedicated_coord_(is_dedicated_coord) {
   // Init() is not called, this constructor is for white box testing only.
   DCHECK(TestInfo::is_test());
 }
@@ -86,7 +89,7 @@ void QuerySchedule::Init() {
 
   // mark coordinator fragment
   const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
-  if (RequiresCoordinatorFragment()) {
+  if (requiresCoordinatorFragment()) {
     fragment_exec_params_[root_fragment.idx].is_coord_fragment = true;
     // the coordinator instance gets index 0, generated instance ids start at 1
     next_instance_id_ = CreateInstanceId(next_instance_id_, 1);
@@ -179,11 +182,7 @@ void QuerySchedule::Validate() const {
       }
     }
   }
-
-  for (const auto& elem: per_backend_exec_params_) {
-    const BackendExecParams& bp = elem.second;
-    DCHECK(!bp.instance_params.empty() || bp.is_coord_backend);
-  }
+  // TODO: add validation for BackendExecParams
 }
 
 int64_t QuerySchedule::GetPerExecutorMemoryEstimate() const {
@@ -224,7 +223,7 @@ void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) c
 }
 
 const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const {
-  DCHECK(RequiresCoordinatorFragment());
+  DCHECK(requiresCoordinatorFragment());
   const TPlanFragment& coord_fragment =  request_.plan_exec_info[0].fragments[0];
   const FragmentExecParams& fragment_params = fragment_exec_params_[coord_fragment.idx];
   DCHECK_EQ(fragment_params.instance_exec_params.size(), 1);
@@ -248,25 +247,20 @@ int QuerySchedule::GetNumFragmentInstances() const {
 }
 
 int64_t QuerySchedule::GetClusterMemoryToAdmit() const {
-  // There will always be an entry for the coordinator in per_backend_exec_params_.
-  return per_backend_mem_to_admit() * (per_backend_exec_params_.size() - 1)
-      + coord_backend_mem_to_admit();
+  if (!requiresCoordinatorFragment()) {
+    // For this case, there will be no coordinator fragment so only use the per
+    // executor mem to admit while accounting for admitted memory. This will also ensure
+    // the per backend mem admitted accounting is consistent with the cluster-wide mem
+    // admitted.
+    return per_backend_mem_to_admit() * per_backend_exec_params_.size();
+  } else {
+    return per_backend_mem_to_admit() * (per_backend_exec_params_.size() - 1)
+        + coord_backend_mem_to_admit();
+  }
 }
 
 bool QuerySchedule::UseDedicatedCoordEstimates() const {
-  for (auto& itr : per_backend_exec_params_) {
-    if (!itr.second.is_coord_backend) continue;
-    auto& coord = itr.second;
-    bool is_dedicated_coord = !coord.be_desc.is_executor;
-    bool only_coord_fragment_scheduled =
-        RequiresCoordinatorFragment() && coord.instance_params.size() == 1;
-    bool no_fragment_scheduled = coord.instance_params.size() == 0;
-    return FLAGS_use_dedicated_coordinator_estimates && is_dedicated_coord
-        && (only_coord_fragment_scheduled || no_fragment_scheduled);
-  }
-  DCHECK(false)
-      << "Coordinator backend should always have a entry in per_backend_exec_params_";
-  return false;
+  return is_dedicated_coord_ && FLAGS_use_dedicated_coordinator_estimates;
 }
 
 void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
@@ -275,7 +269,6 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
   // mem_limit if it is set in the query options, else sets it to -1 (no limit).
   bool mimic_old_behaviour =
       pool_cfg.min_query_mem_limit == 0 && pool_cfg.max_query_mem_limit == 0;
-  bool use_dedicated_coord_estimates = UseDedicatedCoordEstimates();
 
   per_backend_mem_to_admit_ = 0;
   coord_backend_mem_to_admit_ = 0;
@@ -288,7 +281,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
 
   if (!has_query_option) {
     per_backend_mem_to_admit_ = GetPerExecutorMemoryEstimate();
-    coord_backend_mem_to_admit_ = use_dedicated_coord_estimates ?
+    coord_backend_mem_to_admit_ = UseDedicatedCoordEstimates() ?
         GetDedicatedCoordMemoryEstimate() :
         GetPerExecutorMemoryEstimate();
     if (!mimic_old_behaviour) {
@@ -306,7 +299,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
     if (pool_cfg.min_query_mem_limit > 0) {
       per_backend_mem_to_admit_ =
           max(per_backend_mem_to_admit_, pool_cfg.min_query_mem_limit);
-      if (!use_dedicated_coord_estimates || has_query_option) {
+      if (!UseDedicatedCoordEstimates() || has_query_option) {
         // The minimum mem limit option does not apply to dedicated coordinators -
         // this would result in over-reserving of memory. Treat coordinator and
         // executor mem limits the same if the query option was explicitly set.
@@ -328,7 +321,7 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
   coord_backend_mem_to_admit_ = min(coord_backend_mem_to_admit_, MemInfo::physical_mem());
 
   // If the query is only scheduled to run on the coordinator.
-  if (per_backend_exec_params_.size() == 1 && RequiresCoordinatorFragment()) {
+  if (per_backend_exec_params_.size() == 1 && requiresCoordinatorFragment()) {
     per_backend_mem_to_admit_ = 0;
   }
 
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 6725774..ad169c7 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -45,9 +45,7 @@ typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
 typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
     FragmentScanRangeAssignment;
 
-/// Execution parameters for a single backend. This gets created for every backend that
-/// participates in query execution, which includes, every backend that has fragments
-/// scheduled on it and the coordinator backend. Computed by Scheduler::Schedule(), set
+/// Execution parameters for a single backend. Computed by Scheduler::Schedule(), set
 /// via QuerySchedule::set_per_backend_exec_params(). Used as an input to
 /// AdmissionController and a BackendState.
 struct BackendExecParams {
@@ -56,8 +54,6 @@ struct BackendExecParams {
   /// The fragment instance params assigned to this backend. All instances of a
   /// particular fragment are contiguous in this vector. Query lifetime;
   /// FInstanceExecParams are owned by QuerySchedule::fragment_exec_params_.
-  /// This can be empty only for the coordinator backend, that is, if 'is_coord_backend'
-  /// is true.
   std::vector<const FInstanceExecParams*> instance_params;
 
   // The minimum query-wide buffer reservation size (in bytes) required for this backend.
@@ -78,8 +74,16 @@ struct BackendExecParams {
   // concurrently-executing fragment instances at any point in query execution.
   int64_t thread_reservation = 0;
 
-  // Indicates whether this backend is the coordinator.
-  bool is_coord_backend = false;
+  // The maximum bytes of memory that can be admitted to this backend by the
+  // admission controller. Obtained from the scheduler's executors configuration
+  // which is updated by membership updates from the statestore.
+  int64_t admit_mem_limit = 0;
+
+  // The maximum number of queries that this backend can execute concurrently.
+  int64_t admit_num_queries_limit = 0;
+
+  // Indicates whether this backend will run the coordinator fragment.
+  bool contains_coord_fragment = false;
 };
 
 /// Map from an impalad host address to the list of assigned fragment instance params.
@@ -158,17 +162,18 @@ struct FragmentExecParams {
 class QuerySchedule {
  public:
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-      const TQueryOptions& query_options, RuntimeProfile* summary_profile,
+      const TQueryOptions& query_options, bool is_dedicated_coord,
+      RuntimeProfile* summary_profile,
       RuntimeProfile::EventSequence* query_events);
 
   /// For testing only: build a QuerySchedule object but do not run Init().
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-      const TQueryOptions& query_options, RuntimeProfile* summary_profile);
+      const TQueryOptions& query_options, bool is_dedicated_coord,
+      RuntimeProfile* summary_profile);
 
   /// Verifies that the schedule is well-formed (and DCHECKs if it isn't):
   /// - all fragments have a FragmentExecParams
   /// - all scan ranges are assigned
-  /// - all BackendExecParams have instances assigned except for coordinator.
   void Validate() const;
 
   const TUniqueId& query_id() const { return query_id_; }
@@ -282,11 +287,6 @@ class QuerySchedule {
 
   /// Returns true if coordinator estimates calculated by the planner and specialized for
   /// dedicated a coordinator are to be used for estimating memory requirements.
-  /// This happens when the following conditions are true:
-  /// 1. Coordinator fragment is scheduled on a dedicated coordinator
-  /// 2. Either only the coordinator fragment or no fragments are scheduled on the
-  /// coordinator backend. This essentially means that no executor fragments are scheduled
-  /// on the coordinator backend.
   bool UseDedicatedCoordEstimates() const;
 
   /// Populates or updates the per host query memory limit and the amount of memory to be
@@ -299,6 +299,11 @@ class QuerySchedule {
 
   void set_executor_group(string executor_group);
 
+  /// Returns true if a coordinator fragment is required based on the query stmt type.
+  bool requiresCoordinatorFragment() const {
+    return request_.stmt_type == TStmtType::QUERY;
+  }
+
  private:
   /// These references are valid for the lifetime of this query schedule because they
   /// are all owned by the enclosing QueryExecState.
@@ -361,6 +366,9 @@ class QuerySchedule {
   /// The coordinator's backend memory reservation. Set in Scheduler::Schedule().
   int64_t coord_min_reservation_ = 0;
 
+  /// Indicates whether coordinator fragment will be running on a dedicated coordinator.
+  bool is_dedicated_coord_ = false;
+
   /// The name of the executor group that this schedule was computed for. Set by the
   /// Scheduler and only valid after scheduling completes successfully.
   string executor_group_;
@@ -369,11 +377,6 @@ class QuerySchedule {
   /// Sets is_coord_fragment and input_fragments.
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
   void Init();
-
-  /// Returns true if a coordinator fragment is required based on the query stmt type.
-  bool RequiresCoordinatorFragment() const {
-    return request_.stmt_type == TStmtType::QUERY;
-  }
 };
 }
 
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 467694b..b5e6807 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -673,22 +673,21 @@ void Scheduler::ComputeBackendExecParams(
       be_params.initial_mem_reservation_total_claims +=
           f.fragment.initial_mem_reservation_total_claims;
       be_params.thread_reservation += f.fragment.thread_reservation;
+      if (f.is_coord_fragment) be_params.contains_coord_fragment = true;
     }
   }
 
-  // This also ensures an entry always exists for the coordinator backend.
-  int64_t coord_min_reservation = 0;
-  const TNetworkAddress& coord_addr = executor_config.local_be_desc.address;
-  BackendExecParams& coord_be_params = per_backend_params[coord_addr];
-  coord_be_params.is_coord_backend = true;
-  coord_min_reservation = coord_be_params.min_mem_reservation_bytes;
-
   int64_t largest_min_reservation = 0;
+  int64_t coord_min_reservation = 0;
   for (auto& backend : per_backend_params) {
     const TNetworkAddress& host = backend.first;
     const TBackendDescriptor be_desc = LookUpBackendDesc(executor_config, host);
+    backend.second.admit_mem_limit = be_desc.admit_mem_limit;
+    backend.second.admit_num_queries_limit = be_desc.admit_num_queries_limit;
     backend.second.be_desc = be_desc;
-    if (!backend.second.is_coord_backend) {
+    if (backend.second.contains_coord_fragment) {
+      coord_min_reservation = backend.second.min_mem_reservation_bytes;
+    } else {
       largest_min_reservation =
           max(largest_min_reservation, backend.second.min_mem_reservation_bytes);
     }
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 34ac10d..01ebb3f 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -343,9 +343,8 @@ class Scheduler {
       const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
       FragmentScanRangeAssignment* assignment);
 
-  /// Computes BackendExecParams for all backends assigned in the query and always one for
-  /// the coordinator backend since it participates in execution regardless. Must be
-  /// called after ComputeFragmentExecParams().
+  /// Computes BackendExecParams for all backends assigned in the query. Must be called
+  /// after ComputeFragmentExecParams().
   void ComputeBackendExecParams(
       const ExecutorConfig& executor_config, QuerySchedule* schedule);
 
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index dbcff87..b9d1f30 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -441,16 +441,12 @@ public class Planner {
         maxPerHostPeakResources.getMinMemReservationBytes());
     request.setMax_per_host_thread_reservation(
         maxPerHostPeakResources.getThreadReservation());
-    if (getAnalysisResult().isQueryStmt()) {
-      request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
-          .getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
-          DEDICATED_COORD_SAFETY_BUFFER_BYTES));
-    } else {
-      // For queries that don't have a coordinator fragment, estimate a small
-      // amount of memory that the query state spwaned on the coordinator can use.
-      request.setDedicated_coord_mem_estimate(totalRuntimeFilterMemBytes +
-          DEDICATED_COORD_SAFETY_BUFFER_BYTES);
-    }
+    // Assuming the root fragment will always run on the coordinator backend, which
+    // might not be true for queries that don't have a coordinator fragment
+    // (request.getStmt_type() != TStmtType.QUERY). TODO: Fix in IMPALA-8791.
+    request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
+        .getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
+        DEDICATED_COORD_SAFETY_BUFFER_BYTES));
     if (LOG.isTraceEnabled()) {
       LOG.trace("Max per-host min reservation: " +
           maxPerHostPeakResources.getMinMemReservationBytes());
diff --git a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
index 39b505f..cd53dac 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
@@ -4,8 +4,8 @@
 create table test as select id from functional.alltypes where id > 1
 ---- RUNTIME_PROFILE
 row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
-row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
-row_regex: .*Cluster Memory Admitted: 132.00 MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
+row_regex: .*Cluster Memory Admitted: 32.00 MB.*
 ====
 ---- QUERY
 # Truncate table to run the following inserts.
@@ -24,8 +24,8 @@ row_regex: .*Cluster Memory Admitted: 10.00 MB.*
 insert into test select id from functional.alltypes where id > 3
 ---- RUNTIME_PROFILE
 row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
-row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
-row_regex: .*Cluster Memory Admitted: 132.00 MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
+row_regex: .*Cluster Memory Admitted: 32.00 MB.*
 ====
 ---- QUERY
 # SELECT with merging exchange (i.e. order by).