You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2019/07/21 23:36:21 UTC

[impala] 03/05: IMPALA-8484: Run queries on disjoint executor groups

This is an automated email from the ASF dual-hosted git repository.

lv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2397ae5590d487025fb446efecd34f5395b8997f
Author: Lars Volker <lv...@cloudera.com>
AuthorDate: Fri May 3 09:39:19 2019 -0700

    IMPALA-8484: Run queries on disjoint executor groups
    
    This change adds support for running queries inside a single admission
    control pool on one of several, disjoint sets of executors called
    "executor groups".
    
    Executors can be configured with an executor group through the newly
    added '--executor_groups' flag. Note that in anticipation of future
    changes, the flag already uses the plural form, but only a single
    executor group may be specified for now. Each executor group
    specification can optionally contain a minimum size, separated by a
    ':', e.g. --executor_groups default-pool-1:3. Only when the cluster
    membership contains at least that number of executors for the groups
    will it be considered for admission.
    
    Executor groups are mapped to resource pools by their name: An executor
    group can service queries from a resource pool if the pool name is a
    prefix of the group name separated by a '-'. For example, queries in
    poll poolA can be serviced by executor groups named poolA-1 and poolA-2,
    but not by groups name foo or poolB-1.
    
    During scheduling, executor groups are considered in alphabetical order.
    This means that one group is filled up entirely before a subsequent
    group is considered for admission. Groups also need to pass a health
    check before considered. In particular, they must contain at least the
    minimum number of executors specified.
    
    If no group is specified during startup, executors are added to the
    default executor group. If - during admission - no executor group for a
    pool can be found and the default group is non-empty, then the default
    group is considered. The default group does not have a minimum size.
    
    This change inverts the order of scheduling and admission. Prior to this
    change, queries were scheduled before submitting them to the admission
    controller. Now the admission controller computes schedules for all
    candidate executor groups before each admission attempt. If the cluster
    membership has not changed, then the schedules of the previous attempt
    will be reused. This means that queries will no longer fail if the
    cluster membership changes while they are queued in the admission
    controller.
    
    This change also alters the default behavior when using a dedicated
    coordinator and no executors have registered yet. Prior to this change,
    a query would fail immediately with an error ("No executors registered
    in group"). Now a query will get queued and wait until executors show
    up, or it times out after the pools queue timeout period.
    
    Testing:
    
    This change adds a new custom cluster test for executor groups. It
    makes use of new capabilities added to start-impala-cluster.py to bring
    up additional executors into an already running cluster.
    
    Additionally, this change adds an instructional implementation of
    executor group based autoscaling, which can be used during development.
    It also adds a helper to run queries concurrently. Both are used in a
    new test to exercise the executor group logic and to prevent regressions
    to these tools.
    
    In addition to these tests, the existing tests for the admission
    controller (both BE and EE tests) thoroughly exercise the changed code.
    Some of them required changes themselves to reflect the new behavior.
    
    I looped the new tests (test_executor_groups and test_auto_scaling) for
    a night (110 iterations each) without any issues.
    
    I also started an autoscaling cluster with a single group and ran
    TPC-DS, TPC-H, and test_queries on it successfully.
    
    Known limitations:
    
    When using executor groups, only a single coordinator and a single AC
    pool (i.e. the default pool) are supported. Executors to not include the
    number of currently running queries in their statestore updates and so
    admission controllers are not aware of the number of queries admitted by
    other controllers per host.
    
    Change-Id: I8a1d0900f2a82bd2fc0a906cc094e442cffa189b
    Reviewed-on: http://gerrit.cloudera.org:8080/13550
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/coordinator.cc                     |   2 +-
 be/src/runtime/exec-env.cc                        |  31 +-
 be/src/runtime/exec-env.h                         |   4 +
 be/src/scheduling/admission-controller-test.cc    |  52 +-
 be/src/scheduling/admission-controller.cc         | 799 +++++++++++++++-------
 be/src/scheduling/admission-controller.h          | 460 +++++++++----
 be/src/scheduling/cluster-membership-mgr-test.cc  |  35 +-
 be/src/scheduling/cluster-membership-mgr.cc       |  57 +-
 be/src/scheduling/cluster-membership-mgr.h        |   7 +-
 be/src/scheduling/cluster-membership-test-util.cc |  21 +-
 be/src/scheduling/cluster-membership-test-util.h  |  18 +-
 be/src/scheduling/executor-group-test.cc          |  74 +-
 be/src/scheduling/executor-group.cc               |  52 +-
 be/src/scheduling/executor-group.h                |  39 +-
 be/src/scheduling/query-schedule.cc               |   5 +
 be/src/scheduling/query-schedule.h                |  11 +
 be/src/scheduling/scheduler-test-util.cc          |  10 +-
 be/src/scheduling/scheduler.cc                    |  57 +-
 be/src/scheduling/scheduler.h                     |  23 +-
 be/src/service/client-request-state.cc            |  28 +-
 be/src/service/client-request-state.h             |   8 +-
 be/src/service/impala-http-handler.cc             |  29 +-
 be/src/service/impala-server.cc                   |  83 ++-
 be/src/service/impala-server.h                    |  25 +-
 be/src/util/runtime-profile.h                     |   2 +-
 bin/start-impala-cluster.py                       |  58 +-
 common/thrift/StatestoreService.thrift            |  16 +
 tests/common/custom_cluster_test_suite.py         |   8 +-
 tests/common/impala_cluster.py                    |   2 +-
 tests/common/impala_service.py                    |  55 +-
 tests/common/resource_pool_config.py              |   2 +-
 tests/custom_cluster/test_admission_controller.py | 114 ++-
 tests/custom_cluster/test_auto_scaling.py         | 212 ++++++
 tests/custom_cluster/test_catalog_wait.py         |   3 +-
 tests/custom_cluster/test_coordinators.py         |   7 +-
 tests/custom_cluster/test_executor_groups.py      | 298 ++++++++
 tests/custom_cluster/test_restart_services.py     |  13 +-
 tests/query_test/test_observability.py            |   7 +
 tests/util/auto_scaler.py                         | 339 +++++++++
 tests/util/concurrent_workload.py                 | 169 +++++
 www/backends.tmpl                                 |   6 +
 41 files changed, 2579 insertions(+), 662 deletions(-)

diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index b1bfae3..b2c88da 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -392,7 +392,7 @@ Status Coordinator::FinishBackendStartup() {
     }
     if (backend_state->rpc_latency() > max_latency) {
       // Find the backend that takes the most time to acknowledge to
-      // the ExecQueryFinstances() RPC.
+      // the ExecQueryFInstances() RPC.
       max_latency = backend_state->rpc_latency();
       max_latency_host = TNetworkAddressToString(backend_state->impalad_address());
     }
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index b4b4abd..44ca63a 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -54,6 +54,7 @@
 #include "service/impala-server.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/cgroup-util.h"
+#include "util/cpu-info.h"
 #include "util/debug-util.h"
 #include "util/default-path-handlers.h"
 #include "util/hdfs-bulk-ops.h"
@@ -65,8 +66,8 @@
 #include "util/parse-util.h"
 #include "util/periodic-counter-updater.h"
 #include "util/pretty-printer.h"
-#include "util/test-info.h"
 #include "util/system-state-info.h"
+#include "util/test-info.h"
 #include "util/thread-pool.h"
 #include "util/webserver.h"
 
@@ -85,6 +86,10 @@ DEFINE_int32(state_store_subscriber_port, 23000,
     "port where StatestoreSubscriberService should be exported");
 DEFINE_int32(num_hdfs_worker_threads, 16,
     "(Advanced) The number of threads in the global HDFS operation pool");
+DEFINE_int32(max_concurrent_queries, 0,
+    "(Advanced) The maximum number of queries to run on this backend concurrently "
+    "(defaults to number of cores / -num_cores for executors, and 8x that value for "
+    "dedicated coordinators).");
 
 DEFINE_bool_hidden(use_local_catalog, false,
     "Use experimental implementation of a local catalog. If this is set, "
@@ -116,6 +121,7 @@ DECLARE_string(buffer_pool_limit);
 DECLARE_string(buffer_pool_clean_pages_limit);
 DECLARE_int64(min_buffer_size);
 DECLARE_bool(is_coordinator);
+DECLARE_bool(is_executor);
 DECLARE_int32(webserver_port);
 DECLARE_int64(tcmalloc_max_total_thread_cache_bytes);
 
@@ -141,6 +147,11 @@ DEFINE_int32(catalog_client_rpc_retry_interval_ms, 10000, "(Advanced) The time t
 
 const static string DEFAULT_FS = "fs.defaultFS";
 
+// The multiplier for how many queries a dedicated coordinator can run compared to an
+// executor. This is only effective when using non-default settings for executor groups
+// and the absolute value can be overridden by the '--max_concurrent_queries' flag.
+const static int COORDINATOR_CONCURRENCY_MULTIPLIER = 8;
+
 namespace impala {
 
 struct ExecEnv::KuduClientPtr {
@@ -197,18 +208,17 @@ ExecEnv::ExecEnv(int backend_port, int krpc_port,
       Substitute("impalad@$0", TNetworkAddressToString(configured_backend_address_)),
       subscriber_address, statestore_address, metrics_.get()));
 
-  cluster_membership_mgr_.reset(new ClusterMembershipMgr(statestore_subscriber_->id(),
-      statestore_subscriber_.get()));
-
   if (FLAGS_is_coordinator) {
     hdfs_op_thread_pool_.reset(
         CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024));
     exec_rpc_thread_pool_.reset(new CallableThreadPool("exec-rpc-pool", "worker",
         FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max()));
-    scheduler_.reset(new Scheduler(cluster_membership_mgr_.get(), metrics_.get(),
-        request_pool_service_.get()));
+    scheduler_.reset(new Scheduler(metrics_.get(), request_pool_service_.get()));
   }
 
+  cluster_membership_mgr_.reset(new ClusterMembershipMgr(
+      statestore_subscriber_->id(), statestore_subscriber_.get()));
+
   admission_controller_.reset(
       new AdmissionController(cluster_membership_mgr_.get(), statestore_subscriber_.get(),
           request_pool_service_.get(), metrics_.get(), configured_backend_address_));
@@ -275,6 +285,15 @@ Status ExecEnv::Init() {
   }
   InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit);
 
+  admit_num_queries_limit_ = CpuInfo::num_cores();
+  if (FLAGS_max_concurrent_queries > 0) {
+    admit_num_queries_limit_ = FLAGS_max_concurrent_queries;
+  } else if (FLAGS_is_coordinator && !FLAGS_is_executor) {
+    // By default we assume that dedicated coordinators can handle more queries than
+    // executors.
+    admit_num_queries_limit_ *= COORDINATOR_CONCURRENCY_MULTIPLIER;
+  }
+
   InitSystemStateInfo();
 
   RETURN_IF_ERROR(metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr));
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 6601fad..4af0815 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -172,6 +172,7 @@ class ExecEnv {
       kudu::client::KuduClient** client) WARN_UNUSED_RESULT;
 
   int64_t admit_mem_limit() const { return admit_mem_limit_; }
+  int64_t admit_num_queries_limit() const { return admit_num_queries_limit_; }
 
  private:
   boost::scoped_ptr<ObjectPool> obj_pool_;
@@ -263,6 +264,9 @@ class ExecEnv {
   /// such as the JVM if --mem_limit_includes_jvm=true. Set in Init().
   int64_t admit_mem_limit_;
 
+  /// The maximum number of queries that this host can run concurrently.
+  int64_t admit_num_queries_limit_;
+
   /// Choose a memory limit (returned in *bytes_limit) based on the --mem_limit flag and
   /// the memory available to the daemon process. Returns an error if the memory limit is
   /// invalid or another error is encountered that should prevent starting up the daemon.
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
index 2537fde..8e8498a 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -23,7 +23,9 @@
 #include "runtime/exec-env.h"
 #include "runtime/runtime-state.h"
 #include "runtime/test-env.h"
+#include "scheduling/query-schedule.h"
 #include "service/fe-support.h"
+#include "service/impala-server.h"
 #include "testutil/gtest-util.h"
 #include "testutil/scoped-flag-setter.h"
 #include "util/metrics.h"
@@ -90,6 +92,7 @@ class AdmissionControllerTest : public testing::Test {
     TQueryOptions* query_options = pool_.Add(new TQueryOptions());
     QuerySchedule* query_schedule =
         pool_.Add(new QuerySchedule(*query_id, *request, *query_options, profile));
+    query_schedule->set_executor_group(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
     query_schedule->UpdateMemoryRequirements(config);
 
     SetHostsInQuerySchedule(*query_schedule, num_hosts);
@@ -251,9 +254,10 @@ TEST_F(AdmissionControllerTest, Simple) {
 
   // Create a QuerySchedule to run on QUEUE_C.
   QuerySchedule* query_schedule = MakeQuerySchedule(QUEUE_C, config_c, 1, 64L * MEGABYTE);
+  query_schedule->UpdateMemoryRequirements(config_c);
 
   // Check that the AdmissionController initially has no data about other hosts.
-  ASSERT_EQ(0, admission_controller->host_mem_reserved_.size());
+  ASSERT_EQ(0, admission_controller->host_stats_.size());
 
   // Check that the query can be admitted.
   string not_admitted_reason;
@@ -287,9 +291,9 @@ TEST_F(AdmissionControllerTest, Simple) {
   admission_controller->UpdatePoolStats(incoming_topic_deltas, &outgoing_topic_updates);
 
   // Check that the AdmissionController has aggregated the remote stats.
-  ASSERT_EQ(3, admission_controller->host_mem_reserved_.size());
-  ASSERT_EQ(6000, admission_controller->host_mem_reserved_[HOST_1]);
-  ASSERT_EQ(5000, admission_controller->host_mem_reserved_[HOST_2]);
+  ASSERT_EQ(3, admission_controller->host_stats_.size());
+  ASSERT_EQ(6000, admission_controller->host_stats_[HOST_1].mem_reserved);
+  ASSERT_EQ(5000, admission_controller->host_stats_[HOST_2].mem_reserved);
 
   // Check the PoolStats for QUEUE_C.
   AdmissionController::PoolStats* pool_stats =
@@ -426,8 +430,9 @@ TEST_F(AdmissionControllerTest, CanAdmitRequestCount) {
       "with 0.5 queries)");
 }
 
-/// Test RejectImmediately using scalable parameters.
-TEST_F(AdmissionControllerTest, RejectImmediately) {
+/// Tests that query rejection works as expected by calling RejectForSchedule() and
+/// RejectForCluster() directly.
+TEST_F(AdmissionControllerTest, QueryRejection) {
   // Pass the paths of the configuration files as command line flags.
   FLAGS_fair_scheduler_allocation_path = GetResourceFile("fair-scheduler-test2.xml");
   FLAGS_llama_site_path = GetResourceFile("llama-site-test2.xml");
@@ -450,10 +455,10 @@ TEST_F(AdmissionControllerTest, RejectImmediately) {
   QuerySchedule* query_schedule =
       MakeQuerySchedule(QUEUE_D, config_d, host_count, 50L * MEGABYTE);
 
-  // Check messages from RejectImmediately().
+  // Check messages from RejectForSchedule().
   string rejected_reason;
-  ASSERT_TRUE(admission_controller->RejectImmediately(
-      *query_schedule, config_d, host_count, &rejected_reason));
+  ASSERT_TRUE(admission_controller->RejectForSchedule(
+      *query_schedule, config_d, host_count, host_count, &rejected_reason));
   EXPECT_STR_CONTAINS(rejected_reason,
       "request memory needed 500.00 MB is greater than pool max mem resources 400.00 MB "
       "(calculated as 10 backends each with 40.00 MB)");
@@ -462,8 +467,8 @@ TEST_F(AdmissionControllerTest, RejectImmediately) {
   // This will be rejected immediately as minimum memory reservation is too high.
   SetHostsInQuerySchedule(*query_schedule, host_count, 45L * MEGABYTE);
   string rejected_reserved_reason;
-  ASSERT_TRUE(admission_controller->RejectImmediately(
-      *query_schedule, config_d, host_count, &rejected_reserved_reason));
+  ASSERT_TRUE(admission_controller->RejectForSchedule(
+      *query_schedule, config_d, host_count, host_count, &rejected_reserved_reason));
   EXPECT_STR_CONTAINS(rejected_reserved_reason,
       "minimum memory reservation needed is greater than pool max mem resources. Pool "
       "max mem resources: 400.00 MB (calculated as 10 backends each with 40.00 MB). "
@@ -475,8 +480,8 @@ TEST_F(AdmissionControllerTest, RejectImmediately) {
   config_d.min_query_mem_limit = 600L * MEGABYTE;
   config_d.max_query_mem_limit = 700L * MEGABYTE;
   string rejected_invalid_config_reason;
-  ASSERT_TRUE(admission_controller->RejectImmediately(
-      *query_schedule, config_d, host_count, &rejected_invalid_config_reason));
+  ASSERT_TRUE(admission_controller->RejectForCluster(QUEUE_D, config_d,
+      /* admit_from_queue=*/false, host_count, &rejected_invalid_config_reason));
   EXPECT_STR_CONTAINS(rejected_invalid_config_reason,
       "The min_query_mem_limit 629145600 is greater than the current max_mem_resources "
       "419430400 (calculated as 10 backends each with 40.00 MB); queries will not be "
@@ -485,8 +490,8 @@ TEST_F(AdmissionControllerTest, RejectImmediately) {
   TPoolConfig config_disabled_queries;
   config_disabled_queries.max_requests = 0;
   string rejected_queries_reason;
-  ASSERT_TRUE(admission_controller->RejectImmediately(
-      *query_schedule, config_disabled_queries, host_count, &rejected_queries_reason));
+  ASSERT_TRUE(admission_controller->RejectForCluster(QUEUE_D, config_disabled_queries,
+      /* admit_from_queue=*/false, host_count, &rejected_queries_reason));
   EXPECT_STR_CONTAINS(rejected_queries_reason, "disabled by requests limit set to 0");
 
   TPoolConfig config_disabled_memory;
@@ -494,8 +499,8 @@ TEST_F(AdmissionControllerTest, RejectImmediately) {
   config_disabled_memory.max_mem_resources = 0;
   config_disabled_memory.max_memory_multiple = 0;
   string rejected_mem_reason;
-  ASSERT_TRUE(admission_controller->RejectImmediately(
-      *query_schedule, config_disabled_memory, host_count, &rejected_mem_reason));
+  ASSERT_TRUE(admission_controller->RejectForCluster(QUEUE_D, config_disabled_memory,
+      /* admit_from_queue=*/false, host_count, &rejected_mem_reason));
   EXPECT_STR_CONTAINS(rejected_mem_reason, "disabled by pool max mem resources set to 0");
 
   TPoolConfig config_queue_small;
@@ -504,8 +509,8 @@ TEST_F(AdmissionControllerTest, RejectImmediately) {
   config_queue_small.max_mem_resources = 600 * MEGABYTE;
   pool_stats->agg_num_queued_ = 3;
   string rejected_queue_length_reason;
-  ASSERT_TRUE(admission_controller->RejectImmediately(
-      *query_schedule, config_queue_small, host_count, &rejected_queue_length_reason));
+  ASSERT_TRUE(admission_controller->RejectForCluster(QUEUE_D, config_queue_small,
+      /* admit_from_queue=*/false, host_count, &rejected_queue_length_reason));
   EXPECT_STR_CONTAINS(rejected_queue_length_reason,
       "queue full, limit=3 (configured statically), num_queued=3.");
 
@@ -513,8 +518,8 @@ TEST_F(AdmissionControllerTest, RejectImmediately) {
   // queries that can run be queued.
   config_queue_small.max_queued_queries_multiple = 0.3;
   string rejected_queue_multiple_reason;
-  ASSERT_TRUE(admission_controller->RejectImmediately(
-      *query_schedule, config_queue_small, host_count, &rejected_queue_multiple_reason));
+  ASSERT_TRUE(admission_controller->RejectForCluster(QUEUE_D, config_queue_small,
+      /* admit_from_queue=*/false, host_count, &rejected_queue_multiple_reason));
   EXPECT_STR_CONTAINS(rejected_queue_multiple_reason,
       "queue full, limit=3 (calculated as 10 backends each with 0.3 queries), "
       "num_queued=3.");
@@ -651,10 +656,10 @@ TEST_F(AdmissionControllerTest, PoolStats) {
   CheckPoolStatsEmpty(pool_stats);
 
   // Show that Queue and Dequeue leave stats at zero.
-  pool_stats->Queue(*query_schedule);
+  pool_stats->Queue();
   ASSERT_EQ(1, pool_stats->agg_num_queued());
   ASSERT_EQ(1, pool_stats->metrics()->agg_num_queued->GetValue());
-  pool_stats->Dequeue(*query_schedule, false);
+  pool_stats->Dequeue(false);
   CheckPoolStatsEmpty(pool_stats);
 
   // Show that Admit and Release leave stats at zero.
@@ -680,4 +685,3 @@ TEST_F(AdmissionControllerTest, PoolDisabled) {
 }
 
 } // end namespace impala
-
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 427e14a..93c3ea0 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -19,12 +19,15 @@
 
 #include <boost/algorithm/string.hpp>
 #include <boost/mem_fn.hpp>
+#include <gutil/strings/stringpiece.h>
 #include <gutil/strings/substitute.h>
 
 #include "common/logging.h"
 #include "runtime/bufferpool/reservation-util.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "scheduling/cluster-membership-mgr.h"
+#include "scheduling/query-schedule.h"
 #include "scheduling/scheduler.h"
 #include "service/impala-server.h"
 #include "util/bit-util.h"
@@ -70,6 +73,11 @@ string PrintBytes(int64_t value) {
 // case the pool name contains it as well.
 const char TOPIC_KEY_DELIMITER = '!';
 
+// Delimiter used for the resource pool prefix of executor groups. In order to be used for
+// queries in "resource-pool-A", an executor group name must start with
+// "resource-pool-A-".
+const char POOL_GROUP_DELIMITER = '-';
+
 // Define metric key format strings for metrics in PoolMetrics
 // '$0' is replaced with the pool name by strings::Substitute
 const string TOTAL_ADMITTED_METRIC_KEY_FORMAT =
@@ -150,12 +158,22 @@ const string AdmissionController::PROFILE_INFO_KEY_LAST_QUEUED_REASON =
     "Latest admission queue reason";
 const string AdmissionController::PROFILE_INFO_KEY_ADMITTED_MEM =
     "Cluster Memory Admitted";
+const string AdmissionController::PROFILE_INFO_KEY_EXECUTOR_GROUP = "Executor Group";
 const string AdmissionController::PROFILE_INFO_KEY_STALENESS_WARNING =
     "Admission control state staleness";
 const string AdmissionController::PROFILE_TIME_SINCE_LAST_UPDATE_COUNTER_NAME =
     "AdmissionControlTimeSinceLastUpdate";
 
 // Error status string details
+const string REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_MEM_FIXED =
+    "Invalid pool config: the min_query_mem_limit $0 is greater than the "
+    "max_mem_resources $1 (configured statically)";
+const string REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_MEM_MULTIPLE =
+    "The min_query_mem_limit $0 is greater than the current max_mem_resources $1 ($2); "
+    "queries will not be admitted until more executors are available.";
+const string REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_LIMIT =
+    "Invalid pool config: the min_query_mem_limit is greater than the "
+    "max_query_mem_limit ($0 > $1)";
 const string REASON_MEM_LIMIT_TOO_LOW_FOR_RESERVATION =
     "minimum memory reservation is greater than memory available to the query for buffer "
     "reservations. Memory reservation needed given the current plan: $0. Adjust either "
@@ -193,6 +211,10 @@ const string REASON_THREAD_RESERVATION_LIMIT_EXCEEDED =
 const string REASON_THREAD_RESERVATION_AGG_LIMIT_EXCEEDED =
     "sum of thread reservations across all $0 backends is greater than the "
     "THREAD_RESERVATION_AGGREGATE_LIMIT query option value: $1 > $2.";
+// $0 is the error message returned by the scheduler.
+const string REASON_SCHEDULER_ERROR = "Error during scheduling: $0";
+const string REASON_LOCAL_BACKEND_NOT_STARTED = "Local backend has not started up yet.";
+const string REASON_NO_EXECUTOR_GROUPS = "No healthy executor groups found for pool $0.";
 
 // Queue decision details
 // $0 = num running queries, $1 = num queries limit, $2 = num queries limit explanation,
@@ -211,6 +233,10 @@ const string POOL_MEM_NOT_AVAILABLE =
 const string HOST_MEM_NOT_AVAILABLE = "Not enough memory available on host $0."
     "Needed $1 but only $2 out of $3 was available.$4";
 
+// $0 = host name, $1 = num admitted, $2 = max requests
+const string HOST_SLOT_NOT_AVAILABLE = "No query slot available on host $0. "
+                                       "$1/$2 are already admitted.";
+
 // Parses the pool name and backend_id from the topic key if it is valid.
 // Returns true if the topic key is valid and pool_name and backend_id are set.
 static inline bool ParsePoolTopicKey(const string& topic_key, string* pool_name,
@@ -339,7 +365,7 @@ void AdmissionController::PoolStats::Release(
   peak_mem_histogram_[histogram_bucket] = ++(peak_mem_histogram_[histogram_bucket]);
 }
 
-void AdmissionController::PoolStats::Queue(const QuerySchedule& schedule) {
+void AdmissionController::PoolStats::Queue() {
   agg_num_queued_ += 1;
   metrics_.agg_num_queued->Increment(1L);
 
@@ -349,8 +375,7 @@ void AdmissionController::PoolStats::Queue(const QuerySchedule& schedule) {
   metrics_.total_queued->Increment(1L);
 }
 
-void AdmissionController::PoolStats::Dequeue(const QuerySchedule& schedule,
-    bool timed_out) {
+void AdmissionController::PoolStats::Dequeue(bool timed_out) {
   agg_num_queued_ -= 1;
   metrics_.agg_num_queued->Increment(-1L);
 
@@ -366,17 +391,24 @@ void AdmissionController::PoolStats::Dequeue(const QuerySchedule& schedule,
   }
 }
 
-void AdmissionController::UpdateHostMemAdmitted(const QuerySchedule& schedule,
-    int64_t per_node_mem) {
+void AdmissionController::UpdateHostStats(
+    const QuerySchedule& schedule, int64_t per_node_mem, int64_t num_queries) {
   DCHECK_NE(per_node_mem, 0);
+  DCHECK(num_queries == 1 || num_queries == -1)
+      << "Invalid number of queries: " << num_queries;
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host_addr = entry.first;
     const string host = TNetworkAddressToString(host_addr);
     VLOG_ROW << "Update admitted mem reserved for host=" << host
-             << " prev=" << PrintBytes(host_mem_admitted_[host])
-             << " new="  << PrintBytes(host_mem_admitted_[host] + per_node_mem);
-    host_mem_admitted_[host] += per_node_mem;
-    DCHECK_GE(host_mem_admitted_[host], 0);
+             << " prev=" << PrintBytes(host_stats_[host].mem_admitted)
+             << " new=" << PrintBytes(host_stats_[host].mem_admitted + per_node_mem);
+    host_stats_[host].mem_admitted += per_node_mem;
+    DCHECK_GE(host_stats_[host].mem_admitted, 0);
+    VLOG_ROW << "Update admitted queries for host=" << host
+             << " prev=" << host_stats_[host].num_admitted
+             << " new=" << host_stats_[host].num_admitted + num_queries;
+    host_stats_[host].num_admitted += num_queries;
+    DCHECK_GE(host_stats_[host].num_admitted, 0);
   }
 }
 
@@ -416,8 +448,9 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   int64_t cluster_mem_to_admit = schedule.GetClusterMemoryToAdmit();
 
   // Case 1:
-  PoolStats* stats = GetPoolStats(pool_name);
+  PoolStats* stats = GetPoolStats(schedule);
   VLOG_RPC << "Checking agg mem in pool=" << pool_name << " : " << stats->DebugString()
+           << " executor_group=" << schedule.executor_group()
            << " cluster_mem_needed=" << PrintBytes(cluster_mem_to_admit)
            << " pool_max_mem=" << PrintBytes(pool_max_mem) << " ("
            << GetMaxMemForPoolDescription(pool_cfg, cluster_size) << ")";
@@ -435,8 +468,9 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
     int64_t admit_mem_limit = entry.second.admit_mem_limit;
-    int64_t mem_reserved = host_mem_reserved_[host_id];
-    int64_t mem_admitted = host_mem_admitted_[host_id];
+    const HostStats& host_stats = host_stats_[host_id];
+    int64_t mem_reserved = host_stats.mem_reserved;
+    int64_t mem_admitted = host_stats.mem_admitted;
     VLOG_ROW << "Checking memory on host=" << host_id
              << " mem_reserved=" << PrintBytes(mem_reserved)
              << " mem_admitted=" << PrintBytes(mem_admitted)
@@ -462,50 +496,119 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   return true;
 }
 
+bool AdmissionController::HasAvailableSlot(const QuerySchedule& schedule,
+    const TPoolConfig& pool_cfg, string* unavailable_reason) {
+  for (const auto& entry : schedule.per_backend_exec_params()) {
+    const TNetworkAddress& host = entry.first;
+    const string host_id = TNetworkAddressToString(host);
+    int64_t admit_num_queries_limit = entry.second.admit_num_queries_limit;
+    int64_t num_admitted = host_stats_[host_id].num_admitted;
+    VLOG_ROW << "Checking available slot on host=" << host_id
+             << " num_admitted=" << num_admitted << " needs=" << num_admitted + 1
+             << " admit_num_queries_limit=" << admit_num_queries_limit;
+    if (num_admitted >= admit_num_queries_limit) {
+      *unavailable_reason =
+          Substitute(HOST_SLOT_NOT_AVAILABLE, host_id, num_admitted,
+              admit_num_queries_limit);
+      return false;
+    }
+  }
+  return true;
+}
+
 bool AdmissionController::CanAdmitRequest(const QuerySchedule& schedule,
     const TPoolConfig& pool_cfg, int64_t cluster_size, bool admit_from_queue,
     string* not_admitted_reason) {
   // Can't admit if:
-  //  (a) Pool configuration is invalid
-  //  (b) There are already queued requests (and this is not admitting from the queue).
-  //  (c) Already at the maximum number of requests
-  //  (d) There are not enough memory resources available for the query
-
-  // Queries from a misconfigured pool will remain queued till they either time out or the
-  // pool config is changed to a valid config.
-  if (!IsPoolConfigValidForCluster(pool_cfg, cluster_size, not_admitted_reason))
-    return false;
+  //  (a) There are already queued requests (and this is not admitting from the queue).
+  //  (b) The resource pool is already at the maximum number of requests.
+  //  (c) One of the executors in 'schedule' is already at its maximum number of requests
+  //      (when not using the default executor group).
+  //  (d) There are not enough memory resources available for the query.
 
-  const string& pool_name = schedule.request_pool();
-  PoolStats* stats = GetPoolStats(pool_name);
   const int64_t max_requests = GetMaxRequestsForPool(pool_cfg, cluster_size);
-  if (!admit_from_queue && stats->local_stats().num_queued > 0) {
+  PoolStats* pool_stats = GetPoolStats(schedule);
+  bool default_group =
+      schedule.executor_group() == ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME;
+  if (!admit_from_queue && pool_stats->local_stats().num_queued > 0) {
     *not_admitted_reason = Substitute(QUEUED_QUEUE_NOT_EMPTY,
-        stats->local_stats().num_queued, GetStalenessDetailLocked(" "));
+        pool_stats->local_stats().num_queued, GetStalenessDetailLocked(" "));
     return false;
-  } else if (max_requests >= 0 && stats->agg_num_running() >= max_requests) {
-    *not_admitted_reason = Substitute(QUEUED_NUM_RUNNING, stats->agg_num_running(),
+  }
+  if (max_requests >= 0 && pool_stats->agg_num_running() >= max_requests) {
+    // All executor groups are limited by the aggregate number of queries running in the
+    // pool.
+    *not_admitted_reason = Substitute(QUEUED_NUM_RUNNING, pool_stats->agg_num_running(),
         max_requests, GetMaxRequestsForPoolDescription(pool_cfg, cluster_size),
         GetStalenessDetailLocked(" "));
     return false;
-  } else {
-    bool has_available_mem_resources =
-        HasAvailableMemResources(schedule, pool_cfg, cluster_size, not_admitted_reason);
-    if (!has_available_mem_resources) {
-      return false;
-    }
+  }
+  if (!default_group && !HasAvailableSlot(schedule, pool_cfg, not_admitted_reason)) {
+    // All non-default executor groups are also limited by the number of running queries
+    // per executor.
+    // TODO(IMPALA-8757): Extend slot based admission to default executor group
+    return false;
+  }
+  if (!HasAvailableMemResources(schedule, pool_cfg, cluster_size, not_admitted_reason)) {
+    return false;
   }
   return true;
 }
 
-bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
-    const TPoolConfig& pool_cfg, int64_t cluster_size, string* rejection_reason) {
+bool AdmissionController::RejectForCluster(const string& pool_name,
+    const TPoolConfig& pool_cfg, bool admit_from_queue, int64_t cluster_size,
+    string* rejection_reason) {
+  DCHECK(rejection_reason != nullptr && rejection_reason->empty());
+
+  // Checks related to pool max_requests:
+  if (GetMaxRequestsForPool(pool_cfg, cluster_size) == 0) {
+    *rejection_reason = REASON_DISABLED_REQUESTS_LIMIT;
+    return true;
+  }
+
+  // Checks related to pool max_mem_resources:
+  int64_t max_mem = GetMaxMemForPool(pool_cfg, cluster_size);
+  if (max_mem == 0) {
+    *rejection_reason = REASON_DISABLED_MAX_MEM_RESOURCES;
+    return true;
+  }
+
+  if (max_mem > 0 && pool_cfg.min_query_mem_limit > max_mem) {
+    if (PoolHasFixedMemoryLimit(pool_cfg)) {
+      *rejection_reason = Substitute(REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_MEM_FIXED,
+          pool_cfg.min_query_mem_limit, max_mem);
+    } else {
+      *rejection_reason =
+          Substitute(REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_MEM_MULTIPLE,
+              pool_cfg.min_query_mem_limit, max_mem,
+              GetMaxMemForPoolDescription(pool_cfg, cluster_size));
+    }
+    return true;
+  }
+
+  if (pool_cfg.max_query_mem_limit > 0
+      && pool_cfg.min_query_mem_limit > pool_cfg.max_query_mem_limit) {
+    *rejection_reason = Substitute(REASON_INVALID_POOL_CONFIG_MIN_LIMIT_MAX_LIMIT,
+        pool_cfg.min_query_mem_limit, pool_cfg.max_query_mem_limit);
+    return true;
+  }
+
+  PoolStats* stats = GetPoolStats(pool_name);
+  int64_t max_queued = GetMaxQueuedForPool(pool_cfg, cluster_size);
+  if (!admit_from_queue && stats->agg_num_queued() >= max_queued) {
+    *rejection_reason = Substitute(REASON_QUEUE_FULL, max_queued,
+        GetMaxQueuedForPoolDescription(pool_cfg, cluster_size), stats->agg_num_queued(),
+        GetStalenessDetailLocked(" "));
+    return true;
+  }
+
+  return false;
+}
+
+bool AdmissionController::RejectForSchedule(const QuerySchedule& schedule,
+    const TPoolConfig& pool_cfg, int64_t cluster_size, int64_t group_size,
+    string* rejection_reason) {
   DCHECK(rejection_reason != nullptr && rejection_reason->empty());
-  // This function checks for a number of cases where the query can be rejected
-  // immediately. The first check that fails is the error that is reported. The order of
-  // the checks isn't particularly important, though some thought was given to ordering
-  // them in a way that might make the sense for a user.
-  if (!IsPoolConfigValidForCluster(pool_cfg, cluster_size, rejection_reason)) return true;
 
   // Compute the max (over all backends) and cluster total (across all backends) for
   // min_mem_reservation_bytes and thread_reservation and the min (over all backends)
@@ -564,14 +667,10 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
     return true;
   }
 
-  // Checks related to pool max_requests:
-  if (GetMaxRequestsForPool(pool_cfg, cluster_size) == 0) {
-    *rejection_reason = REASON_DISABLED_REQUESTS_LIMIT;
-    return true;
-  }
-
   // Checks related to pool max_mem_resources:
-  int64_t max_mem = GetMaxMemForPool(pool_cfg, cluster_size);
+  // We perform these checks here against the group_size to prevent queuing up queries
+  // that would never be able to reserve the required memory on an executor group.
+  int64_t max_mem = GetMaxMemForPool(pool_cfg, group_size);
   if (max_mem == 0) {
     *rejection_reason = REASON_DISABLED_MAX_MEM_RESOURCES;
     return true;
@@ -579,7 +678,7 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
   if (max_mem > 0) {
     if (cluster_min_mem_reservation_bytes > max_mem) {
       *rejection_reason = Substitute(REASON_MIN_RESERVATION_OVER_POOL_MEM,
-          PrintBytes(max_mem), GetMaxMemForPoolDescription(pool_cfg, cluster_size),
+          PrintBytes(max_mem), GetMaxMemForPoolDescription(pool_cfg, group_size),
           PrintBytes(cluster_min_mem_reservation_bytes));
       return true;
     }
@@ -587,10 +686,13 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
     if (cluster_mem_to_admit > max_mem) {
       *rejection_reason =
           Substitute(REASON_REQ_OVER_POOL_MEM, PrintBytes(cluster_mem_to_admit),
-              PrintBytes(max_mem), GetMaxMemForPoolDescription(pool_cfg, cluster_size));
+              PrintBytes(max_mem), GetMaxMemForPoolDescription(pool_cfg, group_size));
       return true;
     }
     int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
+    VLOG_ROW << "Checking backend mem with per_backend_mem_to_admit = "
+             << per_backend_mem_to_admit
+             << " and min_admit_mem_limit.second = " << min_admit_mem_limit.second;
     if (per_backend_mem_to_admit > min_admit_mem_limit.second) {
       *rejection_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
           PrintBytes(per_backend_mem_to_admit), PrintBytes(min_admit_mem_limit.second),
@@ -598,17 +700,6 @@ bool AdmissionController::RejectImmediately(const QuerySchedule& schedule,
       return true;
     }
   }
-
-  // Checks related to the pool queue size:
-  PoolStats* stats = GetPoolStats(schedule.request_pool());
-  int64_t max_queued = GetMaxQueuedForPool(pool_cfg, cluster_size);
-  if (stats->agg_num_queued() >= max_queued) {
-    *rejection_reason = Substitute(REASON_QUEUE_FULL, max_queued,
-        GetMaxQueuedForPoolDescription(pool_cfg, cluster_size), stats->agg_num_queued(),
-        GetStalenessDetailLocked(" "));
-    return true;
-  }
-
   return false;
 }
 
@@ -635,95 +726,98 @@ void AdmissionController::PoolStats::UpdateDerivedMetrics(
   metrics_.max_memory_derived->SetValue(GetMaxMemForPool(pool_cfg, cluster_size));
 }
 
-Status AdmissionController::SubmitForAdmission(QuerySchedule* schedule,
-    Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome) {
-  const string& pool_name = schedule->request_pool();
-  const int64_t cluster_size = GetClusterSize();
-  TPoolConfig pool_cfg;
-  RETURN_IF_ERROR(request_pool_service_->GetPoolConfig(pool_name, &pool_cfg));
-  schedule->UpdateMemoryRequirements(pool_cfg);
-  const int64_t max_requests = GetMaxRequestsForPool(pool_cfg, cluster_size);
-  const int64_t max_queued = GetMaxQueuedForPool(pool_cfg, cluster_size);
-  const int64_t max_mem = GetMaxMemForPool(pool_cfg, cluster_size);
+Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,
+    Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,
+    std::unique_ptr<QuerySchedule>* schedule_result) {
+  DCHECK(schedule_result->get() == nullptr);
+
+  ClusterMembershipMgr::SnapshotPtr membership_snapshot =
+      ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot();
+  DCHECK(membership_snapshot.get() != nullptr);
 
   // Note the queue_node will not exist in the queue when this method returns.
-  QueueNode queue_node(schedule, admit_outcome, schedule->summary_profile());
-  string not_admitted_reason;
+  QueueNode queue_node(request, admit_outcome, request.summary_profile);
 
-  schedule->query_events()->MarkEvent(QUERY_EVENT_SUBMIT_FOR_ADMISSION);
-  ScopedEvent completedEvent(schedule->query_events(), QUERY_EVENT_COMPLETED_ADMISSION);
+  // Re-resolve the pool name to propagate any resolution errors now that this request is
+  // known to require a valid pool. All executor groups / schedules will use the same pool
+  // name.
+  string pool_name;
+  TPoolConfig pool_cfg;
+  RETURN_IF_ERROR(
+      ResolvePoolAndGetConfig(request.request.query_ctx, &pool_name, &pool_cfg));
+  request.summary_profile->AddInfoString("Request Pool", pool_name);
+
+  const int64_t cluster_size = GetClusterSize(*membership_snapshot);
+  // We track this outside of the queue node so that it is still available after the query
+  // has been dequeued.
+  string initial_queue_reason;
+  ScopedEvent completedEvent(request.query_events, QUERY_EVENT_COMPLETED_ADMISSION);
   {
     // Take lock to ensure the Dequeue thread does not modify the request queue.
     lock_guard<mutex> lock(admission_ctrl_lock_);
-    RequestQueue* queue = &request_queue_map_[pool_name];
+    request.query_events->MarkEvent(QUERY_EVENT_SUBMIT_FOR_ADMISSION);
+
     pool_config_map_[pool_name] = pool_cfg;
     PoolStats* stats = GetPoolStats(pool_name);
     stats->UpdateConfigMetrics(pool_cfg, cluster_size);
     stats->UpdateDerivedMetrics(pool_cfg, cluster_size);
-    VLOG_QUERY << "Schedule for id=" << PrintId(schedule->query_id())
-               << " in pool_name=" << pool_name << " per_host_mem_estimate="
-               << PrintBytes(schedule->GetPerHostMemoryEstimate())
-               << " PoolConfig: max_requests=" << max_requests << " ("
-               << GetMaxRequestsForPoolDescription(pool_cfg, cluster_size) << ")"
-               << " max_queued=" << max_queued << " ("
-               << GetMaxQueuedForPoolDescription(pool_cfg, cluster_size) << ")"
-               << " max_mem=" << PrintBytes(max_mem) << " ("
-               << GetMaxMemForPoolDescription(pool_cfg, cluster_size) << ")";
-    VLOG_QUERY << "Stats: " << stats->DebugString();
-    string rejection_reason;
-    if (RejectImmediately(*schedule, pool_cfg, cluster_size, &rejection_reason)) {
-      AdmissionOutcome outcome =
-          admit_outcome->Set(AdmissionOutcome::REJECTED_OR_TIMED_OUT);
-      if (outcome != AdmissionOutcome::REJECTED_OR_TIMED_OUT) {
+
+    bool must_reject = !FindGroupToAdmitOrReject(cluster_size, membership_snapshot,
+        pool_cfg, /* admit_from_queue=*/false, stats, &queue_node);
+    if (must_reject) {
+      AdmissionOutcome outcome = admit_outcome->Set(AdmissionOutcome::REJECTED);
+      if (outcome != AdmissionOutcome::REJECTED) {
         DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED);
         VLOG_QUERY << "Ready to be " << PROFILE_INFO_VAL_REJECTED
-                   << " but already cancelled, query id="
-                   << PrintId(schedule->query_id());
+                   << " but already cancelled, query id=" << PrintId(request.query_id);
         return Status::CANCELLED;
       }
-      schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_ADMISSION_RESULT,
-          PROFILE_INFO_VAL_REJECTED);
+      request.summary_profile->AddInfoString(
+          PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_REJECTED);
       stats->metrics()->total_rejected->Increment(1);
-      const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_REJECTED,
-          pool_name, rejection_reason);
+      const ErrorMsg& rejected_msg = ErrorMsg(
+          TErrorCode::ADMISSION_REJECTED, pool_name, queue_node.not_admitted_reason);
       VLOG_QUERY << rejected_msg.msg();
       return Status::Expected(rejected_msg);
     }
-    pools_for_updates_.insert(pool_name);
 
-    if (CanAdmitRequest(*schedule, pool_cfg, cluster_size, false, &not_admitted_reason)) {
+    if (queue_node.admitted_schedule.get() != nullptr) {
+      const string& group_name = queue_node.admitted_schedule->executor_group();
+      VLOG(3) << "Can admit to group " << group_name << " (or cancelled)";
       DCHECK_EQ(stats->local_stats().num_queued, 0);
+      *schedule_result = std::move(queue_node.admitted_schedule);
       AdmissionOutcome outcome = admit_outcome->Set(AdmissionOutcome::ADMITTED);
       if (outcome != AdmissionOutcome::ADMITTED) {
         DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED);
         VLOG_QUERY << "Ready to be " << PROFILE_INFO_VAL_ADMIT_IMMEDIATELY
-                   << " but already cancelled, query id="
-                   << PrintId(schedule->query_id());
+                   << " but already cancelled, query id=" << PrintId(request.query_id);
         return Status::CANCELLED;
       }
-      VLOG_QUERY << "Admitted query id=" << PrintId(schedule->query_id());
-      AdmitQuery(schedule, false);
+      VLOG_QUERY << "Admitting query id=" << PrintId(request.query_id);
+      AdmitQuery(schedule_result->get(), false);
       stats->UpdateWaitTime(0);
       VLOG_RPC << "Final: " << stats->DebugString();
       return Status::OK();
     }
 
     // We cannot immediately admit but do not need to reject, so queue the request
-    VLOG_QUERY << "Queuing, query id=" << PrintId(schedule->query_id())
-               << " reason: " << not_admitted_reason;
-    stats->Queue(*schedule);
+    RequestQueue* queue = &request_queue_map_[pool_name];
+    VLOG_QUERY << "Queuing, query id=" << PrintId(request.query_id)
+               << " reason: " << queue_node.not_admitted_reason;
+    initial_queue_reason = queue_node.not_admitted_reason;
+    stats->Queue();
     queue->Enqueue(&queue_node);
   }
 
   // Update the profile info before waiting. These properties will be updated with
   // their final state after being dequeued.
-  schedule->summary_profile()->AddInfoString(
+  request.summary_profile->AddInfoString(
       PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_QUEUED);
-  schedule->summary_profile()->AddInfoString(
-      PROFILE_INFO_KEY_INITIAL_QUEUE_REASON, not_admitted_reason);
-  schedule->summary_profile()->AddInfoString(
-      PROFILE_INFO_KEY_LAST_QUEUED_REASON, not_admitted_reason);
-  schedule->query_events()->MarkEvent(QUERY_EVENT_QUEUED);
+  request.summary_profile->AddInfoString(
+      PROFILE_INFO_KEY_INITIAL_QUEUE_REASON, initial_queue_reason);
+  request.summary_profile->AddInfoString(
+      PROFILE_INFO_KEY_LAST_QUEUED_REASON, queue_node.not_admitted_reason);
+  request.query_events->MarkEvent(QUERY_EVENT_QUEUED);
 
   int64_t queue_wait_timeout_ms = FLAGS_queue_wait_timeout_ms;
   if (pool_cfg.__isset.queue_timeout_ms) {
@@ -737,48 +831,56 @@ Status AdmissionController::SubmitForAdmission(QuerySchedule* schedule,
   bool timed_out;
   admit_outcome->Get(queue_wait_timeout_ms, &timed_out);
   int64_t wait_time_ms = MonotonicMillis() - wait_start_ms;
-  schedule->summary_profile()->AddInfoString(PROFILE_INFO_KEY_INITIAL_QUEUE_REASON,
+  request.summary_profile->AddInfoString(PROFILE_INFO_KEY_INITIAL_QUEUE_REASON,
       Substitute(
-          PROFILE_INFO_VAL_INITIAL_QUEUE_REASON, wait_time_ms, not_admitted_reason));
+          PROFILE_INFO_VAL_INITIAL_QUEUE_REASON, wait_time_ms, initial_queue_reason));
 
   // Disallow the FAIL action here. It would leave the queue in an inconsistent state.
-  DebugActionNoFail(schedule->query_options(), "AC_AFTER_ADMISSION_OUTCOME");
+  DebugActionNoFail(request.query_options, "AC_AFTER_ADMISSION_OUTCOME");
 
   {
     lock_guard<mutex> lock(admission_ctrl_lock_);
     // If the query has not been admitted or cancelled up till now, it will be considered
     // to be timed out.
-    AdmissionOutcome outcome =
-        admit_outcome->Set(AdmissionOutcome::REJECTED_OR_TIMED_OUT);
+    AdmissionOutcome outcome = admit_outcome->Set(AdmissionOutcome::TIMED_OUT);
     RequestQueue* queue = &request_queue_map_[pool_name];
     pools_for_updates_.insert(pool_name);
-    PoolStats* stats = GetPoolStats(pool_name);
-    stats->UpdateWaitTime(wait_time_ms);
-    if (outcome == AdmissionOutcome::REJECTED_OR_TIMED_OUT) {
-      queue->Remove(&queue_node);
-      stats->Dequeue(*schedule, true);
-      schedule->summary_profile()->AddInfoString(
+    PoolStats* pool_stats = GetPoolStats(pool_name);
+    pool_stats->UpdateWaitTime(wait_time_ms);
+    if (outcome == AdmissionOutcome::REJECTED) {
+      if (queue->Remove(&queue_node)) pool_stats->Dequeue(true);
+      request.summary_profile->AddInfoString(
+          PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_REJECTED);
+      const ErrorMsg& rejected_msg = ErrorMsg(
+          TErrorCode::ADMISSION_REJECTED, pool_name, queue_node.not_admitted_reason);
+      VLOG_QUERY << rejected_msg.msg();
+      return Status::Expected(rejected_msg);
+    } else if (outcome == AdmissionOutcome::TIMED_OUT) {
+      bool removed = queue->Remove(&queue_node);
+      DCHECK(removed);
+      pool_stats->Dequeue(true);
+      request.summary_profile->AddInfoString(
           PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_TIME_OUT);
       const ErrorMsg& rejected_msg = ErrorMsg(TErrorCode::ADMISSION_TIMED_OUT,
-          queue_wait_timeout_ms, pool_name, not_admitted_reason);
+          queue_wait_timeout_ms, pool_name, queue_node.not_admitted_reason);
       VLOG_QUERY << rejected_msg.msg();
       return Status::Expected(rejected_msg);
     } else if (outcome == AdmissionOutcome::CANCELLED) {
-      // Only update stats if it has not already been removed and updated by the Dequeue
-      // thread.
-      if (queue->Remove(&queue_node)) stats->Dequeue(*schedule, false);
-      schedule->summary_profile()->AddInfoString(
+      if (queue->Remove(&queue_node)) pool_stats->Dequeue(false);
+      request.summary_profile->AddInfoString(
           PROFILE_INFO_KEY_ADMISSION_RESULT, PROFILE_INFO_VAL_CANCELLED_IN_QUEUE);
       VLOG_QUERY << PROFILE_INFO_VAL_CANCELLED_IN_QUEUE
-                 << ", query id=" << PrintId(schedule->query_id());
+                 << ", query id=" << PrintId(request.query_id);
       return Status::CANCELLED;
     }
     // The dequeue thread updates the stats (to avoid a race condition) so we do
     // not change them here.
     DCHECK_ENUM_EQ(outcome, AdmissionOutcome::ADMITTED);
+    DCHECK(queue_node.admitted_schedule.get() != nullptr);
+    *schedule_result = std::move(queue_node.admitted_schedule);
     DCHECK(!queue->Contains(&queue_node));
-    VLOG_QUERY << "Admitted queued query id=" << PrintId(schedule->query_id());
-    VLOG_RPC << "Final: " << stats->DebugString();
+    VLOG_QUERY << "Admitted queued query id=" << PrintId(request.query_id);
+    VLOG_RPC << "Final: " << pool_stats->DebugString();
     return Status::OK();
   }
 }
@@ -788,9 +890,9 @@ void AdmissionController::ReleaseQuery(
   const string& pool_name = schedule.request_pool();
   {
     lock_guard<mutex> lock(admission_ctrl_lock_);
-    PoolStats* stats = GetPoolStats(pool_name);
+    PoolStats* stats = GetPoolStats(schedule);
     stats->Release(schedule, peak_mem_consumption);
-    UpdateHostMemAdmitted(schedule, -schedule.per_backend_mem_to_admit());
+    UpdateHostStats(schedule, -schedule.per_backend_mem_to_admit(), -1);
     pools_for_updates_.insert(pool_name);
     VLOG_RPC << "Released query id=" << PrintId(schedule.query_id()) << " "
              << stats->DebugString();
@@ -798,6 +900,13 @@ void AdmissionController::ReleaseQuery(
   dequeue_cv_.NotifyOne();
 }
 
+Status AdmissionController::ResolvePoolAndGetConfig(
+    const TQueryCtx& query_ctx, string* pool_name, TPoolConfig* pool_config) {
+  RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(query_ctx, pool_name));
+  DCHECK_EQ(query_ctx.request_pool, *pool_name);
+  return request_pool_service_->GetPoolConfig(*pool_name, pool_config);
+}
+
 // Statestore subscriber callback for IMPALA_REQUEST_QUEUE_TOPIC.
 void AdmissionController::UpdatePoolStats(
     const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
@@ -815,9 +924,7 @@ void AdmissionController::UpdatePoolStats(
       // and then re-compute the pool stats for any pools that changed.
       if (!delta.is_delta) {
         VLOG_ROW << "Full impala-request-queue stats update";
-        for (PoolStatsMap::value_type& entry: pool_stats_) {
-          entry.second.ClearRemoteStats();
-        }
+        for (auto& entry : pool_stats_) entry.second.ClearRemoteStats();
       }
       HandleTopicUpdates(delta.topic_entries);
     }
@@ -843,7 +950,7 @@ void AdmissionController::PoolStats::UpdateRemoteStats(const string& host_id,
     if (it != remote_stats_.end()) {
       remote_stats_.erase(it);
     } else {
-      VLOG_RPC << "Attempted to remove non-existent remote stats for host=" << host_id;
+      VLOG_QUERY << "Attempted to remove non-existent remote stats for host=" << host_id;
     }
   } else {
     remote_stats_[host_id] = *host_stats;
@@ -879,8 +986,7 @@ void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reser
   int64_t num_running = 0;
   int64_t num_queued = 0;
   int64_t mem_reserved = 0;
-  for (const PoolStats::RemoteStatsMap::value_type& remote_entry:
-       remote_stats_) {
+  for (const PoolStats::RemoteStatsMap::value_type& remote_entry : remote_stats_) {
     const string& host = remote_entry.first;
     // Skip an update from this subscriber as the information may be outdated.
     // The stats from this coordinator will be added below.
@@ -892,10 +998,13 @@ void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reser
     num_running += remote_pool_stats.num_admitted_running;
     num_queued += remote_pool_stats.num_queued;
 
-    // The update the per-pool and per-host aggregates with the mem reserved by this
-    // host in this pool.
+    // Update the per-pool and per-host aggregates with the mem reserved by this host in
+    // this pool.
     mem_reserved += remote_pool_stats.backend_mem_reserved;
     (*host_mem_reserved)[host] += remote_pool_stats.backend_mem_reserved;
+    // TODO(IMPALA-8762): For multiple coordinators, need to track the number of running
+    // queries per executor, i.e. every admission controller needs to send the full map to
+    // everyone else.
   }
   num_running += local_stats_.num_admitted_running;
   num_queued += local_stats_.num_queued;
@@ -908,8 +1017,8 @@ void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reser
   DCHECK_GE(num_running, local_stats_.num_admitted_running);
   DCHECK_GE(num_queued, local_stats_.num_queued);
 
-  if (agg_num_running_ == num_running && agg_num_queued_ == num_queued &&
-      agg_mem_reserved_ == mem_reserved) {
+  if (agg_num_running_ == num_running && agg_num_queued_ == num_queued
+      && agg_mem_reserved_ == mem_reserved) {
     DCHECK_EQ(num_running, metrics_.agg_num_running->GetValue());
     DCHECK_EQ(num_queued, metrics_.agg_num_queued->GetValue());
     DCHECK_EQ(mem_reserved, metrics_.agg_mem_reserved->GetValue());
@@ -926,25 +1035,164 @@ void AdmissionController::PoolStats::UpdateAggregates(HostMemMap* host_mem_reser
 }
 
 void AdmissionController::UpdateClusterAggregates() {
-  // Recompute the host mem reserved.
-  HostMemMap updated_mem_reserved;
-  for (PoolStatsMap::value_type& entry: pool_stats_) {
-    entry.second.UpdateAggregates(&updated_mem_reserved);
-  }
+  // Recompute mem_reserved for all hosts.
+  PoolStats::HostMemMap updated_mem_reserved;
+  for (auto& entry : pool_stats_) entry.second.UpdateAggregates(&updated_mem_reserved);
 
-  if (VLOG_ROW_IS_ON) {
-    stringstream ss;
-    ss << "Updated mem reserved for hosts:";
-    int i = 0;
-    for (const HostMemMap::value_type& e: updated_mem_reserved) {
-      if (host_mem_reserved_[e.first] == e.second) continue;
-      ss << endl << e.first << ": " << PrintBytes(host_mem_reserved_[e.first]);
+  stringstream ss;
+  ss << "Updated mem reserved for hosts:";
+  int i = 0;
+  for (const auto& e : updated_mem_reserved) {
+    int64_t old_mem_reserved = host_stats_[e.first].mem_reserved;
+    if (old_mem_reserved == e.second) continue;
+    host_stats_[e.first].mem_reserved = e.second;
+    if (VLOG_ROW_IS_ON) {
+      ss << endl << e.first << ": " << PrintBytes(old_mem_reserved);
       ss << " -> " << PrintBytes(e.second);
       ++i;
     }
-    if (i > 0) VLOG_ROW << ss.str();
   }
-  host_mem_reserved_ = updated_mem_reserved;
+  if (i > 0) VLOG_ROW << ss.str();
+}
+
+Status AdmissionController::ComputeGroupSchedules(
+    ClusterMembershipMgr::SnapshotPtr membership_snapshot, QueueNode* queue_node) {
+  int64_t previous_membership_version = 0;
+  if (queue_node->membership_snapshot.get() != nullptr) {
+    previous_membership_version = queue_node->membership_snapshot->version;
+  }
+  int64_t current_membership_version = membership_snapshot->version;
+
+  DCHECK_GT(current_membership_version, 0);
+  DCHECK_GE(current_membership_version, previous_membership_version);
+  if (current_membership_version <= previous_membership_version) {
+    VLOG(3) << "No rescheduling necessary, previous membership version: "
+            << previous_membership_version
+            << ", current membership version: " << current_membership_version;
+    return Status::OK();
+  }
+  const AdmissionRequest& request = queue_node->admission_request;
+  VLOG(3) << "Scheduling query " << PrintId(request.query_id)
+          << " with membership version " << current_membership_version;
+
+  queue_node->membership_snapshot = membership_snapshot;
+  std::vector<GroupSchedule>* output_schedules = &queue_node->group_schedules;
+  output_schedules->clear();
+
+  const string& pool_name = request.request.query_ctx.request_pool;
+
+  // If the first statestore update arrives before the local backend has finished starting
+  // up, we might not have a local backend descriptor yet. We return no schedules, which
+  // will result in the query being queued.
+  if (membership_snapshot->local_be_desc == nullptr) {
+    queue_node->not_admitted_reason = REASON_LOCAL_BACKEND_NOT_STARTED;
+    LOG(WARNING) << queue_node->not_admitted_reason;
+    return Status::OK();
+  }
+  const TBackendDescriptor& local_be_desc = *membership_snapshot->local_be_desc;
+
+  vector<const ExecutorGroup*> executor_groups;
+  GetExecutorGroupsForPool(
+      membership_snapshot->executor_groups, pool_name, &executor_groups);
+
+  if (executor_groups.empty()) {
+    queue_node->not_admitted_reason = Substitute(REASON_NO_EXECUTOR_GROUPS, pool_name);
+    LOG(WARNING) << queue_node->not_admitted_reason;
+    return Status::OK();
+  }
+
+  // We loop over the executor groups in a deterministic order. This means we will fill up
+  // each executor group before considering an unused one. In particular, we will not try
+  // to balance queries across executor groups equally.
+  // TODO(IMPALA-8731): balance queries across executor groups more evenly
+  for (const ExecutorGroup* executor_group : executor_groups) {
+    DCHECK(executor_group->IsHealthy());
+    DCHECK_GT(executor_group->NumExecutors(), 0);
+    unique_ptr<QuerySchedule> group_schedule =
+        make_unique<QuerySchedule>(request.query_id, request.request,
+            request.query_options, request.summary_profile, request.query_events);
+    const string& group_name = executor_group->name();
+    VLOG(3) << "Scheduling for executor group: " << group_name << " with "
+            << executor_group->NumExecutors() << " executors";
+    const Scheduler::ExecutorConfig group_config = {*executor_group, local_be_desc};
+    RETURN_IF_ERROR(ExecEnv::GetInstance()->scheduler()->Schedule(
+        group_config, group_schedule.get()));
+    DCHECK(!group_schedule->executor_group().empty());
+    output_schedules->emplace_back(std::move(group_schedule), *executor_group);
+  }
+  DCHECK(!output_schedules->empty());
+  return Status::OK();
+}
+
+bool AdmissionController::FindGroupToAdmitOrReject(int64_t cluster_size,
+    ClusterMembershipMgr::SnapshotPtr membership_snapshot, const TPoolConfig& pool_config,
+    bool admit_from_queue, PoolStats* pool_stats, QueueNode* queue_node) {
+  // Check for rejection based on current cluster size
+  const string& pool_name = pool_stats->name();
+  string rejection_reason;
+  if (RejectForCluster(
+          pool_name, pool_config, admit_from_queue, cluster_size, &rejection_reason)) {
+    DCHECK(!rejection_reason.empty());
+    queue_node->not_admitted_reason = rejection_reason;
+    return false;
+  }
+
+  // Compute schedules
+  Status ret = ComputeGroupSchedules(membership_snapshot, queue_node);
+  if (!ret.ok()) {
+    DCHECK(queue_node->not_admitted_reason.empty());
+    queue_node->not_admitted_reason = Substitute(REASON_SCHEDULER_ERROR, ret.GetDetail());
+    return false;
+  }
+  if (queue_node->group_schedules.empty()) {
+    DCHECK(!queue_node->not_admitted_reason.empty());
+    return true;
+  }
+
+  for (GroupSchedule& group_schedule : queue_node->group_schedules) {
+    const ExecutorGroup& executor_group = group_schedule.executor_group;
+    DCHECK_GT(executor_group.NumExecutors(), 0);
+    QuerySchedule* schedule = group_schedule.schedule.get();
+    schedule->UpdateMemoryRequirements(pool_config);
+
+    const string& group_name = executor_group.name();
+    int64_t group_size = executor_group.NumExecutors();
+    VLOG(3) << "Trying to admit query to pool " << pool_name << " in executor group "
+            << group_name << " (" << group_size << " executors)";
+
+    const int64_t max_queued = GetMaxQueuedForPool(pool_config, cluster_size);
+    const int64_t max_mem = GetMaxMemForPool(pool_config, cluster_size);
+    const int64_t max_requests = GetMaxRequestsForPool(pool_config, cluster_size);
+    VLOG_QUERY << "Trying to admit id=" << PrintId(schedule->query_id())
+               << " in pool_name=" << pool_name << " executor_group_name=" << group_name
+               << " per_host_mem_estimate="
+               << PrintBytes(schedule->GetPerHostMemoryEstimate())
+               << " max_requests=" << max_requests << " ("
+               << GetMaxRequestsForPoolDescription(pool_config, cluster_size) << ")"
+               << " max_queued=" << max_queued << " ("
+               << GetMaxQueuedForPoolDescription(pool_config, cluster_size) << ")"
+               << " max_mem=" << PrintBytes(max_mem) << " ("
+               << GetMaxMemForPoolDescription(pool_config, cluster_size) << ")";
+    VLOG_QUERY << "Stats: " << pool_stats->DebugString();
+
+    // Query is rejected if the rejection check fails on *any* group.
+    if (RejectForSchedule(
+            *schedule, pool_config, cluster_size, group_size, &rejection_reason)) {
+      DCHECK(!rejection_reason.empty());
+      queue_node->not_admitted_reason = rejection_reason;
+      return false;
+    }
+
+    if (CanAdmitRequest(*schedule, pool_config, cluster_size, admit_from_queue,
+            &queue_node->not_admitted_reason)) {
+      queue_node->admitted_schedule = std::move(group_schedule.schedule);
+      return true;
+    } else {
+      VLOG_RPC << "Cannot admit query " << queue_node->admission_request.query_id
+               << " to group " << group_name << ": " << queue_node->not_admitted_reason;
+    }
+  }
+  return true;
 }
 
 void AdmissionController::PoolStats::UpdateMemTrackerStats() {
@@ -969,7 +1217,7 @@ void AdmissionController::PoolStats::UpdateMemTrackerStats() {
 void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
   // local_stats_ are updated eagerly except for backend_mem_reserved (which isn't used
   // for local admission control decisions). Update that now before sending local_stats_.
-  for (PoolStatsMap::value_type& entry: pool_stats_) {
+  for (auto& entry : pool_stats_) {
     entry.second.UpdateMemTrackerStats();
   }
   if (pools_for_updates_.empty()) return;
@@ -998,26 +1246,25 @@ void AdmissionController::DequeueLoop() {
     unique_lock<mutex> lock(admission_ctrl_lock_);
     if (done_) break;
     dequeue_cv_.Wait(lock);
-    const int64_t cluster_size = GetClusterSize();
+    ClusterMembershipMgr::SnapshotPtr membership_snapshot =
+        ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot();
+
+    // If a query was queued while the cluster is still starting up but the client facing
+    // services have already started to accept connections, the whole membership can still
+    // be empty.
+    if (membership_snapshot->executor_groups.empty()) continue;
+    const int64_t cluster_size = GetClusterSize(*membership_snapshot);
+
     for (const PoolConfigMap::value_type& entry: pool_config_map_) {
       const string& pool_name = entry.first;
       const TPoolConfig& pool_config = entry.second;
-      PoolStatsMap::iterator it = pool_stats_.find(pool_name);
-      DCHECK(it != pool_stats_.end());
-      PoolStats* stats = &it->second;
-      RequestQueue& queue = request_queue_map_[pool_name];
+      PoolStats* stats = GetPoolStats(pool_name, /* dcheck_exists=*/true);
       stats->UpdateDerivedMetrics(pool_config, cluster_size);
 
       if (stats->local_stats().num_queued == 0) continue; // Nothing to dequeue
-
-      // Handle the unlikely case that after requests were queued, the pool config was
-      // changed and the pool was disabled. Skip dequeuing them and let them time out.
-      // TODO: Dequeue and reject all requests in this pool.
-      if (PoolDisabled(pool_config)) continue;
-
-      DCHECK_GT(stats->local_stats().num_queued, 0);
       DCHECK_GE(stats->agg_num_queued(), stats->local_stats().num_queued);
 
+      RequestQueue& queue = request_queue_map_[pool_name];
       int64_t max_to_dequeue = GetMaxToDequeue(queue, stats, pool_config, cluster_size);
       VLOG_RPC << "Dequeue thread will try to admit " << max_to_dequeue << " requests"
                << ", pool=" << pool_name
@@ -1028,32 +1275,64 @@ void AdmissionController::DequeueLoop() {
       while (max_to_dequeue > 0 && !queue.empty()) {
         QueueNode* queue_node = queue.head();
         DCHECK(queue_node != nullptr);
-        QuerySchedule* schedule = queue_node->schedule;
-        schedule->UpdateMemoryRequirements(pool_config);
+        // Find a group that can admit the query
         bool is_cancelled = queue_node->admit_outcome->IsSet()
             && queue_node->admit_outcome->Get() == AdmissionOutcome::CANCELLED;
-        string not_admitted_reason;
-        // TODO: Requests further in the queue may be blocked unnecessarily. Consider a
-        // better policy once we have better test scenarios.
-        if (!is_cancelled
-            && !CanAdmitRequest(
-                   *schedule, pool_config, cluster_size, true, &not_admitted_reason)) {
-          LogDequeueFailed(queue_node, not_admitted_reason);
+
+        bool is_rejected = !is_cancelled
+            && !FindGroupToAdmitOrReject(cluster_size, membership_snapshot, pool_config,
+                   /* admit_from_queue=*/true, stats, queue_node);
+
+        if (!is_cancelled && !is_rejected
+            && queue_node->admitted_schedule.get() == nullptr) {
+          // If no group was found, stop trying to dequeue.
+          // TODO(IMPALA-2968): Requests further in the queue may be blocked
+          // unnecessarily. Consider a better policy once we have better test scenarios.
+          LogDequeueFailed(queue_node, queue_node->not_admitted_reason);
           break;
         }
-        VLOG_RPC << "Dequeuing query=" << PrintId(schedule->query_id());
+
+        // At this point we know that the query must be taken off the queue
         queue.Dequeue();
         --max_to_dequeue;
-        stats->Dequeue(*schedule, false);
-        // If query is already cancelled, just dequeue and continue.
-        AdmissionOutcome outcome =
-            queue_node->admit_outcome->Set(AdmissionOutcome::ADMITTED);
-        if (outcome == AdmissionOutcome::CANCELLED) {
-          VLOG_QUERY << "Dequeued cancelled query=" << PrintId(schedule->query_id());
-          continue;
+        VLOG(3) << "Dequeueing from stats for pool " << pool_name;
+        stats->Dequeue(false);
+
+        if (is_rejected) {
+          AdmissionOutcome outcome =
+              queue_node->admit_outcome->Set(AdmissionOutcome::REJECTED);
+          if (outcome == AdmissionOutcome::REJECTED) {
+            stats->metrics()->total_rejected->Increment(1);
+            continue; // next query
+          } else {
+            DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED);
+            is_cancelled = true;
+          }
+        }
+        DCHECK(is_cancelled || queue_node->admitted_schedule != nullptr);
+
+        const TUniqueId& query_id = queue_node->admission_request.query_id;
+        if (!is_cancelled) {
+          VLOG_QUERY << "Admitting from queue: query=" << PrintId(query_id);
+          AdmissionOutcome outcome =
+              queue_node->admit_outcome->Set(AdmissionOutcome::ADMITTED);
+          if (outcome != AdmissionOutcome::ADMITTED) {
+            DCHECK_ENUM_EQ(outcome, AdmissionOutcome::CANCELLED);
+            is_cancelled = true;
+          }
         }
-        DCHECK_ENUM_EQ(outcome, AdmissionOutcome::ADMITTED);
-        AdmitQuery(schedule, true);
+
+        if (is_cancelled) {
+          VLOG_QUERY << "Dequeued cancelled query=" << PrintId(query_id);
+          continue; // next query
+        }
+
+        DCHECK(queue_node->admit_outcome->IsSet());
+        DCHECK_ENUM_EQ(queue_node->admit_outcome->Get(), AdmissionOutcome::ADMITTED);
+        DCHECK(!is_cancelled);
+        DCHECK(!is_rejected);
+        DCHECK(queue_node->admitted_schedule != nullptr);
+        AdmitQuery(queue_node->admitted_schedule.get(), true);
       }
       pools_for_updates_.insert(pool_name);
     }
@@ -1087,68 +1366,48 @@ int64_t AdmissionController::GetMaxToDequeue(RequestQueue& queue, PoolStats* sta
     return min(stats->local_stats().num_queued,
         max<int64_t>(1, queue_size_ratio * total_available));
   } else {
-    return stats->agg_num_queued(); // No limit on num running requests
+    return stats->local_stats().num_queued; // No limit on num running requests
   }
 }
 
 void AdmissionController::LogDequeueFailed(QueueNode* node,
     const string& not_admitted_reason) {
-  VLOG_QUERY << "Could not dequeue query id="
-             << PrintId(node->schedule->query_id())
+  VLOG_QUERY << "Could not dequeue query id=" << PrintId(node->admission_request.query_id)
              << " reason: " << not_admitted_reason;
-  node->profile->AddInfoString(PROFILE_INFO_KEY_LAST_QUEUED_REASON,
-      not_admitted_reason);
+  node->admission_request.summary_profile->AddInfoString(
+      PROFILE_INFO_KEY_LAST_QUEUED_REASON, not_admitted_reason);
+}
+
+AdmissionController::PoolStats* AdmissionController::GetPoolStats(
+    const QuerySchedule& schedule) {
+  DCHECK(!schedule.request_pool().empty());
+  return GetPoolStats(schedule.request_pool());
 }
 
-AdmissionController::PoolStats*
-AdmissionController::GetPoolStats(const string& pool_name) {
-  PoolStatsMap::iterator it = pool_stats_.find(pool_name);
+AdmissionController::PoolStats* AdmissionController::GetPoolStats(
+    const string& pool_name, bool dcheck_exists) {
+  DCHECK(!pool_name.empty());
+  auto it = pool_stats_.find(pool_name);
+  DCHECK(!dcheck_exists || it != pool_stats_.end());
   if (it == pool_stats_.end()) {
-    pool_stats_.insert(PoolStatsMap::value_type(pool_name, PoolStats(this, pool_name)));
-    it = pool_stats_.find(pool_name);
+    bool inserted;
+    std::tie(it, inserted) = pool_stats_.emplace(pool_name, PoolStats(this, pool_name));
+    DCHECK(inserted);
   }
   DCHECK(it != pool_stats_.end());
   return &it->second;
 }
 
-bool AdmissionController::IsPoolConfigValidForCluster(
-    const TPoolConfig& pool_cfg, int64_t cluster_size, string* reason) {
-  if (pool_cfg.max_query_mem_limit > 0
-      && pool_cfg.min_query_mem_limit > pool_cfg.max_query_mem_limit) {
-    *reason = Substitute("Invalid pool config: the min_query_mem_limit is greater than "
-                         "the max_query_mem_limit ($0 > $1)",
-        pool_cfg.min_query_mem_limit, pool_cfg.max_query_mem_limit);
-    return false;
-  }
-  int64_t max_mem = GetMaxMemForPool(pool_cfg, cluster_size);
-  if (max_mem > 0 && pool_cfg.min_query_mem_limit > max_mem) {
-    if (PoolHasFixedMemoryLimit(pool_cfg)) {
-      *reason =
-          Substitute("Invalid pool config: the min_query_mem_limit $0 is greater than "
-                     "the max_mem_resources $1 (configured statically)",
-              pool_cfg.min_query_mem_limit, max_mem);
-    } else {
-      *reason =
-          Substitute("The min_query_mem_limit $0 is greater than "
-                     "the current max_mem_resources $1 ($2); queries will not be admitted"
-                     " until more executors are available.",
-              pool_cfg.min_query_mem_limit, max_mem,
-              GetMaxMemForPoolDescription(pool_cfg, cluster_size));
-    }
-    return false;
-  }
-  return true;
-}
-
 void AdmissionController::AdmitQuery(QuerySchedule* schedule, bool was_queued) {
-  PoolStats* pool_stats = GetPoolStats(schedule->request_pool());
-  VLOG_RPC << "For Query " << schedule->query_id() << " per_backend_mem_limit set to: "
+  PoolStats* pool_stats = GetPoolStats(*schedule);
+  VLOG_RPC << "For Query " << PrintId(schedule->query_id())
+           << " per_backend_mem_limit set to: "
            << PrintBytes(schedule->per_backend_mem_limit())
            << " per_backend_mem_to_admit set to: "
            << PrintBytes(schedule->per_backend_mem_to_admit());
-  // Update memory accounting.
+  // Update memory and number of queries.
   pool_stats->Admit(*schedule);
-  UpdateHostMemAdmitted(*schedule, schedule->per_backend_mem_to_admit());
+  UpdateHostStats(*schedule, schedule->per_backend_mem_to_admit(), 1);
   // Update summary profile.
   const string& admission_result =
       was_queued ? PROFILE_INFO_VAL_ADMIT_QUEUED : PROFILE_INFO_VAL_ADMIT_IMMEDIATELY;
@@ -1156,6 +1415,8 @@ void AdmissionController::AdmitQuery(QuerySchedule* schedule, bool was_queued) {
       PROFILE_INFO_KEY_ADMISSION_RESULT, admission_result);
   schedule->summary_profile()->AddInfoString(
       PROFILE_INFO_KEY_ADMITTED_MEM, PrintBytes(schedule->GetClusterMemoryToAdmit()));
+  schedule->summary_profile()->AddInfoString(
+      PROFILE_INFO_KEY_EXECUTOR_GROUP, schedule->executor_group());
   // We may have admitted based on stale information. Include a warning in the profile
   // if this this may be the case.
   int64_t time_since_update_ms;
@@ -1168,7 +1429,6 @@ void AdmissionController::AdmitQuery(QuerySchedule* schedule, bool was_queued) {
     schedule->summary_profile()->AddInfoString(
         PROFILE_INFO_KEY_STALENESS_WARNING, staleness_detail);
   }
-
 }
 
 string AdmissionController::GetStalenessDetail(const string& prefix,
@@ -1194,7 +1454,7 @@ string AdmissionController::GetStalenessDetailLocked(const string& prefix,
 
 void AdmissionController::PoolToJsonLocked(const string& pool_name,
     rapidjson::Value* resource_pools, rapidjson::Document* document) {
-  PoolStatsMap::iterator it = pool_stats_.find(pool_name);
+  auto it = pool_stats_.find(pool_name);
   if (it == pool_stats_.end()) return;
   PoolStats* stats = &it->second;
   RequestQueue& queue = request_queue_map_[pool_name];
@@ -1206,7 +1466,16 @@ void AdmissionController::PoolToJsonLocked(const string& pool_name,
   // Get the queued queries
   Value queued_queries(kArrayType);
   queue.Iterate([&queued_queries, document](QueueNode* node) {
-    QuerySchedule* schedule = node->schedule;
+    if (node->group_schedules.empty()) {
+      Value query_info(kObjectType);
+      query_info.AddMember("query_id", "N/A", document->GetAllocator());
+      query_info.AddMember("mem_limit", 0, document->GetAllocator());
+      query_info.AddMember("mem_limit_to_admit", 0, document->GetAllocator());
+      query_info.AddMember("num_backends", 0, document->GetAllocator());
+      queued_queries.PushBack(query_info, document->GetAllocator());
+      return true;
+    }
+    QuerySchedule* schedule = node->group_schedules.begin()->schedule.get();
     Value query_info(kObjectType);
     Value query_id(PrintId(schedule->query_id()).c_str(), document->GetAllocator());
     query_info.AddMember("query_id", query_id, document->GetAllocator());
@@ -1316,7 +1585,7 @@ void AdmissionController::PoolStats::ToJson(
 
 void AdmissionController::ResetPoolInformationalStats(const string& pool_name) {
   lock_guard<mutex> lock(admission_ctrl_lock_);
-  PoolStatsMap::iterator it = pool_stats_.find(pool_name);
+  auto it = pool_stats_.find(pool_name);
   if(it == pool_stats_.end()) return;
   it->second.ResetInformationalStats();
 }
@@ -1400,11 +1669,9 @@ void AdmissionController::PoolStats::InitMetrics() {
 }
 
 void AdmissionController::PopulatePerHostMemReservedAndAdmitted(
-    std::unordered_map<string, pair<int64_t, int64_t>>* mem_map) {
+    PerHostStats* per_host_stats) {
   lock_guard<mutex> l(admission_ctrl_lock_);
-  for (const auto& elem: host_mem_reserved_) {
-    (*mem_map)[elem.first] = make_pair(elem.second, host_mem_admitted_[elem.first]);
-  }
+  *per_host_stats = host_stats_;
 }
 
 string AdmissionController::MakePoolTopicKey(
@@ -1415,16 +1682,50 @@ string AdmissionController::MakePoolTopicKey(
   return Substitute("$0$1$2", pool_name, TOPIC_KEY_DELIMITER, backend_id);
 }
 
-int64_t AdmissionController::GetClusterSize() {
-  const ClusterMembershipMgr::SnapshotPtr membership_snapshot =
-      cluster_membership_mgr_->GetSnapshot();
-  // The count is computed including executors that are quiescing. This is important
-  // as this value is used in calculating the memory needed to run a query.
-  // If we exclude the quiescing executors, and a query at the head of the queue is
-  // scheduled to run on those, then the scaled down limits of the pool would prevent it
-  // from being admitted and hold up the rest of the queue (till it times out), even
-  // though the query can be allowed to start running on those quiescing executors.
-  return max<int64_t>(1, membership_snapshot->current_backends.size());
+void AdmissionController::GetExecutorGroupsForPool(
+    const ClusterMembershipMgr::ExecutorGroups& all_groups, const string& pool_name,
+    vector<const ExecutorGroup*>* matching_groups) {
+  string prefix(pool_name + POOL_GROUP_DELIMITER);
+  // We search for matching groups before the health check so that we don't fall back to
+  // the default group in case there are matching but unhealthy groups.
+  for (const auto& it : all_groups) {
+    StringPiece name(it.first);
+    if (name.starts_with(prefix)) matching_groups->push_back(&it.second);
+  }
+  if (matching_groups->empty()) {
+    auto default_it = all_groups.find(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
+    if (default_it == all_groups.end()) return;
+    VLOG(3) << "Checking default executor group for pool " << pool_name;
+    matching_groups->push_back(&default_it->second);
+  }
+  // Filter out unhealthy groups.
+  auto erase_from = std::remove_if(matching_groups->begin(), matching_groups->end(),
+      [](const ExecutorGroup* g) { return !g->IsHealthy(); });
+  matching_groups->erase(erase_from, matching_groups->end());
+  // Sort executor groups by name.
+  auto cmp = [](const ExecutorGroup* a, const ExecutorGroup* b) {
+    return a->name() < b->name();
+  };
+  sort(matching_groups->begin(), matching_groups->end(), cmp);
+}
+
+int64_t AdmissionController::GetClusterSize(
+    const ClusterMembershipMgr::Snapshot& membership_snapshot) {
+  int64_t sum = 0;
+  for (const auto& it : membership_snapshot.executor_groups) {
+    sum += it.second.NumExecutors();
+  }
+  return sum;
+}
+
+int64_t AdmissionController::GetExecutorGroupSize(
+    const ClusterMembershipMgr::Snapshot& membership_snapshot,
+    const string& group_name) {
+  auto it = membership_snapshot.executor_groups.find(group_name);
+  DCHECK(it != membership_snapshot.executor_groups.end())
+      << "Could not find group " << group_name;
+  if (it == membership_snapshot.executor_groups.end()) return 0;
+  return it->second.NumExecutors();
 }
 
 int64_t AdmissionController::GetMaxMemForPool(
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index e8fab43..d71327c 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -15,13 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #ifndef SCHEDULING_ADMISSION_CONTROLLER_H
 #define SCHEDULING_ADMISSION_CONTROLLER_H
 
-#include <vector>
-#include <string>
 #include <list>
+#include <string>
+#include <utility>
+#include <vector>
 
 #include <boost/unordered_map.hpp>
 #include <boost/unordered_set.hpp>
@@ -29,11 +29,11 @@
 
 #include "common/status.h"
 #include "scheduling/cluster-membership-mgr.h"
-#include "scheduling/query-schedule.h"
 #include "scheduling/request-pool-service.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/condition-variable.h"
 #include "util/internal-queue.h"
+#include "util/runtime-profile.h"
 #include "util/thread.h"
 
 namespace impala {
@@ -46,31 +46,33 @@ class ExecEnv;
 /// has been made or the caller has initiated a cancellation.
 enum class AdmissionOutcome {
   ADMITTED,
-  REJECTED_OR_TIMED_OUT,
+  REJECTED,
+  TIMED_OUT,
   CANCELLED,
 };
 
 /// The AdmissionController is used to throttle requests (e.g. queries, DML) based
 /// on available cluster resources, which are configured in one or more resource pools. A
 /// request will either be admitted for immediate execution, queued for later execution,
-/// or rejected.  Resource pools can be configured to have maximum number of concurrent
-/// queries, maximum cluster wide memory, maximum queue size, max and min per host memory
-/// limit for every query, and to set whether the mem_limit query option will be clamped
-/// by the previously mentioned max/min per host limits or not. Queries will be queued if
-/// there are already too many queries executing or there isn't enough available memory.
-/// Once the queue reaches the maximum queue size, incoming queries will be rejected.
-/// Requests in the queue will time out after a configurable timeout.
+/// or rejected (either immediately or after being queued). Resource pools can be
+/// configured to have maximum number of concurrent queries, maximum cluster wide memory,
+/// maximum queue size, max and min per host memory limit for every query, and to set
+/// whether the mem_limit query option will be clamped by the previously mentioned max/min
+/// per host limits or not. Queries will be queued if there are already too many queries
+/// executing or there isn't enough available memory. Once the queue reaches the maximum
+/// queue size, incoming queries will be rejected. Requests in the queue will time out
+/// after a configurable timeout.
 ///
-/// Any impalad can act as a coordinator and thus also an admission controller, so some
-/// cluster state must be shared between impalads in order to make admission decisions on
-/// any node. Every impalad maintains some per-pool and per-host statistics related to
-/// the requests it itself is servicing as the admission controller. Some of these
-/// local admission statistics in addition to some backend-specific statistics (i.e.
-/// the backend executor associated with the same impalad process) are disseminated
-/// across the cluster via the statestore using the IMPALA_REQUEST_QUEUE_TOPIC topic.
-/// For example, coordinators will end up sending statestore updates where the admission
-/// statistics reflect the load and all participating backends will have statestore
-/// updates reflecting load they're executing.
+/// Depending on the -is_coordinator startup flag, multiple impalads can act as a
+/// coordinator and thus also an admission controller, so some cluster state must be
+/// shared between impalads in order to make admission decisions on any of them. Every
+/// coordinator maintains some per-pool and per-host statistics related to the requests it
+/// itself is servicing as the admission controller. Some of these local admission
+/// statistics in addition to some backend-specific statistics (i.e. the backend executor
+/// associated with the same impalad process) are disseminated across the cluster via the
+/// statestore using the IMPALA_REQUEST_QUEUE_TOPIC topic. Effectively, coordinators send
+/// statestore updates where the admission statistics reflect the load and all
+/// participating backends send statestore updates reflecting the load they're executing.
 ///
 /// Every <impalad, pool> pair is sent as a topic update at the statestore heartbeat
 /// interval when pool statistics change, and the topic updates from other impalads are
@@ -90,8 +92,8 @@ enum class AdmissionOutcome {
 /// effort and will involve changes outside of the admission controller.
 ///
 /// The memory required for admission for a request is specified as the query option
-/// MEM_LIMIT (either explicitly or via a default value). This is a per-node value. If
-/// there is no memory limit, the per-node estimate from planning is used instead as a
+/// MEM_LIMIT (either explicitly or via a default value). This is a per-host value. If
+/// there is no memory limit, the per-host estimate from planning is used instead as a
 /// memory limit and a lower bound is enforced on it based on the largest initial
 /// reservation of the query. The final memory limit used is also clamped by the max/min
 /// memory limits configured for the pool with an option to not enforce these limits on
@@ -110,7 +112,10 @@ enum class AdmissionOutcome {
 ///  3) All participating backends must have enough memory available. Each impalad has a
 ///     per-process mem limit, and that is the max memory that can be reserved on that
 ///     backend.
-///  4) The final per host memory limit used can accommodate the largest Initial
+///  3b) (optional) When using executor groups (see below) and admitting to the
+///     non-default executor group, then the number of currently running queries must be
+///     below the configured maximum for all participating backends.
+///  4) The final per host memory limit used can accommodate the largest initial
 ///     reservation.
 ///
 /// In order to admit based on these conditions, the admission controller accounts for
@@ -133,6 +138,11 @@ enum class AdmissionOutcome {
 ///     when requests are admitted and released (and NOTE: not via the statestore, so
 ///     there is no latency, but this does not account for memory from requests admitted
 ///     by other impalads).
+///  c) Num Admitted: the number of queries that have been admitted and are therefore
+///     considered to be currently running. Note that there is currently no equivalent to
+///     the reserved memory reporting, i.e. hosts do not report the actual number of
+///     queries that are currently executing (IMPALA-8762). This prevents using multiple
+///     coordinators with executor groups.
 ///
 /// As described, both the 'reserved' and 'admitted' mem accounting mechanisms have
 /// different advantages and disadvantages. The 'reserved' mem accounting works well in
@@ -143,65 +153,125 @@ enum class AdmissionOutcome {
 /// are used or, if there is a wide distribution of requests across impalads, the rate of
 /// submission is low enough that new state is able to be updated by the statestore.
 ///
-/// Example:
+/// Executor Groups:
+/// Executors in a cluster can be assigned to executor groups. Each executor can only be
+/// in one group. A resource pool can have multiple executor groups associated with it.
+/// Each executor group belongs to a single resource pool and will only serve requests
+/// from that pool. I.e. the relationships are 1 resource pool : many executor groups and
+/// 1 executor group : many executors.
+
+
+
+///
+/// Executors that don't specify an executor group name during startup are automatically
+/// added to a default group called DEFAULT_EXECUTOR_GROUP_NAME. The default executor
+/// group does not enforce query concurrency limits per host and as such can be admitted
+/// to by multiple coordinators.
+///
+/// Executor groups are mapped to resource pools implicitly by their name. Queries in a
+/// resource pool can run on all executor groups whose name starts with the pool's name,
+/// separated by a '-'. For example, queries in a pool with name 'q1' can run on all
+/// executor groups starting with 'q1-'. If no matching executor groups can be found for a
+/// resource pool and the default executor group is not empty, then the default group is
+/// used.
+///
+/// In addition to the checks described before, admission to executor groups is bounded by
+/// the maximum number of queries that can run concurrently on an executor
+/// (-max_concurrent_queries). An additional check is performed to ensure that each
+/// executor in the group has an available slot to run the query. Admission controllers
+/// include the number of queries that have been admitted to each executor in the
+/// statestore updates.
+///
+/// In order to find an executor group that can run a query, the admission controller
+/// calls FindGroupToAdmitOrReject(), either during the initial admission attempt or in
+/// DequeueLoop(). If the cluster membership has changed, it (re-)computes schedules for
+/// all executor groups and then tries to admit queries using the list of schedules.
+/// Admission is always attempted in the same order so that executor groups fill up before
+/// further ones are considered. In particular, we don't attempt to balance the queries
+/// across executor groups.
+///
+/// Example without executor groups:
 /// Consider a 10-node cluster with 100gb/node and a resource pool 'q1' configured with
 /// 500gb of aggregate memory and 40gb as the max memory limit. An incoming request with
 /// the MEM_LIMIT query option set to 50gb and scheduled to execute on all backends is
-/// received by AdmitQuery() on an otherwise quiet cluster. Based on the pool
+/// received by SubmitForAdmission() on an otherwise quiet cluster. Based on the pool
 /// configuration, a per host mem limit of 40gb is used for this query and for any
-/// subsequent checks that it needs to pass prior admission. CanAdmitRequest() checks for
-/// a valid pool config and the number of running queries and then calls
+/// subsequent checks that it needs to pass prior to admission. FindGroupToAdmitOrReject()
+/// computes a schedule for the default executor group and performs rejection tests before
+/// calling CanAdmitRequest(), which checks the number of running queries and then calls
 /// HasAvailableMemResources() to check for memory resources. It first checks whether
 /// there is enough memory for the request using PoolStats::EffectiveMemReserved() (which
 /// is the max of the pool's agg_mem_reserved_ and local_mem_admitted_, see #1 above),
-/// then checks for enough memory on each individual host via the max of the values in the
-/// host_mem_reserved_ and host_mem_admitted_ maps (see #2 above) and finally checks if
-/// the memory limit used for this query can accommodate its largest initial reservation.
-/// In this case, ample resources are available so CanAdmitRequest() returns true.
-/// PoolStats::Admit() is called to update q1's PoolStats: it first updates
-/// agg_num_running_ and local_mem_admitted_ which are able to be used immediately for
-/// incoming admission requests, then it updates num_admitted_running in the struct sent
-/// to the statestore (local_stats_). UpdateHostMemAdmitted() is called to update the
-/// per-host admitted mem (stored in the map host_mem_admitted_) for all participating
-/// hosts. Then AdmitQuery() returns to the Scheduler. If another identical admission
-/// request is received by the same coordinator immediately, it will be rejected because
-/// q1's local_mem_admitted_ is already 400gb. If that request were sent to another
-/// impalad at the same time, it would have been admitted because not all updates have
-/// been disseminated yet. The next statestore update will contain the updated value of
+/// then checks for enough memory on each individual host via the max of mem_reserved and
+/// mem_admitted in hosts_stats_ (see #2 above) and finally checks if the memory limit
+/// used for this query can accommodate its largest initial reservation. In this case,
+/// ample resources are available so CanAdmitRequest() returns true. PoolStats::Admit() is
+/// called to update q1's PoolStats: it first updates agg_num_running_ and
+/// local_mem_admitted_ which are available to be used immediately for incoming admission
+/// requests, then it updates num_admitted_running in the struct sent to the statestore
+/// (local_stats_). UpdateHostStats() is called to update the per-host admitted mem
+/// (stored in the map host_stats_) for all participating hosts. Then SubmitForAdmission()
+/// returns to the ClientRequestState. If another identical admission request is received
+/// by the same coordinator immediately, it will be rejected because q1's
+/// local_mem_admitted_ is already 400gb. If that request were sent to another impalad at
+/// the same time, it would have been admitted because not all updates have been
+/// disseminated yet. The next statestore update will contain the updated value of
 /// num_admitted_running for q1 on this backend. As remote fragments begin execution on
 /// remote impalads, their pool mem trackers will reflect the updated amount of memory
 /// reserved (set in local_stats_.backend_mem_reserved by UpdateMemTrackerStats()) and the
-/// next statestore updates coming from those impalads will send the updated value. As
+/// next statestore updates coming from those impalads will contain the updated value. As
 /// the statestore updates are received (in the subscriber callback fn UpdatePoolStats()),
 /// the incoming per-backend, per-pool mem_reserved values are aggregated to
 /// PoolStats::agg_mem_reserved_ (pool aggregate over all hosts) and backend_mem_reserved_
 /// (per-host aggregates over all pools). Once this has happened, any incoming admission
 /// request now has the updated state required to make correct admission decisions.
 ///
+/// Example with executor groups:
+/// Consider a cluster with a dedicated coordinator and 2 executor groups
+/// "default-pool-group-1" and "default-pool-group-2" (the number of executors per group
+/// does not matter for this example). Both executor groups will be able to serve requests
+/// from the default resource pool. Consider that each executor can only run one query at
+/// a time, i.e. --max_concurrent_queries=1 is specified for all executors. An incoming
+/// query is submitted through SubmitForAdmission(), which calls
+/// FindGroupToAdmitOrReject(). From there we call ComputeGroupSchedules() which calls
+/// compute schedules for both executor groups. Then we perform rejection tests and
+/// afterwards call CanAdmitRequest() for each of the schedules. Executor groups are
+/// processed in alphanumerically sorted order, so we attempt admission to group
+/// "default-pool-group-1" first. CanAdmitRequest() calls HasAvailableSlot() to check
+/// whether any of the hosts in the group have reached their maximum number of concurrent
+/// queries and since that is not the case, admission succeeds. The query is admitted and
+/// 'num_admitted' is incremented for each host in that group. When a second query arrives
+/// while the first one is still running, we perform the same steps. In particular we
+/// compute schedules for both groups and consider admission to default-pool-group-1
+/// first. However, the check in HasAvailableSlot() now fails and we will consider group
+/// default-pool-group-2 next. For this group, the check succeeds and the query is
+/// admitted, incrementing the num_admitted counter for each host in group
+/// default-pool-group-2.
+///
 /// Queuing Behavior:
-/// Once the resources in a pool are consumed each coordinator receiving requests will
+/// Once the resources in a pool are consumed, each coordinator receiving requests will
 /// begin queuing. While each individual queue is FIFO, there is no total ordering on the
 /// queued requests between admission controllers and no FIFO behavior is guaranteed for
 /// requests submitted to different coordinators. When resources become available, there
 /// is no synchronous coordination between nodes used to determine which get to dequeue
-/// and
-/// admit requests. Instead, we use a simple heuristic to try to dequeue a number of
+/// and admit requests. Instead, we use a simple heuristic to try to dequeue a number of
 /// requests proportional to the number of requests that are waiting in each individual
 /// admission controller to the total number of requests queued across all admission
 /// controllers (i.e. impalads). This limits the amount of overadmission that may result
-/// from a large amount of resources becoming available at the same time.
-/// When there are requests queued in multiple pools on the same host, the admission
-/// controller simply iterates over the pools in pool_stats_ and attempts to dequeue from
-/// each. This is fine for the max_requests limit, but is unfair for memory-based
-/// admission because the iteration order of pools effectively gives priority to the
-/// queues at the beginning. Requests across queues may be competing for the same
-/// resources on particular hosts, i.e. #2 in the description of memory-based admission
-/// above. Note the pool's max_mem_resources (#1) is not contented.
+/// from a large amount of resources becoming available at the same time. When there are
+/// requests queued in multiple pools on the same host, the admission controller simply
+/// iterates over the pools in pool_stats_ and attempts to dequeue from each. This is fine
+/// for the max_requests limit, but is unfair for memory-based admission because the
+/// iteration order of pools effectively gives priority to the queues at the beginning.
+/// Requests across queues may be competing for the same resources on particular hosts,
+/// i.e. #2 in the description of memory-based admission above. Note the pool's
+/// max_mem_resources (#1) is not contented.
 ///
 /// Cancellation Behavior:
-/// An admission request<schedule, admit_outcome> submitted using AdmitQuery() can be
-/// proactively cancelled by setting the 'admit_outcome' to AdmissionOutcome::CANCELLED.
-/// This is handled asynchronously by AdmitQuery() and DequeueLoop().
+/// An admission request<schedule, admit_outcome> submitted using SubmitForAdmission() can
+/// be proactively cancelled by setting the 'admit_outcome' to
+/// AdmissionOutcome::CANCELLED. This is handled asynchronously by SubmitForAdmission()
+/// and DequeueLoop().
 ///
 /// Pool Configuration Mechanism:
 /// The path to pool config files are specified using the startup flags
@@ -211,7 +281,6 @@ enum class AdmissionOutcome {
 /// are only propagated to Impala when a new query is serviced. See RequestPoolService
 /// class for more details.
 ///
-/// TODO: Improve the dequeuing policy. IMPALA-2968.
 
 class AdmissionController {
  public:
@@ -227,6 +296,7 @@ class AdmissionController {
   static const std::string PROFILE_INFO_VAL_INITIAL_QUEUE_REASON;
   static const std::string PROFILE_INFO_KEY_LAST_QUEUED_REASON;
   static const std::string PROFILE_INFO_KEY_ADMITTED_MEM;
+  static const std::string PROFILE_INFO_KEY_EXECUTOR_GROUP;
   static const std::string PROFILE_INFO_KEY_STALENESS_WARNING;
   static const std::string PROFILE_TIME_SINCE_LAST_UPDATE_COUNTER_NAME;
 
@@ -235,17 +305,30 @@ class AdmissionController {
       MetricGroup* metrics, const TNetworkAddress& host_addr);
   ~AdmissionController();
 
-  /// Submits the request for admission. Returns immediately if rejected, but otherwise
-  /// blocks until the request is either admitted, times out or cancelled by the client
-  /// (by setting 'admit_outcome' to CANCELLED). When this method returns the following
-  /// <admit_outcome, Return Status> pairs are possible:
+  /// This struct contains all information needed to create a QuerySchedule and try to
+  /// admit it. None of the members are owned by the instances of this class (usually they
+  /// are owned by the ClientRequestState).
+  struct AdmissionRequest {
+    const TUniqueId& query_id;
+    const TQueryExecRequest& request;
+    const TQueryOptions& query_options;
+    RuntimeProfile* summary_profile;
+    RuntimeProfile::EventSequence* query_events;
+  };
+
+  /// Submits the request for admission. May returns immediately if rejected, but
+  /// otherwise blocks until the request is either admitted, times out, gets rejected
+  /// later, or cancelled by the client (by setting 'admit_outcome' to CANCELLED). When
+  /// this method returns, the following <admit_outcome, Return Status> pairs are
+  /// possible:
   /// - Admitted: <ADMITTED, Status::OK>
-  /// - Rejected or timed out: <REJECTED_OR_TIMED_OUT, Status(msg: reason for the same)>
+  /// - Rejected or timed out: <REJECTED or TIMED_OUT, Status(msg: reason for the same)>
   /// - Cancelled: <CANCELLED, Status::CANCELLED>
   /// If admitted, ReleaseQuery() should also be called after the query completes or gets
   /// cancelled to ensure that the pool statistics are updated.
-  Status SubmitForAdmission(QuerySchedule* schedule,
-      Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome);
+  Status SubmitForAdmission(const AdmissionRequest& request,
+      Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,
+      std::unique_ptr<QuerySchedule>* schedule_result);
 
   /// Updates the pool statistics when a query completes (either successfully,
   /// is cancelled or failed). This should be called for all requests that have
@@ -274,11 +357,26 @@ class AdmissionController {
   /// Calls ResetInformationalStats on all pools.
   void ResetAllPoolInformationalStats();
 
+  // This struct stores per-host statistics which are used during admission and by HTTP
+  // handlers to query admission control statistics for currently registered backends.
+  struct HostStats {
+    /// The mem reserved for a query that is currently executing is its memory limit, if
+    /// set (which should be the common case with admission control). Otherwise, if the
+    /// query has no limit or the query is finished executing, the current consumption
+    /// (tracked by its query mem tracker) is used.
+    int64_t mem_reserved = 0;
+    /// The per host mem admitted only for the queries admitted locally.
+    int64_t mem_admitted = 0;
+    /// The per host number of queries admitted only for the queries admitted locally.
+    int64_t num_admitted = 0;
+  };
+
+  typedef std::unordered_map<std::string, HostStats> PerHostStats;
+
   // Populates the input map with the per host memory reserved and admitted in the
   // following format: <host_address_str, pair<mem_reserved, mem_admitted>>.
   // Only used for populating the 'backends' debug page.
-  void PopulatePerHostMemReservedAndAdmitted(
-      std::unordered_map<std::string, std::pair<int64_t, int64_t>>* mem_map);
+  void PopulatePerHostMemReservedAndAdmitted(PerHostStats* host_stats);
 
   /// Returns a non-empty string with a warning if the admission control data is stale.
   /// 'prefix' is added to the start of the string. Returns an empty string if not stale.
@@ -321,18 +419,7 @@ class AdmissionController {
   /// MonotonicMillis(), or is 0 if an update was never received.
   int64_t last_topic_update_time_ms_ = 0;
 
-  /// Maps from host id to memory reserved and memory admitted, both aggregates over all
-  /// pools. See the class doc for a detailed definition of reserved and admitted.
-  /// Protected by admission_ctrl_lock_.
-  typedef boost::unordered_map<std::string, int64_t> HostMemMap;
-  /// The mem reserved for a query that is currently executing is its memory limit, if set
-  /// (which should be the common case with admission control). Otherwise, if the query
-  /// has no limit or the query is finished executing, the current consumption (tracked
-  /// by its query mem tracker) is used.
-  HostMemMap host_mem_reserved_;
-
-  /// The per host mem admitted only for the queries admitted locally.
-  HostMemMap host_mem_admitted_;
+  PerHostStats host_stats_;
 
   /// Contains all per-pool statistics and metrics. Accessed via GetPoolStats().
   class PoolStats {
@@ -397,9 +484,9 @@ class AdmissionController {
     /// Updates the pool stats when the request represented by 'schedule' is released.
     void Release(const QuerySchedule& schedule, int64_t peak_mem_consumption);
     /// Updates the pool stats when the request represented by 'schedule' is queued.
-    void Queue(const QuerySchedule& schedule);
+    void Queue();
     /// Updates the pool stats when the request represented by 'schedule' is dequeued.
-    void Dequeue(const QuerySchedule& schedule, bool timed_out);
+    void Dequeue(bool timed_out);
 
     // STATESTORE CALLBACK METHODS
     /// Updates the local_stats_.backend_mem_reserved with the pool mem tracker. Called
@@ -414,6 +501,11 @@ class AdmissionController {
     /// are removed (i.e. topic deletion).
     void UpdateRemoteStats(const std::string& backend_id, TPoolStats* host_stats);
 
+    /// Maps from host id to memory reserved and memory admitted, both aggregates over all
+    /// pools. See the class doc for a detailed definition of reserved and admitted.
+    /// Protected by admission_ctrl_lock_.
+    typedef boost::unordered_map<std::string, int64_t> HostMemMap;
+
     /// Called after updating local_stats_ and remote_stats_ to update the aggregate
     /// values of agg_num_running_, agg_num_queued_, and agg_mem_reserved_. The in/out
     /// parameter host_mem_reserved is a map from host id to memory reserved used to
@@ -445,6 +537,8 @@ class AdmissionController {
     /// average of wait time.
     void ResetInformationalStats();
 
+    const std::string& name() const { return name_; }
+
    private:
     const std::string name_;
     AdmissionController* parent_;
@@ -505,7 +599,7 @@ class AdmissionController {
     FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
     FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
     FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
-    FRIEND_TEST(AdmissionControllerTest, RejectImmediately);
+    FRIEND_TEST(AdmissionControllerTest, QueryRejection);
     friend class AdmissionControllerTest;
   };
 
@@ -514,28 +608,83 @@ class AdmissionController {
   typedef boost::unordered_map<std::string, PoolStats> PoolStatsMap;
   PoolStatsMap pool_stats_;
 
+  /// This struct groups together a schedule and the executor group that it was scheduled
+  /// on. It is used to attempt admission without rescheduling the query in case the
+  /// cluster membership has not changed. Users of the struct must make sure that
+  /// executor_group stays valid.
+  struct GroupSchedule {
+    GroupSchedule(
+        std::unique_ptr<QuerySchedule> schedule, const ExecutorGroup& executor_group)
+      : schedule(std::move(schedule)), executor_group(executor_group) {}
+    std::unique_ptr<QuerySchedule> schedule;
+    const ExecutorGroup& executor_group;
+  };
+
   /// The set of pools that have changed between topic updates that need stats to be sent
   /// to the statestore. The key is the pool name.
   typedef boost::unordered_set<std::string> PoolSet;
   PoolSet pools_for_updates_;
 
-  /// Structure stored in a QueryQueue representing a request. This struct lives only
-  /// during the call to AdmitQuery() but its members live past that and are owned by the
-  /// ClientRequestState object associated with them.
+  /// Structure stored in the RequestQueue representing an admission request. This struct
+  /// lives only during the call to AdmitQuery() but its members live past that and are
+  /// owned by the ClientRequestState object associated with them.
+  ///
+  /// Objects of this class progress linearly through the following states.
+  /// - Initialized: The request has been created
+  /// - Admitting: The request has been attempted to be admitted at least once and
+  ///   additional intermediate state has been stored in some members
+  /// - Admitted: The request was admitted, cancelled, or rejected and 'admit_outcome' is
+  ///   set. If it was admitted, 'admitted_schedule' is also not nullptr.
   struct QueueNode : public InternalQueue<QueueNode>::Node {
-    QueueNode(QuerySchedule* query_schedule,
+    QueueNode(AdmissionRequest request,
         Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admission_outcome,
         RuntimeProfile* profile)
-      : schedule(query_schedule), admit_outcome(admission_outcome), profile(profile) {}
+      : admission_request(std::move(request)),
+        profile(profile),
+        admit_outcome(admission_outcome) {}
+
+    /////////////////////////////////////////
+    /// BEGIN: Members that are valid for new objects after initialization
+
+    /// The admission request contains everything required to build schedules.
+    const AdmissionRequest admission_request;
+
+    /// Profile to be updated with information about admission.
+    RuntimeProfile* profile;
+
+    /// END: Members that are valid for new objects after initialization
+    /////////////////////////////////////////
 
-    /// The query schedule of the queued request.
-    QuerySchedule* const schedule;
+    /////////////////////////////////////////
+    /// BEGIN: Members that are only valid while queued, but invalid once dequeued.
+
+    /// The membership snapshot used during the last admission attempt. It can be nullptr
+    /// before the first admission attempt and if any schedules have been created,
+    /// 'group_schedule' will contain the corresponding schedules and executor groups.
+    ClusterMembershipMgr::SnapshotPtr membership_snapshot;
+
+    /// List of schedules and executor groups that can be attempted to be admitted for
+    /// this queue node.
+    std::vector<GroupSchedule> group_schedules;
+
+    /// END: Members that are only valid while queued, but invalid once dequeued.
+    /////////////////////////////////////////
+
+    /////////////////////////////////////////
+    /// BEGIN: Members that are valid after admission / cancellation / rejection
+
+    /// The last reason why this request could not be admitted.
+    std::string not_admitted_reason;
 
     /// The Admission outcome of the queued request.
     Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* const admit_outcome;
 
-    /// Profile to be updated with information about admission.
-    RuntimeProfile* const profile;
+    /// The schedule of the query if it was admitted successfully. Nullptr if it has not
+    /// been admitted or was cancelled or rejected.
+    std::unique_ptr<QuerySchedule> admitted_schedule = nullptr;
+
+    /// END: Members that are valid after admission / cancellation / rejection
+    /////////////////////////////////////////
   };
 
   /// Queue for the queries waiting to be admitted for execution. Once the
@@ -560,6 +709,11 @@ class AdmissionController {
   /// If true, tear down the dequeuing thread. This only happens in unit tests.
   bool done_;
 
+  /// Resolves the resource pool name in 'query_ctx.request_pool' and stores the resulting
+  /// name in 'pool_name' and the resulting config in 'pool_config'.
+  Status ResolvePoolAndGetConfig(const TQueryCtx& query_ctx, std::string* pool_name,
+      TPoolConfig* pool_config);
+
   /// Statestore subscriber callback that sends outgoing topic deltas (see
   /// AddPoolUpdates()) and processes incoming topic deltas, updating the PoolStats
   /// state.
@@ -577,12 +731,34 @@ class AdmissionController {
   /// statestore. Called by UpdatePoolStats(). Must hold admission_ctrl_lock_.
   void HandleTopicUpdates(const std::vector<TTopicItem>& topic_updates);
 
-  /// Re-computes the per-pool aggregate stats and the per-host aggregates in
-  /// host_mem_reserved_ using each pool's remote_stats_ and local_stats_.
+  /// Re-computes the per-pool aggregate stats and the per-host aggregates in host_stats_
+  /// using each pool's remote_stats_ and local_stats_.
   /// Called by UpdatePoolStats() after handling updates and deletions.
   /// Must hold admission_ctrl_lock_.
   void UpdateClusterAggregates();
 
+  /// Computes schedules for all executor groups that can run the query in 'queue_node'.
+  /// For subsequent calls schedules are only re-computed if the membership version inside
+  /// 'membership_snapshot' has changed. Will return any errors that occur during
+  /// scheduling, e.g. if the scan range generation fails. Note that this will not return
+  /// an error if no executor groups are available for scheduling, but will set
+  /// 'queue_node->not_admitted_reason' and leave 'queue_node->group_schedules' empty in
+  /// that case.
+  Status ComputeGroupSchedules(
+      ClusterMembershipMgr::SnapshotPtr membership_snapshot, QueueNode* queue_node);
+
+  /// Reschedules the query if necessary using 'membership_snapshot' and tries to find an
+  /// executor group that the query can be admitted to. If the query is unable to run on
+  /// any of the groups irrespective of their current workload, it is rejected. Returns
+  /// true and sets queue_node->admitted_schedule if the query can be admitted. Returns
+  /// true and keeps queue_node->admitted_schedule unset if the query cannot be admitted
+  /// now, but also does not need to be rejected. If the query must be rejected, this
+  /// method returns false and sets queue_node->not_admitted_reason.
+  bool FindGroupToAdmitOrReject(
+      int64_t cluster_size, ClusterMembershipMgr::SnapshotPtr membership_snapshot,
+      const TPoolConfig& pool_config, bool admit_from_queue, PoolStats* pool_stats,
+      QueueNode* queue_node);
+
   /// Dequeues the queued queries when notified by dequeue_cv_ and admits them if they
   /// have not been cancelled yet.
   void DequeueLoop();
@@ -590,7 +766,7 @@ class AdmissionController {
   /// Returns true if schedule can be admitted to the pool with pool_cfg.
   /// admit_from_queue is true if attempting to admit from the queue. Otherwise, returns
   /// false and not_admitted_reason specifies why the request can not be admitted
-  /// immediately. Caller owns not_admitted_reason.  Must hold admission_ctrl_lock_.
+  /// immediately. Caller owns not_admitted_reason. Must hold admission_ctrl_lock_.
   bool CanAdmitRequest(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
       int64_t cluster_size, bool admit_from_queue, std::string* not_admitted_reason);
 
@@ -609,35 +785,79 @@ class AdmissionController {
 
   /// Returns true if there is enough memory available to admit the query based on the
   /// schedule, the aggregate pool memory, and the per-host memory. If not, this returns
-  /// false and returns the reason in mem_unavailable_reason. Caller owns
-  /// mem_unavailable_reason. Must hold admission_ctrl_lock_.
+  /// false and returns the reason in 'mem_unavailable_reason'. Caller owns
+  /// 'mem_unavailable_reason'.
+  /// Must hold admission_ctrl_lock_.
   bool HasAvailableMemResources(const QuerySchedule& schedule,
       const TPoolConfig& pool_cfg, int64_t cluster_size,
       std::string* mem_unavailable_reason);
 
-  /// Adds per_node_mem to host_mem_admitted_ for each host in schedule. Must hold
-  /// admission_ctrl_lock_. Note that per_node_mem may be negative when a query completes.
-  void UpdateHostMemAdmitted(const QuerySchedule& schedule, int64_t per_node_mem);
-
-  /// Returns true if this request must be rejected immediately, e.g. requires more
-  /// memory than possible to reserve or the queue is already full. If true,
-  /// rejection_reason is set to a explanation of why the request was rejected.
+  /// Returns true if there is an available slot on all executors in the schedule. The
+  /// number of slots per executors does not change with the group or cluster size and
+  /// instead always uses pool_cfg.max_requests. If a host does not have a free slot, this
+  /// returns false and sets 'unavailable_reason'.
+  /// Must hold admission_ctrl_lock_.
+  bool HasAvailableSlot(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
+      string* unavailable_reason);
+
+  /// Adds 'per_node_mem' and 'num_queries' to the per-host stats in host_stats_ for each
+  /// host in 'schedule'. Must hold admission_ctrl_lock_. Note that 'per_node_mem' and
+  /// 'num_queries' may be negative when a query completes.
+  void UpdateHostStats(
+      const QuerySchedule& schedule, int64_t per_node_mem, int64_t num_queries);
+
+  /// Rejection happens in several stages
+  /// 1) Based on static pool configuration
+  ///     - Check if the pool is disabled (max_requests = 0, max_mem = 0)
+  ///     - min_query_mem_limit > max_query_mem_limit (From IsPoolConfigValidForCluster)
+  ///
+  /// 2) Based on the entire cluster size
+  ///     - Check for maximum queue size (queue full)
+  ///
+  /// 3) Based on the executor group size
+  ///     - pool.min_query_mem_limit > max_mem (From IsPoolConfigValidForCluster)
+  ///       - max_mem may depend on group size
+  ///
+  /// 4) Based on a schedule
+  ///     - largest_min_mem_reservation > buffer_pool_limit
+  ///     - CanAccommodateMaxInitialReservation
+  ///     - Thread reservation limit (thread_reservation_limit,
+  ///       thread_reservation_aggregate_limit)
+  ///     - cluster_min_mem_reservation_bytes > max_mem
+  ///     - cluster_mem_to_admit > max_mem
+  ///     - per_backend_mem_to_admit > min_admit_mem_limit
+  ///
+  /// We lump together 1 & 2 and 3 & 4. The first two depend on the total cluster size.
+  /// The latter 2 depend on the executor group size and therefore on the schedule. If no
+  /// executor group is available, the query will be queued.
+
+  /// Returns true if a request must be rejected immediately based on the pool
+  /// configuration and cluster size, e.g. if the pool config is invalid, the pool is
+  /// disabled, or the queue is already full.
   /// Must hold admission_ctrl_lock_.
-  bool RejectImmediately(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
-      int64_t cluster_size, std::string* rejection_reason);
+  bool RejectForCluster(const std::string& pool_name, const TPoolConfig& pool_cfg,
+      bool admit_from_queue, int64_t cluster_size, std::string* rejection_reason);
+
+  /// Returns true if a request must be rejected immediately based on the pool
+  /// configuration and a particular schedule, e.g. because the memory requirements of the
+  /// query exceed the maximum of the group. This assumes that all executor groups for a
+  /// pool are uniform and that a query rejected for one group will not be able to run on
+  /// other groups, either.
+  /// Must hold admission_ctrl_lock_.
+  bool RejectForSchedule(const QuerySchedule& schedule, const TPoolConfig& pool_cfg,
+      int64_t cluster_size, int64_t group_size, std::string* rejection_reason);
 
   /// Gets or creates the PoolStats for pool_name. Must hold admission_ctrl_lock_.
-  PoolStats* GetPoolStats(const std::string& pool_name);
+  PoolStats* GetPoolStats(const std::string& pool_name, bool dcheck_exists = false);
+
+  /// Gets or creates the PoolStats for query schedule 'schedule'. Scheduling must be done
+  /// already and the schedule must have an associated executor_group.
+  PoolStats* GetPoolStats(const QuerySchedule& schedule);
 
-  /// Log the reason for dequeueing of 'node' failing and add the reason to the query's
+  /// Log the reason for dequeuing of 'node' failing and add the reason to the query's
   /// profile. Must hold admission_ctrl_lock_.
   static void LogDequeueFailed(QueueNode* node, const std::string& not_admitted_reason);
 
-  /// Returns false if pool config is invalid and populates the 'reason' with the reason
-  /// behind invalidity.
-  static bool IsPoolConfigValidForCluster(
-      const TPoolConfig& pool_cfg, int64_t cluster_size, std::string* reason);
-
   /// Sets the per host mem limit and mem admitted in the schedule and does the necessary
   /// accounting and logging on successful submission.
   /// Caller must hold 'admission_ctrl_lock_'.
@@ -700,16 +920,24 @@ class AdmissionController {
   static std::string GetMaxQueuedForPoolDescription(
       const TPoolConfig& pool_config, int64_t cluster_size);
 
+  /// Return all executor groups from 'all_groups' that can be used to run queries in
+  /// 'pool_name'.
+  void GetExecutorGroupsForPool(const ClusterMembershipMgr::ExecutorGroups& all_groups,
+      const std::string& pool_name, std::vector<const ExecutorGroup*>* matching_groups);
+
   /// Returns the current size of the cluster.
-  /// The minimum cluster size that is returned is 1.
-  int64_t GetClusterSize();
+  int64_t GetClusterSize(const ClusterMembershipMgr::Snapshot& membership_snapshot);
+
+  /// Returns the size of executor group 'group_name' in 'membership_snapshot'.
+  int64_t GetExecutorGroupSize(const ClusterMembershipMgr::Snapshot& membership_snapshot,
+      const std::string& group_name);
 
   FRIEND_TEST(AdmissionControllerTest, Simple);
   FRIEND_TEST(AdmissionControllerTest, PoolStats);
   FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
   FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestCount);
   FRIEND_TEST(AdmissionControllerTest, GetMaxToDequeue);
-  FRIEND_TEST(AdmissionControllerTest, RejectImmediately);
+  FRIEND_TEST(AdmissionControllerTest, QueryRejection);
   friend class AdmissionControllerTest;
 };
 
diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc b/be/src/scheduling/cluster-membership-mgr-test.cc
index bacd8e8..1e5e6db 100644
--- a/be/src/scheduling/cluster-membership-mgr-test.cc
+++ b/be/src/scheduling/cluster-membership-mgr-test.cc
@@ -23,6 +23,7 @@
 #include "gen-cpp/StatestoreService_types.h"
 #include "scheduling/cluster-membership-mgr.h"
 #include "scheduling/cluster-membership-test-util.h"
+#include "service/impala-server.h"
 #include "testutil/gtest-util.h"
 #include "testutil/rand-util.h"
 
@@ -56,6 +57,12 @@ class ClusterMembershipMgrTest : public testing::Test {
  protected:
   ClusterMembershipMgrTest() {}
 
+  /// Returns the size of the default executor group of the current membership in 'cmm'.
+  int GetDefaultGroupSize(const ClusterMembershipMgr& cmm) const {
+    const string& group_name = ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME;
+    return cmm.GetSnapshot()->executor_groups.find(group_name)->second.NumExecutors();
+  }
+
   /// A struct to hold information related to a simulated backend during the test.
   struct Backend {
     string backend_id;
@@ -309,20 +316,17 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
   // The mgr will return its changed TBackendDescriptor
   ASSERT_EQ(1, returned_topic_deltas.size());
   // It will also remove itself from the executor group (but not the current backends).
-  ASSERT_EQ(1,
-      cmm1.GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+  ASSERT_EQ(1, GetDefaultGroupSize(cmm1));
   ASSERT_EQ(2, cmm1.GetSnapshot()->current_backends.size());
 
   // Propagate the quiescing to the 2nd mgr
   *ss_topic_delta = returned_topic_deltas[0];
   returned_topic_deltas.clear();
-  ASSERT_EQ(2,
-      cmm2.GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+  ASSERT_EQ(2, GetDefaultGroupSize(cmm2));
   cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
   ASSERT_EQ(0, returned_topic_deltas.size());
   ASSERT_EQ(2, cmm2.GetSnapshot()->current_backends.size());
-  ASSERT_EQ(1,
-      cmm2.GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+  ASSERT_EQ(1, GetDefaultGroupSize(cmm2));
 
   // Delete the 1st backend from the 2nd one
   ASSERT_EQ(1, ss_topic_delta->topic_entries.size());
@@ -330,9 +334,7 @@ TEST_F(ClusterMembershipMgrTest, TwoInstances) {
   cmm2.UpdateMembership(topic_delta_map, &returned_topic_deltas);
   ASSERT_EQ(0, returned_topic_deltas.size());
   ASSERT_EQ(1, cmm2.GetSnapshot()->current_backends.size());
-  ASSERT_EQ(1,
-      cmm2.GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
-
+  ASSERT_EQ(1, GetDefaultGroupSize(cmm2));
 }
 
 // This test runs a group of 20 backends through their full lifecycle, validating that
@@ -357,8 +359,7 @@ TEST_F(ClusterMembershipMgrTest, FullLifecycleMultipleBackends) {
   // group.
   for (Backend* be : running_) {
     EXPECT_EQ(running_.size(), be->cmm->GetSnapshot()->current_backends.size());
-    EXPECT_EQ(running_.size(),
-        be->cmm->GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+    EXPECT_EQ(running_.size(), GetDefaultGroupSize(*be->cmm));
   }
 
   // Quiesce half of the backends.
@@ -367,12 +368,10 @@ TEST_F(ClusterMembershipMgrTest, FullLifecycleMultipleBackends) {
     // All backends must still remain online
     EXPECT_EQ(NUM_BACKENDS, be->cmm->GetSnapshot()->current_backends.size());
 
-    EXPECT_EQ(NUM_BACKENDS - i,
-        be->cmm->GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+    EXPECT_EQ(NUM_BACKENDS - i, GetDefaultGroupSize(*be->cmm));
     QuiesceBackend(be);
     // Make sure that the numbers drop
-    EXPECT_EQ(NUM_BACKENDS - i - 1,
-        be->cmm->GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+    EXPECT_EQ(NUM_BACKENDS - i - 1, GetDefaultGroupSize(*be->cmm));
   }
   int num_still_running = NUM_BACKENDS - quiescing_.size();
   ASSERT_EQ(num_still_running, running_.size());
@@ -382,8 +381,7 @@ TEST_F(ClusterMembershipMgrTest, FullLifecycleMultipleBackends) {
     // All backends are still registered
     EXPECT_EQ(backends_.size(), be->cmm->GetSnapshot()->current_backends.size());
     // Executor groups now show half of the backends remaining
-    EXPECT_EQ(num_still_running,
-        be->cmm->GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+    EXPECT_EQ(num_still_running, GetDefaultGroupSize(*be->cmm));
   }
 
   // Delete half of the backends and make sure that the other half learned about it.
@@ -408,8 +406,7 @@ TEST_F(ClusterMembershipMgrTest, FullLifecycleMultipleBackends) {
   while (!running_.empty()) QuiesceBackend(running_.front());
   for (auto& be : backends_) {
     // Executor groups now are empty
-    EXPECT_EQ(0,
-        be->cmm->GetSnapshot()->executor_groups.find("default")->second.NumExecutors());
+    EXPECT_EQ(0, GetDefaultGroupSize(*be->cmm));
   }
 }
 
diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc
index 09a3678..ce859e2 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -21,11 +21,28 @@
 #include "common/names.h"
 #include "util/test-info.h"
 
-namespace impala {
+namespace {
+using namespace impala;
+
+/// Looks for an executor group with name 'name' in 'executor_groups' and returns it. If
+/// the group doesn't exist yet, it creates a new one and inserts it into
+/// 'executor_groups'.
+ExecutorGroup* FindOrInsertExecutorGroup(const TExecutorGroupDesc& group,
+    ClusterMembershipMgr::ExecutorGroups* executor_groups) {
+  auto it = executor_groups->find(group.name);
+  if (it != executor_groups->end()) {
+    DCHECK_EQ(group.name, it->second.name());
+    return &it->second;
+  }
+  bool inserted;
+  tie(it, inserted) = executor_groups->emplace(group.name, ExecutorGroup(group));
+  DCHECK(inserted);
+  return &it->second;
+}
 
-const string ClusterMembershipMgr::DEFAULT_EXECUTOR_GROUP = "default";
-static const vector<string> DEFAULT_EXECUTOR_GROUPS =
-    {ClusterMembershipMgr::DEFAULT_EXECUTOR_GROUP};
+}
+
+namespace impala {
 
 ClusterMembershipMgr::ClusterMembershipMgr(string local_backend_id,
     StatestoreSubscriber* subscriber) :
@@ -152,6 +169,7 @@ void ClusterMembershipMgr::UpdateMembership(
     }
   }
   if (local_be_desc.get() != nullptr) new_state->local_be_desc = local_be_desc;
+  new_state->version += 1;
 
   // Process removed, new, and updated entries from the topic update and apply the changes
   // to the new backend map and executor groups.
@@ -163,11 +181,11 @@ void ClusterMembershipMgr::UpdateMembership(
       if (new_backend_map->find(item.key) != new_backend_map->end()) {
         const TBackendDescriptor& be_desc = (*new_backend_map)[item.key];
         if (be_desc.is_executor && !be_desc.is_quiescing) {
-          const vector<string>& groups = DEFAULT_EXECUTOR_GROUPS;
-          for (const string& group : groups) {
+          for (const auto& group : be_desc.executor_groups) {
             VLOG(1) << "Removing backend " << item.key << " from group " << group
                     << " (deleted)";
-            (*new_executor_groups)[group].RemoveExecutor(be_desc);
+            FindOrInsertExecutorGroup(
+                group, new_executor_groups)->RemoveExecutor(be_desc);
           }
         }
         new_backend_map->erase(item.key);
@@ -220,11 +238,10 @@ void ClusterMembershipMgr::UpdateMembership(
       TBackendDescriptor& existing = it->second;
       if (be_desc.is_quiescing && !existing.is_quiescing && existing.is_executor) {
         // Executor needs to be removed from its groups
-        const vector<string>& groups = DEFAULT_EXECUTOR_GROUPS;
-        for (const string& group : groups) {
+        for (const auto& group : be_desc.executor_groups) {
           VLOG(1) << "Removing backend " << item.key << " from group " << group
                   << " (quiescing)";
-          (*new_executor_groups)[group].RemoveExecutor(be_desc);
+          FindOrInsertExecutorGroup(group, new_executor_groups)->RemoveExecutor(be_desc);
         }
       }
       existing = be_desc;
@@ -232,10 +249,9 @@ void ClusterMembershipMgr::UpdateMembership(
       // Create
       new_backend_map->insert(make_pair(item.key, be_desc));
       if (!be_desc.is_quiescing && be_desc.is_executor) {
-        const vector<string>& groups = DEFAULT_EXECUTOR_GROUPS;
-        for (const string& group : groups) {
+        for (const auto& group : be_desc.executor_groups) {
           VLOG(1) << "Adding backend " << item.key << " to group " << group;
-          (*new_executor_groups)[group].AddExecutor(be_desc);
+          FindOrInsertExecutorGroup(group, new_executor_groups)->AddExecutor(be_desc);
         }
       }
     }
@@ -246,19 +262,16 @@ void ClusterMembershipMgr::UpdateMembership(
   // in case it was reset to empty above.
   if (NeedsLocalBackendUpdate(*new_state, local_be_desc)) {
     // We need to update both the new membership state and the statestore
-    new_state->current_backends[local_backend_id_] = *local_be_desc;
-    const vector<string>& groups = DEFAULT_EXECUTOR_GROUPS;
-    for (const string& group : groups) {
+    (*new_backend_map)[local_backend_id_] = *local_be_desc;
+    for (const auto& group : local_be_desc->executor_groups) {
       if (local_be_desc->is_quiescing) {
         VLOG(1) << "Removing local backend from group " << group;
-        (*new_executor_groups)[group].RemoveExecutor(*local_be_desc);
+        FindOrInsertExecutorGroup(
+            group, new_executor_groups)->RemoveExecutor(*local_be_desc);
       } else if (local_be_desc->is_executor) {
         VLOG(1) << "Adding local backend to group " << group;
-        (*new_executor_groups)[group].AddExecutor(*local_be_desc);
-      } else {
-        //TODO(IMPALA-8484): Remove this when it's no longer needed
-        VLOG(1) << "Creating empty default executor group";
-        new_executor_groups->emplace(group, ExecutorGroup());
+        FindOrInsertExecutorGroup(
+            group, new_executor_groups)->AddExecutor(*local_be_desc);
       }
     }
     AddLocalBackendToStatestore(*local_be_desc, subscriber_topic_updates);
diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h
index fc8b58e..a6439d9 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -94,6 +94,10 @@ class ClusterMembershipMgr {
     BackendIdMap current_backends;
     /// A map of executor groups by their names.
     ExecutorGroups executor_groups;
+
+    /// The version of this Snapshot. It is incremented every time the cluster membership
+    /// changes.
+    int64_t version = 0;
   };
 
   /// An immutable shared membership snapshot.
@@ -148,9 +152,6 @@ class ClusterMembershipMgr {
   /// updates and the status of the local backend.
   SnapshotPtr GetSnapshot() const;
 
-  /// The default executor group name.
-  static const std::string DEFAULT_EXECUTOR_GROUP;
-
   /// Handler for statestore updates, called asynchronously when an update is received
   /// from the subscription manager. This method processes incoming updates from the
   /// statestore and applies them to the current membership state. It also ensures that
diff --git a/be/src/scheduling/cluster-membership-test-util.cc b/be/src/scheduling/cluster-membership-test-util.cc
index cd70761..0b99641 100644
--- a/be/src/scheduling/cluster-membership-test-util.cc
+++ b/be/src/scheduling/cluster-membership-test-util.cc
@@ -18,6 +18,8 @@
 #include "scheduling/cluster-membership-test-util.h"
 #include "common/logging.h"
 #include "common/names.h"
+#include "scheduling/executor-group.h"
+#include "service/impala-server.h"
 
 static const int BACKEND_PORT = 1000;
 static const int KRPC_PORT = 2000;
@@ -42,7 +44,8 @@ string HostIdxToIpAddr(int host_idx) {
   return IP_PREFIX + suffix;
 }
 
-TBackendDescriptor MakeBackendDescriptor(int idx, int port_offset) {
+TBackendDescriptor MakeBackendDescriptor(
+    int idx, const TExecutorGroupDesc& group_desc, int port_offset) {
   TBackendDescriptor be_desc;
   be_desc.address.hostname = HostIdxToHostname(idx);
   be_desc.address.port = BACKEND_PORT + port_offset;
@@ -53,8 +56,24 @@ TBackendDescriptor MakeBackendDescriptor(int idx, int port_offset) {
   be_desc.__set_is_coordinator(true);
   be_desc.__set_is_executor(true);
   be_desc.is_quiescing = false;
+  be_desc.executor_groups.push_back(group_desc);
   return be_desc;
 }
 
+TBackendDescriptor MakeBackendDescriptor(
+    int idx, const ExecutorGroup& group, int port_offset) {
+  TExecutorGroupDesc group_desc;
+  group_desc.name = group.name();
+  group_desc.min_size = group.min_size();
+  return MakeBackendDescriptor(idx, group_desc, port_offset);
+}
+
+TBackendDescriptor MakeBackendDescriptor(int idx, int port_offset) {
+  TExecutorGroupDesc group_desc;
+  group_desc.name = ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME;
+  group_desc.min_size = 1;
+  return MakeBackendDescriptor(idx, group_desc, port_offset);
+}
+
 }  // end namespace test
 }  // end namespace impala
diff --git a/be/src/scheduling/cluster-membership-test-util.h b/be/src/scheduling/cluster-membership-test-util.h
index 2ce858c..3b282a7 100644
--- a/be/src/scheduling/cluster-membership-test-util.h
+++ b/be/src/scheduling/cluster-membership-test-util.h
@@ -21,6 +21,8 @@
 #include <string>
 
 namespace impala {
+class ExecutorGroup;
+
 namespace test {
 
 /// Convert a host index to a hostname.
@@ -30,9 +32,19 @@ std::string HostIdxToHostname(int host_idx);
 /// will specify the lower 24 bits of the IPv4 address (the lower 3 octets).
 std::string HostIdxToIpAddr(int host_idx);
 
-/// Builds a new backend descriptor. 'idx' is used to determine its name and IP address
-/// and the caller must make sure that it is unique across sets of hosts. To create
-/// backends on the same host, an optional port offset can be specified.
+/// Various methods to build a new backend descriptor. 'idx' is used to determine its name
+/// and IP address and the caller must make sure that it is unique across sets of hosts.
+/// To create backends on the same host, an optional port offset can be specified.
+///
+/// Make a backend descriptor for group 'group_desc'.
+TBackendDescriptor MakeBackendDescriptor(
+    int idx, const TExecutorGroupDesc& group_desc, int port_offset = 0);
+
+/// Make a backend descriptor for 'group'.
+TBackendDescriptor MakeBackendDescriptor(
+    int idx, const ExecutorGroup& group, int port_offset = 0);
+
+/// Make a backend descriptor for the default executor group.
 TBackendDescriptor MakeBackendDescriptor(int idx, int port_offset = 0);
 
 }  // end namespace test
diff --git a/be/src/scheduling/executor-group-test.cc b/be/src/scheduling/executor-group-test.cc
index 4eba14b..b47d44f 100644
--- a/be/src/scheduling/executor-group-test.cc
+++ b/be/src/scheduling/executor-group-test.cc
@@ -29,52 +29,76 @@ using namespace impala::test;
 
 /// Test adding multiple backends on different hosts.
 TEST(ExecutorGroupTest, AddExecutors) {
-  ExecutorGroup executor_group;
-  executor_group.AddExecutor(MakeBackendDescriptor(1));
-  executor_group.AddExecutor(MakeBackendDescriptor(2));
-  ASSERT_EQ(2, executor_group.NumExecutors());
+  ExecutorGroup group1("group1");
+  group1.AddExecutor(MakeBackendDescriptor(1, group1));
+  group1.AddExecutor(MakeBackendDescriptor(2, group1));
+  ASSERT_EQ(2, group1.NumExecutors());
   IpAddr backend_ip;
-  ASSERT_TRUE(executor_group.LookUpExecutorIp("host_1", &backend_ip));
+  ASSERT_TRUE(group1.LookUpExecutorIp("host_1", &backend_ip));
   EXPECT_EQ("10.0.0.1", backend_ip);
-  ASSERT_TRUE(executor_group.LookUpExecutorIp("host_2", &backend_ip));
+  ASSERT_TRUE(group1.LookUpExecutorIp("host_2", &backend_ip));
   EXPECT_EQ("10.0.0.2", backend_ip);
 }
 
 /// Test adding multiple backends on the same host.
 TEST(ExecutorGroupTest, MultipleExecutorsOnSameHost) {
-  ExecutorGroup executor_group;
-  executor_group.AddExecutor(MakeBackendDescriptor(1, /* port_offset=*/0));
-  executor_group.AddExecutor(MakeBackendDescriptor(1, /* port_offset=*/1));
+  ExecutorGroup group1("group1");
+  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/0));
+  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/1));
   IpAddr backend_ip;
-  ASSERT_TRUE(executor_group.LookUpExecutorIp("host_1", &backend_ip));
+  ASSERT_TRUE(group1.LookUpExecutorIp("host_1", &backend_ip));
   EXPECT_EQ("10.0.0.1", backend_ip);
-  const ExecutorGroup::Executors& backend_list =
-      executor_group.GetExecutorsForHost("10.0.0.1");
+  const ExecutorGroup::Executors& backend_list = group1.GetExecutorsForHost("10.0.0.1");
   EXPECT_EQ(2, backend_list.size());
 }
 
 /// Test removing a backend.
 TEST(ExecutorGroupTest, RemoveExecutor) {
-  ExecutorGroup executor_group;
-  executor_group.AddExecutor(MakeBackendDescriptor(1));
-  executor_group.AddExecutor(MakeBackendDescriptor(2));
-  executor_group.RemoveExecutor(MakeBackendDescriptor(2));
+  ExecutorGroup group1("group1");
+  group1.AddExecutor(MakeBackendDescriptor(1, group1));
+  group1.AddExecutor(MakeBackendDescriptor(2, group1));
+  group1.RemoveExecutor(MakeBackendDescriptor(2, group1));
   IpAddr backend_ip;
-  ASSERT_TRUE(executor_group.LookUpExecutorIp("host_1", &backend_ip));
+  ASSERT_TRUE(group1.LookUpExecutorIp("host_1", &backend_ip));
   EXPECT_EQ("10.0.0.1", backend_ip);
-  ASSERT_FALSE(executor_group.LookUpExecutorIp("host_2", &backend_ip));
+  ASSERT_FALSE(group1.LookUpExecutorIp("host_2", &backend_ip));
 }
 
 /// Test removing one of multiple backends on the same host (IMPALA-3944).
 TEST(ExecutorGroupTest, RemoveExecutorOnSameHost) {
-  ExecutorGroup executor_group;
-  executor_group.AddExecutor(MakeBackendDescriptor(1, /* port_offset=*/0));
-  executor_group.AddExecutor(MakeBackendDescriptor(1, /* port_offset=*/1));
-  executor_group.RemoveExecutor(MakeBackendDescriptor(1, /* port_offset=*/1));
+  ExecutorGroup group1("group1");
+  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/0));
+  group1.AddExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/1));
+  group1.RemoveExecutor(MakeBackendDescriptor(1, group1, /* port_offset=*/1));
   IpAddr backend_ip;
-  ASSERT_TRUE(executor_group.LookUpExecutorIp("host_1", &backend_ip));
+  ASSERT_TRUE(group1.LookUpExecutorIp("host_1", &backend_ip));
   EXPECT_EQ("10.0.0.1", backend_ip);
-  const ExecutorGroup::Executors& backend_list =
-      executor_group.GetExecutorsForHost("10.0.0.1");
+  const ExecutorGroup::Executors& backend_list = group1.GetExecutorsForHost("10.0.0.1");
   EXPECT_EQ(1, backend_list.size());
 }
+
+/// Test that exercises the size-based group health check.
+TEST(ExecutorGroupTest, HealthCheck) {
+  ExecutorGroup group1("group1", 2);
+  group1.AddExecutor(MakeBackendDescriptor(1, group1));
+  ASSERT_FALSE(group1.IsHealthy());
+  group1.AddExecutor(MakeBackendDescriptor(2, group1));
+  ASSERT_TRUE(group1.IsHealthy());
+  group1.RemoveExecutor(MakeBackendDescriptor(2, group1));
+  ASSERT_FALSE(group1.IsHealthy());
+}
+
+/// Tests that adding an inconsistent backend to a group fails.
+TEST(ExecutorGroupTest, TestAddInconsistent) {
+  ExecutorGroup group1("group1", 2);
+  group1.AddExecutor(MakeBackendDescriptor(1, group1));
+  // Backend for a group with a matching name but mismatching size is not allowed.
+  ExecutorGroup group_size_mismatch("group1", 3);
+  group1.AddExecutor(MakeBackendDescriptor(3, group_size_mismatch));
+  ASSERT_EQ(1, group1.NumExecutors());
+  // Backend for a group with a mismatching name can be added (See AddExecutor() for
+  // details).
+  ExecutorGroup group_name_mismatch("group_name_mismatch", 2);
+  group1.AddExecutor(MakeBackendDescriptor(2, group_name_mismatch));
+  ASSERT_EQ(2, group1.NumExecutors());
+}
diff --git a/be/src/scheduling/executor-group.cc b/be/src/scheduling/executor-group.cc
index f4e1354..5661154 100644
--- a/be/src/scheduling/executor-group.cc
+++ b/be/src/scheduling/executor-group.cc
@@ -26,8 +26,15 @@ namespace impala {
 // TODO: This can be tuned further with real world tests
 static const uint32_t NUM_HASH_RING_REPLICAS = 25;
 
-ExecutorGroup::ExecutorGroup()
-  : executor_ip_hash_ring_(NUM_HASH_RING_REPLICAS) {}
+ExecutorGroup::ExecutorGroup(string name) : ExecutorGroup(name, 1) {}
+
+ExecutorGroup::ExecutorGroup(string name, int64_t min_size)
+  : name_(name), min_size_(min_size), executor_ip_hash_ring_(NUM_HASH_RING_REPLICAS) {
+    DCHECK_GT(min_size_, 0);
+  }
+
+ExecutorGroup::ExecutorGroup(const TExecutorGroupDesc& desc)
+  : ExecutorGroup(desc.name, desc.min_size) {}
 
 const ExecutorGroup::Executors& ExecutorGroup::GetExecutorsForHost(
     const IpAddr& ip) const {
@@ -38,7 +45,7 @@ const ExecutorGroup::Executors& ExecutorGroup::GetExecutorsForHost(
 
 ExecutorGroup::IpAddrs ExecutorGroup::GetAllExecutorIps() const {
   IpAddrs ips;
-  ips.reserve(NumExecutors());
+  ips.reserve(NumHosts());
   for (auto& it: executor_map_) ips.push_back(it.first);
   return ips;
 }
@@ -67,6 +74,11 @@ void ExecutorGroup::AddExecutor(const TBackendDescriptor& be_desc) {
         << be_desc.krpc_address;
     return;
   }
+  if (!CheckConsistencyOrWarn(be_desc)) {
+    LOG(WARNING) << "Ignoring inconsistent backend for executor group: "
+                 << be_desc.krpc_address;
+    return;
+  }
   if (be_descs.empty()) {
     executor_ip_hash_ring_.AddNode(be_desc.ip_address);
   }
@@ -128,4 +140,38 @@ const TBackendDescriptor* ExecutorGroup::LookUpBackendDesc(
   return nullptr;
 }
 
+int ExecutorGroup::NumExecutors() const {
+  int count = 0;
+  for (const auto& executor_list : executor_map_) count += executor_list.second.size();
+  return count;
+}
+
+bool ExecutorGroup::IsHealthy() const {
+  int num_executors = NumExecutors();
+  if (num_executors < min_size_) {
+    LOG(WARNING) << "Executor group " << name_ << " is unhealthy: " << num_executors
+                 << " out of " << min_size_ << " are available.";
+    return false;
+  }
+  return true;
+}
+
+bool ExecutorGroup::CheckConsistencyOrWarn(const TBackendDescriptor& be_desc) const {
+  // Check if the executor's group configuration matches this group.
+  for (const TExecutorGroupDesc& desc : be_desc.executor_groups) {
+    if (desc.name == name_) {
+      if (desc.min_size == min_size_) {
+        return true;
+      } else {
+        LOG(WARNING) << "Backend " << be_desc << " is configured for executor group "
+                     << desc << " but group has minimum size " << min_size_;
+        return false;
+      }
+    }
+  }
+  // If the backend does not mention the group we consider it consistent to allow backends
+  // to be added to unrelated groups, e.g. for the coordinator-only scheuduling.
+  return true;
+}
+
 }  // end ns impala
diff --git a/be/src/scheduling/executor-group.h b/be/src/scheduling/executor-group.h
index 01c4839..6e2fee7 100644
--- a/be/src/scheduling/executor-group.h
+++ b/be/src/scheduling/executor-group.h
@@ -37,11 +37,16 @@ namespace impala {
 /// getter methods return references and the membership must not be changed while client
 /// code holds those references.
 ///
+/// Executor groups can optionally have a target size assigned to them and will be
+/// considered unhealthy if less than that number of executors are added.
+///
 /// Note that only during tests objects of this class will store more than one backend per
 /// host/IP address.
 class ExecutorGroup {
  public:
-  ExecutorGroup();
+  explicit ExecutorGroup(std::string name);
+  explicit ExecutorGroup(std::string name, int64_t min_size);
+  explicit ExecutorGroup(const TExecutorGroupDesc& desc);
   ExecutorGroup(const ExecutorGroup& other) = default;
 
   /// List of backends, in this case they're all executors.
@@ -60,7 +65,11 @@ class ExecutorGroup {
   Executors GetAllExecutorDescriptors() const;
 
   /// Adds an executor to the group. If it already exists, it is ignored. Backend
-  /// descriptors are identified by their IP address and port.
+  /// descriptors are identified by their IP address and port. Backends that fail the
+  /// consistency check (CheckConsistencyOrWarn()) are ignored. Note that executors can be
+  /// added to an executor group even if they don't have a matching TExecutorGroupDesc in
+  /// their 'executor_groups' list. This is required when building the coordinator-only
+  /// group during scheduling.
   void AddExecutor(const TBackendDescriptor& be_desc);
 
   /// Removes an executor from the group if it exists. Otherwise does nothing. Backend
@@ -86,10 +95,32 @@ class ExecutorGroup {
   const HashRing* GetHashRing() const { return &executor_ip_hash_ring_; }
 
   /// Returns the number of executor hosts in this group. During tests, hosts can run
-  /// multiple executor backend descriptors.
-  int NumExecutors() const { return executor_map_.size(); }
+  /// multiple executor backend descriptors, but will only be counted here once.
+  int NumHosts() const { return executor_map_.size(); }
+
+  /// Returns the number of executors (backend descriptors) in this group. Multiple
+  /// executors running on the same host (e.g. during tests) are counted individually.
+  int NumExecutors() const;
+
+  /// Returns true if the group is healthy, i.e. contains at least 'min_size_' executors.
+  /// Returns false otherwise.
+  bool IsHealthy() const;
+
+  const string& name() const { return name_; }
+  int64_t min_size() const { return min_size_; }
 
  private:
+  /// Finds the first executor group in 'be_desc.executor_groups' and validates its target
+  /// size. Returns true if a group is found and its min size matches, or if no group is
+  /// found. Returns false and logs a warning otherwise.
+  bool CheckConsistencyOrWarn(const TBackendDescriptor& be_desc) const;
+
+  const std::string name_;
+
+  /// The minimum number of executors in this group to be considered healthy. A group must
+  /// not be empty to be considered healthy and this value must be non-zero.
+  int64_t min_size_;
+
   /// Map from a host's IP address to a list of executors running on that node.
   typedef std::unordered_map<IpAddr, Executors> ExecutorMap;
   ExecutorMap executor_map_;
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 1bb5f02..84d939a 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -286,4 +286,9 @@ void QuerySchedule::UpdateMemoryRequirements(const TPoolConfig& pool_cfg) {
   }
 }
 
+void QuerySchedule::set_executor_group(string executor_group) {
+  DCHECK(executor_group_.empty());
+  executor_group_ = std::move(executor_group);
+}
+
 }
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 2449f87..4b82a79 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -76,6 +76,9 @@ struct BackendExecParams {
   // admission controller. Obtained from the scheduler's executors configuration
   // which is updated by membership updates from the statestore.
   int64_t admit_mem_limit = 0;
+
+  // The maximum number of queries that this backend can execute concurrently.
+  int64_t admit_num_queries_limit = 0;
 };
 
 /// Map from an impalad host address to the list of assigned fragment instance params.
@@ -261,6 +264,10 @@ class QuerySchedule {
   /// GetClusterMemoryToAdmit().
   void UpdateMemoryRequirements(const TPoolConfig& pool_cfg);
 
+  const string& executor_group() const { return executor_group_; }
+
+  void set_executor_group(string executor_group);
+
  private:
   /// These references are valid for the lifetime of this query schedule because they
   /// are all owned by the enclosing QueryExecState.
@@ -309,6 +316,10 @@ class QuerySchedule {
   /// successfully.
   int64_t per_backend_mem_to_admit_ = 0;
 
+  /// The name of the executor group that this schedule was computed for. Set by the
+  /// Scheduler and only valid after scheduling completes successfully.
+  string executor_group_;
+
   /// Populate fragment_exec_params_ from request_.plan_exec_info.
   /// Sets is_coord_fragment and input_fragments.
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 1dd1b8c..09a87fb 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -26,6 +26,7 @@
 #include "scheduling/cluster-membership-test-util.h"
 #include "scheduling/scheduler.h"
 #include "util/hash-util.h"
+#include "service/impala-server.h"
 
 using namespace impala;
 using namespace impala::test;
@@ -99,6 +100,9 @@ ClusterMembershipMgr::BeDescSharedPtr BuildBackendDescriptor(const Host& host) {
   be_desc->__set_is_coordinator(host.is_coordinator);
   be_desc->__set_is_executor(host.is_executor);
   be_desc->is_quiescing = false;
+  be_desc->executor_groups.push_back(TExecutorGroupDesc());
+  be_desc->executor_groups.back().name = ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME;
+  be_desc->executor_groups.back().min_size = 1;
   return be_desc;
 }
 
@@ -633,11 +637,11 @@ Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) {
   ClusterMembershipMgr::SnapshotPtr membership_snapshot =
       cluster_membership_mgr_->GetSnapshot();
   auto it = membership_snapshot->executor_groups.find(
-      ClusterMembershipMgr::DEFAULT_EXECUTOR_GROUP);
+      ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
   // If a group does not exist (e.g. no executors are registered), we pass an empty group
   // to the scheduler to exercise its error handling logic.
   bool no_executor_group = it == membership_snapshot->executor_groups.end();
-  ExecutorGroup empty_group;
+  ExecutorGroup empty_group("empty-group");
   DCHECK(membership_snapshot->local_be_desc.get() != nullptr);
   Scheduler::ExecutorConfig executor_config =
       {no_executor_group ? empty_group : it->second, *membership_snapshot->local_be_desc};
@@ -696,7 +700,7 @@ void SchedulerWrapper::InitializeScheduler() {
   });
   Status status = cluster_membership_mgr_->Init();
   DCHECK(status.ok()) << "Cluster membership manager init failed in test";
-  scheduler_.reset(new Scheduler(cluster_membership_mgr_.get(), &metrics_, nullptr));
+  scheduler_.reset(new Scheduler(&metrics_, nullptr));
   // Initialize the cluster membership manager
   SendFullMembershipMap();
 }
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 12871ef..ae7b329 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -32,7 +32,6 @@
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/exec-env.h"
-#include "scheduling/cluster-membership-mgr.h"
 #include "scheduling/hash-ring.h"
 #include "statestore/statestore-subscriber.h"
 #include "thirdparty/pcg-cpp-0.98/include/pcg_random.hpp"
@@ -57,10 +56,8 @@ static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.to
 static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
 static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
 
-Scheduler::Scheduler(ClusterMembershipMgr* cluster_membership_mgr,
-    MetricGroup* metrics, RequestPoolService* request_pool_service)
+Scheduler::Scheduler(MetricGroup* metrics, RequestPoolService* request_pool_service)
   : metrics_(metrics->GetOrCreateChildGroup("scheduler")),
-    cluster_membership_mgr_(cluster_membership_mgr),
     request_pool_service_(request_pool_service) {
   LOG(INFO) << "Starting scheduler";
   if (metrics_ != nullptr) {
@@ -75,10 +72,9 @@ const TBackendDescriptor& Scheduler::LookUpBackendDesc(
   const TBackendDescriptor* desc = executor_config.group.LookUpBackendDesc(host);
   if (desc == nullptr) {
     // Local host may not be in executor_config's executor group if it's a dedicated
-    // coordinator.
+    // coordinator, or if it is configured to be in a different executor group.
     const TBackendDescriptor& local_be_desc = executor_config.local_be_desc;
     DCHECK(host == local_be_desc.address);
-    DCHECK(!local_be_desc.is_executor);
     desc = &local_be_desc;
   }
   return *desc;
@@ -456,7 +452,7 @@ Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_conf
   bool random_replica = query_options.schedule_random_replica || node_random_replica;
 
   // TODO: Build this one from executor_group
-  ExecutorGroup coord_only_executor_group;
+  ExecutorGroup coord_only_executor_group("coordinator-only-group");
   const TBackendDescriptor& local_be_desc = executor_config.local_be_desc;
   coord_only_executor_group.AddExecutor(local_be_desc);
   VLOG_QUERY << "Exec at coord is " << (exec_at_coord ? "true" : "false");
@@ -647,49 +643,16 @@ void Scheduler::GetScanHosts(const TBackendDescriptor& local_be_desc, TPlanNodeI
   }
 }
 
-Status Scheduler::Schedule(QuerySchedule* schedule) {
-  // Use a snapshot of the cluster membership state upfront to avoid using inconsistent
-  // views throughout scheduling.
-  ClusterMembershipMgr::SnapshotPtr membership_snapshot =
-      cluster_membership_mgr_->GetSnapshot();
-  if (membership_snapshot->local_be_desc.get() == nullptr) {
-    // This can happen in the short time period after the ImpalaServer has finished
-    // starting up (which makes the local backend available) and the next statestore
-    // update that pulls the local backend descriptor into the membership snapshot.
-    return Status("Local backend has not been registered in the cluster membership");
-  }
-  const string& group_name = ClusterMembershipMgr::DEFAULT_EXECUTOR_GROUP;
-  VLOG_QUERY << "Scheduling query " << PrintId(schedule->query_id())
-      << " on executor group: " << group_name;
-
-  auto it = membership_snapshot->executor_groups.find(group_name);
-  if (it == membership_snapshot->executor_groups.end()) {
-    return Status(Substitute("Unknown executor group: $0", group_name));
-  }
-
-  const ExecutorGroup& executor_group = it->second;
-  if (executor_group.NumExecutors() == 0) {
-    return Status(Substitute("No executors registered in group: $0", group_name));
-  }
-
-  ExecutorConfig executor_config =
-      {executor_group, *membership_snapshot->local_be_desc};
+Status Scheduler::Schedule(
+    const ExecutorConfig& executor_config, QuerySchedule* schedule) {
+  RETURN_IF_ERROR(DebugAction(schedule->query_options(), "SCHEDULER_SCHEDULE"));
   RETURN_IF_ERROR(ComputeScanRangeAssignment(executor_config, schedule));
   ComputeFragmentExecParams(executor_config, schedule);
   ComputeBackendExecParams(executor_config, schedule);
 #ifndef NDEBUG
   schedule->Validate();
 #endif
-
-  // TODO: Move to admission control, it doesn't need to be in the Scheduler.
-  string resolved_pool;
-  // Re-resolve the pool name to propagate any resolution errors now that this request
-  // is known to require a valid pool.
-  RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(
-          schedule->request().query_ctx, &resolved_pool));
-  // Resolved pool name should have been set in the TQueryCtx and shouldn't have changed.
-  DCHECK_EQ(resolved_pool, schedule->request_pool());
-  schedule->summary_profile()->AddInfoString("Request Pool", schedule->request_pool());
+  schedule->set_executor_group(executor_config.group.name());
   return Status::OK();
 }
 
@@ -718,6 +681,8 @@ void Scheduler::ComputeBackendExecParams(
     const TNetworkAddress& host = backend.first;
     backend.second.admit_mem_limit =
         LookUpBackendDesc(executor_config, host).admit_mem_limit;
+    backend.second.admit_num_queries_limit =
+        LookUpBackendDesc(executor_config, host).admit_num_queries_limit;
     largest_min_reservation =
         max(largest_min_reservation, backend.second.min_mem_reservation_bytes);
   }
@@ -831,8 +796,8 @@ const IpAddr* Scheduler::AssignmentCtx::SelectRemoteExecutor() {
   } else {
     // Pick next executor from assignment_heap. All executors must have been inserted into
     // the heap at this point.
-    DCHECK_GT(executor_group_.NumExecutors(), 0);
-    DCHECK_EQ(executor_group_.NumExecutors(), assignment_heap_.size());
+    DCHECK_GT(executor_group_.NumHosts(), 0);
+    DCHECK_EQ(executor_group_.NumHosts(), assignment_heap_.size());
     candidate_ip = &(assignment_heap_.top().ip);
   }
   DCHECK(candidate_ip != nullptr);
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 502029a..01ebb3f 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -42,14 +42,13 @@
 #include "util/runtime-profile.h"
 
 namespace impala {
-class ClusterMembershipMgr;
 
 namespace test {
 class SchedulerWrapper;
 }
 
-/// Performs simple scheduling by matching between a list of executor backends that it
-/// retrieves from the cluster membership manager, and a list of target data locations.
+/// Performs simple scheduling by matching between a list of executor backends that is
+/// supplied by the users of this class, and a list of target data locations.
 ///
 /// TODO: Track assignments (assignment_ctx in ComputeScanRangeAssignment) per query
 ///       instead of per plan node?
@@ -61,20 +60,19 @@ class SchedulerWrapper;
 ///           configuration.
 class Scheduler {
  public:
-  Scheduler(ClusterMembershipMgr* cluster_membership_mgr, MetricGroup* metrics,
-      RequestPoolService* request_pool_service);
+  Scheduler(MetricGroup* metrics, RequestPoolService* request_pool_service);
 
-  /// Populates given query schedule and assigns fragments to hosts based on scan
-  /// ranges in the query exec request.
-  Status Schedule(QuerySchedule* schedule);
-
- private:
   /// Current snapshot of executors to be used for scheduling a scan.
   struct ExecutorConfig {
     const ExecutorGroup& group;
     const TBackendDescriptor& local_be_desc;
   };
 
+  /// Populates given query schedule and assigns fragments to hosts based on scan
+  /// ranges in the query exec request.
+  Status Schedule(const ExecutorConfig& executor_config, QuerySchedule* schedule);
+
+ private:
   /// Map from a host's IP address to the next executor to be round-robin scheduled for
   /// that host (needed for setups with multiple executors on a single host)
   typedef boost::unordered_map<IpAddr, ExecutorGroup::Executors::const_iterator>
@@ -249,11 +247,6 @@ class Scheduler {
   /// MetricGroup subsystem access
   MetricGroup* metrics_;
 
-  /// Pointer to the cluster membership manager. It provides information about backends
-  /// and executors in the cluster that the scheduler uses to assign fragment instances to
-  /// backends.
-  ClusterMembershipMgr* cluster_membership_mgr_;
-
   /// Locality metrics
   IntCounter* total_assignments_ = nullptr;
   IntCounter* total_local_assignments_ = nullptr;
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index afdfb36..4e848e8 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -481,13 +481,6 @@ Status ClientRequestState::ExecAsyncQueryOrDmlRequest(
     lock_guard<mutex> l(lock_);
     // Don't start executing the query if Cancel() was called concurrently with Exec().
     if (is_cancelled_) return Status::CANCELLED;
-    schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
-        exec_request_.query_options, summary_profile_, query_events_));
-  }
-  Status status = exec_env_->scheduler()->Schedule(schedule_.get());
-  {
-    lock_guard<mutex> l(lock_);
-    RETURN_IF_ERROR(UpdateQueryStatus(status));
   }
   RETURN_IF_ERROR(Thread::Create("query-exec-state", "async-exec-thread",
       &ClientRequestState::FinishExecQueryOrDmlRequest, this, &async_exec_thread_, true));
@@ -495,16 +488,31 @@ Status ClientRequestState::ExecAsyncQueryOrDmlRequest(
 }
 
 void ClientRequestState::FinishExecQueryOrDmlRequest() {
-  DebugActionNoFail(schedule_->query_options(), "CRS_BEFORE_ADMISSION");
+  DebugActionNoFail(exec_request_.query_options, "CRS_BEFORE_ADMISSION");
 
   DCHECK(exec_env_->admission_controller() != nullptr);
+  DCHECK(exec_request_.__isset.query_exec_request);
   Status admit_status =
       ExecEnv::GetInstance()->admission_controller()->SubmitForAdmission(
-          schedule_.get(), &admit_outcome_);
+          {query_id(), exec_request_.query_exec_request, exec_request_.query_options,
+              summary_profile_, query_events_},
+          &admit_outcome_, &schedule_);
   {
     lock_guard<mutex> l(lock_);
     if (!UpdateQueryStatus(admit_status).ok()) return;
   }
+  DCHECK(schedule_.get() != nullptr);
+  DCHECK_EQ(schedule_->query_id(), query_id());
+  // Note that we don't need to check for cancellation between admission and query
+  // startup. The query was not cancelled right before being admitted and the window here
+  // is small enough to not require special handling. Instead we start the query and then
+  // cancel it through the check below if necessary.
+  DebugActionNoFail(schedule_->query_options(), "CRS_BEFORE_COORD_STARTS");
+  // Register the query with the server to support cancellation. This happens after
+  // admission because now the set of executors is fixed and an executor failure will
+  // cause a query failure.
+  parent_server_->RegisterQueryLocations(
+      schedule_->per_backend_exec_params(), query_id());
   coord_.reset(new Coordinator(this, *schedule_, query_events_));
   Status exec_status = coord_->Exec();
 
@@ -523,7 +531,7 @@ void ClientRequestState::FinishExecQueryOrDmlRequest() {
       coord_exec_called_.Store(true);
     } else {
       VLOG_QUERY << "Cancelled right after starting the coordinator query id="
-                 << PrintId(schedule_->query_id());
+                 << PrintId(query_id());
       discard_result(UpdateQueryStatus(Status::CANCELLED));
     }
   }
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 56a901d..a06442d 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -183,7 +183,7 @@ class ClientRequestState {
   const TUniqueId& session_id() const { return query_ctx_.session.session_id; }
   const std::string& default_db() const { return query_ctx_.session.database; }
   bool eos() const { return eos_; }
-  QuerySchedule* schedule() { return schedule_.get(); }
+  const QuerySchedule* schedule() const { return schedule_.get(); }
 
   /// Returns the Coordinator for 'QUERY' and 'DML' requests once Coordinator::Exec()
   /// completes successfully. Otherwise returns null.
@@ -338,7 +338,7 @@ protected:
   std::shared_ptr<ImpalaServer::SessionState> session_;
 
   /// Resource assignment determined by scheduler. Owned by obj_pool_.
-  boost::scoped_ptr<QuerySchedule> schedule_;
+  std::unique_ptr<QuerySchedule> schedule_;
 
   /// Thread for asynchronously running the admission control code-path and starting
   /// execution in the following cases:
@@ -439,8 +439,8 @@ protected:
   /// To get access to UpdateCatalog, LOAD, and DDL methods. Not owned.
   Frontend* frontend_;
 
-  /// The parent ImpalaServer; called to wait until the the impalad has processed a
-  /// catalog update request. Not owned.
+  /// The parent ImpalaServer; called to wait until the impalad has processed a catalog
+  /// update request. Not owned.
   ImpalaServer* parent_server_;
 
   /// Start/end time of the query, in Unix microseconds.
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 61e6f69..c5b77a4 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -17,15 +17,17 @@
 
 #include "service/impala-http-handler.h"
 
+#include <algorithm>
 #include <sstream>
 #include <boost/lexical_cast.hpp>
 #include <boost/thread/mutex.hpp>
-#include <rapidjson/stringbuffer.h>
 #include <rapidjson/prettywriter.h>
-#include <gutil/strings/substitute.h>
+#include <rapidjson/stringbuffer.h>
 
 #include "catalog/catalog-util.h"
 #include "gen-cpp/beeswax_types.h"
+#include "gutil/strings/join.h"
+#include "gutil/strings/substitute.h"
 #include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
@@ -869,9 +871,9 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include
 
 void ImpalaHttpHandler::BackendsHandler(const Webserver::WebRequest& req,
     Document* document) {
-  std::unordered_map<string, pair<int64_t, int64_t>> host_mem_map;
+  AdmissionController::PerHostStats host_stats;
   ExecEnv::GetInstance()->admission_controller()->PopulatePerHostMemReservedAndAdmitted(
-      &host_mem_map);
+      &host_stats);
   Value backends_list(kArrayType);
   ClusterMembershipMgr* cluster_membership_mgr =
       ExecEnv::GetInstance()->cluster_membership_mgr();
@@ -895,14 +897,23 @@ void ImpalaHttpHandler::BackendsHandler(const Webserver::WebRequest& req,
     Value admit_mem_limit(PrettyPrinter::PrintBytes(backend.admit_mem_limit).c_str(),
         document->GetAllocator());
     backend_obj.AddMember("admit_mem_limit", admit_mem_limit, document->GetAllocator());
-    // If the host address does not exist in the 'host_mem_map', this would ensure that a
+    // If the host address does not exist in the 'host_stats', this would ensure that a
     // value of zero is used for those addresses.
-    Value mem_reserved(PrettyPrinter::PrintBytes(host_mem_map[address].first).c_str(),
-        document->GetAllocator());
+    Value mem_reserved(PrettyPrinter::PrintBytes(
+        host_stats[address].mem_reserved).c_str(), document->GetAllocator());
     backend_obj.AddMember("mem_reserved", mem_reserved, document->GetAllocator());
-    Value mem_admitted(PrettyPrinter::PrintBytes(host_mem_map[address].second).c_str(),
-        document->GetAllocator());
+    Value mem_admitted(PrettyPrinter::PrintBytes(
+        host_stats[address].mem_admitted).c_str(), document->GetAllocator());
     backend_obj.AddMember("mem_admitted", mem_admitted, document->GetAllocator());
+    backend_obj.AddMember("admit_num_queries_limit", backend.admit_num_queries_limit,
+        document->GetAllocator());
+    backend_obj.AddMember("num_admitted", host_stats[address].num_admitted,
+        document->GetAllocator());
+    vector<string> group_names;
+    for (const auto& group : backend.executor_groups) group_names.push_back(group.name);
+    Value executor_groups(JoinStrings(group_names, ", ").c_str(),
+        document->GetAllocator());
+    backend_obj.AddMember("executor_groups", executor_groups, document->GetAllocator());
     backends_list.PushBack(backend_obj, document->GetAllocator());
   }
   document->AddMember("backends", backends_list, document->GetAllocator());
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d7ee0df..bf3e93f 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -17,29 +17,31 @@
 
 #include "service/impala-server.h"
 
+#include <netdb.h>
+#include <unistd.h>
 #include <algorithm>
 #include <exception>
+#include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/join.hpp>
 #include <boost/algorithm/string/replace.hpp>
-#include <boost/filesystem.hpp>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
-#include <boost/unordered_set.hpp>
-#include <boost/bind.hpp>
-#include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/trim.hpp>
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include <boost/filesystem.hpp>
 #include <boost/lexical_cast.hpp>
+#include <boost/unordered_set.hpp>
 #include <gperftools/malloc_extension.h>
+#include <gutil/strings/numbers.h>
+#include <gutil/strings/split.h>
 #include <gutil/strings/substitute.h>
 #include <gutil/walltime.h>
-#include <openssl/evp.h>
 #include <openssl/err.h>
+#include <openssl/evp.h>
 #include <rapidjson/rapidjson.h>
 #include <rapidjson/stringbuffer.h>
 #include <rapidjson/writer.h>
-#include <sys/types.h>
 #include <sys/socket.h>
-#include <netdb.h>
-#include <unistd.h>
+#include <sys/types.h>
 
 #include "catalog/catalog-server.h"
 #include "catalog/catalog-util.h"
@@ -125,6 +127,7 @@ DECLARE_string(authorized_proxy_user_config);
 DECLARE_string(authorized_proxy_user_config_delimiter);
 DECLARE_string(authorized_proxy_group_config);
 DECLARE_string(authorized_proxy_group_config_delimiter);
+DECLARE_string(debug_actions);
 DECLARE_bool(abort_on_config_error);
 DECLARE_bool(disk_spill_encryption);
 DECLARE_bool(enable_ldap_auth);
@@ -239,6 +242,8 @@ DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and co
     "queries from clients. If false, it will refuse client connections.");
 DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query "
     "fragments.");
+DEFINE_string(executor_groups, "", "List of executor groups, separated by comma. "
+    "Currently only a single group may be specified.");
 
 // TODO: can we automatically choose a startup grace period based on the max admission
 // control queue timeout + some margin for error?
@@ -274,6 +279,36 @@ DEFINE_int32(query_event_hook_nthreads, 1, "Number of threads to use for "
 
 DECLARE_bool(compact_catalog_topic);
 
+namespace {
+using namespace impala;
+
+vector<impala::TExecutorGroupDesc> GetExecutorGroups(const string& flag) {
+  vector<impala::TExecutorGroupDesc> result;
+  vector<StringPiece> groups;
+  groups = Split(flag, ",", SkipEmpty());
+  if (groups.empty()) groups.push_back(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
+  DCHECK_EQ(1, groups.size());
+  // Name and optional minimum group size are separated by ':'.
+  for (const StringPiece& group : groups) {
+    int colon_idx = group.find_first_of(':');
+    TExecutorGroupDesc group_desc;
+    group_desc.name = group.substr(0, colon_idx).as_string();
+    group_desc.min_size = 1;
+    if (colon_idx != StringPiece::npos) {
+      StringParser::ParseResult result;
+      group_desc.min_size = StringParser::StringToInt<int64_t>(
+          group.data() + colon_idx + 1, group.length() - colon_idx - 1, &result);
+      if (result != StringParser::PARSE_SUCCESS) {
+        LOG(FATAL) << "Failed to parse minimum executor group size from group: "
+                     << group.ToString();
+      }
+    }
+    result.push_back(group_desc);
+  }
+  return result;
+}
+} // end anonymous namespace
+
 namespace impala {
 
 // Prefix of profile, event and lineage log filenames. The version number is
@@ -294,6 +329,8 @@ const string BEESWAX_SERVER_NAME = "beeswax-frontend";
 const string HS2_SERVER_NAME = "hiveserver2-frontend";
 const string HS2_HTTP_SERVER_NAME = "hiveserver2-http-frontend";
 
+const string ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME = "default";
+
 const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION = "42000";
 const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = "HY000";
 const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
@@ -1071,19 +1108,6 @@ Status ImpalaServer::ExecuteInternal(
   if (!status.ok()) {
     VLOG_QUERY << "Couldn't update catalog metrics: " << status.GetDetail();
   }
-
-  if ((*request_state)->schedule() != nullptr) {
-    const PerBackendExecParams& per_backend_params =
-        (*request_state)->schedule()->per_backend_exec_params();
-    if (!per_backend_params.empty()) {
-      lock_guard<mutex> l(query_locations_lock_);
-      for (const auto& entry : per_backend_params) {
-        const TNetworkAddress& host = entry.first;
-        query_locations_[host].insert((*request_state)->query_id());
-      }
-    }
-  }
-
   return Status::OK();
 }
 
@@ -1829,6 +1853,18 @@ Status ImpalaServer::ProcessCatalogUpdateResult(
   return Status::OK();
 }
 
+void ImpalaServer::RegisterQueryLocations(
+    const PerBackendExecParams& per_backend_params, const TUniqueId& query_id) {
+  VLOG_QUERY << "Registering query locations";
+  if (!per_backend_params.empty()) {
+    lock_guard<mutex> l(query_locations_lock_);
+    for (const auto& entry : per_backend_params) {
+      const TNetworkAddress& host = entry.first;
+      query_locations_[host].insert(query_id);
+    }
+  }
+}
+
 void ImpalaServer::CancelQueriesOnFailedBackends(
     const std::unordered_set<TNetworkAddress>& current_membership) {
   // Maps from query id (to be cancelled) to a list of failed Impalads that are
@@ -1936,7 +1972,9 @@ void ImpalaServer::BuildLocalBackendDescriptorInternal(TBackendDescriptor* be_de
   be_desc->__set_krpc_address(krpc_address);
 
   be_desc->__set_admit_mem_limit(exec_env_->admit_mem_limit());
+  be_desc->__set_admit_num_queries_limit(exec_env_->admit_num_queries_limit());
   be_desc->__set_is_quiescing(is_quiescing);
+  be_desc->executor_groups = GetExecutorGroups(FLAGS_executor_groups);
 }
 
 ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& request_state,
@@ -2606,6 +2644,7 @@ Status ImpalaServer::Start(int32_t thrift_be_port, int32_t beeswax_port, int32_t
     RETURN_IF_ERROR(beeswax_server_->Start());
     LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
   }
+  RETURN_IF_ERROR(DebugAction(FLAGS_debug_actions, "IMPALA_SERVER_END_OF_START"));
   services_started_ = true;
   ImpaladMetrics::IMPALA_SERVER_READY->SetValue(true);
   LOG(INFO) << "Impala has started.";
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 6131659..a052b5a 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -29,14 +29,18 @@
 #include <boost/uuid/uuid_io.hpp>
 #include <unordered_map>
 
-#include "gen-cpp/ImpalaService.h"
+#include "common/status.h"
+#include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/ImpalaHiveServer2Service.h"
 #include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/Frontend_types.h"
+#include "gen-cpp/ImpalaService.h"
 #include "kudu/util/random.h"
 #include "rpc/thrift-server.h"
-#include "common/status.h"
+#include "runtime/timestamp-value.h"
+#include "runtime/types.h"
+#include "scheduling/query-schedule.h"
 #include "service/query-options.h"
+#include "statestore/statestore-subscriber.h"
 #include "util/condition-variable.h"
 #include "util/container-util.h"
 #include "util/runtime-profile.h"
@@ -44,9 +48,6 @@
 #include "util/simple-logger.h"
 #include "util/thread-pool.h"
 #include "util/time.h"
-#include "runtime/timestamp-value.h"
-#include "runtime/types.h"
-#include "statestore/statestore-subscriber.h"
 
 namespace impala {
 using kudu::ThreadSafeRandom;
@@ -442,6 +443,12 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Returns a current snapshot of the local backend descriptor.
   std::shared_ptr<const TBackendDescriptor> GetLocalBackendDescriptor();
 
+  /// Adds the query_id to the map from backend to the list of queries running or expected
+  /// to run there (query_locations_). After calling this function, the server will cancel
+  /// a query with an error if one of its backends fail.
+  void RegisterQueryLocations(
+      const PerBackendExecParams& per_backend_params, const TUniqueId& query_id);
+
   /// Takes a set of network addresses of active backends and cancels all the queries
   /// running on failed ones (that is, addresses not in the active set).
   void CancelQueriesOnFailedBackends(
@@ -475,6 +482,10 @@ class ImpalaServer : public ImpalaServiceIf,
   /// The prefix of audit event log filename.
   static const string AUDIT_EVENT_LOG_FILE_PREFIX;
 
+  /// The default executor group name for executors that do not explicitly belong to a
+  /// specific executor group.
+  static const string DEFAULT_EXECUTOR_GROUP_NAME;
+
   /// Per-session state.  This object is reference counted using shared_ptrs.  There
   /// is one ref count in the SessionStateMap for as long as the session is active.
   /// All queries running from this session also have a reference.
@@ -1240,7 +1251,7 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Protects query_locations_. Not held in conjunction with other locks.
   boost::mutex query_locations_lock_;
 
-  /// A map from backend to the list of queries currently running or scheduled to run
+  /// A map from backend to the list of queries currently running or expected to run
   /// there.
   typedef boost::unordered_map<TNetworkAddress, boost::unordered_set<TUniqueId>>
       QueryLocations;
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 9203785..5c79669 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -144,7 +144,7 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// structures owned by the profile are allocated from 'pool'.
   /// If 'is_averaged_profile' is true, the counters in this profile will be derived
   /// averages (of unit AveragedCounter) from other profiles, so the counter map will
-  /// be left empty Otherwise, the counter map is initialized with a single entry for
+  /// be left empty. Otherwise, the counter map is initialized with a single entry for
   /// TotalTime.
   static RuntimeProfile* Create(ObjectPool* pool, const std::string& name,
       bool is_averaged_profile = false);
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index df32bf1..910483d 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -76,6 +76,10 @@ parser.add_option("--kill", "--kill_only", dest="kill_only", action="store_true"
                   " the running impalads and the statestored.")
 parser.add_option("--force_kill", dest="force_kill", action="store_true", default=False,
                   help="Force kill impalad and statestore processes.")
+parser.add_option("-a", "--add_executors", dest="add_executors",
+                  action="store_true", default=False,
+                  help="Start additional impalad processes. The executor group name must "
+                  "be specified using --impalad_args")
 parser.add_option("-r", "--restart_impalad_only", dest="restart_impalad_only",
                   action="store_true", default=False,
                   help="Restarts only the impalad processes")
@@ -267,7 +271,7 @@ def build_catalogd_arg_list():
 
 
 def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordinators,
-    remap_ports):
+    remap_ports, start_idx=0):
   """Build the argument lists for impala daemons in the cluster. Returns a list of
   argument lists, one for each impala daemon in the cluster. Each argument list is
   a list of strings. 'num_coordinators' and 'use_exclusive_coordinators' allow setting
@@ -295,7 +299,7 @@ def build_impalad_arg_lists(cluster_size, num_coordinators, use_exclusive_coordi
 
   # Build args for each each impalad instance.
   impalad_args = []
-  for i in range(cluster_size):
+  for i in range(start_idx, start_idx + cluster_size):
     service_name = impalad_service_name(i)
 
     impala_port_args = ""
@@ -431,7 +435,8 @@ class MiniClusterOperations(object):
       raise RuntimeError("Unable to start catalogd. Check log or file permissions"
                          " for more details.")
 
-  def start_impalads(self, cluster_size, num_coordinators, use_exclusive_coordinators):
+  def start_impalads(self, cluster_size, num_coordinators, use_exclusive_coordinators,
+                     start_idx=0):
     """Start 'cluster_size' impalad instances. The first 'num_coordinator' instances will
       act as coordinators. 'use_exclusive_coordinators' specifies whether the coordinators
       will only execute coordinator fragments."""
@@ -439,16 +444,21 @@ class MiniClusterOperations(object):
       # No impalad instances should be started.
       return
 
+    # The current TCP port allocation of the minicluster allows up to 10 impalads before
+    # the backend port (25000 + idx) will collide with the statestore (25010).
+    assert start_idx + cluster_size <= 10, "Must not start more than 10 impalads"
+
     impalad_arg_lists = build_impalad_arg_lists(
-        cluster_size, num_coordinators, use_exclusive_coordinators, remap_ports=True)
+        cluster_size, num_coordinators, use_exclusive_coordinators, remap_ports=True,
+        start_idx=start_idx)
     assert cluster_size == len(impalad_arg_lists)
-    for i in xrange(cluster_size):
+    for i in xrange(start_idx, start_idx + cluster_size):
       service_name = impalad_service_name(i)
       LOG.info("Starting Impala Daemon logging to {log_dir}/{service_name}.INFO".format(
           log_dir=options.log_dir, service_name=service_name))
       output_file = os.path.join(
           options.log_dir, "{service_name}-out.log".format(service_name=service_name))
-      run_daemon_with_options("impalad", impalad_arg_lists[i],
+      run_daemon_with_options("impalad", impalad_arg_lists[i - start_idx],
           jvm_debug_port=DEFAULT_IMPALAD_JVM_DEBUG_PORT + i, output_file=output_file)
 
 
@@ -548,6 +558,7 @@ class DockerMiniClusterOperations(object):
                    for src, dst in port_map.iteritems()]
     # Impersonate the current user for operations against the minicluster. This is
     # necessary because the user name inside the container is "root".
+    # TODO: pass in the actual options
     env_args = ["-e", "HADOOP_USER_NAME={0}".format(getpass.getuser()),
                 "-e", "JAVA_TOOL_OPTIONS={0}".format(
                     build_java_tool_options(DEFAULT_IMPALAD_JVM_DEBUG_PORT))]
@@ -618,9 +629,9 @@ def validate_options():
     LOG.error("Please specify a cluster size >= 0")
     sys.exit(1)
 
-  if options.num_coordinators <= 0:
-    LOG.error("Please specify a valid number of coordinators > 0")
-    sys.exit(1)
+  if (options.use_exclusive_coordinators and
+      options.num_coordinators >= options.cluster_size):
+    LOG.info("Starting impala cluster without executors")
 
   if not os.path.isdir(options.log_dir):
     LOG.error("Log dir does not exist or is not a directory: {log_dir}".format(
@@ -629,10 +640,12 @@ def validate_options():
 
   restart_only_count = len([opt for opt in [options.restart_impalad_only,
                                             options.restart_statestored_only,
-                                            options.restart_catalogd_only] if opt])
+                                            options.restart_catalogd_only,
+                                            options.add_executors] if opt])
   if restart_only_count > 1:
-    LOG.error("--restart_impalad_only, --restart_catalogd_only, and "
-              "--restart_statestored_only options are mutually exclusive")
+    LOG.error("--restart_impalad_only, --restart_catalogd_only, "
+              "--restart_statestored_only, and --add_executors options are mutually "
+              "exclusive")
     sys.exit(1)
   elif restart_only_count == 1:
     if options.inprocess:
@@ -665,6 +678,8 @@ if __name__ == "__main__":
     cluster_ops.kill_catalogd(force=options.force_kill)
   elif options.restart_statestored_only:
     cluster_ops.kill_statestored(force=options.force_kill)
+  elif options.add_executors:
+    pass
   else:
     cluster_ops.kill_all_daemons(force=options.force_kill)
 
@@ -678,6 +693,9 @@ if __name__ == "__main__":
           "Restarting entire cluster.")
       options.restart_impalad_only = False
 
+  existing_cluster_size = len(cluster_ops.get_cluster().impalads)
+  expected_cluster_size = options.cluster_size
+  num_coordinators = options.num_coordinators
   try:
     if options.restart_catalogd_only:
       cluster_ops.start_catalogd()
@@ -685,12 +703,18 @@ if __name__ == "__main__":
       cluster_ops.start_statestore()
     elif options.restart_impalad_only:
       cluster_ops.start_impalads(options.cluster_size, options.num_coordinators,
-                              options.use_exclusive_coordinators)
+                                 options.use_exclusive_coordinators)
+    elif options.add_executors:
+      num_coordinators = 0
+      use_exclusive_coordinators = False
+      cluster_ops.start_impalads(options.cluster_size, num_coordinators,
+                                 use_exclusive_coordinators, existing_cluster_size)
+      expected_cluster_size += existing_cluster_size
     else:
       cluster_ops.start_statestore()
       cluster_ops.start_catalogd()
       cluster_ops.start_impalads(options.cluster_size, options.num_coordinators,
-                              options.use_exclusive_coordinators)
+                                 options.use_exclusive_coordinators)
     # Sleep briefly to reduce log spam: the cluster takes some time to start up.
     sleep(3)
 
@@ -700,8 +724,8 @@ if __name__ == "__main__":
       for delay in options.catalog_init_delays.split(","):
         if int(delay.strip()) != 0: expected_catalog_delays += 1
     # Check for the cluster to be ready.
-    impala_cluster.wait_until_ready(options.cluster_size,
-        options.cluster_size - expected_catalog_delays)
+    impala_cluster.wait_until_ready(expected_cluster_size,
+        expected_cluster_size - expected_catalog_delays)
   except Exception, e:
     LOG.exception("Error starting cluster")
     sys.exit(1)
@@ -713,5 +737,5 @@ if __name__ == "__main__":
   LOG.info(("Impala Cluster Running with {num_nodes} nodes "
       "({num_coordinators} coordinators, {num_executors} executors).").format(
           num_nodes=options.cluster_size,
-          num_coordinators=min(options.cluster_size, options.num_coordinators),
+          num_coordinators=min(options.cluster_size, num_coordinators),
           num_executors=executors))
diff --git a/common/thrift/StatestoreService.thrift b/common/thrift/StatestoreService.thrift
index 203518e..67a482f 100644
--- a/common/thrift/StatestoreService.thrift
+++ b/common/thrift/StatestoreService.thrift
@@ -44,6 +44,15 @@ struct TPoolStats {
   3: required i64 backend_mem_reserved;
 }
 
+// Structure to describe an executor group. We use this to configure the executor group
+// for backends during startup and during cluster membership management.
+struct TExecutorGroupDesc {
+  // The name of the executor group.
+  1: required string name;
+  // The minimum size of the executor group to be considered healthy.
+  2: required i64 min_size;
+}
+
 // Structure serialised in the Impala backend topic. Each Impalad
 // constructs one TBackendDescriptor, and registers it in the cluster-membership
 // topic. Impalads subscribe to this topic to learn of the location of
@@ -78,6 +87,13 @@ struct TBackendDescriptor {
   // True if fragment instances should not be scheduled on this daemon because the
   // daemon has been quiescing, e.g. if it shutting down.
   9: required bool is_quiescing;
+
+  // The list of executor groups that this backend belongs to. Only valid if is_executor
+  // is set, and currently must contain exactly one entry.
+  10: required list<TExecutorGroupDesc> executor_groups;
+
+  // The number of queries that can be admitted to this backend.
+  11: required i64 admit_num_queries_limit;
 }
 
 // Description of a single entry in a topic
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index d3dc3f9..22489fe 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -234,8 +234,10 @@ class CustomClusterTestSuite(ImpalaTestSuite):
                             cluster_size=DEFAULT_CLUSTER_SIZE,
                             num_coordinators=NUM_COORDINATORS,
                             use_exclusive_coordinators=False,
+                            add_executors=False,
                             log_level=1,
                             expected_num_executors=DEFAULT_CLUSTER_SIZE,
+                            expected_subscribers=0,
                             default_query_options=None,
                             statestored_timeout_s=60,
                             impalad_timeout_s=60):
@@ -252,6 +254,9 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if use_exclusive_coordinators:
       cmd.append("--use_exclusive_coordinators")
 
+    if add_executors:
+      cmd.append("--add_executors")
+
     if pytest.config.option.use_local_catalog:
       cmd.append("--impalad_args=--use_local_catalog=1")
       cmd.append("--catalogd_args=--catalog_topic_mode=minimal")
@@ -281,7 +286,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
     # The number of statestore subscribers is
     # cluster_size (# of impalad) + 1 (for catalogd).
-    expected_subscribers = cluster_size + 1
+    if expected_subscribers == 0:
+      expected_subscribers = expected_num_executors + 1
 
     statestored.service.wait_for_live_subscribers(expected_subscribers,
                                                   timeout=statestored_timeout_s)
diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py
index c8836e7..ea5daf5 100644
--- a/tests/common/impala_cluster.py
+++ b/tests/common/impala_cluster.py
@@ -96,7 +96,7 @@ class ImpalaCluster(object):
     else:
       self.__impalads, self.__statestoreds, self.__catalogd =\
           self.__find_docker_containers()
-    LOG.info("Found %d impalad/%d statestored/%d catalogd process(es)" %
+    LOG.debug("Found %d impalad/%d statestored/%d catalogd process(es)" %
         (len(self.__impalads), len(self.__statestoreds), 1 if self.__catalogd else 0))
 
   @property
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index e5aca37..80685c4 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -19,6 +19,7 @@
 # programatically interact with the services and perform operations such as querying
 # the debug webpage, getting metric values, or creating client connections.
 
+from collections import defaultdict
 import json
 import logging
 import re
@@ -163,17 +164,38 @@ class ImpaladService(BaseImpalaService):
     self.be_port = be_port
     self.hs2_port = hs2_port
 
-  def get_num_known_live_backends(self, timeout=30, interval=1,
+  def get_num_known_live_executors(self, timeout=30, interval=1,
       include_shutting_down=True):
-    LOG.info("Getting num_known_live_backends from %s:%s" %
+    return self.get_num_known_live_backends(include_shutting_down=include_shutting_down,
+                                            only_executors=True)
+
+  def get_num_known_live_backends(self, timeout=30, interval=1,
+      include_shutting_down=True, only_coordinators=False, only_executors=False):
+    LOG.debug("Getting num_known_live_backends from %s:%s" %
         (self.hostname, self.webserver_port))
     result = json.loads(self.read_debug_webpage('backends?json', timeout, interval))
     count = 0
     for backend in result['backends']:
-      if include_shutting_down or not backend['is_quiescing']:
-        count += 1
+      if backend['is_quiescing'] and not include_shutting_down:
+        continue
+      if only_coordinators and not backend['is_coordinator']:
+        continue
+      if only_executors and not backend['is_executor']:
+        continue
+      count += 1
     return count
 
+  def get_executor_groups(self, timeout=30, interval=1):
+    """Returns a mapping from executor group name to a list of all KRPC endpoints of a
+    group's executors."""
+    LOG.debug("Getting executor groups from %s:%s" %
+        (self.hostname, self.webserver_port))
+    result = json.loads(self.read_debug_webpage('backends?json', timeout, interval))
+    groups = defaultdict(list)
+    for backend in result['backends']:
+      groups[backend['executor_groups']].append(backend['krpc_address'])
+    return groups
+
   def get_query_locations(self):
     # Returns a dictionary of the format <host_address, num_of_queries_running_there>
     result = json.loads(self.read_debug_webpage('queries?json', timeout=30, interval=1))
@@ -185,6 +207,31 @@ class ImpaladService(BaseImpalaService):
     result = json.loads(self.read_debug_webpage('queries?json', timeout, interval))
     return result['in_flight_queries']
 
+  def _get_pool_counter(self, pool_name, counter_name, timeout=30, interval=1):
+    """Returns the value of the field 'counter_name' in pool 'pool_name' or 0 if the pool
+    doesn't exist."""
+    result = json.loads(self.read_debug_webpage('admission?json', timeout, interval))
+    pools = result['resource_pools']
+    for pool in pools:
+      if pool['pool_name'] == pool_name:
+        return pool[counter_name]
+    return 0
+
+  def get_num_queued_queries(self, pool_name, timeout=30, interval=1):
+    """Returns the number of queued queries in pool 'pool_name' or 0 if the pool doesn't
+    exist."""
+    return self._get_pool_counter(pool_name, 'agg_num_queued', timeout, interval)
+
+  def get_total_admitted_queries(self, pool_name, timeout=30, interval=1):
+    """Returns the total number of queries that have been admitted to pool 'pool_name' or
+    0 if the pool doesn't exist."""
+    return self._get_pool_counter(pool_name, 'total_admitted', timeout, interval)
+
+  def get_num_running_queries(self, pool_name, timeout=30, interval=1):
+    """Returns the number of queries currently running in pool 'pool_name' or 0 if the
+    pool doesn't exist."""
+    return self._get_pool_counter(pool_name, 'agg_num_running', timeout, interval)
+
   def get_num_in_flight_queries(self, timeout=30, interval=1):
     LOG.info("Getting num_in_flight_queries from %s:%s" %
         (self.hostname, self.webserver_port))
diff --git a/tests/common/resource_pool_config.py b/tests/common/resource_pool_config.py
index b160163..ee785b7 100644
--- a/tests/common/resource_pool_config.py
+++ b/tests/common/resource_pool_config.py
@@ -61,7 +61,7 @@ class ResourcePoolConfig(object):
     metric_key = "admission-controller.{0}.root.{1}".format(metric_str, pool_name)
     start_time = time()
     while (time() - start_time < timeout):
-      handle = client.execute_async("select 1")
+      handle = client.execute_async("select 'wait_for_config_change'")
       client.close_query(handle)
       current_val = str(self.impala_service.get_metric_value(metric_key))
       if current_val == target_val:
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index e139eec..b416270 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -377,8 +377,11 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       execute_statement_req.sessionHandle = open_session_resp.sessionHandle
       execute_statement_req.statement = "select count(1) from functional.alltypes"
       execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
-      TestAdmissionController.check_response(execute_statement_resp,
-          TCLIService.TStatusCode.ERROR_STATUS, "User must be specified")
+      self.wait_for_operation_state(execute_statement_resp.operationHandle,
+                                    TCLIService.TOperationState.ERROR_STATE)
+      get_operation_status_resp = self.get_operation_status(
+          execute_statement_resp.operationHandle)
+      assert "User must be specified" in get_operation_status_resp.errorMessage
     finally:
       close_req = TCLIService.TCloseSessionReq()
       close_req.sessionHandle = open_session_resp.sessionHandle
@@ -553,25 +556,32 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       client.clear_configuration()
 
       client.set_configuration_option("debug_action", "CRS_BEFORE_ADMISSION:SLEEP@2000")
-      handle = client.execute_async("select 1")
+      handle = client.execute_async("select 2")
       sleep(1)
       client.close_query(handle)
       self.assert_impalad_log_contains('INFO',
           "Ready to be Admitted immediately but already cancelled, query id=")
 
       client.set_configuration_option("debug_action",
-          "CRS_AFTER_COORD_STARTS:SLEEP@2000")
-      handle = client.execute_async("select 1")
+          "CRS_BEFORE_COORD_STARTS:SLEEP@2000")
+      handle = client.execute_async("select 3")
       sleep(1)
       client.close_query(handle)
       self.assert_impalad_log_contains('INFO',
           "Cancelled right after starting the coordinator query id=")
 
+      client.set_configuration_option("debug_action", "CRS_AFTER_COORD_STARTS:SLEEP@2000")
+      handle = client.execute_async("select 4")
+      sleep(1)
+      client.close_query(handle)
+      self.assert_impalad_log_contains('INFO',
+          "Cancelled right after starting the coordinator query id=", 2)
+
       client.clear_configuration()
       handle = client.execute_async("select sleep(10000)")
       client.set_configuration_option("debug_action",
           "AC_AFTER_ADMISSION_OUTCOME:SLEEP@2000")
-      queued_query_handle = client.execute_async("select 1")
+      queued_query_handle = client.execute_async("select 5")
       sleep(1)
       assert client.get_state(queued_query_handle) == QueryState.COMPILED
       assert "Admission result: Queued" in client.get_runtime_profile(queued_query_handle)
@@ -587,7 +597,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       client.clear_configuration()
 
       handle = client.execute_async("select sleep(10000)")
-      queued_query_handle = client.execute_async("select 1")
+      queued_query_handle = client.execute_async("select 6")
       sleep(1)
       assert client.get_state(queued_query_handle) == QueryState.COMPILED
       assert "Admission result: Queued" in client.get_runtime_profile(queued_query_handle)
@@ -600,7 +610,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       assert self.cluster.impalads[0].service.get_metric_value(
         "admission-controller.agg-num-running.default-pool") == 0
       assert self.cluster.impalads[0].service.get_metric_value(
-        "admission-controller.total-admitted.default-pool") == 3
+        "admission-controller.total-admitted.default-pool") == 4
       assert self.cluster.impalads[0].service.get_metric_value(
         "admission-controller.total-queued.default-pool") == 2
     finally:
@@ -695,9 +705,11 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     handle_running = self.client.execute_async(query)
     self.client.wait_for_admission_control(handle_running)
     handle_queued = self.client.execute_async(query)
+    self.client.wait_for_admission_control(handle_queued)
     self.impalad_test_service.wait_for_metric_value(
       "admission-controller.total-queued.default-pool", 1)
-    self.__assert_num_queries_accounted(2)
+    # Queued queries don't show up on backends
+    self.__assert_num_queries_accounted(1, 1)
     # First close the queued query
     self.close_query(handle_queued)
     self.close_query(handle_running)
@@ -708,15 +720,16 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     self.execute_query_expect_failure(self.client, query, exec_options)
     self.__assert_num_queries_accounted(0)
 
-  def __assert_num_queries_accounted(self, expected_num):
+  def __assert_num_queries_accounted(self, num_running, num_queued=0):
     """Checks if the num of queries accounted by query_locations and in-flight are as
     expected"""
     # Wait for queries to start/un-register.
-    assert self.impalad_test_service.wait_for_num_in_flight_queries(expected_num)
+    num_inflight = num_running + num_queued
+    assert self.impalad_test_service.wait_for_num_in_flight_queries(num_inflight)
     query_locations = self.impalad_test_service.get_query_locations()
     for host, num_q in query_locations.items():
-      assert num_q == expected_num, "There should be {0} running queries on either " \
-                                    "impalads: {0}".format(query_locations)
+      assert num_q == num_running, "There should be {0} running queries on either " \
+                                   "impalads: {0}".format(query_locations)
 
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   @pytest.mark.execute_serially
@@ -743,9 +756,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       additional_args="-default_pool_max_requests 1", make_copy=True),
     statestored_args=_STATESTORED_ARGS)
   def test_pool_config_change_while_queued(self, vector):
-    """Tests if the invalid checks work even if the query is queued. Makes sure the query
-    is not dequeued if the config is invalid and is promptly dequeued when it goes back
-    to being valid"""
+    """Tests that the invalid checks work even if the query is queued. Makes sure that a
+    queued query is dequeued and rejected if the config is invalid."""
     pool_name = "invalidTestPool"
     config_str = "max-query-mem-limit"
     self.client.set_configuration_option('request_pool', pool_name)
@@ -754,7 +766,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     self.client.wait_for_admission_control(sleep_query_handle)
     self.__wait_for_change_to_profile(sleep_query_handle,
                                       "Admission result: Admitted immediately")
-    queued_query_handle = self.client.execute_async("select 1")
+    queued_query_handle = self.client.execute_async("select 2")
     self.__wait_for_change_to_profile(queued_query_handle, "Admission result: Queued")
 
     # Change config to be invalid.
@@ -764,16 +776,12 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     # Close running query so the queued one gets a chance.
     self.client.close_query(sleep_query_handle)
 
-    # Check latest queued reason changed
-    queued_reason = "Latest admission queue reason: Invalid pool config: the " \
-                    "min_query_mem_limit is greater than the max_query_mem_limit" \
-                    " (26214400 > 1)"
-    self.__wait_for_change_to_profile(queued_query_handle, queued_reason)
+    # Observe that the queued query fails.
+    self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
+    self.close_query(queued_query_handle)
 
-    # Now change the config back to valid value and make sure the query is allowed to run.
+    # Change the config back to a valid value
     config.set_config_value(pool_name, config_str, 0)
-    self.client.wait_for_finished_timeout(queued_query_handle, 20)
-    self.close_query(queued_query_handle)
 
     # Now do the same thing for change to pool.max-query-mem-limit such that it can no
     # longer accommodate the largest min_reservation.
@@ -788,18 +796,9 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     config.set_config_value(pool_name, config_str, 25 * 1024 * 1024)
     # Close running query so the queued one gets a chance.
     self.client.close_query(sleep_query_handle)
-    # Check latest queued reason changed
-    queued_reason = "minimum memory reservation is greater than memory available to " \
-                    "the query for buffer reservations. Memory reservation needed given" \
-                    " the current plan: 88.00 KB. Adjust either the mem_limit or the" \
-                    " pool config (max-query-mem-limit, min-query-mem-limit) for the" \
-                    " query to allow the query memory limit to be at least 32.09 MB."
-    self.__wait_for_change_to_profile(queued_query_handle, queued_reason, 5)
-    # Now change the config back to a reasonable value.
-    config.set_config_value(pool_name, config_str, 0)
-    self.client.wait_for_finished_timeout(queued_query_handle, 20)
-    self.__wait_for_change_to_profile(queued_query_handle,
-                                      "Admission result: Admitted (queued)")
+
+    # Observe that the queued query fails.
+    self.wait_for_state(queued_query_handle, QueryState.EXCEPTION, 20),
     self.close_query(queued_query_handle)
 
   def __wait_for_change_to_profile(self, query_handle, search_string, timeout=20):
@@ -1090,6 +1089,47 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
         rejected_reasons_return.append(query_statuses[0])
     return num_rejected, rejected_reasons_return
 
+  @pytest.mark.execute_serially
+  def test_impala_server_startup_delay(self):
+    """This test verifies that queries get queued when the coordinator has already started
+    accepting client connections during startup, but the local backend descriptor is not
+    yet available."""
+    server_start_delay_s = 20
+    # We need to start the cluster here instead of during setup_method() so we can launch
+    # it from a separate thread.
+
+    def start_cluster():
+      LOG.info("Starting cluster")
+      impalad_args = "--debug_actions=IMPALA_SERVER_END_OF_START:SLEEP@%s" % (
+          1000 * server_start_delay_s)
+      self._start_impala_cluster(['--impalad_args=%s' % impalad_args])
+
+    # Initiate the cluster start
+    start_cluster_thread = threading.Thread(target=start_cluster)
+    start_cluster_thread.start()
+
+    # Wait some time to arrive at IMPALA_SERVER_END_OF_START
+    sleep(server_start_delay_s)
+
+    # With a new client, execute a query and observe that it gets queued and ultimately
+    # succeeds.
+    client = self.create_impala_client()
+    result = self.execute_query_expect_success(client, "select 1")
+    start_cluster_thread.join()
+    profile = result.runtime_profile
+    reasons = self.__extract_init_queue_reasons([profile])
+    assert len(reasons) == 1
+    assert "Local backend has not started up yet." in reasons[0]
+
+  @pytest.mark.execute_serially
+  def test_scheduler_error(self):
+    """This test verifies that the admission controller handles scheduler errors
+    correctly."""
+    client = self.create_impala_client()
+    client.set_configuration_option("debug_action", "SCHEDULER_SCHEDULE:FAIL")
+    result = self.execute_query_expect_failure(client, "select 1")
+    assert "Error during scheduling" in str(result)
+
 
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between submissions
diff --git a/tests/custom_cluster/test_auto_scaling.py b/tests/custom_cluster/test_auto_scaling.py
new file mode 100644
index 0000000..d0ab577
--- /dev/null
+++ b/tests/custom_cluster/test_auto_scaling.py
@@ -0,0 +1,212 @@
+#!/usr/bin/env impala-python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+from time import sleep, time
+
+from tests.util.auto_scaler import AutoScaler
+from tests.util.concurrent_workload import ConcurrentWorkload
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+LOG = logging.getLogger("test_auto_scaling")
+
+
+class TestAutoScaling(CustomClusterTestSuite):
+  """This class contains tests that exercise the logic related to scaling clusters up and
+  down by adding and removing groups of executors."""
+  INITIAL_STARTUP_TIME_S = 10
+  STATE_CHANGE_TIMEOUT_S = 45
+  # This query will scan two partitions (month = 1, 2) and thus will have 1 fragment
+  # instance per executor on groups of size 2. Each partition has 2 rows, so it performs
+  # two comparisons and should take around 1 second to complete.
+  QUERY = """select * from functional_parquet.alltypestiny where month < 3
+             and id + random() < sleep(500)"""
+
+  def _get_total_admitted_queries(self):
+    return self.impalad_test_service.get_total_admitted_queries("default-pool")
+
+  def _get_num_executors(self):
+    return self.impalad_test_service.get_num_known_live_backends(only_executors=True)
+
+  def _get_num_running_queries(self):
+    return self.impalad_test_service.get_num_running_queries("default-pool")
+
+  def test_single_workload(self):
+    """This test exercises the auto-scaling logic in the admission controller. It spins up
+    a base cluster (coordinator, catalog, statestore), runs some queries to observe that
+    new executors are started, then stops the workload and observes that the cluster gets
+    shutdown."""
+    GROUP_SIZE = 2
+    EXECUTOR_SLOTS = 3
+    auto_scaler = AutoScaler(executor_slots=EXECUTOR_SLOTS, group_size=GROUP_SIZE)
+    workload = None
+    try:
+      auto_scaler.start()
+      sleep(self.INITIAL_STARTUP_TIME_S)
+
+      workload = ConcurrentWorkload(self.QUERY, num_streams=5)
+      LOG.info("Starting workload")
+      workload.start()
+
+      # Wait for workers to spin up
+      assert any(self._get_num_executors() >= GROUP_SIZE or sleep(1)
+                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
+          "Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
+
+      # Wait until we admitted at least 10 queries
+      assert any(self._get_total_admitted_queries() >= 10 or sleep(1)
+                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
+          "Did not admit enough queries within %s s" % self.STATE_CHANGE_TIMEOUT_S
+
+      # Wait for second executor group to start
+      num_expected = 2 * GROUP_SIZE
+      assert any(self._get_num_executors() == num_expected or sleep(1)
+                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
+                     "Number of backends did not reach %s within %s s" % (
+                     num_expected, self.STATE_CHANGE_TIMEOUT_S)
+
+      # Wait for query rate to surpass the maximum for a single executor group plus 20%
+      min_query_rate = 1.2 * EXECUTOR_SLOTS
+      assert any(workload.get_query_rate() > min_query_rate or sleep(1)
+                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
+                     "Query rate did not surpass %s within %s s" % (
+                     num_expected, self.STATE_CHANGE_TIMEOUT_S)
+
+      LOG.info("Stopping workload")
+      workload.stop()
+
+      # Wait for workers to spin down
+      assert any(self._get_num_executors() == 0 or sleep(1)
+                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
+          "Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
+
+    finally:
+      if workload:
+        workload.stop()
+      LOG.info("Stopping auto scaler")
+      auto_scaler.stop()
+
+  def test_single_group_maxed_out(self):
+    """This test starts an auto scaler and limits it to a single executor group. It then
+    makes sure that the query throughput does not exceed the expected limit."""
+    GROUP_SIZE = 2
+    EXECUTOR_SLOTS = 3
+    auto_scaler = AutoScaler(executor_slots=EXECUTOR_SLOTS, group_size=GROUP_SIZE,
+                             max_groups=1)
+    workload = None
+    try:
+      auto_scaler.start()
+      sleep(self.INITIAL_STARTUP_TIME_S)
+
+      workload = ConcurrentWorkload(self.QUERY, num_streams=5)
+      LOG.info("Starting workload")
+      workload.start()
+
+      # Wait for workers to spin up
+      assert any(self._get_num_executors() >= GROUP_SIZE or sleep(1)
+                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
+          "Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
+
+      # Wait until we admitted at least 10 queries
+      assert any(self._get_total_admitted_queries() >= 10 or sleep(1)
+                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
+          "Did not admit enough queries within %s s" % self.STATE_CHANGE_TIMEOUT_S
+
+      # Sample the number of running queries for while
+      SAMPLE_NUM_RUNNING_S = 30
+      end_time = time() + SAMPLE_NUM_RUNNING_S
+      num_running = []
+      while time() < end_time:
+        num_running.append(self._get_num_running_queries())
+        sleep(1)
+
+      # Must reach EXECUTOR_SLOTS but not exceed it
+      assert max(num_running) == EXECUTOR_SLOTS, \
+          "Unexpected number of running queries: %s" % num_running
+
+      # Check that only a single group started
+      assert self._get_num_executors() == GROUP_SIZE
+
+      LOG.info("Stopping workload")
+      workload.stop()
+
+      # Wait for workers to spin down
+      assert any(self._get_num_executors() == 0 or sleep(1)
+                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
+          "Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
+
+    finally:
+      if workload:
+        workload.stop()
+      LOG.info("Stopping auto scaler")
+      auto_scaler.stop()
+
+  def test_sequential_startup(self):
+    """This test starts an executor group sequentially and observes that no queries are
+    admitted until the group has been fully started."""
+    # Larger groups size so it takes a while to start up
+    GROUP_SIZE = 4
+    EXECUTOR_SLOTS = 3
+    auto_scaler = AutoScaler(executor_slots=EXECUTOR_SLOTS, group_size=GROUP_SIZE,
+                             start_batch_size=1, max_groups=1)
+    workload = None
+    try:
+      auto_scaler.start()
+      sleep(self.INITIAL_STARTUP_TIME_S)
+
+      workload = ConcurrentWorkload(self.QUERY, num_streams=5)
+      LOG.info("Starting workload")
+      workload.start()
+
+      # Wait for first executor to start up
+      assert any(self._get_num_executors() >= 1 or sleep(1)
+                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
+          "Number of backends did not increase within %s s" % self.STATE_CHANGE_TIMEOUT_S
+
+      # Wait for remaining executors to start up and make sure that no queries are
+      # admitted during startup
+      end_time = time() + self.STATE_CHANGE_TIMEOUT_S
+      startup_complete = False
+      while time() < end_time:
+        num_admitted = self._get_total_admitted_queries()
+        num_backends = self._get_num_executors()
+        if num_backends < GROUP_SIZE:
+          assert num_admitted == 0, "%s/%s backends started but %s queries have " \
+              "already been admitted." % (num_backends, GROUP_SIZE, num_admitted)
+        if num_admitted > 0:
+          assert num_backends == GROUP_SIZE
+          startup_complete = True
+          break
+        sleep(1)
+
+      assert startup_complete, "Did not start up in %s s" % self.STATE_CHANGE_TIMEOUT_S
+
+      LOG.info("Stopping workload")
+      workload.stop()
+
+      # Wait for workers to spin down
+      assert any(self._get_num_executors() == 0 or sleep(1)
+                 for _ in range(self.STATE_CHANGE_TIMEOUT_S)), \
+          "Backends did not shut down within %s s" % self.STATE_CHANGE_TIMEOUT_S
+
+    finally:
+      if workload:
+        workload.stop()
+      LOG.info("Stopping auto scaler")
+      auto_scaler.stop()
diff --git a/tests/custom_cluster/test_catalog_wait.py b/tests/custom_cluster/test_catalog_wait.py
index 502bef0..b60f4e2 100644
--- a/tests/custom_cluster/test_catalog_wait.py
+++ b/tests/custom_cluster/test_catalog_wait.py
@@ -47,7 +47,8 @@ class TestCatalogWait(CustomClusterTestSuite):
 
     # On startup, expect only two executors to be registered.
     self._start_impala_cluster(["--catalog_init_delays=0,0,200000"],
-                               expected_num_executors=2)
+                               expected_num_executors=2,
+                               expected_subscribers=4)
 
     # Expect that impalad[2] is not ready.
     self.cluster.impalads[2].service.wait_for_metric_value('impala-server.ready', 0);
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
index fdf3004..c6f92f9 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -291,5 +291,10 @@ class TestCoordinators(CustomClusterTestSuite):
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(cluster_size=1, num_exclusive_coordinators=1)
   def test_dedicated_coordinator_without_executors(self):
+    """This test verifies that a query gets queued and times out when no executors are
+    present."""
     result = self.execute_query_expect_failure(self.client, "select 2")
-    assert "No executors registered in group: default" in str(result)
+    expected_error = "Query aborted:Admission for query exceeded timeout 60000ms in " \
+                     "pool default-pool. Queued reason: No healthy executor groups " \
+                     "found for pool default-pool."
+    assert expected_error in str(result)
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
new file mode 100644
index 0000000..5a5a4b1
--- /dev/null
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -0,0 +1,298 @@
+#!/usr/bin/env impala-python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.util.concurrent_workload import ConcurrentWorkload
+
+import logging
+import pytest
+from time import sleep
+
+LOG = logging.getLogger("test_auto_scaling")
+
+
+class TestExecutorGroups(CustomClusterTestSuite):
+  """This class contains tests that exercise the logic related to scaling clusters up and
+  down by adding and removing groups of executors. All tests start with a base cluster
+  containing a dedicated coordinator, catalog, and statestore. Tests will then start
+  executor groups and run queries to validate the behavior."""
+
+  def setup_method(self, method):
+    # Always start the base cluster with the coordinator in its own executor group.
+    existing_args = method.func_dict.get("impalad_args", "")
+    method.func_dict["impalad_args"] = "%s -executor_groups=coordinator" % existing_args
+    method.func_dict["cluster_size"] = 1
+    method.func_dict["num_exclusive_coordinators"] = 1
+    self.num_groups = 1
+    self.num_executors = 1
+    super(TestExecutorGroups, self).setup_method(method)
+
+  def _group_name(self, name):
+    # By convention, group names must start with their associated resource pool name
+    # followed by a "-". Tests in this class all use the default resource pool.
+    return "default-pool-%s" % name
+
+  def _add_executor_group(self, name_suffix, min_size, num_executors=0,
+                          max_concurrent_queries=0):
+    """Adds an executor group to the cluster. 'min_size' specifies the minimum size for
+    the new group to be considered healthy. 'num_executors' specifies the number of
+    executors to start and defaults to 'min_size' but can be different from 'min_size' to
+    start an unhealthy group. 'max_concurrent_queries' can be used to override the default
+    (num cores). If 'name_suffix' is empty, no executor group is specified for the new
+    backends and they will end up in the default group."""
+    self.num_groups += 1
+    if num_executors == 0:
+      num_executors = min_size
+    self.num_executors += num_executors
+    name = self._group_name(name_suffix)
+    LOG.info("Adding %s executors to group %s with minimum size %s" %
+             (num_executors, name, min_size))
+    cluster_args = ["--impalad_args=-max_concurrent_queries=%s" % max_concurrent_queries]
+    if len(name_suffix) > 0:
+      cluster_args.append("--impalad_args=-executor_groups=%s:%s" % (name, min_size))
+    self._start_impala_cluster(options=cluster_args,
+                               cluster_size=num_executors,
+                               num_coordinators=0,
+                               add_executors=True,
+                               expected_num_executors=self.num_executors)
+
+  def _get_total_admitted_queries(self):
+    """Returns the total number of queries that have been admitted to the default resource
+    pool."""
+    return self.impalad_test_service.get_total_admitted_queries("default-pool")
+
+  def _get_num_running_queries(self):
+    """Returns the number of queries that are currently running in the default resource
+    pool."""
+    return self.impalad_test_service.get_num_running_queries("default-pool")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="-queue_wait_timeout_ms=2000")
+  def test_no_group_timeout(self):
+    """Tests that a query submitted to a coordinator with no executor group times out."""
+    result = self.execute_query_expect_failure(self.client, "select sleep(2)")
+    assert "Admission for query exceeded timeout" in str(result)
+
+  @pytest.mark.execute_serially
+  def test_single_group(self):
+    """Tests that we can start a single executor group and run a simple query."""
+    QUERY = "select count(*) from functional.alltypestiny"
+    self._add_executor_group("group1", 2)
+    self.execute_query_expect_success(self.client, QUERY)
+
+  @pytest.mark.execute_serially
+  def test_executor_group_starts_while_qeueud(self):
+    """Tests that a query can stay in the queue of an empty cluster until an executor
+    group comes online."""
+    QUERY = "select count(*) from functional.alltypestiny"
+    client = self.client
+    handle = client.execute_async(QUERY)
+    profile = client.get_runtime_profile(handle)
+    assert "No healthy executor groups found for pool" in profile
+    self._add_executor_group("group1", 2)
+    client.wait_for_finished_timeout(handle, 20)
+
+  @pytest.mark.execute_serially
+  def test_executor_group_health(self):
+    """Tests that an unhealthy executor group will not run queries."""
+    QUERY = "select count(*) from functional.alltypestiny"
+    # Start cluster and group
+    self._add_executor_group("group1", 2)
+    client = self.client
+    # Run query to validate
+    self.execute_query_expect_success(client, QUERY)
+    # Kill an executor
+    coordinator = self.cluster.impalads[0]
+    executor = self.cluster.impalads[1]
+    executor.kill()
+    coordinator.service.wait_for_num_known_live_backends(2)
+    # Run query and observe timeout
+    handle = client.execute_async(QUERY)
+    profile = client.get_runtime_profile(handle)
+    assert "No healthy executor groups found for pool" in profile, profile
+    # Restart executor
+    executor.start()
+    # Query should now finish
+    client.wait_for_finished_timeout(handle, 20)
+    # Run query and observe success
+    self.execute_query_expect_success(client, QUERY)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="-default_pool_max_requests=1")
+  def test_executor_group_shutdown(self):
+    """Tests that an executor group can shutdown and a query in the queue can still run
+    successfully when the group gets restored."""
+    self._add_executor_group("group1", 2)
+    client = self.client
+    q1 = client.execute_async("select sleep(5000)")
+    q2 = client.execute_async("select sleep(3)")
+    # Verify that q2 is queued up behind q1
+    profile = client.get_runtime_profile(q2)
+    assert "Initial admission queue reason: number of running queries" in profile, profile
+    # Kill an executor
+    coordinator = self.cluster.impalads[0]
+    executor = self.cluster.impalads[1]
+    executor.kill()
+    coordinator.service.wait_for_num_known_live_backends(2)
+    # Wait for q1 to finish (sleep runs on the coordinator)
+    client.wait_for_finished_timeout(q1, 20)
+    # Check that q2 still hasn't run
+    profile = client.get_runtime_profile(q2)
+    assert "Admission result: Queued" in profile, profile
+    # Restore executor group health
+    executor.start()
+    # Query should now finish
+    client.wait_for_finished_timeout(q2, 20)
+
+  @pytest.mark.execute_serially
+  def test_max_concurrent_queries(self):
+    """Tests that the max_concurrent_queries flag works as expected."""
+    self._add_executor_group("group1", 2, max_concurrent_queries=1)
+    # Query that runs on every executor
+    QUERY = "select * from functional_parquet.alltypestiny \
+             where month < 3 and id + random() < sleep(500);"
+    client = self.client
+    q1 = client.execute_async(QUERY)
+    client.wait_for_admission_control(q1)
+    q2 = client.execute_async(QUERY)
+    profile = client.get_runtime_profile(q2)
+    assert "Initial admission queue reason: No query slot available on host" in profile
+    client.cancel(q1)
+    client.cancel(q2)
+
+  @pytest.mark.execute_serially
+  def test_multiple_executor_groups(self):
+    """Tests that two queries can run on two separate executor groups simultaneously."""
+    # Query that runs on every executor
+    QUERY = "select * from functional_parquet.alltypestiny \
+             where month < 3 and id + random() < sleep(500);"
+    self._add_executor_group("group1", 2, max_concurrent_queries=1)
+    self._add_executor_group("group2", 2, max_concurrent_queries=1)
+    client = self.client
+    q1 = client.execute_async(QUERY)
+    client.wait_for_admission_control(q1)
+    q2 = client.execute_async(QUERY)
+    client.wait_for_admission_control(q2)
+    profiles = [client.get_runtime_profile(q) for q in (q1, q2)]
+    assert not any("Initial admission queue reason" in p for p in profiles), profiles
+    client.cancel(q1)
+    client.cancel(q2)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="-max_concurrent_queries=1")
+  def test_coordinator_concurrency(self):
+    """Tests that the command line flag to limit the coordinator concurrency works as
+    expected."""
+    QUERY = "select sleep(1000)"
+    # Add group with more slots than coordinator
+    self._add_executor_group("group2", 2, max_concurrent_queries=3)
+    # Try to run two queries and observe that one gets queued
+    client = self.client
+    q1 = client.execute_async(QUERY)
+    client.wait_for_admission_control(q1)
+    q2 = client.execute_async(QUERY)
+    profile = client.get_runtime_profile(q2)
+    assert "Initial admission queue reason" in profile
+    client.cancel(q1)
+    client.cancel(q2)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="-max_concurrent_queries=16")
+  def test_executor_concurrency(self):
+    """Tests that the command line flag to limit query concurrency on executors works as
+    expected."""
+    # Query that runs on every executor
+    QUERY = "select * from functional_parquet.alltypestiny \
+             where month < 3 and id + random() < sleep(500);"
+    self._add_executor_group("group1", 2, max_concurrent_queries=3)
+
+    workload = None
+    try:
+      workload = ConcurrentWorkload(QUERY, num_streams=5)
+      LOG.info("Starting workload")
+      workload.start()
+
+      RAMP_UP_TIMEOUT_S = 60
+      # Wait until we admitted at least 10 queries
+      assert any(self._get_total_admitted_queries() >= 10 or sleep(1)
+                  for _ in range(RAMP_UP_TIMEOUT_S)), \
+          "Did not admit enough queries within %s s" % RAMP_UP_TIMEOUT_S
+
+      # Sample the number of running queries for while
+      NUM_RUNNING_SAMPLES = 30
+      num_running = []
+      for _ in xrange(NUM_RUNNING_SAMPLES):
+        num_running.append(self._get_num_running_queries())
+        sleep(1)
+
+      # Must reach 3 but not exceed it
+      assert max(num_running) == 3, \
+          "Unexpected number of running queries: %s" % num_running
+
+    finally:
+      LOG.info("Stopping workload")
+      if workload:
+        workload.stop()
+
+  @pytest.mark.execute_serially
+  def test_sequential_startup_wait(self):
+    """Tests that starting an executor group sequentially works as expected, i.e. queries
+    don't fail and no queries are admitted until the group is in a healthy state."""
+    QUERY = "select sleep(4)"
+    # Start first executor
+    self._add_executor_group("group1", 3, num_executors=1)
+    # Run query and observe that it gets queued
+    client = self.client
+    handle = client.execute_async(QUERY)
+    profile = client.get_runtime_profile(handle)
+    assert "Initial admission queue reason: No healthy executor groups found for pool" \
+        in profile
+    initial_state = client.get_state(handle)
+    # Start another executor and observe that the query stays queued
+    self._add_executor_group("group1", 3, num_executors=1)
+    profile = client.get_runtime_profile(handle)
+    assert client.get_state(handle) == initial_state
+    # Start the remaining executor and observe that the query finishes
+    self._add_executor_group("group1", 3, num_executors=1)
+    client.wait_for_finished_timeout(handle, 20)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="-queue_wait_timeout_ms=2000")
+  def test_empty_default_group(self):
+    """Tests that an empty default group is correctly marked as non-healthy and excluded
+    from scheduling."""
+    # Start default executor group
+    self._add_executor_group("", min_size=2, num_executors=2,
+                             max_concurrent_queries=3)
+    # Run query to make sure things work
+    QUERY = "select count(*) from functional.alltypestiny"
+    self.execute_query_expect_success(self.client, QUERY)
+    # Kill executors to make group empty
+    impalads = self.cluster.impalads
+    impalads[1].kill()
+    impalads[2].kill()
+    impalads[0].service.wait_for_num_known_live_backends(1)
+    # Run query to make sure it times out
+    result = self.execute_query_expect_failure(self.client, QUERY)
+    print str(result)
+    expected_error = "Query aborted:Admission for query exceeded timeout 2000ms in " \
+                     "pool default-pool. Queued reason: No healthy executor groups " \
+                     "found for pool default-pool."
+    assert expected_error in str(result)
+
diff --git a/tests/custom_cluster/test_restart_services.py b/tests/custom_cluster/test_restart_services.py
index 62fbf93..781ec5f 100644
--- a/tests/custom_cluster/test_restart_services.py
+++ b/tests/custom_cluster/test_restart_services.py
@@ -297,8 +297,8 @@ class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite):
     before_shutdown_handle = self.__exec_and_wait_until_running(QUERY)
 
     # Run this query which simulates getting stuck in admission control until after
-    # the shutdown grace period expires. This exercises the code path where the
-    # coordinator terminates the query before it has started up.
+    # the shutdown grace period expires. This demonstrates that queries don't get
+    # cancelled if the cluster membership changes while they're waiting for admission.
     before_shutdown_admission_handle = self.execute_query_async(QUERY,
         {'debug_action': 'CRS_BEFORE_ADMISSION:SLEEP@30000'})
 
@@ -332,8 +332,9 @@ class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite):
     assert self.__fetch_and_get_num_backends(
         QUERY, before_shutdown_handle, delay_s=fetch_delay_s) == 3
 
-    # Confirm that the query stuck in admission failed.
-    self.__check_deadline_expired(QUERY, before_shutdown_admission_handle)
+    # Confirm that the query stuck in admission succeeded.
+    assert self.__fetch_and_get_num_backends(
+        QUERY, before_shutdown_admission_handle, timeout_s=30) == 2
 
     # Start the impalad back up and run another query, which should be scheduled on it
     # again.
@@ -475,11 +476,11 @@ class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite):
                 self.client.QUERY_STATES['RUNNING'], timeout=20)
     return handle
 
-  def __fetch_and_get_num_backends(self, query, handle, delay_s=0):
+  def __fetch_and_get_num_backends(self, query, handle, delay_s=0, timeout_s=20):
     """Fetch the results of 'query' from the beeswax handle 'handle', close the
     query and return the number of backends obtained from the profile."""
     self.impalad_test_service.wait_for_query_state(self.client, handle,
-                self.client.QUERY_STATES['FINISHED'], timeout=20)
+                self.client.QUERY_STATES['FINISHED'], timeout=timeout_s)
     if delay_s > 0:
       LOG.info("sleeping for {0}s".format(delay_s))
       time.sleep(delay_s)
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index 3d12033..2012bab 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -627,3 +627,10 @@ class TestObservability(ImpalaTestSuite):
     query = "select count (*) from functional.alltypes"
     runtime_profile = self.execute_query(query).runtime_profile
     self.__verify_profile_event_sequence(event_regexes, runtime_profile)
+
+  def test_query_profile_contains_executor_group(self):
+    """Test that the profile contains an info string with the executor group that was
+    picked by admission control."""
+    query = "select count (*) from functional.alltypes"
+    runtime_profile = self.execute_query(query).runtime_profile
+    assert "Executor Group:" in runtime_profile
diff --git a/tests/util/auto_scaler.py b/tests/util/auto_scaler.py
new file mode 100755
index 0000000..e3b851e
--- /dev/null
+++ b/tests/util/auto_scaler.py
@@ -0,0 +1,339 @@
+#!/usr/bin/env impala-python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import time
+import logging
+import os
+import pipes
+from subprocess import check_call
+from tests.common.impala_cluster import ImpalaCluster
+from threading import Event, Thread
+
+IMPALA_HOME = os.environ["IMPALA_HOME"]
+
+
+class AutoScaler(object):
+  """This class implements a simple autoscaling algorithm: if queries queue up for a
+  configurable duration, a new executor group is started. Likewise, if the number of
+  concurrently running queries indicated that an executor group can be removed, such
+  measure is taken.
+
+  Users of this class can start an auto scaler by calling start() and must call stop()
+  before exiting (see main() below for an example).
+
+  This class only uses the default admission control pool.
+  """
+  DEFAULT_POOL_NAME = "default-pool"
+
+  def __init__(self, executor_slots, group_size, start_batch_size=0, max_groups=0,
+               wait_up_s=0, wait_down_s=0):
+    # Number of queries that can run concurrently on each executor
+    self.executor_slots = executor_slots
+    self.coordinator_slots = 128
+    # Number of executors per executor group
+    self.group_size = group_size
+    # New executor groups will be started in increments of this size
+    self.start_batch_size = group_size
+    if start_batch_size > 0:
+      self.start_batch_size = start_batch_size
+    # Maximum number of executor groups. We only have 10 TCP ports free on our
+    # miniclusters and we need one for the dedicated coordinator.
+    self.max_groups = 9 / self.group_size
+    # max_groups can further bound the maximum number of groups we are going to start,
+    # but we won't start more than possible.
+    if max_groups > 0 and max_groups < self.max_groups:
+      self.max_groups = max_groups
+    # Number of seconds to wait before scaling up/down
+    self.scale_wait_up_s = 5
+    if wait_up_s > 0:
+      self.scale_wait_up_s = wait_up_s
+    self.scale_wait_down_s = 5
+    if wait_down_s > 0:
+      self.scale_wait_down_s = wait_down_s
+    self.groups = []
+    self.num_groups = 0
+    # Stopwatches to track how long the conditions for scaling up/down have been met.
+    self.scale_up_sw = time.time()
+    self.scale_down_sw = time.time()
+
+    self.loop_thread = None
+    # Event to signal that the control loop should exit
+    self.stop_ev = Event()
+
+  def get_cluster(self):
+    return ImpalaCluster.get_e2e_test_cluster()
+
+  def get_coordinator(self):
+    cluster = self.get_cluster()
+    assert len(cluster.impalads) > 0
+    return cluster.get_first_impalad()
+
+  def get_service(self):
+    return self.get_coordinator().service
+
+  def get_client(self):
+    return self.get_coordinator().service.create_hs2_client()
+
+  def group_name(self, idx):
+    # By convention, group names must start with their associated resource pool name
+    # followed by a "-".
+    return "%s-group-%s" % (self.DEFAULT_POOL_NAME, idx)
+
+  def start_base_cluster(self):
+    """Starts the base cluster consisting of an exclusive coordinator, catalog, and
+    statestore. Does not add any executors."""
+    logging.info("Starting base cluster (coordinator, catalog, statestore)")
+    cluster_args = ["--impalad_args=-executor_groups=coordinator"]
+    self._start_impala_cluster(cluster_args, cluster_size=1,
+                               executor_slots=self.coordinator_slots,
+                               expected_num_executors=0, add_executors=False)
+    logging.info("Done, number of running executor groups: %s" % self.num_groups)
+
+  def start_group(self):
+    """Starts an executor group. The name of the group is automatically determined based
+    on the current number of total executor groups. Executors in the group will be started
+    in batches."""
+    self.num_groups += 1
+    name = self.group_name(self.num_groups)
+    desc = "%s:%s" % (name, self.group_size)
+    logging.info("Starting executor group %s with %s members" % (name, self.group_size))
+    cluster_args = ["--impalad_args=-executor_groups=%s" % desc]
+    batch_size = self.start_batch_size
+    num_started = 0
+    num_expected = (self.num_groups - 1) * self.group_size
+    while (num_started < self.group_size):
+      to_start = min(batch_size, self.group_size - num_started)
+      num_expected += to_start
+      if to_start == 1:
+        start_msg = "Starting executor %s" % (num_started + 1)
+      else:
+        start_msg = "Starting executors %s-%s" % (num_started + 1,
+                                                  num_started + to_start)
+      logging.info(start_msg)
+      self._start_impala_cluster(cluster_args, cluster_size=to_start,
+                                 executor_slots=self.executor_slots,
+                                 expected_num_executors=num_expected, add_executors=True)
+      num_started += to_start
+    logging.info("Done, number of running executor groups: %s" % self.num_groups)
+
+  def stop_group(self):
+    """Stops the executor group that was added last."""
+    name = self.group_name(self.num_groups)
+    group_hosts = self.get_groups()[name]
+    logging.info("Stopping executor group %s" % name)
+    for host in group_hosts:
+      logging.debug("Stopping host %s" % host)
+      query = ":shutdown('%s');" % host
+      self.execute(query)
+    self.wait_for_group_gone(name)
+    self.num_groups -= 1
+    logging.info("Done, number of running executor groups: %s" % self.num_groups)
+
+  def wait_for_group_gone(self, group_name, timeout=120):
+    """Waits until all executors in group 'group_name' have unregistered themselves from
+    the coordinator's cluster membership view."""
+    end = time.time() + timeout
+    while time.time() < end:
+      groups = self.get_groups()
+      if group_name not in groups:
+        return
+      time.sleep(0.5)
+    assert False, "Timeout waiting for group %s to shut down" % group_name
+
+  def get_groups(self):
+    return self.get_service().get_executor_groups()
+
+  def execute(self, query):
+    return self.get_client().execute(query)
+
+  def get_num_queued_queries(self):
+    """Returns the number of queries currently queued in the default pool on the
+    coordinator."""
+    return self.get_service().get_num_queued_queries(pool_name=self.DEFAULT_POOL_NAME)
+
+  def get_num_running_queries(self):
+    """Returns the number of queries currently queued in the default pool on the
+    coordinator."""
+    return self.get_service().get_num_running_queries(self.DEFAULT_POOL_NAME)
+
+  def loop(self):
+    """Controls whether new executor groups need to be started or existing ones need to be
+    stopped, based on the number of queries that are currently queued and running.
+    """
+    while not self.stop_ev.is_set():
+      now = time.time()
+      num_queued = self.get_num_queued_queries()
+      num_running = self.get_num_running_queries()
+      capacity = self.executor_slots * self.num_groups
+
+      logging.debug("queued: %s, running: %s, capacity: %s" % (num_queued, num_running,
+                                                               capacity))
+
+      if num_queued == 0:
+        self.scale_up_sw = now
+
+      scale_up = self.scale_up_sw < now - self.scale_wait_up_s
+      if scale_up and self.num_groups < self.max_groups:
+        self.start_group()
+        self.scale_up_sw = time.time()
+        self.scale_down_sw = self.scale_up_sw
+        continue
+
+      surplus = capacity - num_running
+      if surplus < self.executor_slots:
+        self.scale_down_sw = now
+
+      if self.scale_down_sw < now - self.scale_wait_down_s:
+        self.stop_group()
+        self.scale_up_sw = time.time()
+        self.scale_down_sw = self.scale_up_sw
+        continue
+
+      time.sleep(1)
+
+  def start(self):
+    """Starts a base cluster with coordinator and statestore and the control loop to start
+    and stop additional executor groups."""
+    self.start_base_cluster()
+    assert self.loop_thread is None
+    self.loop_thread = Thread(target=self.loop)
+    self.loop_thread.start()
+
+  def stop(self):
+    """Stops the AutoScaler and its cluster."""
+    if self.stop_ev.is_set():
+      return
+    self.stop_ev.set()
+    if self.loop_thread:
+      self.loop_thread.join()
+      self.loop_thread = None
+    self._kill_whole_cluster()
+
+  def _start_impala_cluster(self, options, cluster_size, executor_slots,
+                            expected_num_executors, add_executors):
+    """Starts an Impala cluster and waits for all impalads to come online.
+
+    If 'add_executors' is True, new executors will be added to the cluster and the
+    existing daemons will not be restarted. In that case 'cluster_size' must specify the
+    number of nodes that will be added and 'expected_num_executors' must be the total
+    expected number of executors after the additional ones have started.
+
+    If 'add_executors' is false, 'cluster_size' must be 1 and a single exclusive
+    coordinator will be started (together with catalog and statestore).
+    """
+    assert cluster_size > 0, "cluster_size cannot be 0"
+    impala_log_dir = os.getenv("LOG_DIR", "/tmp/")
+    cmd = [os.path.join(IMPALA_HOME, "bin/start-impala-cluster.py"),
+           "--cluster_size=%d" % cluster_size,
+           "--log_dir=%s" % impala_log_dir,
+           "--log_level=1"]
+    if add_executors:
+      cmd.append("--add_executors")
+    else:
+      assert expected_num_executors == 0
+      assert cluster_size == 1
+      cmd.append("--use_exclusive_coordinators")
+
+    impalad_args = [
+        "-vmodule=admission-controller=3,cluster-membership-mgr=3",
+        "-max_concurrent_queries=%s" % executor_slots,
+        "-shutdown_grace_period_s=2"]
+
+    options += ["--impalad_args=%s" % a for a in impalad_args]
+
+    logging.debug("Starting cluster with command: %s" %
+                 " ".join(pipes.quote(arg) for arg in cmd + options))
+    log_debug = logging.getLogger().getEffectiveLevel() == logging.DEBUG
+    log_file = None
+    if not log_debug:
+      log_file = open("/dev/null", "w")
+
+    check_call(cmd + options, close_fds=True, stdout=log_file, stderr=log_file)
+
+    # The number of statestore subscribers is
+    # cluster_size (# of impalad) + 1 (for catalogd).
+    if expected_num_executors > 0:
+      expected_subscribers = expected_num_executors + 2
+      expected_backends = expected_num_executors + 1
+    else:
+      expected_subscribers = cluster_size + 1
+      expected_backends = 1
+
+    cluster = self.get_cluster()
+    statestored = cluster.statestored
+    if statestored is None:
+      raise Exception("statestored was not found")
+
+    logging.debug("Waiting for %s subscribers to come online" % expected_subscribers)
+    statestored.service.wait_for_live_subscribers(expected_subscribers, timeout=60)
+    for impalad in cluster.impalads:
+      logging.debug("Waiting for %s executors to come online" % expected_backends)
+      impalad.service.wait_for_num_known_live_backends(expected_backends, timeout=60)
+
+  def _kill_whole_cluster(self):
+    """Terminates the whole cluster, i.e. all impalads, catalogd, and statestored."""
+    logging.info("terminating cluster")
+    check_call([os.path.join(IMPALA_HOME, "bin/start-impala-cluster.py"), "--kill_only"])
+
+
+def main():
+  parser = argparse.ArgumentParser()
+  parser.add_argument("-n", "--executor_slots", help="Concurrent queries per executor "
+                      "group", type=int, default=3)
+  parser.add_argument("-g", "--group_size", help="Number of executors per group",
+                      type=int, default=2)
+  parser.add_argument("-b", "--batch_size", help="Start executors of a group "
+                      "in batches instead of all at once", type=int, default=0)
+  parser.add_argument("-m", "--max_groups", help="Maximum number of groups to start",
+                      type=int, default=0)
+  parser.add_argument("-d", "--wait_down", help="Time to wait before scaling down (s)",
+                      type=int, default=5)
+  parser.add_argument("-u", "--wait_up", help="Time to wait before scaling up (s)",
+                      type=int, default=5)
+  parser.add_argument("-v", "--verbose", help="Verbose logging", action="store_true")
+  args = parser.parse_args()
+
+  # Restrict some logging for command line usage
+  logging.getLogger("impala_cluster").setLevel(logging.INFO)
+  logging.getLogger("requests").setLevel(logging.WARNING)
+  if args.verbose:
+    logging.basicConfig(level=logging.DEBUG)
+    logging.getLogger("impala.hiveserver2").setLevel(logging.INFO)
+  else:
+    logging.basicConfig(level=logging.INFO)
+    # Also restrict other modules' debug output
+    logging.getLogger("impala_connection").setLevel(logging.WARNING)
+    logging.getLogger("impala_service").setLevel(logging.WARNING)
+    logging.getLogger("impala.hiveserver2").setLevel(logging.WARNING)
+
+  a = AutoScaler(executor_slots=args.executor_slots, group_size=args.group_size,
+                 start_batch_size=args.batch_size, max_groups=args.max_groups,
+                 wait_up_s=args.wait_up, wait_down_s=args.wait_down)
+  a.start()
+  try:
+    while True:
+      time.sleep(1)
+  except KeyboardInterrupt:
+    logging.debug("Caught KeyboardInterrupt, stopping autoscaler")
+    a.stop()
+
+
+if __name__ == "__main__":
+  main()
diff --git a/tests/util/concurrent_workload.py b/tests/util/concurrent_workload.py
new file mode 100755
index 0000000..b982806
--- /dev/null
+++ b/tests/util/concurrent_workload.py
@@ -0,0 +1,169 @@
+#!/usr/bin/env impala-python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This class can be used to drive a concurrent workload against a local minicluster
+
+import argparse
+import logging
+# Needed to work around datetime threading bug:
+# https://stackoverflow.com/questions/32245560/module-object-has-no-attribute-strptime-with-several-threads-python
+import _strptime  # noqa: F401
+import sys
+import time
+from Queue import Queue
+from threading import current_thread, Event, Thread
+
+from tests.common.impala_cluster import ImpalaCluster
+
+
+class ConcurrentWorkload(object):
+  """This class can be used to drive concurrent streams of queries against a cluster. It
+  is useful when trying to carefully control the number of queries running on a cluster
+  concurrently. The queries typically involve some sleep statement to allow for larger
+  numbers of concurrently running queries.
+
+  This class should not be used for performance benchmarks, e.g. to evaluate query
+  throughput.
+
+  Users of this class need to make sure to call start() and stop(). Optionally, the class
+  supports printing the current throughput rate. The class also requires that the first
+  node in the cluster is a dedicated coordinator and it must already be running when
+  calling start().
+  """
+  def __init__(self, query, num_streams):
+    self.query = query
+    self.num_streams = num_streams
+    self.stop_ev = Event()
+    self.output_q = Queue()
+    self.threads = []
+    self.query_rate = 0
+    self.query_rate_thread = Thread(target=self.compute_query_rate,
+                                    args=(self.output_q, self.stop_ev))
+
+  def execute(self, query):
+    """Executes a query on the coordinator of the local minicluster."""
+    cluster = ImpalaCluster.get_e2e_test_cluster()
+    if len(cluster.impalads) == 0:
+      raise Exception("Coordinator not running")
+    client = cluster.get_first_impalad().service.create_hs2_client()
+    return client.execute(query)
+
+  def loop_query(self, query, output_q, stop_ev):
+    """Executes 'query' in a loop while 'stop_ev' is not set and inserts the result into
+    'output_q'."""
+    while not stop_ev.is_set():
+      try:
+        output_q.put(self.execute(query))
+      except Exception:
+        if not stop_ev.is_set():
+          stop_ev.set()
+          logging.exception("Caught error, stopping")
+    logging.info("%s exiting" % current_thread().name)
+
+  def compute_query_rate(self, queue, stop_ev):
+    """Computes the query throughput rate in queries per second averaged over the last 5
+    seconds. This method only returns when 'stop_ev' is set by the caller."""
+    AVG_WINDOW_S = 5
+    times = []
+    while not stop_ev.is_set():
+      # Don't block to check for stop_ev
+      if queue.empty():
+        time.sleep(0.1)
+        continue
+      queue.get()
+      now = time.time()
+      times.append(now)
+      # Keep only timestamps within the averaging window
+      start = now - AVG_WINDOW_S
+      times = [t for t in times if t >= start]
+      self.query_rate = float(len(times)) / AVG_WINDOW_S
+
+  def get_query_rate(self):
+    """Returns the query rate as computed by compute_query_rate. This is thread-safe
+    because assignments in Python are atomic."""
+    return self.query_rate
+
+  def start(self):
+    """Starts worker threads to execute queries."""
+    # Start workers
+    for i in xrange(self.num_streams):
+      t = Thread(target=self.loop_query, args=(self.query, self.output_q, self.stop_ev))
+      self.threads.append(t)
+      t.start()
+    self.query_rate_thread.start()
+
+  def print_query_rate(self):
+    """Prints the current query throughput until user presses ctrl-c."""
+    try:
+      self._print_query_rate(self.output_q, self.stop_ev)
+    except KeyboardInterrupt:
+      self.stop()
+    assert self.stop_ev.is_set(), "Stop event expected to be set but it isn't"
+
+  def _print_query_rate(self, queue, stop_ev):
+    """Prints the query throughput rate until 'stop_ev' is set by the caller."""
+    PERIOD_S = 1
+
+    print_time = time.time()
+    while not stop_ev.is_set():
+      sys.stdout.write("\rQuery rate %.2f/s" % self.query_rate)
+      sys.stdout.flush()
+      print_time += PERIOD_S
+      time.sleep(print_time - time.time())
+    sys.stdout.write("\n")
+
+  def stop(self):
+    """Stops all worker threads and waits for them to finish."""
+    if self.stop_ev is None or self.stop_ev.is_set():
+      return
+    self.stop_ev.set()
+    # Wait for all workers to exit
+    for t in self.threads:
+      logging.info("Waiting for %s" % t.name)
+      t.join()
+    self.threads = []
+    if self.query_rate_thread:
+      self.query_rate_thread.join()
+      self.query_rate = None
+
+
+def main():
+  parser = argparse.ArgumentParser()
+  parser.add_argument("-q", "--query", help="Run this query",
+                      default="select * from functional_parquet.alltypestiny "
+                              "where month < 3 and id + random() < sleep(500);")
+  parser.add_argument("-n", "--num_streams", help="Run this many in parallel", type=int,
+                      default=5)
+  args = parser.parse_args()
+
+  # Restrict logging so it doesn't interfere with print_query_rate()
+  logging.basicConfig(level=logging.INFO)
+  # Also restrict other modules' debug output
+  logging.getLogger("impala_cluster").setLevel(logging.INFO)
+  logging.getLogger("impala_connection").setLevel(logging.WARNING)
+  logging.getLogger("impala.hiveserver2").setLevel(logging.CRITICAL)
+
+  s = ConcurrentWorkload(args.query, args.num_streams)
+
+  s.start()
+  s.print_query_rate()
+
+
+if __name__ == "__main__":
+  main()
diff --git a/www/backends.tmpl b/www/backends.tmpl
index d29f4a1..aa164b7 100644
--- a/www/backends.tmpl
+++ b/www/backends.tmpl
@@ -31,6 +31,9 @@ under the License.
       <th>Memory Limit for Admission</th>
       <th>Memory Reserved</th>
       <th>Memory Admitted by Queries Submitted to this Coordinator</th>
+      <th>Num. Queries Limit for Admission</th>
+      <th>Num. Queries Admitted by this Coordinator</th>
+      <th>Executor Groups</th>
     </tr>
   </thead>
   <tbody>
@@ -44,6 +47,9 @@ under the License.
       <td>{{admit_mem_limit}}</td>
       <td>{{mem_reserved}}</td>
       <td>{{mem_admitted}}</td>
+      <td>{{admit_num_queries_limit}}</td>
+      <td>{{num_admitted}}</td>
+      <td>{{executor_groups}}</td>
     </tr>
     {{/backends}}
   </tbody>