You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2019/08/01 20:16:17 UTC

[impala] 02/05: IMPALA-7486: Add specialized estimation scheme for dedicated coordinators

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a0b0fd4fce7676255dda52445cb972424f0b7c26
Author: Bikramjeet Vig <bi...@cloudera.com>
AuthorDate: Wed Jun 19 13:53:18 2019 -0700

    IMPALA-7486: Add specialized estimation scheme for dedicated coordinators
    
    This patch computes two memory estimates in the frontend:
    an estimate for any host that is an executor (including
    a combined coordinator and executor) and an estimate
    for a dedicated coordinator. This is computed regardless
    of whether it is a dedicated coordinator or not.
    
    Admission control then, in the case when the coordinator
    is dedicated, uses the coordinator memory estimate for
    the coordinator node and the executor memory estimate
    for all other nodes.
    
    Other highlights:
     - if MEM_LIMIT query option is set, it is applied to all backends,
       both executors and coordinators.
     - the min_query_mem_limit pool config is not enforced on the
       dedicated coordinator estimates unless MEM_LIMIT query option is set.
     - the lower cap on estimates and the admission checks based on the
       min mem limit required for reservation are applied separately on
       coordinator's and executors' mem requirements.
     - Added a hidden startup option 'use_dedicated_coordinator_estimates'
       which if set to false, reverts to previous estimation behavior.
    
    Testing:
    - Added unit test for admission/rejection in dedicated
      coordinator clusters.
    - Added end to end tests.
    
    Change-Id: I2b94e7293b91dec8a18491079c34923eadd94b21
    Reviewed-on: http://gerrit.cloudera.org:8080/13740
    Reviewed-by: Bikramjeet Vig <bi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator.cc                      |   8 +-
 be/src/scheduling/admission-controller-test.cc     | 226 ++++++++++++++++++++-
 be/src/scheduling/admission-controller.cc          | 146 ++++++++-----
 be/src/scheduling/admission-controller.h           |  22 +-
 be/src/scheduling/query-schedule.cc                |  84 ++++++--
 be/src/scheduling/query-schedule.h                 |  70 ++++++-
 be/src/scheduling/scheduler.cc                     |  11 +-
 be/src/service/impala-http-handler.cc              |  17 +-
 be/src/util/backend-gflag-util.cc                  |   8 +-
 common/thrift/BackendGflags.thrift                 |   4 +
 common/thrift/Frontend.thrift                      |   5 +
 .../org/apache/impala/planner/PlanFragment.java    |   4 +
 .../java/org/apache/impala/planner/Planner.java    |  37 +++-
 .../org/apache/impala/service/BackendConfig.java   |   4 +
 .../java/org/apache/impala/service/Frontend.java   |   9 +-
 .../QueryTest/dedicated-coord-mem-estimates.test   |  96 +++++++++
 tests/custom_cluster/test_admission_controller.py  | 168 ++++++++++++++-
 www/admission_controller.tmpl                      |  16 +-
 18 files changed, 820 insertions(+), 115 deletions(-)

diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index d4ce820..bd4fb75 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -111,8 +111,14 @@ Status Coordinator::Exec() {
   const string& str = Substitute("Query $0", PrintId(query_id()));
   progress_.Init(str, schedule_.num_scan_ranges());
 
+  // If there is no coord fragment then pick the mem limit computed for an executor.
+  // TODO: IMPALA-8791: make sure minimal or no limit is imposed for cases where no
+  // fragments are scheduled to run on the coordinator backend.
+  int64_t coord_mem_limit = schedule_.requiresCoordinatorFragment() ?
+      schedule_.coord_backend_mem_limit() :
+      schedule_.per_backend_mem_limit();
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
-      query_ctx(), schedule_.per_backend_mem_limit());
+      query_ctx(), coord_mem_limit);
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
       -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
 
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
index 8e8498a..a241525 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -20,6 +20,7 @@
 #include "common/names.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/logging_test_util.h"
+#include "runtime/bufferpool/reservation-util.h"
 #include "runtime/exec-env.h"
 #include "runtime/runtime-state.h"
 #include "runtime/test-env.h"
@@ -50,6 +51,7 @@ static const string HOST_1 = "host1:25000";
 static const string HOST_2 = "host2:25000";
 
 static const int64_t MEGABYTE = 1024L * 1024L;
+static const int64_t GIGABYTE = 1024L * MEGABYTE;
 
 /// Parent class for Admission Controller tests.
 /// Common code and constants should go here.
@@ -80,18 +82,22 @@ class AdmissionControllerTest : public testing::Test {
 
   /// Make a QuerySchedule with dummy parameters that can be used to test admission and
   /// rejection in AdmissionController.
-  QuerySchedule* MakeQuerySchedule(string request_pool_name, TPoolConfig& config,
-      const int num_hosts, const int per_host_mem_estimate) {
+  QuerySchedule* MakeQuerySchedule(string request_pool_name, int64_t mem_limit,
+      TPoolConfig& config, const int num_hosts, const int per_host_mem_estimate,
+      const int coord_mem_estimate, bool is_dedicated_coord) {
     DCHECK_GT(num_hosts, 0);
     TQueryExecRequest* request = pool_.Add(new TQueryExecRequest());
     request->query_ctx.request_pool = request_pool_name;
     request->__set_per_host_mem_estimate(per_host_mem_estimate);
+    request->__set_dedicated_coord_mem_estimate(coord_mem_estimate);
+    request->__set_stmt_type(TStmtType::QUERY);
 
     RuntimeProfile* profile = RuntimeProfile::Create(&pool_, "pool1");
     TUniqueId* query_id = pool_.Add(new TUniqueId()); // always 0,0
     TQueryOptions* query_options = pool_.Add(new TQueryOptions());
-    QuerySchedule* query_schedule =
-        pool_.Add(new QuerySchedule(*query_id, *request, *query_options, profile));
+    query_options->__set_mem_limit(mem_limit);
+    QuerySchedule* query_schedule = pool_.Add(new QuerySchedule(
+        *query_id, *request, *query_options, is_dedicated_coord, profile));
     query_schedule->set_executor_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
     query_schedule->UpdateMemoryRequirements(config);
 
@@ -99,6 +105,13 @@ class AdmissionControllerTest : public testing::Test {
     return query_schedule;
   }
 
+  /// Same as previous MakeQuerySchedule with fewer input (more default params).
+  QuerySchedule* MakeQuerySchedule(string request_pool_name, TPoolConfig& config,
+      const int num_hosts, const int per_host_mem_estimate) {
+    return MakeQuerySchedule(request_pool_name, 0, config, num_hosts,
+        per_host_mem_estimate, per_host_mem_estimate, false);
+  }
+
   /// Replace the per-backend hosts in the schedule with one having 'count' hosts.
   void SetHostsInQuerySchedule(QuerySchedule& query_schedule, const int count,
       int64_t min_mem_reservation_bytes = 0, int64_t admit_mem_limit = 200L * MEGABYTE) {
@@ -107,6 +120,7 @@ class AdmissionControllerTest : public testing::Test {
       BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
       backend_exec_params->admit_mem_limit = admit_mem_limit;
       backend_exec_params->min_mem_reservation_bytes = min_mem_reservation_bytes;
+      if (i == 0) backend_exec_params->contains_coord_fragment = true;
       const string host_name = Substitute("host$0", i);
       per_backend_exec_params->emplace(
           MakeNetworkAddress(host_name, 25000), *backend_exec_params);
@@ -684,4 +698,208 @@ TEST_F(AdmissionControllerTest, PoolDisabled) {
       /* max_mem_resources */ 0, /* max_memory_multiple */ 0);
 }
 
+// Basic tests of the QuerySchedule object to confirm that a query with different
+// coordinator and executor memory estimates calculates memory to admit correctly
+// for various combinations of memory limit configurations.
+TEST_F(AdmissionControllerTest, DedicatedCoordQuerySchedule) {
+  AdmissionController* admission_controller = MakeAdmissionController();
+  RequestPoolService* request_pool_service = admission_controller->request_pool_service_;
+
+  const int64_t PER_EXEC_MEM_ESTIMATE = 512 * MEGABYTE;
+  const int64_t COORD_MEM_ESTIMATE = 150 * MEGABYTE;
+  TPoolConfig pool_config;
+  ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
+
+  // For query only running on the coordinator, the per_backend_mem_to_admit should be 0.
+  QuerySchedule* query_schedule = MakeQuerySchedule(
+      "default", 0, pool_config, 1, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_EQ(0, query_schedule->per_backend_mem_to_admit());
+  ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->coord_backend_mem_to_admit());
+
+  // Make sure executors and coordinators are assigned memory to admit appropriately and
+  // that the cluster memory to admitted is calculated correctly.
+  query_schedule = MakeQuerySchedule(
+      "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->GetDedicatedCoordMemoryEstimate());
+  ASSERT_EQ(PER_EXEC_MEM_ESTIMATE, query_schedule->GetPerExecutorMemoryEstimate());
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_EQ(PER_EXEC_MEM_ESTIMATE, query_schedule->per_backend_mem_to_admit());
+  ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->coord_backend_mem_to_admit());
+  ASSERT_EQ(-1, query_schedule->per_backend_mem_limit());
+  ASSERT_EQ(-1, query_schedule->coord_backend_mem_limit());
+  ASSERT_EQ(COORD_MEM_ESTIMATE + PER_EXEC_MEM_ESTIMATE,
+      query_schedule->GetClusterMemoryToAdmit());
+
+  // Set the min_query_mem_limit in pool_config. min_query_mem_limit should
+  // not be enforced on the coordinator. Also ensure mem limits are set for both.
+  query_schedule = MakeQuerySchedule(
+      "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
+  pool_config.__set_min_query_mem_limit(700 * MEGABYTE);
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_to_admit());
+  ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->coord_backend_mem_to_admit());
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_limit());
+  ASSERT_EQ(COORD_MEM_ESTIMATE, query_schedule->coord_backend_mem_limit());
+
+  // Make sure coordinator's mem to admit is adjusted based on its own minimum mem
+  // reservation.
+  query_schedule = MakeQuerySchedule(
+      "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  int64_t coord_min_reservation = 200 * MEGABYTE;
+  int64_t min_coord_mem_limit_required =
+      ReservationUtil::GetMinMemLimitFromReservation(coord_min_reservation);
+  pool_config.__set_min_query_mem_limit(700 * MEGABYTE);
+  query_schedule->set_coord_min_reservation(200 * MEGABYTE);
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_to_admit());
+  ASSERT_EQ(min_coord_mem_limit_required, query_schedule->coord_backend_mem_to_admit());
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_limit());
+  ASSERT_EQ(min_coord_mem_limit_required, query_schedule->coord_backend_mem_limit());
+
+  // Set mem_limit query option.
+  ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
+  query_schedule = MakeQuerySchedule("default", GIGABYTE, pool_config, 2,
+      PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_EQ(GIGABYTE, query_schedule->per_backend_mem_to_admit());
+  ASSERT_EQ(GIGABYTE, query_schedule->coord_backend_mem_to_admit());
+  ASSERT_EQ(GIGABYTE, query_schedule->per_backend_mem_limit());
+  ASSERT_EQ(GIGABYTE, query_schedule->coord_backend_mem_limit());
+
+  // Set mem_limit query option and max_query_mem_limit. In this case, max_query_mem_limit
+  // will be enforced on both coordinator and executor.
+  query_schedule = MakeQuerySchedule("default", GIGABYTE, pool_config, 2,
+      PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
+  pool_config.__set_max_query_mem_limit(700 * MEGABYTE);
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_to_admit());
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->coord_backend_mem_to_admit());
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->per_backend_mem_limit());
+  ASSERT_EQ(700 * MEGABYTE, query_schedule->coord_backend_mem_limit());
+}
+
+// Test admission decisions for clusters with dedicated coordinators, where different
+// amounts of memory should be admitted on coordinators and executors.
+TEST_F(AdmissionControllerTest, DedicatedCoordAdmissionChecks) {
+  AdmissionController* admission_controller = MakeAdmissionController();
+  RequestPoolService* request_pool_service = admission_controller->request_pool_service_;
+
+  TPoolConfig pool_config;
+  ASSERT_OK(request_pool_service->GetPoolConfig("default", &pool_config));
+  pool_config.__set_max_mem_resources(2*GIGABYTE); // to enable memory based admission.
+
+  // Set up a query schedule to test.
+  const int64_t PER_EXEC_MEM_ESTIMATE = GIGABYTE;
+  const int64_t COORD_MEM_ESTIMATE = 150 * MEGABYTE;
+  QuerySchedule* query_schedule = MakeQuerySchedule(
+      "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+  PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
+  // Add coordinator backend.
+  BackendExecParams* coord_exec_params = pool_.Add(new BackendExecParams());
+  coord_exec_params->admit_mem_limit = 512 * MEGABYTE;
+  coord_exec_params->contains_coord_fragment = true;
+  coord_exec_params->thread_reservation = 1;
+  const string coord_host_name = Substitute("host$0", 1);
+  TNetworkAddress coord_addr = MakeNetworkAddress(coord_host_name, 25000);
+  const string coord_host = TNetworkAddressToString(coord_addr);
+  per_backend_exec_params->emplace(coord_addr, *coord_exec_params);
+  // Add executor backend.
+  BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
+  backend_exec_params->admit_mem_limit = GIGABYTE;
+  backend_exec_params->thread_reservation = 1;
+  const string exec_host_name = Substitute("host$0", 2);
+  TNetworkAddress exec_addr = MakeNetworkAddress(exec_host_name, 25000);
+  const string exec_host = TNetworkAddressToString(exec_addr);
+  per_backend_exec_params->emplace(exec_addr, *backend_exec_params);
+  query_schedule->set_per_backend_exec_params(*per_backend_exec_params);
+  string not_admitted_reason;
+  // Test 1: coord's admit_mem_limit < executor's admit_mem_limit. Query should not
+  // be rejected because query fits on both executor and coordinator. It should be
+  // queued if there is not enough capacity.
+  query_schedule->UpdateMemoryRequirements(pool_config);
+  ASSERT_FALSE(admission_controller->RejectForSchedule(
+      *query_schedule, pool_config, 2, 2, &not_admitted_reason));
+  ASSERT_TRUE(admission_controller->HasAvailableMemResources(
+      *query_schedule, pool_config, 2, &not_admitted_reason));
+  // Coord does not have enough available memory.
+  admission_controller->host_stats_[coord_host].mem_reserved = 500 * MEGABYTE;
+  ASSERT_FALSE(admission_controller->HasAvailableMemResources(
+      *query_schedule, pool_config, 2, &not_admitted_reason));
+  EXPECT_STR_CONTAINS(not_admitted_reason,
+      "Not enough memory available on host host1:25000. Needed 150.00 MB but only "
+      "12.00 MB out of 512.00 MB was available.");
+  not_admitted_reason.clear();
+  // Neither coordinator or executor has enough available memory.
+  admission_controller->host_stats_[exec_host].mem_reserved = 500 * MEGABYTE;
+  ASSERT_FALSE(admission_controller->HasAvailableMemResources(
+      *query_schedule, pool_config, 2, &not_admitted_reason));
+  EXPECT_STR_CONTAINS(not_admitted_reason,
+      "Not enough memory available on host host2:25000. Needed 1.00 GB but only "
+      "524.00 MB out of 1.00 GB was available.");
+  not_admitted_reason.clear();
+  // Executor does not have enough available memory.
+  admission_controller->host_stats_[coord_host].mem_reserved = 0;
+  ASSERT_FALSE(admission_controller->HasAvailableMemResources(
+      *query_schedule, pool_config, 2, &not_admitted_reason));
+  EXPECT_STR_CONTAINS(not_admitted_reason,
+      "Not enough memory available on host host2:25000. Needed 1.00 GB but only "
+      "524.00 MB out of 1.00 GB was available.");
+  not_admitted_reason.clear();
+  admission_controller->host_stats_[exec_host].mem_reserved = 0;
+
+  // Test 2: coord's admit_mem_limit < executor's admit_mem_limit. Query rejected because
+  // coord's admit_mem_limit is less than mem_to_admit on the coord.
+  // Re-using previous QuerySchedule object.
+  coord_exec_params->admit_mem_limit = 100 * MEGABYTE;
+  (*per_backend_exec_params)[coord_addr] = *coord_exec_params;
+  query_schedule->set_per_backend_exec_params(*per_backend_exec_params);
+  ASSERT_TRUE(admission_controller->RejectForSchedule(
+      *query_schedule, pool_config, 2, 2, &not_admitted_reason));
+  EXPECT_STR_CONTAINS(not_admitted_reason,
+      "request memory needed 150.00 MB is greater than memory available for "
+      "admission 100.00 MB of host1:25000");
+  ASSERT_FALSE(admission_controller->HasAvailableMemResources(
+      *query_schedule, pool_config, 2, &not_admitted_reason));
+  EXPECT_STR_CONTAINS(not_admitted_reason,
+      "Not enough memory available on host host1:25000. Needed 150.00 MB but only "
+      "100.00 MB out of 100.00 MB was available.");
+  not_admitted_reason.clear();
+
+  // Test 3: Make sure that coord and executors have separate checks on for whether their
+  // mem limits can accommodate their respective initial reservations.
+    query_schedule = MakeQuerySchedule(
+      "default", 0, pool_config, 2, PER_EXEC_MEM_ESTIMATE, COORD_MEM_ESTIMATE, true);
+    pool_config.__set_min_query_mem_limit(MEGABYTE); // to auto set mem_limit(s).
+    query_schedule->UpdateMemoryRequirements(pool_config);
+    query_schedule->set_largest_min_reservation(600 * MEGABYTE);
+    query_schedule->set_coord_min_reservation(50 * MEGABYTE);
+    ASSERT_TRUE(AdmissionController::CanAccommodateMaxInitialReservation(
+        *query_schedule, pool_config, &not_admitted_reason));
+    // Coordinator reservation doesn't fit.
+    query_schedule->set_coord_min_reservation(200 * MEGABYTE);
+    ASSERT_FALSE(AdmissionController::CanAccommodateMaxInitialReservation(
+        *query_schedule, pool_config, &not_admitted_reason));
+    EXPECT_STR_CONTAINS(not_admitted_reason, "minimum memory reservation is greater "
+        "than memory available to the query for buffer reservations. Memory reservation "
+        "needed given the current plan: 200.00 MB");
+    // Neither coordinator or executor reservation fits.
+    query_schedule->set_largest_min_reservation(GIGABYTE);
+    ASSERT_FALSE(AdmissionController::CanAccommodateMaxInitialReservation(
+        *query_schedule, pool_config, &not_admitted_reason));
+    EXPECT_STR_CONTAINS(not_admitted_reason, "minimum memory reservation is greater "
+        "than memory available to the query for buffer reservations. Memory reservation "
+        "needed given the current plan: 1.00 GB");
+    // Coordinator reservation doesn't fit.
+    query_schedule->set_coord_min_reservation(50 * MEGABYTE);
+    query_schedule->set_largest_min_reservation(GIGABYTE);
+    ASSERT_FALSE(AdmissionController::CanAccommodateMaxInitialReservation(
+        *query_schedule, pool_config, &not_admitted_reason));
+    EXPECT_STR_CONTAINS(not_admitted_reason, "minimum memory reservation is greater "
+        "than memory available to the query for buffer reservations. Memory reservation "
+        "needed given the current plan: 1.00 GB");
+}
+
 } // end namespace impala
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index edaf5e5..d31c50b 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -56,6 +56,8 @@ DEFINE_int64_hidden(admission_control_stale_topic_threshold_ms, 5 * 1000,
     "capture most cases where the Impala daemon is disconnected from the statestore "
     "or topic updates are seriously delayed.");
 
+DECLARE_bool(is_executor);
+
 namespace impala {
 
 const int64_t AdmissionController::PoolStats::HISTOGRAM_NUM_OF_BINS = 128;
@@ -202,7 +204,7 @@ const string REASON_REQ_OVER_POOL_MEM =
     "The total memory needed is the per-node MEM_LIMIT times the number of nodes "
     "executing the query. See the Admission Control documentation for more information.";
 const string REASON_REQ_OVER_NODE_MEM =
-    "request memory needed $0 per node is greater than memory available for admission $1 "
+    "request memory needed $0 is greater than memory available for admission $1 "
     "of $2.\n\nUse the MEM_LIMIT query option to indicate how much memory is required "
     "per node.";
 const string REASON_THREAD_RESERVATION_LIMIT_EXCEEDED =
@@ -230,7 +232,7 @@ const string POOL_MEM_NOT_AVAILABLE =
     "Not enough aggregate memory available in pool $0 "
     "with max mem resources $1 ($2). Needed $3 but only $4 was available.$5";
 // $0 = host name, $1 = host mem needed, $3 = host mem available, $4 = staleness detail
-const string HOST_MEM_NOT_AVAILABLE = "Not enough memory available on host $0."
+const string HOST_MEM_NOT_AVAILABLE = "Not enough memory available on host $0. "
     "Needed $1 but only $2 out of $3 was available.$4";
 
 // $0 = host name, $1 = num admitted, $2 = max requests
@@ -392,18 +394,20 @@ void AdmissionController::PoolStats::Dequeue(bool timed_out) {
 }
 
 void AdmissionController::UpdateHostStats(
-    const QuerySchedule& schedule, int64_t per_node_mem, int64_t num_queries) {
-  DCHECK_NE(per_node_mem, 0);
-  DCHECK(num_queries == 1 || num_queries == -1)
-      << "Invalid number of queries: " << num_queries;
+    const QuerySchedule& schedule, bool is_admitting) {
+  int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host_addr = entry.first;
+    int64_t mem_to_admit = entry.second.contains_coord_fragment ?
+        schedule.coord_backend_mem_to_admit() : per_backend_mem_to_admit;
+    if (!is_admitting) mem_to_admit *= -1;
     const string host = TNetworkAddressToString(host_addr);
     VLOG_ROW << "Update admitted mem reserved for host=" << host
              << " prev=" << PrintBytes(host_stats_[host].mem_admitted)
-             << " new=" << PrintBytes(host_stats_[host].mem_admitted + per_node_mem);
-    host_stats_[host].mem_admitted += per_node_mem;
+             << " new=" << PrintBytes(host_stats_[host].mem_admitted + mem_to_admit);
+    host_stats_[host].mem_admitted += mem_to_admit;
     DCHECK_GE(host_stats_[host].mem_admitted, 0);
+    int num_queries = is_admitting ? 1 : -1;
     VLOG_ROW << "Update admitted queries for host=" << host
              << " prev=" << host_stats_[host].num_admitted
              << " new=" << host_stats_[host].num_admitted + num_queries;
@@ -412,23 +416,34 @@ void AdmissionController::UpdateHostStats(
   }
 }
 
+// Helper method used by CanAccommodateMaxInitialReservation(). Returns true if the given
+// 'mem_limit' can accommodate 'buffer_reservation'. If not, returns false and the
+// details about the memory shortage in 'mem_unavailable_reason'.
+static bool CanMemLimitAccommodateReservation(
+    int64_t mem_limit, int64_t buffer_reservation, string* mem_unavailable_reason) {
+  if (mem_limit <= 0) return true; // No mem limit.
+  const int64_t max_reservation =
+      ReservationUtil::GetReservationLimitFromMemLimit(mem_limit);
+  if (buffer_reservation <= max_reservation) return true;
+  const int64_t required_mem_limit =
+      ReservationUtil::GetMinMemLimitFromReservation(buffer_reservation);
+  *mem_unavailable_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
+      PrintBytes(buffer_reservation), PrintBytes(required_mem_limit));
+  return false;
+}
+
 bool AdmissionController::CanAccommodateMaxInitialReservation(
     const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
     string* mem_unavailable_reason) {
-  const int64_t per_backend_mem_limit = schedule.per_backend_mem_limit();
-  if (per_backend_mem_limit > 0) {
-    const int64_t max_reservation =
-        ReservationUtil::GetReservationLimitFromMemLimit(per_backend_mem_limit);
-    const int64_t largest_min_mem_reservation = schedule.largest_min_reservation();
-    if (largest_min_mem_reservation > max_reservation) {
-      const int64_t required_mem_limit =
-          ReservationUtil::GetMinMemLimitFromReservation(largest_min_mem_reservation);
-      *mem_unavailable_reason = Substitute(REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION,
-          PrintBytes(largest_min_mem_reservation), PrintBytes(required_mem_limit));
-      return false;
-    }
-  }
-  return true;
+  const int64_t executor_mem_limit = schedule.per_backend_mem_limit();
+  const int64_t executor_min_reservation = schedule.largest_min_reservation();
+  const int64_t coord_mem_limit = schedule.coord_backend_mem_limit();
+  const int64_t coord_min_reservation = schedule.coord_min_reservation();
+  return CanMemLimitAccommodateReservation(
+             executor_mem_limit, executor_min_reservation, mem_unavailable_reason)
+      && (!schedule.requiresCoordinatorFragment()
+             || CanMemLimitAccommodateReservation(
+                    coord_mem_limit, coord_min_reservation, mem_unavailable_reason));
 }
 
 bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule,
@@ -444,11 +459,10 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   //    specified.
   // 2) Each individual backend must have enough mem available within its process limit
   //    to execute the query.
-  int64_t per_host_mem_to_admit = schedule.per_backend_mem_to_admit();
-  int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit();
 
   // Case 1:
   PoolStats* stats = GetPoolStats(schedule);
+  int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit();
   VLOG_RPC << "Checking agg mem in pool=" << pool_name << " : " << stats->DebugString()
            << " executor_group=" << schedule.executor_group()
            << " cluster_mem_needed=" << PrintBytes(cluster_mem_to_admit)
@@ -464,6 +478,8 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   }
 
   // Case 2:
+  int64_t executor_mem_to_admit = schedule.per_backend_mem_to_admit();
+  int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
@@ -471,15 +487,17 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
     const HostStats& host_stats = host_stats_[host_id];
     int64_t mem_reserved = host_stats.mem_reserved;
     int64_t mem_admitted = host_stats.mem_admitted;
+    int64_t mem_to_admit = entry.second.contains_coord_fragment ?
+        coord_mem_to_admit : executor_mem_to_admit;
     VLOG_ROW << "Checking memory on host=" << host_id
              << " mem_reserved=" << PrintBytes(mem_reserved)
              << " mem_admitted=" << PrintBytes(mem_admitted)
-             << " needs=" << PrintBytes(per_host_mem_to_admit)
+             << " needs=" << PrintBytes(mem_to_admit)
              << " admit_mem_limit=" << PrintBytes(admit_mem_limit);
     int64_t effective_host_mem_reserved = std::max(mem_reserved, mem_admitted);
-    if (effective_host_mem_reserved + per_host_mem_to_admit > admit_mem_limit) {
+    if (effective_host_mem_reserved + mem_to_admit > admit_mem_limit) {
       *mem_unavailable_reason =
-          Substitute(HOST_MEM_NOT_AVAILABLE, host_id, PrintBytes(per_host_mem_to_admit),
+          Substitute(HOST_MEM_NOT_AVAILABLE, host_id, PrintBytes(mem_to_admit),
               PrintBytes(max(admit_mem_limit - effective_host_mem_reserved, 0L)),
               PrintBytes(admit_mem_limit), GetStalenessDetailLocked(" "));
       return false;
@@ -610,13 +628,15 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
     string* rejection_reason) {
   DCHECK(rejection_reason != nullptr && rejection_reason->empty());
 
-  // Compute the max (over all backends) and cluster total (across all backends) for
-  // min_mem_reservation_bytes and thread_reservation and the min (over all backends)
-  // min_admit_mem_limit.
+  // Compute the max (over all backends), the cluster totals (across all backends) for
+  // min_mem_reservation_bytes, thread_reservation, the min admit_mem_limit
+  // (over all executors) and the admit_mem_limit of the coordinator.
   pair<const TNetworkAddress*, int64_t> largest_min_mem_reservation(nullptr, -1);
   int64_t cluster_min_mem_reservation_bytes = 0;
   pair<const TNetworkAddress*, int64_t> max_thread_reservation(nullptr, 0);
-  pair<const TNetworkAddress*, int64_t> min_admit_mem_limit(
+  pair<const TNetworkAddress*, int64_t> min_executor_admit_mem_limit(
+      nullptr, std::numeric_limits<int64_t>::max());
+  pair<const TNetworkAddress*, int64_t> coord_admit_mem_limit(
       nullptr, std::numeric_limits<int64_t>::max());
   int64_t cluster_thread_reservation = 0;
   for (const auto& e : schedule.per_backend_exec_params()) {
@@ -629,9 +649,12 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
     if (e.second.thread_reservation > max_thread_reservation.second) {
       max_thread_reservation = make_pair(&e.first, e.second.thread_reservation);
     }
-    if (e.second.admit_mem_limit < min_admit_mem_limit.second) {
-      min_admit_mem_limit.first = &e.first;
-      min_admit_mem_limit.second = e.second.admit_mem_limit;
+    if (e.second.contains_coord_fragment) {
+      coord_admit_mem_limit.first = &e.first;
+      coord_admit_mem_limit.second = e.second.admit_mem_limit;
+    } else if (e.second.admit_mem_limit < min_executor_admit_mem_limit.second) {
+      min_executor_admit_mem_limit.first = &e.first;
+      min_executor_admit_mem_limit.second = e.second.admit_mem_limit;
     }
   }
 
@@ -689,16 +712,30 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
               PrintBytes(max_mem), GetMaxMemForPoolDescription(pool_cfg, group_size));
       return true;
     }
-    int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
-    VLOG_ROW << "Checking backend mem with per_backend_mem_to_admit = "
-             << per_backend_mem_to_admit
-             << " and min_admit_mem_limit.second = " << min_admit_mem_limit.second;
-    if (per_backend_mem_to_admit > min_admit_mem_limit.second) {
-      *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
-          PrintBytes(per_backend_mem_to_admit), PrintBytes(min_admit_mem_limit.second),
-          TNetworkAddressToString(*min_admit_mem_limit.first));
+    int64_t executor_mem_to_admit = schedule.per_backend_mem_to_admit();
+    VLOG_ROW << "Checking executor mem with executor_mem_to_admit = "
+             << executor_mem_to_admit
+             << " and min_admit_mem_limit.second = "
+             << min_executor_admit_mem_limit.second;
+    if (executor_mem_to_admit > min_executor_admit_mem_limit.second) {
+      *rejection_reason =
+          Substitute(REASON_REQ_OVER_NODE_MEM, PrintBytes(executor_mem_to_admit),
+              PrintBytes(min_executor_admit_mem_limit.second),
+              TNetworkAddressToString(*min_executor_admit_mem_limit.first));
       return true;
     }
+    if (schedule.requiresCoordinatorFragment()) {
+      int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
+      VLOG_ROW << "Checking coordinator mem with coord_mem_to_admit = "
+               << coord_mem_to_admit
+               << " and coord_admit_mem_limit.second = " << coord_admit_mem_limit.second;
+      if (coord_mem_to_admit > coord_admit_mem_limit.second) {
+        *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
+            PrintBytes(coord_mem_to_admit), PrintBytes(coord_admit_mem_limit.second),
+            TNetworkAddressToString(*coord_admit_mem_limit.first));
+        return true;
+      }
+    }
   }
   return false;
 }
@@ -710,8 +747,7 @@ void AdmissionController::PoolStats::UpdateConfigMetrics(
   metrics_.pool_max_queued->SetValue(pool_cfg.max_queued);
   metrics_.max_query_mem_limit->SetValue(pool_cfg.max_query_mem_limit);
   metrics_.min_query_mem_limit->SetValue(pool_cfg.min_query_mem_limit);
-  metrics_.clamp_mem_limit_query_option->SetValue(
-      pool_cfg.clamp_mem_limit_query_option);
+  metrics_.clamp_mem_limit_query_option->SetValue(pool_cfg.clamp_mem_limit_query_option);
   metrics_.max_running_queries_multiple->SetValue(pool_cfg.max_running_queries_multiple);
   metrics_.max_queued_queries_multiple->SetValue(pool_cfg.max_queued_queries_multiple);
   metrics_.max_memory_multiple->SetValue(pool_cfg.max_memory_multiple);
@@ -896,7 +932,7 @@ void AdmissionController::ReleaseQuery(
     lock_guard<mutex> lock(admission_ctrl_lock_);
     PoolStats* stats = GetPoolStats(schedule);
     stats->Release(schedule, peak_mem_consumption);
-    UpdateHostStats(schedule, -schedule.per_backend_mem_to_admit(), -1);
+    UpdateHostStats(schedule, /*is_admitting=*/false);
     pools_for_updates_.insert(pool_name);
     VLOG_RPC << "Released query id=" << PrintId(schedule.query_id()) << " "
              << stats->DebugString();
@@ -1112,9 +1148,11 @@ Status AdmissionController::ComputeGroupSchedules(
   for (const ExecutorGroup* executor_group : executor_groups) {
     DCHECK(executor_group->IsHealthy());
     DCHECK_GT(executor_group->NumExecutors(), 0);
+    bool is_dedicated_coord = !FLAGS_is_executor;
     unique_ptr<QuerySchedule> group_schedule =
         make_unique<QuerySchedule>(request.query_id, request.request,
-            request.query_options, request.summary_profile, request.query_events);
+            request.query_options, is_dedicated_coord, request.summary_profile,
+            request.query_events);
     const string& group_name = executor_group->name();
     VLOG(3) << "Scheduling for executor group: " << group_name << " with "
             << executor_group->NumExecutors() << " executors";
@@ -1170,7 +1208,9 @@ bool AdmissionController::FindGroupToAdmitOrReject(int64_t cluster_size,
     VLOG_QUERY << "Trying to admit id=" << PrintId(schedule->query_id())
                << " in pool_name=" << pool_name << " executor_group_name=" << group_name
                << " per_host_mem_estimate="
-               << PrintBytes(schedule->GetPerHostMemoryEstimate())
+               << PrintBytes(schedule->GetPerExecutorMemoryEstimate())
+               << " dedicated_coord_mem_estimate="
+               << PrintBytes(schedule->GetDedicatedCoordMemoryEstimate())
                << " max_requests=" << max_requests << " ("
                << GetMaxRequestsForPoolDescription(pool_config, cluster_size) << ")"
                << " max_queued=" << max_queued << " ("
@@ -1408,10 +1448,14 @@ void AdmissionController::AdmitQuery(QuerySchedule* schedule, bool was_queued) {
            << " per_backend_mem_limit set to: "
            << PrintBytes(schedule->per_backend_mem_limit())
            << " per_backend_mem_to_admit set to: "
-           << PrintBytes(schedule->per_backend_mem_to_admit());
+           << PrintBytes(schedule->per_backend_mem_to_admit())
+           << " coord_backend_mem_limit set to: "
+           << PrintBytes(schedule->coord_backend_mem_limit())
+           << " coord_backend_mem_to_admit set to: "
+           << PrintBytes(schedule->coord_backend_mem_to_admit());;
   // Update memory and number of queries.
   pool_stats->Admit(*schedule);
-  UpdateHostStats(*schedule, schedule->per_backend_mem_to_admit(), 1);
+  UpdateHostStats(*schedule, /* is_admitting=*/true);
   // Update summary profile.
   const string& admission_result =
       was_queued ? PROFILE_INFO_VAL_ADMIT_QUEUED : PROFILE_INFO_VAL_ADMIT_IMMEDIATELY;
@@ -1487,6 +1531,10 @@ void AdmissionController::PoolToJsonLocked(const string& pool_name,
         "mem_limit", schedule->per_backend_mem_limit(), document->GetAllocator());
     query_info.AddMember("mem_limit_to_admit", schedule->per_backend_mem_to_admit(),
         document->GetAllocator());
+    query_info.AddMember("coord_mem_limit", schedule->coord_backend_mem_limit(),
+        document->GetAllocator());
+    query_info.AddMember("coord_mem_to_admit",
+        schedule->coord_backend_mem_to_admit(), document->GetAllocator());
     query_info.AddMember("num_backends", schedule->per_backend_exec_params().size(),
         document->GetAllocator());
     queued_queries.PushBack(query_info, document->GetAllocator());
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index d71327c..7cf2343 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -770,16 +770,19 @@ class AdmissionController {
   bool CanAdmitRequest(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
       int64_t cluster_size, bool admit_from_queue, std::string* not_admitted_reason);
 
-  /// Returns true if the per host mem limit for the query represented by 'schedule' is
-  /// large enough to accommodate the largest initial reservation required. Otherwise,
-  /// returns false with the details about the memory shortage in
-  /// 'mem_unavailable_reason'. Possible cases where it can return false are:
+  /// Returns true if all executors can accommodate the largest initial reservation of
+  /// any executor and the backend running the coordinator fragment can accommodate its
+  /// own initial reservation. Otherwise, returns false with the details about the memory
+  /// shortage in 'mem_unavailable_reason'. Possible cases where it can return false are:
   /// 1. The pool.max_query_mem_limit is set too low
   /// 2. mem_limit in query options is set low and no max/min_query_mem_limit is set in
   ///    the pool configuration.
   /// 3. mem_limit in query options is set low and min_query_mem_limit is also set low.
   /// 4. mem_limit in query options is set low and the pool.min_query_mem_limit is set
   ///    to a higher value but pool.clamp_mem_limit_query_option is false.
+  /// 5. If a dedicated coordinator is used and the mem_limit in query options is set
+  ///    lower than what is required to support the sum of initial memory reservations of
+  ///    the fragments scheduled on the coordinator.
   static bool CanAccommodateMaxInitialReservation(const QuerySchedule& schedule,
       const TPoolConfig& pool_cfg, std::string* mem_unavailable_reason);
 
@@ -800,11 +803,10 @@ class AdmissionController {
   bool HasAvailableSlot(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
       string* unavailable_reason);
 
-  /// Adds 'per_node_mem' and 'num_queries' to the per-host stats in host_stats_ for each
-  /// host in 'schedule'. Must hold admission_ctrl_lock_. Note that 'per_node_mem' and
-  /// 'num_queries' may be negative when a query completes.
-  void UpdateHostStats(
-      const QuerySchedule& schedule, int64_t per_node_mem, int64_t num_queries);
+  /// Updates the memory admitted and the num of queries running for each host in
+  /// 'schedule'. If 'is_admitting' is true, the memory admitted and the num of queries is
+  /// increased, otherwise it is decreased.
+  void UpdateHostStats(const QuerySchedule& schedule, bool is_admitting);
 
   /// Rejection happens in several stages
   /// 1) Based on static pool configuration
@@ -938,6 +940,8 @@ class AdmissionController {
   FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
   FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
   FRIEND_TEST(AdmissionControllerTest, QueryRejection);
+  FRIEND_TEST(AdmissionControllerTest, DedicatedCoordQuerySchedule);
+  FRIEND_TEST(AdmissionControllerTest, DedicatedCoordAdmissionChecks);
   friend class AdmissionControllerTest;
 };
 
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 84d939a..fceee52 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -35,10 +35,15 @@ using boost::uuids::random_generator;
 using boost::uuids::uuid;
 using namespace impala;
 
+DEFINE_bool_hidden(use_dedicated_coordinator_estimates, true,
+    "Hidden option to fall back to legacy memory estimation logic for dedicated"
+    " coordinators wherein the same per backend estimate is used for both coordinators "
+    "and executors.");
+
 namespace impala {
 
-QuerySchedule::QuerySchedule(const TUniqueId& query_id,
-    const TQueryExecRequest& request, const TQueryOptions& query_options,
+QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
+    const TQueryOptions& query_options, bool is_dedicated_coord,
     RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events)
   : query_id_(query_id),
     request_(request),
@@ -46,18 +51,21 @@ QuerySchedule::QuerySchedule(const TUniqueId& query_id,
     summary_profile_(summary_profile),
     query_events_(query_events),
     num_scan_ranges_(0),
-    next_instance_id_(query_id) {
+    next_instance_id_(query_id),
+    is_dedicated_coord_(is_dedicated_coord) {
   Init();
 }
 
 QuerySchedule::QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-    const TQueryOptions& query_options, RuntimeProfile* summary_profile)
+    const TQueryOptions& query_options, bool is_dedicated_coord,
+    RuntimeProfile* summary_profile)
   : query_id_(query_id),
     request_(request),
     query_options_(query_options),
     summary_profile_(summary_profile),
     num_scan_ranges_(0),
-    next_instance_id_(query_id) {
+    next_instance_id_(query_id),
+    is_dedicated_coord_(is_dedicated_coord) {
   // Init() is not called, this constructor is for white box testing only.
   DCHECK(TestInfo::is_test());
 }
@@ -81,7 +89,7 @@ void QuerySchedule::Init() {
 
   // mark coordinator fragment
   const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
-  if (request_.stmt_type == TStmtType::QUERY) {
+  if (requiresCoordinatorFragment()) {
     fragment_exec_params_[root_fragment.idx].is_coord_fragment = true;
     // the coordinator instance gets index 0, generated instance ids start at 1
     next_instance_id_ = CreateInstanceId(next_instance_id_, 1);
@@ -177,11 +185,16 @@ void QuerySchedule::Validate() const {
   // TODO: add validation for BackendExecParams
 }
 
-int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
+int64_t QuerySchedule::GetPerExecutorMemoryEstimate() const {
   DCHECK(request_.__isset.per_host_mem_estimate);
   return request_.per_host_mem_estimate;
 }
 
+int64_t QuerySchedule::GetDedicatedCoordMemoryEstimate() const {
+  DCHECK(request_.__isset.dedicated_coord_mem_estimate);
+  return request_.dedicated_coord_mem_estimate;
+}
+
 TUniqueId QuerySchedule::GetNextInstanceId() {
   TUniqueId result = next_instance_id_;
   ++next_instance_id_.lo;
@@ -200,7 +213,6 @@ const TPlanFragment* QuerySchedule::GetCoordFragment() const {
   return fragment;
 }
 
-
 void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) const {
   fragments->clear();
   for (const TPlanExecInfo& plan_info: request_.plan_exec_info) {
@@ -211,7 +223,7 @@ void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) c
 }
 
 const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const {
-  DCHECK_EQ(request_.stmt_type, TStmtType::QUERY);
+  DCHECK(requiresCoordinatorFragment());
   const TPlanFragment& coord_fragment =  request_.plan_exec_info[0].fragments[0];
   const FragmentExecParams& fragment_params = fragment_exec_params_[coord_fragment.idx];
   DCHECK_EQ(fragment_params.instance_exec_params.size(), 1);
@@ -235,32 +247,51 @@ int QuerySchedule::GetNumFragmentInstances() const {
 }
 
 int64_t QuerySchedule::GetClusterMemoryToAdmit() const {
-  return per_backend_mem_to_admit() *  per_backend_exec_params_.size();
+  if (!requiresCoordinatorFragment()) {
+    // For this case, there will be no coordinator fragment so only use the per
+    // executor mem to admit while accounting for admitted memory. This will also ensure
+    // the per backend mem admitted accounting is consistent with the cluster-wide mem
+    // admitted.
+    return per_backend_mem_to_admit() * per_backend_exec_params_.size();
+  } else {
+    return per_backend_mem_to_admit() * (per_backend_exec_params_.size() - 1)
+        + coord_backend_mem_to_admit();
+  }
+}
+
+bool QuerySchedule::UseDedicatedCoordEstimates() const {
+  return is_dedicated_coord_ && FLAGS_use_dedicated_coordinator_estimates;
 }
 
 void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
   // If the min_query_mem_limit and max_query_mem_limit are not set in the pool config
-  // then it falls back to traditional(old) behavior, which means that, if for_admission
-  // is false, it returns the mem_limit if it is set in the query options, else returns -1
-  // which means no limit; if for_admission is true, it returns the mem_limit if it is set
-  // in the query options, else returns the per host mem estimate calculated during
-  // planning.
+  // then it falls back to traditional(old) behavior, which means that, it sets the
+  // mem_limit if it is set in the query options, else sets it to -1 (no limit).
   bool mimic_old_behaviour =
       pool_cfg.min_query_mem_limit == 0 && pool_cfg.max_query_mem_limit == 0;
 
   per_backend_mem_to_admit_ = 0;
+  coord_backend_mem_to_admit_ = 0;
   bool has_query_option = false;
   if (query_options().__isset.mem_limit && query_options().mem_limit > 0) {
     per_backend_mem_to_admit_ = query_options().mem_limit;
+    coord_backend_mem_to_admit_ = query_options().mem_limit;
     has_query_option = true;
   }
 
   if (!has_query_option) {
-    per_backend_mem_to_admit_ = GetPerHostMemoryEstimate();
+    per_backend_mem_to_admit_ = GetPerExecutorMemoryEstimate();
+    coord_backend_mem_to_admit_ = UseDedicatedCoordEstimates() ?
+        GetDedicatedCoordMemoryEstimate() :
+        GetPerExecutorMemoryEstimate();
     if (!mimic_old_behaviour) {
-      int64_t min_mem_limit_required = ReservationUtil::GetMinMemLimitFromReservation(
-          largest_min_reservation());
+      int64_t min_mem_limit_required =
+          ReservationUtil::GetMinMemLimitFromReservation(largest_min_reservation());
       per_backend_mem_to_admit_ = max(per_backend_mem_to_admit_, min_mem_limit_required);
+      int64_t min_coord_mem_limit_required =
+          ReservationUtil::GetMinMemLimitFromReservation(coord_min_reservation());
+      coord_backend_mem_to_admit_ =
+          max(coord_backend_mem_to_admit_, min_coord_mem_limit_required);
     }
   }
 
@@ -268,21 +299,38 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
     if (pool_cfg.min_query_mem_limit > 0) {
       per_backend_mem_to_admit_ =
           max(per_backend_mem_to_admit_, pool_cfg.min_query_mem_limit);
+      if (!UseDedicatedCoordEstimates() || has_query_option) {
+        // The minimum mem limit option does not apply to dedicated coordinators -
+        // this would result in over-reserving of memory. Treat coordinator and
+        // executor mem limits the same if the query option was explicitly set.
+        coord_backend_mem_to_admit_ =
+            max(coord_backend_mem_to_admit_, pool_cfg.min_query_mem_limit);
+      }
     }
     if (pool_cfg.max_query_mem_limit > 0) {
       per_backend_mem_to_admit_ =
           min(per_backend_mem_to_admit_, pool_cfg.max_query_mem_limit);
+      coord_backend_mem_to_admit_ =
+          min(coord_backend_mem_to_admit_, pool_cfg.max_query_mem_limit);
     }
   }
 
   // Cap the memory estimate at the amount of physical memory available. The user's
   // provided value or the estimate from planning can each be unreasonable.
   per_backend_mem_to_admit_ = min(per_backend_mem_to_admit_, MemInfo::physical_mem());
+  coord_backend_mem_to_admit_ = min(coord_backend_mem_to_admit_, MemInfo::physical_mem());
+
+  // If the query is only scheduled to run on the coordinator.
+  if (per_backend_exec_params_.size() == 1 && requiresCoordinatorFragment()) {
+    per_backend_mem_to_admit_ = 0;
+  }
 
   if (mimic_old_behaviour && !has_query_option) {
     per_backend_mem_limit_ = -1;
+    coord_backend_mem_limit_ = -1;
   } else {
     per_backend_mem_limit_ = per_backend_mem_to_admit_;
+    coord_backend_mem_limit_ = coord_backend_mem_to_admit_;
   }
 }
 
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 6143fb2..ad169c7 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -81,6 +81,9 @@ struct BackendExecParams {
 
   // The maximum number of queries that this backend can execute concurrently.
   int64_t admit_num_queries_limit = 0;
+
+  // Indicates whether this backend will run the coordinator fragment.
+  bool contains_coord_fragment = false;
 };
 
 /// Map from an impalad host address to the list of assigned fragment instance params.
@@ -159,12 +162,14 @@ struct FragmentExecParams {
 class QuerySchedule {
  public:
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-      const TQueryOptions& query_options, RuntimeProfile* summary_profile,
+      const TQueryOptions& query_options, bool is_dedicated_coord,
+      RuntimeProfile* summary_profile,
       RuntimeProfile::EventSequence* query_events);
 
   /// For testing only: build a QuerySchedule object but do not run Init().
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
-      const TQueryOptions& query_options, RuntimeProfile* summary_profile);
+      const TQueryOptions& query_options, bool is_dedicated_coord,
+      RuntimeProfile* summary_profile);
 
   /// Verifies that the schedule is well-formed (and DCHECKs if it isn't):
   /// - all fragments have a FragmentExecParams
@@ -179,7 +184,12 @@ class QuerySchedule {
   const std::string& request_pool() const { return request().query_ctx.request_pool; }
 
   /// Returns the estimated memory (bytes) per-node from planning.
-  int64_t GetPerHostMemoryEstimate() const;
+  int64_t GetPerExecutorMemoryEstimate() const;
+
+  /// Returns the estimated memory (bytes) for the coordinator backend returned by the
+  /// planner. This estimate is only meaningful if this schedule was generated on a
+  /// dedicated coordinator.
+  int64_t GetDedicatedCoordMemoryEstimate() const;
 
   /// Helper methods used by scheduler to populate this QuerySchedule.
   void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
@@ -239,15 +249,26 @@ class QuerySchedule {
 
   int64_t largest_min_reservation() const { return largest_min_reservation_; }
 
+  int64_t coord_min_reservation() const { return coord_min_reservation_; }
+
   /// Must call UpdateMemoryRequirements() at least once before calling this.
   int64_t per_backend_mem_limit() const { return per_backend_mem_limit_; }
 
   /// Must call UpdateMemoryRequirements() at least once before calling this.
   int64_t per_backend_mem_to_admit() const {
-    DCHECK_GT(per_backend_mem_to_admit_, 0);
+    DCHECK_GE(per_backend_mem_to_admit_, 0);
     return per_backend_mem_to_admit_;
   }
 
+  /// Must call UpdateMemoryRequirements() at least once before calling this.
+  int64_t coord_backend_mem_limit() const { return coord_backend_mem_limit_; }
+
+  /// Must call UpdateMemoryRequirements() at least once before calling this.
+  int64_t coord_backend_mem_to_admit() const {
+    DCHECK_GT(coord_backend_mem_to_admit_, 0);
+    return coord_backend_mem_to_admit_;
+  }
+
   void set_per_backend_exec_params(const PerBackendExecParams& params) {
     per_backend_exec_params_ = params;
   }
@@ -256,10 +277,18 @@ class QuerySchedule {
     largest_min_reservation_ = largest_min_reservation;
   }
 
+  void set_coord_min_reservation(const int64_t coord_min_reservation) {
+    coord_min_reservation_ = coord_min_reservation;
+  }
+
   /// Returns the Cluster wide memory admitted by the admission controller.
   /// Must call UpdateMemoryRequirements() at least once before calling this.
   int64_t GetClusterMemoryToAdmit() const;
 
+  /// Returns true if coordinator estimates calculated by the planner and specialized for
+  /// dedicated a coordinator are to be used for estimating memory requirements.
+  bool UseDedicatedCoordEstimates() const;
+
   /// Populates or updates the per host query memory limit and the amount of memory to be
   /// admitted based on the pool configuration passed to it. Must be called at least once
   /// before making any calls to per_backend_mem_to_admit(), per_backend_mem_limit() and
@@ -270,6 +299,11 @@ class QuerySchedule {
 
   void set_executor_group(string executor_group);
 
+  /// Returns true if a coordinator fragment is required based on the query stmt type.
+  bool requiresCoordinatorFragment() const {
+    return request_.stmt_type == TStmtType::QUERY;
+  }
+
  private:
   /// These references are valid for the lifetime of this query schedule because they
   /// are all owned by the enclosing QueryExecState.
@@ -304,19 +338,36 @@ class QuerySchedule {
   /// Used to generate consecutive fragment instance ids.
   TUniqueId next_instance_id_;
 
-  /// The largest min memory reservation across all backends. Set in
+  /// The largest min memory reservation across all executors. Set in
   /// Scheduler::Schedule().
   int64_t largest_min_reservation_ = 0;
 
-  /// The memory limit per backend that will be imposed on the query.
+  /// The memory limit per executor that will be imposed on the query.
+  /// Set by the admission controller with a value that is only valid if it was admitted
+  /// successfully. -1 means no limit.
+  int64_t per_backend_mem_limit_ = -1;
+
+  /// The per executor memory used for admission accounting.
+  /// Set by the admission controller with a value that is only valid if it was admitted
+  /// successfully. Can be zero if the query is only scheduled to run on the coordinator.
+  int64_t per_backend_mem_to_admit_ = -1;
+
+  /// The memory limit for the coordinator that will be imposed on the query. Used only if
+  /// the query has a coordinator fragment.
   /// Set by the admission controller with a value that is only valid if it was admitted
   /// successfully. -1 means no limit.
-  int64_t per_backend_mem_limit_ = 0;
+  int64_t coord_backend_mem_limit_ = -1;
 
-  /// The per backend memory used for admission accounting.
+  /// The coordinator memory used for admission accounting.
   /// Set by the admission controller with a value that is only valid if it was admitted
   /// successfully.
-  int64_t per_backend_mem_to_admit_ = 0;
+  int64_t coord_backend_mem_to_admit_ = -1;
+
+  /// The coordinator's backend memory reservation. Set in Scheduler::Schedule().
+  int64_t coord_min_reservation_ = 0;
+
+  /// Indicates whether coordinator fragment will be running on a dedicated coordinator.
+  bool is_dedicated_coord_ = false;
 
   /// The name of the executor group that this schedule was computed for. Set by the
   /// Scheduler and only valid after scheduling completes successfully.
@@ -327,7 +378,6 @@ class QuerySchedule {
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
   void Init();
 };
-
 }
 
 #endif
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index de34738..b5e6807 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -673,21 +673,28 @@ void Scheduler::ComputeBackendExecParams(
       be_params.initial_mem_reservation_total_claims +=
           f.fragment.initial_mem_reservation_total_claims;
       be_params.thread_reservation += f.fragment.thread_reservation;
+      if (f.is_coord_fragment) be_params.contains_coord_fragment = true;
     }
   }
 
   int64_t largest_min_reservation = 0;
+  int64_t coord_min_reservation = 0;
   for (auto& backend : per_backend_params) {
     const TNetworkAddress& host = backend.first;
     const TBackendDescriptor be_desc = LookUpBackendDesc(executor_config, host);
     backend.second.admit_mem_limit = be_desc.admit_mem_limit;
     backend.second.admit_num_queries_limit = be_desc.admit_num_queries_limit;
     backend.second.be_desc = be_desc;
-    largest_min_reservation =
-        max(largest_min_reservation, backend.second.min_mem_reservation_bytes);
+    if (backend.second.contains_coord_fragment) {
+      coord_min_reservation = backend.second.min_mem_reservation_bytes;
+    } else {
+      largest_min_reservation =
+          max(largest_min_reservation, backend.second.min_mem_reservation_bytes);
+    }
   }
   schedule->set_per_backend_exec_params(per_backend_params);
   schedule->set_largest_min_reservation(largest_min_reservation);
+  schedule->set_coord_min_reservation(coord_min_reservation);
 
   stringstream min_mem_reservation_ss;
   stringstream num_fragment_instances_ss;
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index c5b77a4..c24a8fb 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -935,8 +935,10 @@ void ImpalaHttpHandler::AdmissionStateHandler(
   // Now get running queries from CRS map.
   struct QueryInfo {
     TUniqueId query_id;
-    int64_t mem_limit;
-    int64_t mem_limit_for_admission;
+    int64_t executor_mem_limit;
+    int64_t executor_mem_to_admit;
+    int64_t coord_mem_limit;
+    int64_t coord_mem_to_admit;
     unsigned long num_backends;
   };
   unordered_map<string, vector<QueryInfo>> running_queries;
@@ -950,6 +952,8 @@ void ImpalaHttpHandler::AdmissionStateHandler(
       running_queries[request_state->request_pool()].push_back(
           {request_state->query_id(), request_state->schedule()->per_backend_mem_limit(),
               request_state->schedule()->per_backend_mem_to_admit(),
+              request_state->schedule()->coord_backend_mem_limit(),
+              request_state->schedule()->coord_backend_mem_to_admit(),
               static_cast<unsigned long>(
                   request_state->schedule()->per_backend_exec_params().size())});
   });
@@ -970,9 +974,14 @@ void ImpalaHttpHandler::AdmissionStateHandler(
       Value query_info(rapidjson::kObjectType);
       Value query_id(PrintId(info.query_id).c_str(), document->GetAllocator());
       query_info.AddMember("query_id", query_id, document->GetAllocator());
-      query_info.AddMember("mem_limit", info.mem_limit, document->GetAllocator());
       query_info.AddMember(
-          "mem_limit_to_admit", info.mem_limit_for_admission, document->GetAllocator());
+          "mem_limit", info.executor_mem_limit, document->GetAllocator());
+      query_info.AddMember(
+          "mem_limit_to_admit", info.executor_mem_to_admit, document->GetAllocator());
+      query_info.AddMember(
+          "coord_mem_limit", info.coord_mem_limit, document->GetAllocator());
+      query_info.AddMember(
+          "coord_mem_to_admit", info.coord_mem_to_admit, document->GetAllocator());
       query_info.AddMember("num_backends", info.num_backends, document->GetAllocator());
       queries_in_pool.PushBack(query_info, document->GetAllocator());
     }
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index c393e6b..5bb7d20 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -46,10 +46,7 @@ DECLARE_string(principal);
 DECLARE_string(local_library_dir);
 DECLARE_string(server_name);
 DECLARE_string(authorization_policy_provider_class);
-DECLARE_string(authorized_proxy_user_config);
-DECLARE_string(authorized_proxy_user_config_delimiter);
 DECLARE_string(authorized_proxy_group_config);
-DECLARE_string(authorized_proxy_group_config_delimiter);
 DECLARE_string(catalog_topic_mode);
 DECLARE_string(kudu_master_hosts);
 DECLARE_string(reserved_words_version);
@@ -78,6 +75,8 @@ DECLARE_string(authorization_provider);
 DECLARE_bool(recursively_list_partitions);
 DECLARE_string(query_event_hook_classes);
 DECLARE_int32(query_event_hook_nthreads);
+DECLARE_bool(is_executor);
+DECLARE_bool(use_dedicated_coordinator_estimates);
 
 namespace impala {
 
@@ -155,6 +154,9 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
   cfg.__set_recursively_list_partitions(FLAGS_recursively_list_partitions);
   cfg.__set_query_event_hook_classes(FLAGS_query_event_hook_classes);
   cfg.__set_query_event_hook_nthreads(FLAGS_query_event_hook_nthreads);
+  cfg.__set_is_executor(FLAGS_is_executor);
+  cfg.__set_use_dedicated_coordinator_estimates(
+      FLAGS_use_dedicated_coordinator_estimates);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 063329f..9c5a075 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -131,4 +131,8 @@ struct TBackendGflags {
   53: required string query_event_hook_classes
 
   54: required i32 query_event_hook_nthreads
+
+  55: required bool is_executor
+
+  56: required bool use_dedicated_coordinator_estimates
 }
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index d6e5f70..e055bd9 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -444,6 +444,11 @@ struct TQueryExecRequest {
   // max DOP) required threads per host, i.e. the number of threads that this query
   // needs to execute successfully. Does not include "optional" threads.
   11: optional i64 max_per_host_thread_reservation;
+
+  // Estimated coordinator's memory consumption in bytes assuming that the coordinator
+  // fragment will run on a dedicated coordinator. Set by the planner and used by
+  // admission control.
+  12: optional i64 dedicated_coord_mem_estimate;
 }
 
 enum TCatalogOpType {
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
index c60778e..b55a546 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanFragment.java
@@ -113,6 +113,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
   // managed by this fragment.
   private long runtimeFiltersMemReservationBytes_ = 0;
 
+  public long getRuntimeFiltersMemReservationBytes() {
+    return runtimeFiltersMemReservationBytes_;
+  }
+
   /**
    * C'tor for fragment with specific partition; the output is by default broadcast.
    */
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 087954b..b9d1f30 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -49,6 +49,7 @@ import org.apache.impala.thrift.TRuntimeFilterMode;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.KuduUtil;
+import org.apache.impala.util.MathUtil;
 import org.apache.impala.util.MaxRowsProcessedVisitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,9 +70,20 @@ public class Planner {
   // estimates of zero, even if the contained PlanNodes have estimates of zero.
   public static final long MIN_PER_HOST_MEM_ESTIMATE_BYTES = 10 * 1024 * 1024;
 
+  // The amount of memory added to a dedicated coordinator's memory estimate to
+  // compensate for underestimation. In the general case estimates for exec
+  // nodes tend to overestimate and should work fine but for estimates in the
+  // 100-500 MB space, underestimates can be a problem. We pick a value of 100MB
+  // because it is trivial for large estimates and small enough to not make a
+  // huge impact on the coordinator's process memory (which ideally would be
+  // large).
+  public static final long DEDICATED_COORD_SAFETY_BUFFER_BYTES = 100 * 1024 * 1024;
+
   public static final ResourceProfile MIN_PER_HOST_RESOURCES =
-      new ResourceProfileBuilder().setMemEstimateBytes(MIN_PER_HOST_MEM_ESTIMATE_BYTES)
-      .setMinMemReservationBytes(0).build();
+      new ResourceProfileBuilder()
+          .setMemEstimateBytes(MIN_PER_HOST_MEM_ESTIMATE_BYTES)
+          .setMinMemReservationBytes(0)
+          .build();
 
   private final PlannerContext ctx_;
 
@@ -287,6 +299,10 @@ public class Planner {
           request.getMax_per_host_thread_reservation()));
       str.append(String.format("Per-Host Resource Estimates: Memory=%s\n",
           PrintUtils.printBytesRoundedToMb(request.getPer_host_mem_estimate())));
+      if (BackendConfig.INSTANCE.useDedicatedCoordinatorEstimates()) {
+        str.append(String.format("Dedicated Coordinator Resource Estimate: Memory=%s\n",
+            PrintUtils.printBytesRoundedToMb(request.getDedicated_coord_mem_estimate())));
+      }
       hasHeader = true;
     }
     // Warn if the planner is running in DEBUG mode.
@@ -389,10 +405,12 @@ public class Planner {
     // are scheduled on all nodes. The actual per-host resource requirements are computed
     // after scheduling.
     ResourceProfile maxPerHostPeakResources = ResourceProfile.invalid();
+    long totalRuntimeFilterMemBytes = 0;
 
     // Do a pass over all the fragments to compute resource profiles. Compute the
     // profiles bottom-up since a fragment's profile may depend on its descendants.
-    List<PlanFragment> allFragments = planRoots.get(0).getNodesPostOrder();
+    PlanFragment rootFragment = planRoots.get(0);
+    List<PlanFragment> allFragments = rootFragment.getNodesPostOrder();
     for (PlanFragment fragment: allFragments) {
       // Compute the per-node, per-sink and aggregate profiles for the fragment.
       fragment.computeResourceProfile(ctx_.getRootAnalyzer());
@@ -405,8 +423,11 @@ public class Planner {
       // per-fragment-instance peak resources.
       maxPerHostPeakResources = maxPerHostPeakResources.sum(
           fragment.getResourceProfile().multiply(fragment.getNumInstancesPerHost(mtDop)));
+      // Coordinator has to have a copy of each of the runtime filters to perform filter
+      // aggregation.
+      totalRuntimeFilterMemBytes += fragment.getRuntimeFiltersMemReservationBytes();
     }
-    planRoots.get(0).computePipelineMembership();
+    rootFragment.computePipelineMembership();
 
     Preconditions.checkState(maxPerHostPeakResources.getMemEstimateBytes() >= 0,
         maxPerHostPeakResources.getMemEstimateBytes());
@@ -415,13 +436,17 @@ public class Planner {
 
     maxPerHostPeakResources = MIN_PER_HOST_RESOURCES.max(maxPerHostPeakResources);
 
-    // TODO: Remove per_host_mem_estimate from the TQueryExecRequest when AC no longer
-    // needs it.
     request.setPer_host_mem_estimate(maxPerHostPeakResources.getMemEstimateBytes());
     request.setMax_per_host_min_mem_reservation(
         maxPerHostPeakResources.getMinMemReservationBytes());
     request.setMax_per_host_thread_reservation(
         maxPerHostPeakResources.getThreadReservation());
+    // Assuming the root fragment will always run on the coordinator backend, which
+    // might not be true for queries that don't have a coordinator fragment
+    // (request.getStmt_type() != TStmtType.QUERY). TODO: Fix in IMPALA-8791.
+    request.setDedicated_coord_mem_estimate(MathUtil.saturatingAdd(rootFragment
+        .getResourceProfile().getMemEstimateBytes(), totalRuntimeFilterMemBytes +
+        DEDICATED_COORD_SAFETY_BUFFER_BYTES));
     if (LOG.isTraceEnabled()) {
       LOG.trace("Max per-host min reservation: " +
           maxPerHostPeakResources.getMinMemReservationBytes());
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index aef3f41..e299c7f 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -165,6 +165,10 @@ public class BackendConfig {
     return backendCfg_.getQuery_event_hook_nthreads();
   }
 
+  public boolean useDedicatedCoordinatorEstimates() {
+    return !backendCfg_.is_executor && backendCfg_.use_dedicated_coordinator_estimates;
+  }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 492f5b0..f282246 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1496,9 +1496,14 @@ public class Frontend {
       TQueryOptions queryOptions) {
     if (queryOptions.isSetMax_mem_estimate_for_admission()
         && queryOptions.getMax_mem_estimate_for_admission() > 0) {
-      long effectiveMemEstimate = Math.min(queryExecRequest.getPer_host_mem_estimate(),
+      long effectivePerHostMemEstimate = Math.min(
+          queryExecRequest.getPer_host_mem_estimate(),
               queryOptions.getMax_mem_estimate_for_admission());
-      queryExecRequest.setPer_host_mem_estimate(effectiveMemEstimate);
+      queryExecRequest.setPer_host_mem_estimate(effectivePerHostMemEstimate);
+      long effectiveCoordinatorMemEstimate = Math.min(
+          queryExecRequest.getDedicated_coord_mem_estimate(),
+              queryOptions.getMax_mem_estimate_for_admission());
+      queryExecRequest.setDedicated_coord_mem_estimate(effectiveCoordinatorMemEstimate);
     }
   }
 
diff --git a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
new file mode 100644
index 0000000..cd53dac
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
@@ -0,0 +1,96 @@
+====
+---- QUERY
+# CTAS
+create table test as select id from functional.alltypes where id > 1
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
+row_regex: .*Cluster Memory Admitted: 32.00 MB.*
+====
+---- QUERY
+# Truncate table to run the following inserts.
+truncate table test
+====
+---- QUERY
+# Small insert(i.e. values list, runs on coordinator only).
+insert into test values (1), (2), (3)
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=10MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
+row_regex: .*Cluster Memory Admitted: 10.00 MB.*
+====
+---- QUERY
+# Large insert where it doesn't run on the coordinator.
+insert into test select id from functional.alltypes where id > 3
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=116MB.*
+row_regex: .*Cluster Memory Admitted: 32.00 MB.*
+====
+---- QUERY
+# SELECT with merging exchange (i.e. order by).
+select * from functional.alltypes order by int_col;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=28MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
+row_regex: .*Cluster Memory Admitted: 157.47 MB.*
+====
+---- QUERY
+# SELECT with non-merging exchange.
+select * from functional.alltypes;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
+row_regex: .*Cluster Memory Admitted: 133.47 MB.*
+====
+---- QUERY
+# SELECT with a non-grouping aggregate in the coordinator fragment.
+select avg(int_col) from functional.alltypes;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=36MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=110MB.*
+row_regex: .*Cluster Memory Admitted: 182.05 MB.*
+====
+---- QUERY
+# SELECT with num_nodes=1 and a complex plan in the coordinator.
+set num_nodes=1;
+select
+  l_returnflag,
+  l_linestatus,
+  sum(l_quantity) as sum_qty,
+  sum(l_extendedprice) as sum_base_price,
+  sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
+  sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
+  avg(l_quantity) as avg_qty,
+  avg(l_extendedprice) as avg_price,
+  avg(l_discount) as avg_disc,
+  count(*) as count_order
+from
+  tpch.lineitem
+where
+  l_shipdate <= '1998-09-02'
+group by
+  l_returnflag,
+  l_linestatus
+order by
+  l_returnflag,
+  l_linestatus
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=98MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=198MB.*
+row_regex: .*Cluster Memory Admitted: 198.00 MB.*
+====
+---- QUERY
+# SELECT with multiple unpartitioned analytic functions to force the sort and analytics
+# into the coordinator fragment.
+select id,
+min(int_col) over (order by year),
+min(int_col) over (order by bigint_col),
+avg(smallint_col) over (order by int_col),
+max(int_col) over (order by smallint_col rows between unbounded preceding and 1 following)
+from functional.alltypes;
+---- RUNTIME_PROFILE
+row_regex: .*Per-Host Resource Estimates: Memory=46MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=124MB.*
+row_regex: .*Cluster Memory Admitted: 216.00 MB.*
+====
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index b416270..c85220c 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -47,6 +47,7 @@ from tests.common.test_dimensions import (
     create_uncompressed_text_dimension)
 from tests.common.test_vector import ImpalaTestDimension
 from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
+from tests.verifiers.mem_usage_verifier import MemUsageVerifier
 from ImpalaService import ImpalaHiveServer2Service
 from TCLIService import TCLIService
 
@@ -473,9 +474,170 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       exec_options['mem_limit'] = long(self.PROC_MEM_TEST_LIMIT * 1.1)
       ex = self.execute_query_expect_failure(self.client, query, exec_options)
       assert ("Rejected query from pool default-pool: request memory needed "
-              "1.10 GB per node is greater than memory available for admission 1.00 GB" in
+              "1.10 GB is greater than memory available for admission 1.00 GB" in
               str(ex)), str(ex)
 
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_config_args(
+      fs_allocation_file="mem-limit-test-fair-scheduler.xml",
+      llama_site_file="mem-limit-test-llama-site.xml"), num_exclusive_coordinators=1,
+    cluster_size=2)
+  def test_dedicated_coordinator_mem_accounting(self, vector):
+    """Verify that when using dedicated coordinators, the memory admitted for and the
+    mem limit applied to the query fragments running on the coordinator is different than
+    the ones on executors."""
+    self.__verify_mem_accounting(vector, using_dedicated_coord_estimates=True)
+
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_config_args(
+      fs_allocation_file="mem-limit-test-fair-scheduler.xml",
+      llama_site_file="mem-limit-test-llama-site.xml")
+    + " -use_dedicated_coordinator_estimates false",
+    num_exclusive_coordinators=1,
+    cluster_size=2)
+  def test_dedicated_coordinator_legacy_mem_accounting(self, vector):
+    """Verify that when using dedicated coordinators with specialized dedicated coord
+    estimates turned off using a hidden startup param, the memory admitted for and the
+    mem limit applied to the query fragments running on the coordinator is the same
+    (as expected from legacy behavior)."""
+    self.__verify_mem_accounting(vector, using_dedicated_coord_estimates=False)
+
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args=impalad_admission_ctrl_config_args(
+      fs_allocation_file="mem-limit-test-fair-scheduler.xml",
+      llama_site_file="mem-limit-test-llama-site.xml"), num_exclusive_coordinators=1,
+    cluster_size=2)
+  def test_sanity_checks_dedicated_coordinator(self, vector):
+    """Test for verifying targeted dedicated coordinator memory estimation behavior."""
+    self.client.set_configuration_option('request_pool', "root.regularPool")
+    ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
+    exec_options = vector.get_value('exec_option')
+    # Make sure query option MAX_MEM_ESTIMATE_FOR_ADMISSION is enforced on the dedicated
+    # coord estimates. Without this query option the estimate would be > 100MB.
+    expected_mem = 60 * (1 << 20)  # 60MB
+    exec_options['MAX_MEM_ESTIMATE_FOR_ADMISSION'] = expected_mem
+    self.client.set_configuration(exec_options)
+    handle = self.client.execute_async(QUERY.format(1))
+    self.client.wait_for_finished_timeout(handle, 1000)
+    mem_to_admit = self.__get_mem_limits_admission_debug_page()
+    assert abs(mem_to_admit['coordinator'] - expected_mem) < 0.0001,\
+      "mem_to_admit:" + str(mem_to_admit)
+    assert abs(mem_to_admit['executor'] - expected_mem) < 0.0001, \
+      "mem_to_admit:" + str(mem_to_admit)
+    self.client.close_query(handle)
+
+    # If the query is only scheduled on the coordinator then the mem to admit on executor
+    # should be zero.
+    exec_options['NUM_NODES'] = 1
+    self.client.set_configuration(exec_options)
+    handle = self.client.execute_async(QUERY.format(1))
+    self.client.wait_for_finished_timeout(handle, 1000)
+    mem_to_admit = self.__get_mem_limits_admission_debug_page()
+    assert abs(mem_to_admit['coordinator'] - expected_mem) < 0.0001, \
+      "mem_to_admit:" + str(mem_to_admit)
+    assert abs(mem_to_admit['executor'] - 0) < 0.0001, \
+      "mem_to_admit:" + str(mem_to_admit)
+    self.client.close_query(handle)
+
+  def __verify_mem_accounting(self, vector, using_dedicated_coord_estimates):
+    """Helper method used by test_dedicated_coordinator_*_mem_accounting that verifies
+    the actual vs expected values for mem admitted and mem limit for both coord and
+    executor. Also verifies that those memory values are different if
+    'using_dedicated_coord_estimates' is true."""
+    self.client.set_configuration_option('request_pool', "root.regularPool")
+    ImpalaTestSuite.change_database(self.client, vector.get_value('table_format'))
+    handle = self.client.execute_async(QUERY.format(1))
+    self.client.wait_for_finished_timeout(handle, 1000)
+    expected_mem_limits = self.__get_mem_limits_admission_debug_page()
+    actual_mem_limits = self.__get_mem_limits_memz_debug_page(handle.get_handle().id)
+    mem_admitted = self.__get_mem_admitted_backends_debug_page()
+    debug_string = " expected_mem_limits:" + str(
+      expected_mem_limits) + " actual_mem_limits:" + str(
+      actual_mem_limits) + " mem_admitted:" + str(mem_admitted)
+    MB = 1 << 20
+    # Easiest way to check float in-equality.
+    assert abs(expected_mem_limits['coordinator'] - expected_mem_limits[
+      'executor']) > 0.0001 or not using_dedicated_coord_estimates, debug_string
+    # There may be some rounding errors so keep a margin of 5MB when verifying
+    assert abs(actual_mem_limits['coordinator'] - expected_mem_limits[
+      'coordinator']) < 5 * MB, debug_string
+    assert abs(actual_mem_limits['executor'] - expected_mem_limits[
+      'executor']) < 5 * MB, debug_string
+    assert abs(mem_admitted['coordinator'] - expected_mem_limits[
+      'coordinator']) < 5 * MB, debug_string
+    assert abs(
+      mem_admitted['executor'] - expected_mem_limits['executor']) < 5 * MB, debug_string
+
+  def __get_mem_limits_admission_debug_page(self):
+    """Helper method assumes a 2 node cluster using a dedicated coordinator. Returns the
+    mem_limit calculated by the admission controller from the impala admission debug page
+    of the coordinator impala daemon. Returns a dictionary with the keys 'coordinator'
+    and 'executor' and their respective mem values in bytes."""
+    # Based on how the cluster is setup, the first impalad in the cluster is the
+    # coordinator.
+    response_json = self.cluster.impalads[0].service.get_debug_webpage_json("admission")
+    assert 'resource_pools' in response_json
+    assert len(response_json['resource_pools']) == 1
+    assert response_json['resource_pools'][0]['running_queries']
+    assert len(response_json['resource_pools'][0]['running_queries']) == 1
+    query_info = response_json['resource_pools'][0]['running_queries'][0]
+    return {'coordinator': float(query_info["coord_mem_to_admit"]),
+            'executor': float(query_info["mem_limit"])}
+
+  def __get_mem_limits_memz_debug_page(self, query_id):
+    """Helper method assumes a 2 node cluster using a dedicated coordinator. Returns the
+    mem limits enforced on the query (identified by the 'query_id') extracted from
+    mem-tracker's output on the memz debug page of the dedicated coordinator and the
+    executor impala daemons. Returns a dictionary with the keys 'coordinator' and
+    'executor' and their respective mem values in bytes."""
+    metric_name = "Query({0})".format(query_id)
+    # Based on how the cluster is setup, the first impalad in the cluster is the
+    # coordinator.
+    mem_trackers = [MemUsageVerifier(i.service).get_mem_usage_values(metric_name) for i in
+                    self.cluster.impalads]
+    return {'coordinator': float(mem_trackers[0]['limit']),
+            'executor': float(mem_trackers[1]['limit'])}
+
+  def __get_mem_admitted_backends_debug_page(self):
+    """Helper method assumes a 2 node cluster using a dedicated coordinator. Returns the
+    mem admitted to the backends extracted from the backends debug page of the coordinator
+    impala daemon. Returns a dictionary with the keys 'coordinator' and 'executor' and
+    their respective mem values in bytes."""
+    # Based on how the cluster is setup, the first impalad in the cluster is the
+    # coordinator.
+    response_json = self.cluster.impalads[0].service.get_debug_webpage_json("backends")
+    assert 'backends' in response_json
+    assert len(response_json['backends']) == 2
+    ret = dict()
+    from tests.verifiers.mem_usage_verifier import parse_mem_value
+    for backend in response_json['backends']:
+      if backend['is_coordinator']:
+        ret['coordinator'] = parse_mem_value(backend['mem_admitted'])
+      else:
+        ret['executor'] = parse_mem_value(backend['mem_admitted'])
+    return ret
+
+  @SkipIfNotHdfsMinicluster.tuned_for_minicluster
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
+  def test_dedicated_coordinator_planner_estimates(self, vector, unique_database):
+    """Planner tests to add coverage for coordinator estimates when using dedicated
+    coordinators. Also includes coverage for verifying cluster memory admitted."""
+    vector_copy = copy(vector)
+    exec_options = vector.get_value('exec_option')
+    # Remove num_nodes from the options to allow test case runner to set it in one of
+    # the test cases.
+    del exec_options['num_nodes']
+    exec_options['num_scanner_threads'] = 1  # To make estimates consistently reproducible
+    self.run_test_case('QueryTest/dedicated-coord-mem-estimates', vector_copy,
+                       unique_database)
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1,
@@ -510,8 +672,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       exec_options['mem_limit'] = "3G"
       self.execute_query(query, exec_options)
     except ImpalaBeeswaxException as e:
-      assert re.search("Rejected query from pool \S+: request memory needed 3.00 GB per "
-          "node is greater than memory available for admission 2.00 GB of \S+", str(e)), \
+      assert re.search("Rejected query from pool \S+: request memory needed 3.00 GB"
+          " is greater than memory available for admission 2.00 GB of \S+", str(e)), \
           str(e)
     # Exercise queuing checks in admission controller.
     try:
diff --git a/www/admission_controller.tmpl b/www/admission_controller.tmpl
index 3af73ef..069f57c 100644
--- a/www/admission_controller.tmpl
+++ b/www/admission_controller.tmpl
@@ -270,8 +270,10 @@ Time since last statestore update containing admission control topic state (ms):
   <table class='table table-hover table-border'>
     <tr>
       <th>Query ID</th>
-      <th>Memory limit</th>
-      <th>Memory limit used for admission</th>
+      <th>Memory limit for the executors</th>
+      <th>Memory admitted on the executors</th>
+      <th>Memory limit for the coordinator</th>
+      <th>Memory admitted on the coordinator</th>
       <th>Num of backends it will run on</th>
       <th>Details</th>
     </tr>
@@ -280,6 +282,8 @@ Time since last statestore update containing admission control topic state (ms):
       <td>{{query_id}}</td>
       <td class='memory'>{{mem_limit}}</td>
       <td class='memory'>{{mem_limit_to_admit}}</td>
+      <td class='memory'>{{coord_mem_limit}}</td>
+      <td class='memory'>{{coord_mem_to_admit}}</td>
       <td>{{num_backends}}</td>
       <td><a href='/query_plan?query_id={{query_id}}'>Details</a></td>
     </tr>
@@ -290,8 +294,10 @@ Time since last statestore update containing admission control topic state (ms):
   <table class='table table-hover table-border'>
     <tr>
       <th>Query ID</th>
-      <th>Memory limit</th>
-      <th>Memory limit used for admission</th>
+      <th>Memory limit for the executors</th>
+      <th>Memory admitted on the executors</th>
+      <th>Memory limit for the coordinator</th>
+      <th>Memory admitted on the coordinator</th>
       <th>Num of backends it will run on</th>
       <th>Details</th>
     </tr>
@@ -300,6 +306,8 @@ Time since last statestore update containing admission control topic state (ms):
       <td>{{query_id}}</td>
       <td class='memory'>{{mem_limit}}</td>
       <td class='memory'>{{mem_limit_to_admit}}</td>
+      <td class='memory'>{{coord_mem_limit}}</td>
+      <td class='memory'>{{coord_mem_to_admit}}</td>
       <td>{{num_backends}}</td>
       <td><a href='/query_plan?query_id={{query_id}}'>Details</a></td>
     </tr>