You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/10/16 20:26:29 UTC

[impala] branch master updated: IMPALA-8998: admission control accounting for mt_dop

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b0c6740  IMPALA-8998: admission control accounting for mt_dop
b0c6740 is described below

commit b0c6740faec6b0a00dcfee126ab39324026c0ca9
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Tue Oct 1 23:13:40 2019 -0700

    IMPALA-8998: admission control accounting for mt_dop
    
    This integrates mt_dop with the "slots" mechanism that's used
    for non-default executor groups.
    
    The idea is simple - the degree of parallelism on a backend
    determines the number of slots consumed. The effective
    degree of parallelism is used, not the raw mt_dop setting.
    E.g. if the query only has a single input split and executes
    only a single fragment instance on a host, we don't want
    to count the full mt_dop value for admission control.
    
    --admission_control_slots is added as a new flag that
    replaces --max_concurrent_queries, since the name better
    reflects the concept. --max_concurrent_queries is kept
    for backwards compatibility and has the same meaning
    as --admission_control_slots.
    
    The admission control logic is extended to take this into
    account. We also add an immediate rejection code path
    since it is now possible for queries to not be admittable
    based on the # of available slots.
    
    We only factor in the "width" of the plan - i.e. the number
    of instances of fragments. We don't account for the number
    of distinct fragments, since they may not actually execute
    in parallel with each other because of dependencies.
    
    This number is added to the per-host profile as the
    "AdmissionSlots" counter.
    
    Testing:
    Added unit tests for rejection and queue/admit checks.
    
    Also includes a fix for IMPALA-9054 where we increase
    the timeout.
    
    Added end-to-end tests:
    * test_admission_slots in test_mt_dop.py that checks the
      admission slot calculation via the profile.
    * End-to-end admission test that exercises the admit
      immediately and queueing code paths.
    
    Added checks to test_verify_metrics (which runs after
    end-to-end tests) to ensure that the per-backend
    slots in use goes to 0 when the cluster is quiesced.
    
    Change-Id: I7b6b6262ef238df26b491352656a26e4163e46e5
    Reviewed-on: http://gerrit.cloudera.org:8080/14357
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
---
 be/src/runtime/coordinator-backend-state.cc        |   4 +
 be/src/runtime/exec-env.cc                         |  29 +++--
 be/src/runtime/exec-env.h                          |  13 ++-
 be/src/scheduling/admission-controller-test.cc     | 127 +++++++++++++++++++--
 be/src/scheduling/admission-controller.cc          |  51 ++++++---
 be/src/scheduling/admission-controller.h           |  46 ++++----
 be/src/scheduling/query-schedule.h                 |   6 +
 be/src/scheduling/scheduler.cc                     |  21 ++++
 be/src/service/impala-http-handler.cc              |   4 +-
 be/src/service/impala-server.cc                    |   2 +-
 common/thrift/StatestoreService.thrift             |   5 +-
 .../QueryTest/mt-dop-parquet-admission-slots.test  |  30 +++++
 tests/custom_cluster/test_admission_controller.py  |  49 +++++++-
 tests/custom_cluster/test_executor_groups.py       |  46 +++++---
 tests/query_test/test_cancellation.py              |   8 +-
 tests/query_test/test_mt_dop.py                    |  24 +++-
 tests/util/auto_scaler.py                          |   2 +-
 tests/verifiers/metric_verifier.py                 |  24 ++++
 tests/verifiers/test_verify_metrics.py             |   9 ++
 www/backends.tmpl                                  |   4 +-
 20 files changed, 416 insertions(+), 88 deletions(-)

diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 224372c..ea1691e 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -40,6 +40,7 @@
 #include "util/error-util-internal.h"
 #include "util/network-util.h"
 #include "util/pretty-printer.h"
+#include "util/runtime-profile-counters.h"
 #include "util/scope-exit-trigger.h"
 #include "util/uid-util.h"
 
@@ -76,6 +77,9 @@ void Coordinator::BackendState::Init(const vector<FragmentStats*>& fragment_stat
 
   host_profile_ = RuntimeProfile::Create(obj_pool, TNetworkAddressToString(host_));
   host_profile_parent->AddChild(host_profile_);
+  RuntimeProfile::Counter* admission_slots =
+      ADD_COUNTER(host_profile_, "AdmissionSlots", TUnit::UNIT);
+  admission_slots->Set(backend_exec_params_->slots_to_use);
 
   // populate instance_stats_map_ and install instance
   // profiles as child profiles in fragment_stats' profile
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index c82ec72..172966e 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -88,9 +88,16 @@ DEFINE_int32(state_store_subscriber_port, 23000,
 DEFINE_int32(num_hdfs_worker_threads, 16,
     "(Advanced) The number of threads in the global HDFS operation pool");
 DEFINE_int32(max_concurrent_queries, 0,
-    "(Advanced) The maximum number of queries to run on this backend concurrently "
-    "(defaults to number of cores / -num_cores for executors, and 8x that value for "
-    "dedicated coordinators).");
+    "(Deprecated) This has been replaced with --admission_control_slots, which "
+    "better accounts for the higher parallelism of queries with mt_dop > 1. "
+    "If --admission_control_slots is not set, the value of --max_concurrent_queries "
+    "is used instead for backward compatibility.");
+DEFINE_int32(admission_control_slots, 0,
+    "(Advanced) The maximum degree of parallelism to run queries with on this backend. "
+    "This determines the number of slots available to queries in admission control for "
+    "this backend. The degree of parallelism of the query determines the number of slots "
+    "that it needs. Defaults to number of cores / -num_cores for executors, and 8x that "
+    "value for dedicated coordinators).");
 
 DEFINE_bool_hidden(use_local_catalog, false,
     "Use experimental implementation of a local catalog. If this is set, "
@@ -153,7 +160,7 @@ const static string DEFAULT_FS = "fs.defaultFS";
 
 // The multiplier for how many queries a dedicated coordinator can run compared to an
 // executor. This is only effective when using non-default settings for executor groups
-// and the absolute value can be overridden by the '--max_concurrent_queries' flag.
+// and the absolute value can be overridden by the '--admission_control_slots' flag.
 const static int COORDINATOR_CONCURRENCY_MULTIPLIER = 8;
 
 namespace {
@@ -313,13 +320,19 @@ Status ExecEnv::Init() {
   }
   InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit);
 
-  admit_num_queries_limit_ = CpuInfo::num_cores();
-  if (FLAGS_max_concurrent_queries > 0) {
-    admit_num_queries_limit_ = FLAGS_max_concurrent_queries;
+  admission_slots_ = CpuInfo::num_cores();
+  if (FLAGS_admission_control_slots > 0) {
+    if (FLAGS_max_concurrent_queries > 0) {
+      LOG(WARNING) << "Ignored --max_concurrent_queries, --admission_control_slots was "
+                   << "set and takes precedence.";
+    }
+    admission_slots_ = FLAGS_admission_control_slots;
+  } else if (FLAGS_max_concurrent_queries > 0) {
+    admission_slots_ = FLAGS_max_concurrent_queries;
   } else if (FLAGS_is_coordinator && !FLAGS_is_executor) {
     // By default we assume that dedicated coordinators can handle more queries than
     // executors.
-    admit_num_queries_limit_ *= COORDINATOR_CONCURRENCY_MULTIPLIER;
+    admission_slots_ *= COORDINATOR_CONCURRENCY_MULTIPLIER;
   }
 
   InitSystemStateInfo();
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 4af0815..c622ff0 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -172,7 +172,7 @@ class ExecEnv {
       kudu::client::KuduClient** client) WARN_UNUSED_RESULT;
 
   int64_t admit_mem_limit() const { return admit_mem_limit_; }
-  int64_t admit_num_queries_limit() const { return admit_num_queries_limit_; }
+  int64_t admission_slots() const { return admission_slots_; }
 
  private:
   boost::scoped_ptr<ObjectPool> obj_pool_;
@@ -264,8 +264,15 @@ class ExecEnv {
   /// such as the JVM if --mem_limit_includes_jvm=true. Set in Init().
   int64_t admit_mem_limit_;
 
-  /// The maximum number of queries that this host can run concurrently.
-  int64_t admit_num_queries_limit_;
+  /// The maximum number of admission slots that should be used on this host. This
+  /// only takes effect if the admission slot functionality is enabled in admission
+  /// control. Until IMPALA-8757 is fixed, the slots are only checked for non-default
+  /// executor groups.
+  ///
+  /// By default, the number of slots is based on the number of cores in the system.
+  /// The number of slots limits the number of queries that can run concurrently on
+  /// this backend. Queries take up multiple slots only when mt_dop > 1.
+  int64_t admission_slots_;
 
   /// Choose a memory limit (returned in *bytes_limit) based on the --mem_limit flag and
   /// the memory available to the daemon process. Returns an error if the memory limit is
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
index fb70b2e..cf36cb9 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -84,7 +84,8 @@ class AdmissionControllerTest : public testing::Test {
   /// rejection in AdmissionController.
   QuerySchedule* MakeQuerySchedule(string request_pool_name, int64_t mem_limit,
       TPoolConfig& config, const int num_hosts, const int per_host_mem_estimate,
-      const int coord_mem_estimate, bool is_dedicated_coord) {
+      const int coord_mem_estimate, bool is_dedicated_coord,
+      const string& executor_group = ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME) {
     DCHECK_GT(num_hosts, 0);
     TQueryExecRequest* request = pool_.Add(new TQueryExecRequest());
     request->query_ctx.request_pool = request_pool_name;
@@ -98,7 +99,7 @@ class AdmissionControllerTest : public testing::Test {
     query_options->__set_mem_limit(mem_limit);
     QuerySchedule* query_schedule = pool_.Add(new QuerySchedule(
         *query_id, *request, *query_options, profile));
-    query_schedule->set_executor_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
+    query_schedule->set_executor_group(executor_group);
     SetHostsInQuerySchedule(*query_schedule, num_hosts, is_dedicated_coord);
     query_schedule->UpdateMemoryRequirements(config);
     return query_schedule;
@@ -106,9 +107,10 @@ class AdmissionControllerTest : public testing::Test {
 
   /// Same as previous MakeQuerySchedule with fewer input (more default params).
   QuerySchedule* MakeQuerySchedule(string request_pool_name, TPoolConfig& config,
-      const int num_hosts, const int per_host_mem_estimate) {
+      const int num_hosts, const int per_host_mem_estimate,
+      const string& executor_group = ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME) {
     return MakeQuerySchedule(request_pool_name, 0, config, num_hosts,
-        per_host_mem_estimate, per_host_mem_estimate, false);
+        per_host_mem_estimate, per_host_mem_estimate, false, executor_group);
   }
 
   /// Replace the per-backend hosts in the schedule with one having 'count' hosts.
@@ -118,25 +120,47 @@ class AdmissionControllerTest : public testing::Test {
   /// if a dedicated coordinator backend exists.
   void SetHostsInQuerySchedule(QuerySchedule& query_schedule, const int count,
       bool is_dedicated_coord, int64_t min_mem_reservation_bytes = 0,
-      int64_t admit_mem_limit = 200L * MEGABYTE) {
+      int64_t admit_mem_limit = 200L * MEGABYTE, int slots_to_use = 1,
+      int slots_available = 8) {
     PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
     for (int i = 0; i < count; ++i) {
+      const string host_name = Substitute("host$0", i);
+      TNetworkAddress host_addr = MakeNetworkAddress(host_name, 25000);
       BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
       backend_exec_params->min_mem_reservation_bytes = min_mem_reservation_bytes;
+      backend_exec_params->slots_to_use = slots_to_use;
       backend_exec_params->be_desc.__set_admit_mem_limit(admit_mem_limit);
+      backend_exec_params->be_desc.__set_admission_slots(slots_available);
       backend_exec_params->be_desc.__set_is_executor(true);
+      backend_exec_params->be_desc.__set_address(host_addr);
       if (i == 0) {
         // Add first element as the coordinator.
         backend_exec_params->is_coord_backend = true;
         backend_exec_params->be_desc.__set_is_executor(!is_dedicated_coord);
       }
-      const string host_name = Substitute("host$0", i);
-      per_backend_exec_params->emplace(
-          MakeNetworkAddress(host_name, 25000), *backend_exec_params);
+      per_backend_exec_params->emplace(host_addr, *backend_exec_params);
     }
     query_schedule.set_per_backend_exec_params(*per_backend_exec_params);
   }
 
+  /// Extract the host network addresses from 'schedule'.
+  vector<TNetworkAddress> GetHostAddrs(const QuerySchedule& schedule) {
+    vector<TNetworkAddress> host_addrs;
+    for (auto& backend_state : schedule.per_backend_exec_params()) {
+      host_addrs.push_back(backend_state.first);
+    }
+    return host_addrs;
+  }
+
+  /// Set the slots in use for all the hosts in 'host_addrs'.
+  void SetSlotsInUse(AdmissionController* admission_controller,
+      const vector<TNetworkAddress>& host_addrs, int slots_in_use) {
+    for (TNetworkAddress host_addr : host_addrs) {
+      string host = TNetworkAddressToString(host_addr);
+      admission_controller->host_stats_[host].slots_in_use = slots_in_use;
+    }
+  }
+
   /// Build a TTopicDelta object for IMPALA_REQUEST_QUEUE_TOPIC.
   static TTopicDelta MakeTopicDelta(const bool is_delta) {
     TTopicDelta delta;
@@ -455,6 +479,61 @@ TEST_F(AdmissionControllerTest, CanAdmitRequestCount) {
       "with 0.5 queries)");
 }
 
+/// Test CanAdmitRequest() using the slots mechanism that is enabled with non-default
+/// executor groups.
+TEST_F(AdmissionControllerTest, CanAdmitRequestSlots) {
+  // Pass the paths of the configuration files as command line flags.
+  FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml");
+  FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml");
+
+  AdmissionController* admission_controller = MakeAdmissionController();
+  RequestPoolService* request_pool_service = admission_controller->request_pool_service_;
+
+  // Get the PoolConfig for QUEUE_D ("root.queueD").
+  TPoolConfig config_d;
+  ASSERT_OK(request_pool_service->GetPoolConfig(QUEUE_D, &config_d));
+
+  // Create QuerySchedules to run on QUEUE_D on 12 hosts.
+  // Running in both default and non-default executor groups is simulated.
+  int64_t host_count = 12;
+  int64_t slots_per_host = 16;
+  int64_t slots_per_query = 4;
+  QuerySchedule* default_group_schedule =
+      MakeQuerySchedule(QUEUE_D, config_d, host_count, 30L * MEGABYTE);
+  QuerySchedule* other_group_schedule =
+      MakeQuerySchedule(QUEUE_D, config_d, host_count, 30L * MEGABYTE, "other_group");
+  for (QuerySchedule* schedule : {default_group_schedule, other_group_schedule}) {
+    SetHostsInQuerySchedule(*schedule, 2, false,
+        MEGABYTE, 200L * MEGABYTE, slots_per_query, slots_per_host);
+  }
+  vector<TNetworkAddress> host_addrs = GetHostAddrs(*default_group_schedule);
+  string not_admitted_reason;
+
+  // Simulate that there are just enough slots free for the query on all hosts.
+  SetSlotsInUse(admission_controller, host_addrs, slots_per_host - slots_per_query);
+
+  // Enough slots are available so it can be admitted in both cases.
+  ASSERT_TRUE(admission_controller->CanAdmitRequest(
+      *default_group_schedule, config_d, host_count, true, &not_admitted_reason))
+      << not_admitted_reason;
+  ASSERT_TRUE(admission_controller->CanAdmitRequest(
+      *other_group_schedule, config_d, host_count, true, &not_admitted_reason))
+      << not_admitted_reason;
+
+  // Simulate that almost all the slots are in use, which prevents admission in the
+  // non-default group.
+  SetSlotsInUse(admission_controller, host_addrs, slots_per_host - 1);
+
+  ASSERT_TRUE(admission_controller->CanAdmitRequest(
+      *default_group_schedule, config_d, host_count, true, &not_admitted_reason))
+      << not_admitted_reason;
+  ASSERT_FALSE(admission_controller->CanAdmitRequest(
+      *other_group_schedule, config_d, host_count, true, &not_admitted_reason));
+  EXPECT_STR_CONTAINS(not_admitted_reason,
+      "Not enough admission control slots available on host host1:25000. Needed 4 "
+      "slots but 15/16 are already in use.");
+}
+
 /// Tests that query rejection works as expected by calling RejectForSchedule() and
 /// RejectForCluster() directly.
 TEST_F(AdmissionControllerTest, QueryRejection) {
@@ -500,6 +579,34 @@ TEST_F(AdmissionControllerTest, QueryRejection) {
       "Cluster-wide memory reservation needed: 450.00 MB. Increase the pool max mem "
       "resources.");
 
+  // Adjust the QuerySchedule to require many slots per node.
+  // This will be rejected immediately in non-default executor groups
+  // as the nodes do not have that many slots. Because of IMPALA-8757, this check
+  // does not yet occur for the default executor group.
+  SetHostsInQuerySchedule(*query_schedule, 2, false, MEGABYTE, 200L * MEGABYTE, 16, 4);
+  string rejected_slots_reason;
+  // Don't reject for default executor group.
+  EXPECT_FALSE(admission_controller->RejectForSchedule(
+      *query_schedule, config_d, host_count, host_count, &rejected_slots_reason))
+      << rejected_slots_reason;
+  // Reject for non-default executor group.
+  QuerySchedule* other_group_schedule = MakeQuerySchedule(
+      QUEUE_D, config_d, host_count, 50L * MEGABYTE, "a_different_executor_group");
+  SetHostsInQuerySchedule(
+      *other_group_schedule, 2, false, MEGABYTE, 200L * MEGABYTE, 16, 4);
+  EXPECT_TRUE(admission_controller->RejectForSchedule(
+      *other_group_schedule, config_d, host_count, host_count, &rejected_slots_reason));
+  EXPECT_STR_CONTAINS(rejected_slots_reason, "number of admission control slots needed "
+      "(16) on backend 'host1:25000' is greater than total slots available 4. Reduce "
+      "mt_dop to less than 4 to ensure that the query can execute.");
+  rejected_slots_reason = "";
+  // Reduce mt_dop to ensure it can execute.
+  SetHostsInQuerySchedule(
+      *other_group_schedule, 2, false, MEGABYTE, 200L * MEGABYTE, 4, 4);
+  EXPECT_FALSE(admission_controller->RejectForSchedule(
+      *other_group_schedule, config_d, host_count, host_count, &rejected_slots_reason))
+      << rejected_slots_reason;
+
   // Overwrite min_query_mem_limit and max_query_mem_limit in config_d to test a message.
   // After this config_d is unusable.
   config_d.min_query_mem_limit = 600L * MEGABYTE;
@@ -692,9 +799,7 @@ TEST_F(AdmissionControllerTest, PoolStats) {
   ASSERT_EQ(1, pool_stats->agg_num_running());
   ASSERT_EQ(1, pool_stats->metrics()->agg_num_running->GetValue());
   int64_t mem_to_release = 0;
-  vector<TNetworkAddress> host_addrs;
-  for (auto backend_state : query_schedule->per_backend_exec_params()) {
-    host_addrs.push_back(backend_state.first);
+  for (auto& backend_state : query_schedule->per_backend_exec_params()) {
     mem_to_release +=
         admission_controller->GetMemToAdmit(*query_schedule, backend_state.second);
   }
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index ad78b5f..2f335bc 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -191,6 +191,10 @@ const string REASON_BUFFER_LIMIT_TOO_LOW_FOR_RESERVATION =
     "minimum memory reservation on backend '$0' is greater than memory available to the "
     "query for buffer reservations. Increase the buffer_pool_limit to $1. See the query "
     "profile for more information about the per-node memory requirements.";
+const string REASON_NOT_ENOUGH_SLOTS_ON_BACKEND =
+    "number of admission control slots needed ($0) on backend '$1' is greater than total "
+    "slots available $2. Reduce mt_dop to less than $2 to ensure that the query can "
+    "execute.";
 const string REASON_MIN_RESERVATION_OVER_POOL_MEM =
     "minimum memory reservation needed is greater than pool max mem resources. Pool "
     "max mem resources: $0 ($1). Cluster-wide memory reservation needed: $2. Increase "
@@ -241,8 +245,9 @@ const string HOST_MEM_NOT_AVAILABLE = "Not enough memory available on host $0. "
     "Needed $1 but only $2 out of $3 was available.$4";
 
 // $0 = host name, $1 = num admitted, $2 = max requests
-const string HOST_SLOT_NOT_AVAILABLE = "No query slot available on host $0. "
-                                       "$1/$2 are already admitted.";
+const string HOST_SLOT_NOT_AVAILABLE = "Not enough admission control slots available on "
+                                       "host $0. Needed $1 slots but $2/$3 are already "
+                                       "in use.";
 
 // Parses the pool name and backend_id from the topic key if it is valid.
 // Returns true if the topic key is valid and pool_name and backend_id are set.
@@ -422,7 +427,8 @@ void AdmissionController::UpdateStatsOnReleaseForBackends(
       continue;
     }
     int64_t mem_to_release = GetMemToAdmit(schedule, backend_exec_params->second);
-    UpdateHostStats(host_addr, -mem_to_release, -1);
+    UpdateHostStats(
+        host_addr, -mem_to_release, -1, -backend_exec_params->second.slots_to_use);
     total_mem_to_release += mem_to_release;
   }
   PoolStats* pool_stats = GetPoolStats(schedule);
@@ -434,7 +440,7 @@ void AdmissionController::UpdateStatsOnAdmission(const QuerySchedule& schedule)
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host_addr = entry.first;
     int64_t mem_to_admit = GetMemToAdmit(schedule, entry.second);
-    UpdateHostStats(host_addr, mem_to_admit, 1);
+    UpdateHostStats(host_addr, mem_to_admit, 1, entry.second.slots_to_use);
   }
   PoolStats* pool_stats = GetPoolStats(schedule);
   pool_stats->AdmitQueryAndMemory(schedule);
@@ -442,7 +448,8 @@ void AdmissionController::UpdateStatsOnAdmission(const QuerySchedule& schedule)
 }
 
 void AdmissionController::UpdateHostStats(
-    const TNetworkAddress& host_addr, int64_t mem_to_admit, int num_queries_to_admit) {
+    const TNetworkAddress& host_addr, int64_t mem_to_admit, int num_queries_to_admit,
+    int num_slots_to_admit) {
   const string host = TNetworkAddressToString(host_addr);
   VLOG_ROW << "Update admitted mem reserved for host=" << host
            << " prev=" << PrintBytes(host_stats_[host].mem_admitted)
@@ -454,6 +461,11 @@ void AdmissionController::UpdateHostStats(
            << " new=" << host_stats_[host].num_admitted + num_queries_to_admit;
   host_stats_[host].num_admitted += num_queries_to_admit;
   DCHECK_GE(host_stats_[host].num_admitted, 0);
+  VLOG_ROW << "Update slots in use for host=" << host
+           << " prev=" << host_stats_[host].slots_in_use
+           << " new=" << host_stats_[host].slots_in_use + num_slots_to_admit;
+  host_stats_[host].slots_in_use += num_slots_to_admit;
+  DCHECK_GE(host_stats_[host].slots_in_use, 0);
 }
 
 // Helper method used by CanAccommodateMaxInitialReservation(). Returns true if the given
@@ -550,20 +562,20 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   return true;
 }
 
-bool AdmissionController::HasAvailableSlot(const QuerySchedule& schedule,
+bool AdmissionController::HasAvailableSlots(const QuerySchedule& schedule,
     const TPoolConfig& pool_cfg, string* unavailable_reason) {
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
-    int64_t admit_num_queries_limit = entry.second.be_desc.admit_num_queries_limit;
-    int64_t num_admitted = host_stats_[host_id].num_admitted;
+    int64_t admission_slots = entry.second.be_desc.admission_slots;
+    int64_t slots_in_use = host_stats_[host_id].slots_in_use;
     VLOG_ROW << "Checking available slot on host=" << host_id
-             << " num_admitted=" << num_admitted << " needs=" << num_admitted + 1
-             << " admit_num_queries_limit=" << admit_num_queries_limit;
-    if (num_admitted >= admit_num_queries_limit) {
-      *unavailable_reason =
-          Substitute(HOST_SLOT_NOT_AVAILABLE, host_id, num_admitted,
-              admit_num_queries_limit);
+             << " slots_in_use=" << slots_in_use
+             << " needs=" << slots_in_use + entry.second.slots_to_use
+             << " executor admission_slots=" << admission_slots;
+    if (slots_in_use + entry.second.slots_to_use > admission_slots) {
+      *unavailable_reason = Substitute(HOST_SLOT_NOT_AVAILABLE, host_id,
+          entry.second.slots_to_use, slots_in_use, admission_slots);
       return false;
     }
   }
@@ -597,7 +609,7 @@ bool AdmissionController::CanAdmitRequest(const QuerySchedule& schedule,
         GetStalenessDetailLocked(" "));
     return false;
   }
-  if (!default_group && !HasAvailableSlot(schedule, pool_cfg, not_admitted_reason)) {
+  if (!default_group && !HasAvailableSlots(schedule, pool_cfg, not_admitted_reason)) {
     // All non-default executor groups are also limited by the number of running queries
     // per executor.
     // TODO(IMPALA-8757): Extend slot based admission to default executor group
@@ -663,6 +675,8 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
     const TPoolConfig& pool_cfg, int64_t cluster_size, int64_t group_size,
     string* rejection_reason) {
   DCHECK(rejection_reason != nullptr && rejection_reason->empty());
+  bool default_group =
+      schedule.executor_group() == ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME;
 
   // Compute the max (over all backends), the cluster totals (across all backends) for
   // min_mem_reservation_bytes, thread_reservation, the min admit_mem_limit
@@ -677,6 +691,13 @@ bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
   int64_t cluster_thread_reservation = 0;
   for (const auto& e : schedule.per_backend_exec_params()) {
     const BackendExecParams& bp = e.second;
+    // TODO(IMPALA-8757): Extend slot based admission to default executor group
+    if (!default_group && bp.slots_to_use > bp.be_desc.admission_slots) {
+      *rejection_reason = Substitute(REASON_NOT_ENOUGH_SLOTS_ON_BACKEND, bp.slots_to_use,
+          TNetworkAddressToString(bp.be_desc.address), bp.be_desc.admission_slots);
+      return true;
+    }
+
     cluster_min_mem_reservation_bytes += bp.min_mem_reservation_bytes;
     if (bp.min_mem_reservation_bytes > largest_min_mem_reservation.second) {
       largest_min_mem_reservation = make_pair(&e.first, bp.min_mem_reservation_bytes);
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 95c80e5..e106751 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -187,7 +187,7 @@ enum class AdmissionOutcome {
 ///
 /// In addition to the checks described before, admission to executor groups is bounded by
 /// the maximum number of queries that can run concurrently on an executor
-/// (-max_concurrent_queries). An additional check is performed to ensure that each
+/// (-admission_control_slots). An additional check is performed to ensure that each
 /// executor in the group has an available slot to run the query. Admission controllers
 /// include the number of queries that have been admitted to each executor in the
 /// statestore updates.
@@ -240,23 +240,23 @@ enum class AdmissionOutcome {
 /// Consider a cluster with a dedicated coordinator and 2 executor groups
 /// "default-pool-group-1" and "default-pool-group-2" (the number of executors per group
 /// does not matter for this example). Both executor groups will be able to serve requests
-/// from the default resource pool. Consider that each executor can only run one query at
-/// a time, i.e. --max_concurrent_queries=1 is specified for all executors. An incoming
-/// query is submitted through SubmitForAdmission(), which calls
+/// from the default resource pool. Consider that each executor has only one admission
+/// slot i.e. --admission_control_slots=1 is specified for all executors. An incoming
+/// query with mt_dop=1 is submitted through SubmitForAdmission(), which calls
 /// FindGroupToAdmitOrReject(). From there we call ComputeGroupSchedules() which calls
 /// compute schedules for both executor groups. Then we perform rejection tests and
 /// afterwards call CanAdmitRequest() for each of the schedules. Executor groups are
 /// processed in alphanumerically sorted order, so we attempt admission to group
-/// "default-pool-group-1" first. CanAdmitRequest() calls HasAvailableSlot() to check
-/// whether any of the hosts in the group have reached their maximum number of concurrent
-/// queries and since that is not the case, admission succeeds. The query is admitted and
-/// 'num_admitted' is incremented for each host in that group. When a second query arrives
-/// while the first one is still running, we perform the same steps. In particular we
-/// compute schedules for both groups and consider admission to default-pool-group-1
-/// first. However, the check in HasAvailableSlot() now fails and we will consider group
-/// default-pool-group-2 next. For this group, the check succeeds and the query is
-/// admitted, incrementing the num_admitted counter for each host in group
-/// default-pool-group-2.
+/// "default-pool-group-1" first. CanAdmitRequest() calls HasAvailableSlots() to check
+/// whether any of the hosts in the group can fit the new query in their available slots
+/// and since it does fit, admission succeeds. The query is admitted and 'slots_in_use'
+/// is incremented for each host in that group based on the effective parallelism of the
+/// query. When a second query arrives while the first one is still running, we perform
+/// the same steps. In particular we compute schedules for both groups and consider
+/// admission to default-pool-group-1 first. However, the check in HasAvailableSlots()
+/// now fails and we will consider group default-pool-group-2 next. For this group,
+/// the check succeeds and the query is admitted, incrementing the num_admitted counter
+/// for each host in group default-pool-group-2.
 ///
 /// Queuing Behavior:
 /// Once the resources in a pool are consumed, each coordinator receiving requests will
@@ -391,6 +391,8 @@ class AdmissionController {
     int64_t mem_admitted = 0;
     /// The per host number of queries admitted only for the queries admitted locally.
     int64_t num_admitted = 0;
+    /// The per host number of slots in use for the queries admitted locally.
+    int64_t slots_in_use = 0;
   };
 
   typedef std::unordered_map<std::string, HostStats> PerHostStats;
@@ -829,12 +831,12 @@ class AdmissionController {
       const TPoolConfig& pool_cfg, int64_t cluster_size,
       std::string* mem_unavailable_reason);
 
-  /// Returns true if there is an available slot on all executors in the schedule. The
-  /// number of slots per executors does not change with the group or cluster size and
-  /// instead always uses pool_cfg.max_requests. If a host does not have a free slot, this
-  /// returns false and sets 'unavailable_reason'.
+  /// Returns true if there are enough available slots on all executors in the schedule to
+  /// fit the query schedule. The number of slots per executors does not change with the
+  /// group or cluster size and instead always uses pool_cfg.max_requests. If a host does
+  /// not have a free slot, this returns false and sets 'unavailable_reason'.
   /// Must hold admission_ctrl_lock_.
-  bool HasAvailableSlot(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
+  bool HasAvailableSlots(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
       string* unavailable_reason);
 
   /// Updates the memory admitted and the num of queries running for each backend in
@@ -850,9 +852,10 @@ class AdmissionController {
       const QuerySchedule& schedule, const std::vector<TNetworkAddress>& host_addrs);
 
   /// Updates the memory admitted and the num of queries running on the specified host by
-  /// adding the specified mem and num_queries to the host stats.
+  /// adding the specified mem, num_queries and slots to the host stats.
   void UpdateHostStats(
-      const TNetworkAddress& host_addr, int64_t mem_to_admit, int num_queries_to_admit);
+      const TNetworkAddress& host_addr, int64_t mem_to_admit, int num_queries_to_admit,
+      int num_slots_to_admit);
 
   /// Rejection happens in several stages
   /// 1) Based on static pool configuration
@@ -1004,6 +1007,7 @@ class AdmissionController {
   FRIEND_TEST(AdmissionControllerTest, PoolStats);
   FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
   FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
+  FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestSlots);
   FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
   FRIEND_TEST(AdmissionControllerTest, QueryRejection);
   FRIEND_TEST(AdmissionControllerTest, DedicatedCoordQuerySchedule);
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 6725774..d019eb2 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -78,6 +78,12 @@ struct BackendExecParams {
   // concurrently-executing fragment instances at any point in query execution.
   int64_t thread_reservation = 0;
 
+  // Number of slots that this query should count for in admission control.
+  // This is calculated as the maximum # of instances of any fragment on this backend.
+  // I.e. 1 if mt_dop is not used and at most the mt_dop value if mt_dop is specified
+  // (but less if the query is not actually running with mt_dop instances on this node).
+  int slots_to_use = 0;
+
   // Indicates whether this backend is the coordinator.
   bool is_coord_backend = false;
 };
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index a2bba7b..fac4364 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -708,6 +708,27 @@ void Scheduler::ComputeBackendExecParams(
     }
   }
 
+  // Compute 'slots_to_use' for each backend based on the max # of instances of
+  // any fragment on that backend.
+  for (auto& backend : per_backend_params) {
+    int be_max_instances = 0;
+    // Instances for a fragment are clustered together because of how the vector is
+    // constructed above. So we can compute the max # of instances of any fragment
+    // with a single pass over the vector.
+    const FragmentExecParams* curr_fragment = nullptr;
+    int curr_instance_count = 0; // Number of instances of the current fragment seen.
+    for (auto& finstance : backend.second.instance_params) {
+      if (curr_fragment == nullptr ||
+          curr_fragment != &finstance->fragment_exec_params) {
+        curr_fragment = &finstance->fragment_exec_params;
+        curr_instance_count = 0;
+      }
+      ++curr_instance_count;
+      be_max_instances = max(be_max_instances, curr_instance_count);
+    }
+    backend.second.slots_to_use = be_max_instances;
+  }
+
   // This also ensures an entry always exists for the coordinator backend.
   int64_t coord_min_reservation = 0;
   const TNetworkAddress& coord_addr = executor_config.local_be_desc.address;
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index ccbc520..5ce4280 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -954,10 +954,12 @@ void ImpalaHttpHandler::BackendsHandler(const Webserver::WebRequest& req,
     Value mem_admitted(PrettyPrinter::PrintBytes(
         host_stats[address].mem_admitted).c_str(), document->GetAllocator());
     backend_obj.AddMember("mem_admitted", mem_admitted, document->GetAllocator());
-    backend_obj.AddMember("admit_num_queries_limit", backend.admit_num_queries_limit,
+    backend_obj.AddMember("admission_slots", backend.admission_slots,
         document->GetAllocator());
     backend_obj.AddMember("num_admitted", host_stats[address].num_admitted,
         document->GetAllocator());
+    backend_obj.AddMember("admission_slots_in_use", host_stats[address].slots_in_use,
+        document->GetAllocator());
     vector<string> group_names;
     for (const auto& group : backend.executor_groups) group_names.push_back(group.name);
     Value executor_groups(JoinStrings(group_names, ", ").c_str(),
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d02eb71..fbd969f 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1812,7 +1812,7 @@ void ImpalaServer::BuildLocalBackendDescriptorInternal(TBackendDescriptor* be_de
   be_desc->__set_krpc_address(krpc_address);
 
   be_desc->__set_admit_mem_limit(exec_env_->admit_mem_limit());
-  be_desc->__set_admit_num_queries_limit(exec_env_->admit_num_queries_limit());
+  be_desc->__set_admission_slots(exec_env_->admission_slots());
   be_desc->__set_is_quiescing(is_quiescing);
   be_desc->executor_groups = GetExecutorGroups(FLAGS_executor_groups);
 }
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 67a482f..b2c6e51 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -92,8 +92,9 @@ struct TBackendDescriptor {
   // is set, and currently must contain exactly one entry.
   10: required list<TExecutorGroupDesc> executor_groups;
 
-  // The number of queries that can be admitted to this backend.
-  11: required i64 admit_num_queries_limit;
+  // The number of admission slots for this backend that can be occupied by running
+  // queries.
+  11: required i64 admission_slots;
 }
 
 // Description of a single entry in a topic
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-admission-slots.test b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-admission-slots.test
new file mode 100644
index 0000000..ca0251b
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/mt-dop-parquet-admission-slots.test
@@ -0,0 +1,30 @@
+====
+---- QUERY
+# 24 partitions across 3 backends means that we'll get 4 finstances per backend
+# (since mt_dop=4, which limits us to 4 finstances per backend).
+select min(string_col) from alltypes
+---- RESULTS
+'0'
+---- RUNTIME_PROFILE
+row_regex:.*AdmissionSlots: 4.*
+====
+---- QUERY
+# 2 partitions across 3 backends means that we'll get 1 finstance per backend
+# and one unused backend.
+select min(string_col) from alltypes where month = 1
+---- RESULTS
+'0'
+---- RUNTIME_PROFILE
+row_regex:.*AdmissionSlots: 1.*
+====
+---- QUERY
+# 7 partitions across 3 backends results in 3 finstances on one backend and 2
+# finstances on the others. This test illustrates that the slots can vary between
+# backends.
+select min(string_col) from alltypes where month <= 7 and year = 2009
+---- RESULTS
+'0'
+---- RUNTIME_PROFILE
+row_regex:.*AdmissionSlots: 2.*
+row_regex:.*AdmissionSlots: 3.*
+====
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 1887a83..6f9b7c4 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -52,6 +52,7 @@ from tests.util.web_pages_util import (
     get_num_completed_backends,
     get_mem_admitted_backends_debug_page)
 from tests.verifiers.mem_usage_verifier import MemUsageVerifier
+from tests.verifiers.metric_verifier import MetricVerifier
 from ImpalaService import ImpalaHiveServer2Service
 from TCLIService import TCLIService
 
@@ -129,12 +130,18 @@ RESOURCES_DIR = os.path.join(os.environ['IMPALA_HOME'], "fe", "src", "test", "re
 
 
 def impalad_admission_ctrl_flags(max_requests, max_queued, pool_max_mem,
-                                 proc_mem_limit=None, queue_wait_timeout_ms=None):
+                                 proc_mem_limit=None, queue_wait_timeout_ms=None,
+                                 admission_control_slots=None, executor_groups=None):
   extra_flags = ""
   if proc_mem_limit is not None:
     extra_flags += " -mem_limit={0}".format(proc_mem_limit)
   if queue_wait_timeout_ms is not None:
     extra_flags += " -queue_wait_timeout_ms={0}".format(queue_wait_timeout_ms)
+  if admission_control_slots is not None:
+    extra_flags += " -admission_control_slots={0}".format(admission_control_slots)
+  if executor_groups is not None:
+    extra_flags += " -executor_groups={0}".format(executor_groups)
+
   return ("-vmodule admission-controller=3 -default_pool_max_requests {0} "
           "-default_pool_max_queued {1} -default_pool_mem_limit {2} {3}".format(
             max_requests, max_queued, pool_max_mem, extra_flags))
@@ -867,6 +874,46 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
+      impalad_args=impalad_admission_ctrl_flags(max_requests=100, max_queued=10,
+          pool_max_mem=-1, admission_control_slots=4,
+          executor_groups="default-pool-group1"),
+      statestored_args=_STATESTORED_ARGS)
+  def test_queue_reasons_slots(self):
+    """Test that queue details appear in the profile when queued based on number of
+    slots."""
+    # Run a bunch of queries - one should get admitted immediately, the rest should
+    # be dequeued one-by-one.
+    STMT = "select min(ss_wholesale_cost) from tpcds_parquet.store_sales"
+    TIMEOUT_S = 60
+    EXPECTED_REASON = "Latest admission queue reason: Not enough admission control " +\
+                      "slots available on host"
+    NUM_QUERIES = 5
+    profiles = self._execute_and_collect_profiles([STMT for i in xrange(NUM_QUERIES)],
+        TIMEOUT_S, config_options={"mt_dop": 4})
+
+    num_reasons = len([profile for profile in profiles if EXPECTED_REASON in profile])
+    assert num_reasons == NUM_QUERIES - 1, \
+        "All queries except first should have been queued: " + '\n===\n'.join(profiles)
+    init_queue_reasons = self.__extract_init_queue_reasons(profiles)
+    assert len(init_queue_reasons) == NUM_QUERIES - 1, \
+        "All queries except first should have been queued: " + '\n===\n'.join(profiles)
+    over_limit_details = [detail
+        for detail in init_queue_reasons
+        if "Not enough admission control slots available on host" in detail]
+    assert len(over_limit_details) == 1, \
+        "One query initially queued because of slots: " + '\n===\n'.join(profiles)
+    queue_not_empty_details = [detail
+        for detail in init_queue_reasons if 'queue is not empty' in detail]
+    assert len(queue_not_empty_details) == NUM_QUERIES - 2, \
+        "Others queued because of non-empty queue: " + '\n===\n'.join(profiles)
+
+    # Confirm that the cluster quiesces and all metrics return to zero.
+    for impalad in self.cluster.impalads:
+      verifier = MetricVerifier(impalad.service)
+      verifier.wait_for_backend_admission_control_state()
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
     impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=10,
         pool_max_mem=1024 * 1024 * 1024),
     statestored_args=_STATESTORED_ARGS)
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index bc632a2..da3beee 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -50,13 +50,13 @@ class TestExecutorGroups(CustomClusterTestSuite):
     return "default-pool-%s" % name
 
   def _add_executor_group(self, name_suffix, min_size, num_executors=0,
-                          max_concurrent_queries=0):
+                          admission_control_slots=0):
     """Adds an executor group to the cluster. 'min_size' specifies the minimum size for
     the new group to be considered healthy. 'num_executors' specifies the number of
     executors to start and defaults to 'min_size' but can be different from 'min_size' to
-    start an unhealthy group. 'max_concurrent_queries' can be used to override the default
-    (num cores). If 'name_suffix' is empty, no executor group is specified for the new
-    backends and they will end up in the default group."""
+    start an unhealthy group. 'admission_control_slots' can be used to override the
+    default (num cores). If 'name_suffix' is empty, no executor group is specified for
+    the new backends and they will end up in the default group."""
     self.num_groups += 1
     if num_executors == 0:
       num_executors = min_size
@@ -64,7 +64,8 @@ class TestExecutorGroups(CustomClusterTestSuite):
     name = self._group_name(name_suffix)
     LOG.info("Adding %s executors to group %s with minimum size %s" %
              (num_executors, name, min_size))
-    cluster_args = ["--impalad_args=-max_concurrent_queries=%s" % max_concurrent_queries]
+    cluster_args = ["--impalad_args=-admission_control_slots=%s" %
+                    admission_control_slots]
     if len(name_suffix) > 0:
       cluster_args.append("--impalad_args=-executor_groups=%s:%s" % (name, min_size))
     self._start_impala_cluster(options=cluster_args,
@@ -198,9 +199,10 @@ class TestExecutorGroups(CustomClusterTestSuite):
     assert self._get_num_executor_groups(only_healthy=True) == 1
 
   @pytest.mark.execute_serially
-  def test_max_concurrent_queries(self):
-    """Tests that the max_concurrent_queries flag works as expected."""
-    self._add_executor_group("group1", 2, max_concurrent_queries=1)
+  def test_admission_slots(self):
+    """Tests that the admission_control_slots flag works as expected to
+    specify the number of admission slots on the executors."""
+    self._add_executor_group("group1", 2, admission_control_slots=1)
     # Query that runs on every executor
     QUERY = "select * from functional_parquet.alltypestiny \
              where month < 3 and id + random() < sleep(500);"
@@ -209,18 +211,26 @@ class TestExecutorGroups(CustomClusterTestSuite):
     client.wait_for_admission_control(q1)
     q2 = client.execute_async(QUERY)
     profile = client.get_runtime_profile(q2)
-    assert "Initial admission queue reason: No query slot available on host" in profile
+    assert ("Initial admission queue reason: Not enough admission control slots "
+            "available on host" in profile)
     client.cancel(q1)
     client.cancel(q2)
 
+    # Test that a query that would occupy too many slots gets rejected
+    result = self.execute_query_expect_failure(self.client,
+        "select min(ss_list_price) from tpcds_parquet.store_sales", {'mt_dop': 64})
+    assert "number of admission control slots needed" in str(result)
+    assert "is greater than total slots available" in str(result)
+
+
   @pytest.mark.execute_serially
   def test_multiple_executor_groups(self):
     """Tests that two queries can run on two separate executor groups simultaneously."""
     # Query that runs on every executor
     QUERY = "select * from functional_parquet.alltypestiny \
              where month < 3 and id + random() < sleep(500);"
-    self._add_executor_group("group1", 2, max_concurrent_queries=1)
-    self._add_executor_group("group2", 2, max_concurrent_queries=1)
+    self._add_executor_group("group1", 2, admission_control_slots=1)
+    self._add_executor_group("group2", 2, admission_control_slots=1)
     self._wait_for_num_executor_groups(2, only_healthy=True)
     client = self.client
     q1 = client.execute_async(QUERY)
@@ -233,13 +243,13 @@ class TestExecutorGroups(CustomClusterTestSuite):
     client.cancel(q2)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(impalad_args="-max_concurrent_queries=1")
+  @CustomClusterTestSuite.with_args(impalad_args="-admission_control_slots=1")
   def test_coordinator_concurrency(self):
     """Tests that the command line flag to limit the coordinator concurrency works as
     expected."""
     QUERY = "select sleep(1000)"
     # Add group with more slots than coordinator
-    self._add_executor_group("group2", 2, max_concurrent_queries=3)
+    self._add_executor_group("group2", 2, admission_control_slots=3)
     # Try to run two queries and observe that one gets queued
     client = self.client
     q1 = client.execute_async(QUERY)
@@ -251,14 +261,14 @@ class TestExecutorGroups(CustomClusterTestSuite):
     client.cancel(q2)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(impalad_args="-max_concurrent_queries=3")
+  @CustomClusterTestSuite.with_args(impalad_args="-admission_control_slots=3")
   def test_executor_concurrency(self):
     """Tests that the command line flag to limit query concurrency on executors works as
     expected."""
     # Query that runs on every executor
     QUERY = "select * from functional_parquet.alltypestiny \
              where month < 3 and id + random() < sleep(500);"
-    self._add_executor_group("group1", 2, max_concurrent_queries=3)
+    self._add_executor_group("group1", 2, admission_control_slots=3)
 
     workload = None
     try:
@@ -323,7 +333,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     from scheduling."""
     # Start default executor group
     self._add_executor_group("", min_size=2, num_executors=2,
-                             max_concurrent_queries=3)
+                             admission_control_slots=3)
     # Run query to make sure things work
     QUERY = "select count(*) from functional.alltypestiny"
     self.execute_query_expect_success(self.client, QUERY)
@@ -350,11 +360,11 @@ class TestExecutorGroups(CustomClusterTestSuite):
              where month < 3 and id + random() < sleep(500);"
     group_names = ["group1", "group2"]
     self._add_executor_group(group_names[0], min_size=1, num_executors=1,
-                             max_concurrent_queries=1)
+                             admission_control_slots=1)
     # Create an exec group of min size 2 to exercise the case where a group becomes
     # unhealthy.
     self._add_executor_group(group_names[1], min_size=2, num_executors=2,
-                             max_concurrent_queries=1)
+                             admission_control_slots=1)
     self._wait_for_num_executor_groups(2, only_healthy=True)
     # Verify metrics for both groups appear.
     assert all(
diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py
index 2fa5f03..c6570f1 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -167,9 +167,11 @@ class TestCancellation(ImpalaTestSuite):
     client = self.hs2_client
     # Start query
     handle = client.execute_async(query)
-    # Wait up to 5 seconds for the query to start
-    assert any(client.get_state(handle) == 'RUNNING_STATE' or sleep(1)
-               for _ in range(5)), 'Query failed to start'
+    # Wait for the query to start (with a long timeout to account for admission control
+    # queuing).
+    WAIT_SECONDS = 60 * 30
+    assert any(client.get_state(handle) == 'RUNNING_STATE' or sleep(0.1)
+               for _ in range(10 * WAIT_SECONDS)), 'Query failed to start'
 
     client.cancel(handle)
     # Wait up to 5 seconds for the query to get cancelled
diff --git a/tests/query_test/test_mt_dop.py b/tests/query_test/test_mt_dop.py
index 0673525..5c086a0 100644
--- a/tests/query_test/test_mt_dop.py
+++ b/tests/query_test/test_mt_dop.py
@@ -23,7 +23,7 @@ from copy import deepcopy
 from tests.common.environ import ImpalaTestClusterProperties
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.kudu_test_suite import KuduTestSuite
-from tests.common.skip import SkipIfEC
+from tests.common.skip import SkipIfEC, SkipIfNotHdfsMinicluster
 from tests.common.test_vector import ImpalaTestDimension
 
 # COMPUTE STATS on Parquet tables automatically sets MT_DOP=4, so include
@@ -120,3 +120,25 @@ class TestMtDopKudu(KuduTestSuite):
     vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
     self.run_test_case('QueryTest/mt-dop-kudu', vector, use_db=unique_database)
 
+
+@SkipIfNotHdfsMinicluster.tuned_for_minicluster
+class TestMtDopAdmissionSlots(ImpalaTestSuite):
+  """Test the number of admission slots calculated for different queries. This
+  is always at most mt_dop, but is less where there are fewer fragment instances per
+  host. The slot calculation logic is orthogonal to file format, so we only need
+  to test on one format."""
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestMtDopAdmissionSlots, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('mt_dop', 4))
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: v.get_value('table_format').file_format == 'parquet')
+
+  def test_admission_slots(self, vector):
+    vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop')
+    self.run_test_case('QueryTest/mt-dop-parquet-admission-slots', vector)
+
diff --git a/tests/util/auto_scaler.py b/tests/util/auto_scaler.py
index 95c0f77..bb6925e 100755
--- a/tests/util/auto_scaler.py
+++ b/tests/util/auto_scaler.py
@@ -253,7 +253,7 @@ class AutoScaler(object):
 
     impalad_args = [
         "-vmodule=admission-controller=3,cluster-membership-mgr=3",
-        "-max_concurrent_queries=%s" % executor_slots,
+        "-admission_control_slots=%s" % executor_slots,
         "-shutdown_grace_period_s=2"]
 
     options += ["--impalad_args=%s" % a for a in impalad_args]
diff --git a/tests/verifiers/metric_verifier.py b/tests/verifiers/metric_verifier.py
index d44b629..aff35b3 100644
--- a/tests/verifiers/metric_verifier.py
+++ b/tests/verifiers/metric_verifier.py
@@ -17,6 +17,12 @@
 #
 # Verifier for common impalad metrics
 
+import logging
+from time import time, sleep
+
+LOG = logging.getLogger('test_verify_metrics')
+LOG.setLevel(level=logging.DEBUG)
+
 # List of metrics that should be equal to zero when there are no outstanding queries.
 METRIC_LIST = [
                "impala-server.num-queries-registered",
@@ -59,3 +65,21 @@ class MetricVerifier(object):
 
   def wait_for_metric(self, metric_name, expected_value, timeout=60):
     self.impalad_service.wait_for_metric_value(metric_name, expected_value, timeout)
+
+  def wait_for_backend_admission_control_state(self, timeout=60):
+    """Wait for the admission-control-related values on the /backends page to go to
+    zero (i.e. consistent with an idle cluster)."""
+    start_time = time()
+    while time() - start_time < timeout:
+      try:
+        self.__assert_backend_ac_value_are_zero()
+        break  # Success!
+      except AssertionError as e:
+        LOG.info("Not yet quiesced: %s", str(e))
+        sleep(0.1)
+
+  def __assert_backend_ac_value_are_zero(self):
+    response = self.impalad_service.get_debug_webpage_json("/backends")
+    for backend in response['backends']:
+      for key in ['num_admitted', 'admission_slots_in_use', 'mem_admitted']:
+        assert str(backend[key]) == '0'
diff --git a/tests/verifiers/test_verify_metrics.py b/tests/verifiers/test_verify_metrics.py
index d1564c5..8db6811 100644
--- a/tests/verifiers/test_verify_metrics.py
+++ b/tests/verifiers/test_verify_metrics.py
@@ -18,6 +18,7 @@
 # Verification of impalad metrics after a test run.
 
 from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.impala_cluster import ImpalaCluster
 from tests.verifiers.metric_verifier import MetricVerifier
 
 class TestValidateMetrics(ImpalaTestSuite):
@@ -43,3 +44,11 @@ class TestValidateMetrics(ImpalaTestSuite):
     """Test that all buffers are unused"""
     verifier = MetricVerifier(self.impalad_test_service)
     verifier.verify_num_unused_buffers()
+
+  def test_backends_are_idle(self):
+    """Test that the backends state is in a valid state when quiesced - i.e.
+    no queries are running and the admission control state reflects that no
+    resources are used."""
+    for impalad in ImpalaCluster.get_e2e_test_cluster().impalads:
+      verifier = MetricVerifier(impalad.service)
+      verifier.wait_for_backend_admission_control_state()
diff --git a/www/backends.tmpl b/www/backends.tmpl
index bbdbd6e..1070993 100644
--- a/www/backends.tmpl
+++ b/www/backends.tmpl
@@ -32,7 +32,7 @@ under the License.
       <th>Memory Limit for Admission</th>
       <th>Memory Reserved</th>
       <th>Memory Admitted by Queries Submitted to this Coordinator</th>
-      <th>Num. Queries Limit for Admission</th>
+      <th>Admission Control Slots In Use</th>
       <th>Num. Queries Admitted by this Coordinator</th>
       <th>Executor Groups</th>
     </tr>
@@ -49,7 +49,7 @@ under the License.
       <td>{{admit_mem_limit}}</td>
       <td>{{mem_reserved}}</td>
       <td>{{mem_admitted}}</td>
-      <td>{{admit_num_queries_limit}}</td>
+      <td>{{admission_slots_in_use}}/{{admission_slots}}</td>
       <td>{{num_admitted}}</td>
       <td>{{executor_groups}}</td>
     </tr>