You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2019/09/07 23:27:14 UTC

[impala] 01/02: IMPALA-8803: Coordinator should release admitted memory per-backend

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit df365a8e306adc9113a000e427fa86b385754804
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Fri Jul 26 11:52:01 2019 -0700

    IMPALA-8803: Coordinator should release admitted memory per-backend
    
    Changes the Coordinator to release admitted memory when each Backend
    completes, rather than waiting for the entire query to complete before
    releasing admitted memory. When the Coordinator detects that a Backend
    has completed (via ControlService::ReportExecStatus) it updates the
    state of the Backend in Coordinator::BackendResourceState.
    BackendResourceState tracks the state of the admitted resources for
    each Backend and decides when the resources for a group of Backend
    states should be released. BackendResourceState defines a state machine
    to help coordinate the state of the admitted memory for each Backend.
    It guarantees that by the time the query is shutdown, all Backends
    release their admitted memory.
    
    BackendResourceState implements three rules to control the rate at
    which the Coordinator releases admitted memory from the
    AdmissionController:
    * Resources are released at most once every 1 second, this prevents
    short lived queries from causing high load on the admission controller
    * Resources are released at most O(log(num_backends)) times; the
    BackendResourceStates can release multiple BackendStates from the
    AdmissionController at a time
    * All pending resources are released if the only remaining Backend is
    the Coordinator Backend; this is useful for result spooling where all
    Backends may complete, except for the Coordinator Backend
    
    Exposes the following hidden startup flags to help tune the heuristics
    above:
    
    --batched_release_decay_factor
    * Defaults to 2
    * Controls the base value for the O(log(num_backends)) bound when
    batching the release of Backends.
    
    --release_backend_states_delay_ms
    * Defaults to 1000 milliseconds
    * Controls how often Backends can release their resources.
    
    Testing:
    * Ran core tests
    * Added new tests to test_result_spooling.py and
    test_admission_controller.py
    
    Change-Id: I88bb11e0ede7574568020e0277dd8ac8d2586dc9
    Reviewed-on: http://gerrit.cloudera.org:8080/14104
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/CMakeLists.txt                      |   3 +
 .../runtime/coordinator-backend-resource-state.cc  | 145 +++++++++++
 be/src/runtime/coordinator-backend-state-test.cc   | 286 +++++++++++++++++++++
 be/src/runtime/coordinator-backend-state.cc        |  43 ++--
 be/src/runtime/coordinator-backend-state.h         | 154 ++++++++++-
 be/src/runtime/coordinator.cc                      |  57 +++-
 be/src/runtime/coordinator.h                       |  35 ++-
 be/src/scheduling/admission-controller-test.cc     |  10 +-
 be/src/scheduling/admission-controller.cc          | 125 +++++++--
 be/src/scheduling/admission-controller.h           |  58 ++++-
 tests/custom_cluster/test_admission_controller.py  |  77 ++++--
 tests/custom_cluster/test_auto_scaling.py          |   2 +-
 tests/custom_cluster/test_executor_groups.py       |   2 +-
 tests/custom_cluster/test_result_spooling.py       |  85 ++++++
 tests/util/auto_scaler.py                          |   4 +-
 tests/util/web_pages_util.py                       |  52 ++++
 16 files changed, 1044 insertions(+), 94 deletions(-)

diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 0c8b11c..cf7a3bd 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -34,6 +34,7 @@ add_library(Runtime
   collection-value.cc
   coordinator.cc
   coordinator-backend-state.cc
+  coordinator-backend-resource-state.cc
   datetime-parse-util.cc
   date-parse-util.cc
   date-value.cc
@@ -84,6 +85,7 @@ add_library(Runtime
 add_dependencies(Runtime gen-deps)
 
 add_library(RuntimeTests STATIC
+  coordinator-backend-state-test.cc
   date-test.cc
   decimal-test.cc
   free-pool-test.cc
@@ -103,6 +105,7 @@ add_library(RuntimeTests STATIC
 )
 add_dependencies(RuntimeTests gen-deps)
 
+ADD_UNIFIED_BE_LSAN_TEST(coordinator-backend-state-test CoordinatorBackendStateTest.*)
 ADD_UNIFIED_BE_LSAN_TEST(mem-pool-test MemPoolTest.*)
 ADD_UNIFIED_BE_LSAN_TEST(free-pool-test FreePoolTest.*)
 ADD_UNIFIED_BE_LSAN_TEST(string-buffer-test StringBufferTest.*)
diff --git a/be/src/runtime/coordinator-backend-resource-state.cc b/be/src/runtime/coordinator-backend-resource-state.cc
new file mode 100644
index 0000000..b721199
--- /dev/null
+++ b/be/src/runtime/coordinator-backend-resource-state.cc
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/coordinator-backend-state.h"
+
+#include "common/names.h"
+
+using namespace impala;
+
+DEFINE_uint32_hidden(batched_release_decay_factor, 2,
+    "The exponential decay factor for the 'Batched Release' of backends. The default of "
+    "2 ensures that the number of times resources are released is bounded by O(log2(n)). "
+    "Setting this to another value, such as 10, would bound the number of times "
+    "resources are released by O(log10(n)).");
+DEFINE_uint64_hidden(release_backend_states_delay_ms, 1000,
+    "The timeout for the 'Timed Release' of backends. If more than this many "
+    "milliseconds has passed since the last time any Backends were released, then "
+    "release all pending Backends. Set to 1000 milliseconds by default.");
+
+DECLARE_uint32(batched_release_decay_factor);
+DECLARE_uint64(release_backend_states_delay_ms);
+
+Coordinator::BackendResourceState::BackendResourceState(
+    const vector<BackendState*>& backend_states, const QuerySchedule& schedule)
+  : num_in_use_(backend_states.size()),
+    backend_states_(backend_states),
+    num_backends_(backend_states.size()),
+    schedule_(schedule),
+    release_backend_states_delay_ns_(FLAGS_release_backend_states_delay_ms * 1000000),
+    batched_release_decay_value_(FLAGS_batched_release_decay_factor) {
+  DCHECK_GT(batched_release_decay_value_, 0)
+      << "Invalid value for --batched_release_decay_factor, must greater than 0";
+  // Populate the backend_resource_states_ map and mark all BackendStates as
+  // IN_USE.
+  for (auto backend_state : backend_states_) {
+    backend_resource_states_[backend_state] = ResourceState::IN_USE;
+  }
+  // Start the 'Timed Release' timer.
+  released_timer_.Start();
+}
+
+Coordinator::BackendResourceState::~BackendResourceState() {
+  // Assert that all BackendStates have been released. We use num_backends_ instead of
+  // backend_states_.size() so backend_states_ does not need to be alive when the
+  // destructor runs.
+  DCHECK(closed_);
+  DCHECK_EQ(num_released_, num_backends_);
+}
+
+void Coordinator::BackendResourceState::MarkBackendFinished(
+    BackendState* backend_state, vector<BackendState*>* releasable_backend_states) {
+  lock_guard<SpinLock> lock(lock_);
+  if (!closed_
+      && backend_resource_states_.at(backend_state) == ResourceState::IN_USE) {
+    // Transition the BackendState to PENDING and update any related counters.
+    backend_resource_states_.at(backend_state) = ResourceState::PENDING;
+    ++num_pending_;
+    --num_in_use_;
+
+    // If the coordinator backend has not been released, but all other have, then the only
+    // running Backend must be the coordinator. The Coordinator fragment should buffer
+    // enough rows to allow all other fragments to be released (if result spooling is
+    // enabled this is especially true, but even without spooling many queries have a
+    // coordinator fragment that buffers multiple RowBatches). If the client does not
+    // fetch all rows immediately, then the Coordinator Backend will be long lived
+    // compared to the rest of the Backends.
+    bool is_coordinator_the_last_unreleased_backend =
+        !released_coordinator_ && num_in_use_ == 1;
+
+    // True if the 'Timed Release' heuristic should be triggered.
+    bool release_backends_timeout_expired =
+        released_timer_.ElapsedTime() > release_backend_states_delay_ns_;
+
+    // True if the 'Batched Release' heuristic should be triggered.
+    bool unreleased_backend_threshold_reached = num_pending_
+        >= std::max(floor(num_backends_ / batched_release_decay_value_), 1.0);
+
+    // If no Backends are running or if only the Coordinator Backend is running or if both
+    // the 'Timed Release' and 'Batched Release' heuristic are true, then transition all
+    // PENDING BackendStates to RELEASABLE and update any state necessary for the
+    // heuristics.
+    if (is_coordinator_the_last_unreleased_backend
+        || (release_backends_timeout_expired && unreleased_backend_threshold_reached)) {
+      released_timer_.Reset();
+      batched_release_decay_value_ *= FLAGS_batched_release_decay_factor;
+      for (auto backend_resource_state : backend_resource_states_) {
+        if (backend_resource_state.second == ResourceState::PENDING) {
+          releasable_backend_states->push_back(backend_resource_state.first);
+          backend_resource_states_[backend_resource_state.first] =
+              ResourceState::RELEASABLE;
+          --num_pending_;
+        }
+      }
+      DCHECK_GE(num_pending_, 0);
+    }
+  }
+}
+
+void Coordinator::BackendResourceState::BackendsReleased(
+    const vector<BackendState*>& released_backend_states) {
+  lock_guard<SpinLock> lock(lock_);
+  // Mark all given BackendStates as RELEASED. A BackendState must be either RELEASABLE
+  // or IN_USE before it can marked as RELEASED.
+  for (auto backend_state : released_backend_states) {
+    DCHECK_NE(backend_resource_states_.at(backend_state), ResourceState::RELEASED);
+    if (backend_resource_states_.at(backend_state) == ResourceState::IN_USE) {
+      --num_in_use_;
+    }
+    backend_resource_states_.at(backend_state) = ResourceState::RELEASED;
+    // If the Backend running the Coordinator has completed and been released, set
+    // released_coordinator_ to true.
+    if (backend_state->exec_params()->is_coord_backend) {
+      released_coordinator_ = true;
+    }
+  }
+  num_released_ += released_backend_states.size();
+}
+
+vector<Coordinator::BackendState*>
+Coordinator::BackendResourceState::CloseAndGetUnreleasedBackends() {
+  lock_guard<SpinLock> lock(lock_);
+  vector<BackendState*> unreleased_backend_states;
+  for (auto backend_resource_state : backend_resource_states_) {
+    if (backend_resource_state.second == ResourceState::IN_USE
+        || backend_resource_state.second == ResourceState::PENDING) {
+      unreleased_backend_states.push_back(backend_resource_state.first);
+    }
+  }
+  closed_ = true;
+  return unreleased_backend_states;
+}
diff --git a/be/src/runtime/coordinator-backend-state-test.cc b/be/src/runtime/coordinator-backend-state-test.cc
new file mode 100644
index 0000000..806c626
--- /dev/null
+++ b/be/src/runtime/coordinator-backend-state-test.cc
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/coordinator.h"
+
+#include "common/names.h"
+#include "common/object-pool.h"
+#include "runtime/coordinator-backend-state.h"
+#include "testutil/gtest-util.h"
+#include "util/network-util.h"
+#include "util/runtime-profile.h"
+
+DECLARE_uint64(release_backend_states_delay_ms);
+
+namespace impala {
+
+class CoordinatorBackendStateTest : public testing::Test {
+ protected:
+  // Amount of time to wait, in milliseconds, to trigger the 'Timed Release' condition.
+  const int64_t timeout_release_ms_ = 1.5 * FLAGS_release_backend_states_delay_ms;
+
+  // Pool for objects to be destroyed during test teardown.
+  ObjectPool pool_;
+
+  RuntimeProfile* profile_ = nullptr;
+
+  virtual void SetUp() { profile_ = RuntimeProfile::Create(&pool_, "pool1"); }
+
+  /// Utility function to create a dummy QuerySchedule.
+  void MakeQuerySchedule(QuerySchedule** query_schedule) {
+    TUniqueId* query_id = pool_.Add(new TUniqueId());
+    TQueryExecRequest* request = pool_.Add(new TQueryExecRequest());
+    TQueryOptions* query_options = pool_.Add(new TQueryOptions());
+
+    *query_schedule =
+        pool_.Add(new QuerySchedule(*query_id, *request, *query_options, profile_));
+  }
+
+  /// Utility function to create a specified number of dummy BackendStates and add them
+  /// to the supplied vector. 'coordinator_backend' points to the Backend running the
+  /// Coordinator. The 'coordinator_backend' is added to the supplied vector as well.
+  void MakeBackendStates(int num_states, QuerySchedule* query_schedule,
+      Coordinator::BackendState** coordinator_backend,
+      std::vector<Coordinator::BackendState*>* backend_states) {
+    TQueryCtx* query_ctx = pool_.Add(new TQueryCtx());
+
+    PerBackendExecParams* per_backend_exec_params = pool_.Add(new PerBackendExecParams());
+    query_schedule->set_per_backend_exec_params(*per_backend_exec_params);
+
+    for (int i = 0; i < num_states; ++i) {
+      TNetworkAddress addr = MakeNetworkAddress(strings::Substitute("host-$0", i), 25000);
+      BackendExecParams* backend_exec_params = pool_.Add(new BackendExecParams());
+      per_backend_exec_params->emplace(addr, *backend_exec_params);
+      Coordinator::BackendState* backend_state = pool_.Add(new Coordinator::BackendState(
+          *query_schedule, *query_ctx, 0, TRuntimeFilterMode::OFF, *backend_exec_params));
+      backend_states->push_back(backend_state);
+      // Mark the first BackendState as the Coordinator Backend.
+      if (i == 0) {
+        backend_exec_params->is_coord_backend = true;
+        *coordinator_backend = backend_state;
+      }
+    }
+  }
+
+  virtual void TearDown() { pool_.Clear(); }
+};
+
+/// Validate the state machine of BackendResourceState by testing that the following
+/// conditions hold:
+///     * All BackendStates are initially marked as IN_USE.
+///     * Releasing all Backends transitions all Backends to the RELEASED state.
+TEST_F(CoordinatorBackendStateTest, StateMachine) {
+  // Initialize a BackendResourceState with three BackendStates.
+  int num_backends = 3;
+  QuerySchedule* query_schedule = nullptr;
+  MakeQuerySchedule(&query_schedule);
+
+  std::vector<Coordinator::BackendState*> backend_states;
+  Coordinator::BackendState* coordinator_backend = nullptr;
+  MakeBackendStates(num_backends, query_schedule, &coordinator_backend, &backend_states);
+  Coordinator::BackendResourceState* backend_resource_state =
+      pool_.Add(new Coordinator::BackendResourceState(backend_states, *query_schedule));
+
+  // Assert that all BackendStates are initially in the IN_USE state.
+  ASSERT_EQ(backend_resource_state->backend_resource_states_.size(), num_backends);
+  for (auto backend_state : backend_resource_state->backend_resource_states_) {
+    ASSERT_EQ(
+        backend_state.second, Coordinator::BackendResourceState::ResourceState::IN_USE);
+  }
+
+  // Assert that releasing an empty vector of BackendStates does not transition any
+  // BackendState to the RELEASED state.
+  std::vector<Coordinator::BackendState*> empty_backend_states;
+  backend_resource_state->BackendsReleased(empty_backend_states);
+  ASSERT_TRUE(empty_backend_states.empty());
+  ASSERT_EQ(backend_resource_state->backend_resource_states_.size(), num_backends);
+  for (auto backend_state : backend_resource_state->backend_resource_states_) {
+    ASSERT_EQ(
+        backend_state.second, Coordinator::BackendResourceState::ResourceState::IN_USE);
+  }
+
+  // Assert that releasing all BackendStates transitions all BackendStates from
+  // IN_USE to RELEASED.
+  backend_resource_state->BackendsReleased(backend_states);
+  std::vector<Coordinator::BackendState*> unreleased_backend_states =
+      backend_resource_state->CloseAndGetUnreleasedBackends();
+  ASSERT_EQ(unreleased_backend_states.size(), 0);
+}
+
+/// Validate the 'Coordinator Only' heuristic.
+TEST_F(CoordinatorBackendStateTest, CoordinatorOnly) {
+  // Initialize a BackendResourceState with eight BackendStates.
+  int num_backends = 8;
+  QuerySchedule* query_schedule = nullptr;
+  MakeQuerySchedule(&query_schedule);
+
+  std::vector<Coordinator::BackendState*> backend_states;
+  Coordinator::BackendState* coordinator_backend = nullptr;
+  MakeBackendStates(num_backends, query_schedule, &coordinator_backend, &backend_states);
+  Coordinator::BackendResourceState* backend_resource_state =
+      pool_.Add(new Coordinator::BackendResourceState(backend_states, *query_schedule));
+
+  // Create a vector of non-Coordinator Backends.
+  std::vector<Coordinator::BackendState*> non_coord_backend_states;
+  for (auto backend_state : backend_states) {
+    if (!backend_state->exec_params()->is_coord_backend) {
+      non_coord_backend_states.push_back(backend_state);
+    }
+  }
+  ASSERT_EQ(non_coord_backend_states.size(), num_backends - 1);
+
+  // Release all but the last non-Coordinator Backend. Each call to MarkBackendFinished
+  // should not cause any PENDING Backends to transition to RELEASABLE because the
+  // 'Timeout Release' heuristic should not have be triggered.
+  std::vector<Coordinator::BackendState*> releasable_backend_states;
+  for (int i = 0; i < non_coord_backend_states.size() - 1; ++i) {
+    backend_resource_state->MarkBackendFinished(
+        non_coord_backend_states[i], &releasable_backend_states);
+    ASSERT_EQ(releasable_backend_states.size(), 0);
+  }
+
+  // Release the last non-Coordinator Backend, this should trigger all PENDING Bbackends
+  // to transition to the RELEASABLE state.
+  backend_resource_state->MarkBackendFinished(
+      non_coord_backend_states[non_coord_backend_states.size() - 1],
+      &releasable_backend_states);
+
+  // Assert that all non-Coordinator Backends are RELEASABLE and then mark them as
+  // RELEASED.
+  ASSERT_EQ(releasable_backend_states.size(), non_coord_backend_states.size());
+  backend_resource_state->BackendsReleased(releasable_backend_states);
+
+  // Release the remaining BackendStates.
+  std::vector<Coordinator::BackendState*> unreleased_backend_states =
+      backend_resource_state->CloseAndGetUnreleasedBackends();
+  ASSERT_EQ(unreleased_backend_states.size(), 1);
+  backend_resource_state->BackendsReleased(unreleased_backend_states);
+}
+
+/// Validate the 'Timed Release' heuristic.
+TEST_F(CoordinatorBackendStateTest, TimedRelease) {
+  // Initialize a BackendResourceState with eight BackendStates.
+  int num_backends = 8;
+  QuerySchedule* query_schedule = nullptr;
+  MakeQuerySchedule(&query_schedule);
+
+  std::vector<Coordinator::BackendState*> backend_states;
+  Coordinator::BackendState* coordinator_backend = nullptr;
+  MakeBackendStates(num_backends, query_schedule, &coordinator_backend, &backend_states);
+  Coordinator::BackendResourceState* backend_resource_state =
+      pool_.Add(new Coordinator::BackendResourceState(backend_states, *query_schedule));
+
+  // Sleep until the 'Timed Release' timeout is hit.
+  SleepForMs(timeout_release_ms_);
+
+  // Mark half of the BackendStates as finished. Marking all Backends up to
+  // num_backends / 2 - 1 should result in 0 releasable Backends, and marking the
+  // num_backends / 2 Backend should result in num_backends / 2 releasable Backends.
+  std::vector<Coordinator::BackendState*> releasable_backend_states;
+  for (int i = 0; i < num_backends / 2 - 1; ++i) {
+    backend_resource_state->MarkBackendFinished(
+        backend_states.at(i), &releasable_backend_states);
+    ASSERT_EQ(releasable_backend_states.size(), 0);
+  }
+  backend_resource_state->MarkBackendFinished(
+      backend_states.at(num_backends / 2 - 1), &releasable_backend_states);
+
+  // Assert that half of the BackendStates transitioned to RELEASABLE.
+  ASSERT_EQ(releasable_backend_states.size(), num_backends / 2);
+
+  // Release half of the BackendStates.
+  backend_resource_state->BackendsReleased(releasable_backend_states);
+
+  // Mark the remaining half of the BackendStates as finished and assert that no Backends
+  // transition to RELEASABLE (the 'Timed Release' timeout should not be hit so no
+  // backends should be released).
+  releasable_backend_states.clear();
+  for (int i = num_backends / 2; i < num_backends / 2 + num_backends / 4; ++i) {
+    backend_resource_state->MarkBackendFinished(
+        backend_states.at(i), &releasable_backend_states);
+    ASSERT_EQ(releasable_backend_states.size(), 0);
+  }
+
+  // Release the remaining BackendStates.
+  std::vector<Coordinator::BackendState*> unreleased_backend_states =
+      backend_resource_state->CloseAndGetUnreleasedBackends();
+  ASSERT_EQ(unreleased_backend_states.size(), num_backends / 2);
+  backend_resource_state->BackendsReleased(unreleased_backend_states);
+}
+
+/// Validate the 'Batched Release' heuristic.
+TEST_F(CoordinatorBackendStateTest, BatchedRelease) {
+  // Initialize a BackendResouceState with 128 BackendStates.
+  int num_backends = 128;
+  QuerySchedule* query_schedule = nullptr;
+  MakeQuerySchedule(&query_schedule);
+
+  std::vector<Coordinator::BackendState*> backend_states;
+  Coordinator::BackendState* coordinator_backend = nullptr;
+  MakeBackendStates(num_backends, query_schedule, &coordinator_backend, &backend_states);
+  Coordinator::BackendResourceState* backend_resource_state =
+      pool_.Add(new Coordinator::BackendResourceState(backend_states, *query_schedule));
+
+  // Sleep until the 'Timed Release' timeout is hit.
+  SleepForMs(timeout_release_ms_);
+
+  // Mark (num_backends / 2 - 1) of the BackendStates as finished. Marking all Backends
+  // up to num_backends / 2 - 1 should result in 0 releasable Backends, and marking the
+  // num_backends / 2 backend should result in num_backends / 2 releasable Backends.
+  std::vector<Coordinator::BackendState*> releasable_backend_states;
+  for (int i = 0; i < num_backends / 2 - 1; ++i) {
+    backend_resource_state->MarkBackendFinished(
+        backend_states.at(i), &releasable_backend_states);
+    ASSERT_EQ(releasable_backend_states.size(), 0);
+  }
+  backend_resource_state->MarkBackendFinished(
+      backend_states.at(num_backends / 2 - 1), &releasable_backend_states);
+
+  // Assert that (num_backends / 2) of the BackendStates transitioned to RELEASABLE.
+  ASSERT_EQ(releasable_backend_states.size(), num_backends / 2);
+
+  // Release half of the BackendStates.
+  backend_resource_state->BackendsReleased(releasable_backend_states);
+
+  // Sleep until the 'Timed Release' timeout is hit.
+  SleepForMs(timeout_release_ms_);
+
+  // Mark (num_backends / 4 - 1) of the BackendStates as finished.
+  releasable_backend_states.clear();
+  for (int i = num_backends / 2; i < num_backends / 2 + num_backends / 4 - 1; ++i) {
+    backend_resource_state->MarkBackendFinished(
+        backend_states.at(i), &releasable_backend_states);
+    ASSERT_EQ(releasable_backend_states.size(), 0);
+  }
+  backend_resource_state->MarkBackendFinished(
+      backend_states.at(num_backends / 2 + num_backends / 4 - 1),
+      &releasable_backend_states);
+
+  // Assert that (num_backends / 4) of the BackendStates transitioned to RELEASABLE.
+  ASSERT_EQ(releasable_backend_states.size(), num_backends / 4);
+
+  // Release a fourth of the BackendStates.
+  backend_resource_state->BackendsReleased(releasable_backend_states);
+
+  // Release the remaining BackendStates
+  std::vector<Coordinator::BackendState*> unreleased_backend_states =
+      backend_resource_state->CloseAndGetUnreleasedBackends();
+  ASSERT_EQ(unreleased_backend_states.size(),
+      num_backends - num_backends / 2 - num_backends / 4);
+  backend_resource_state->BackendsReleased(unreleased_backend_states);
+}
+} // namespace impala
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index f3d81a9..224372c 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -58,17 +58,18 @@ const char* Coordinator::BackendState::InstanceStats::LAST_REPORT_TIME_DESC =
 DECLARE_int32(backend_client_rpc_timeout_ms);
 DECLARE_int64(rpc_max_message_size);
 
-Coordinator::BackendState::BackendState(
-    const Coordinator& coord, int state_idx, TRuntimeFilterMode::type filter_mode)
-  : coord_(coord),
+Coordinator::BackendState::BackendState(const QuerySchedule& schedule,
+    const TQueryCtx& query_ctx, int state_idx,
+    TRuntimeFilterMode::type filter_mode, const BackendExecParams& exec_params)
+  : schedule_(schedule),
     state_idx_(state_idx),
-    filter_mode_(filter_mode) {
-}
+    filter_mode_(filter_mode),
+    backend_exec_params_(&exec_params),
+    query_ctx_(query_ctx),
+    query_id_(schedule.query_id()) {}
 
-void Coordinator::BackendState::Init(
-    const BackendExecParams& exec_params, const vector<FragmentStats*>& fragment_stats,
+void Coordinator::BackendState::Init(const vector<FragmentStats*>& fragment_stats,
     RuntimeProfile* host_profile_parent, ObjectPool* obj_pool) {
-  backend_exec_params_ = &exec_params;
   host_ = backend_exec_params_->be_desc.address;
   krpc_host_ = backend_exec_params_->be_desc.krpc_address;
   num_remaining_instances_ = backend_exec_params_->instance_params.size();
@@ -105,7 +106,7 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
   request->set_min_mem_reservation_bytes(backend_exec_params_->min_mem_reservation_bytes);
   request->set_initial_mem_reservation_total_claims(
       backend_exec_params_->initial_mem_reservation_total_claims);
-  request->set_per_backend_mem_limit(coord_.schedule_.per_backend_mem_limit());
+  request->set_per_backend_mem_limit(schedule_.per_backend_mem_limit());
 
   // set fragment_ctxs and fragment_instance_ctxs
   sidecar->__isset.fragment_ctxs = true;
@@ -173,7 +174,7 @@ void Coordinator::BackendState::SetRpcParams(const DebugOptions& debug_options,
 void Coordinator::BackendState::SetExecError(const Status& status) {
   const string ERR_TEMPLATE = "ExecQueryFInstances rpc query_id=$0 failed: $1";
   const string& err_msg =
-      Substitute(ERR_TEMPLATE, PrintId(query_id()), status.msg().GetFullMessageDetails());
+      Substitute(ERR_TEMPLATE, PrintId(query_id_), status.msg().GetFullMessageDetails());
   LOG(ERROR) << err_msg;
   status_ = Status::Expected(err_msg);
 }
@@ -204,7 +205,7 @@ void Coordinator::BackendState::Exec(
 
   ExecQueryFInstancesRequestPB request;
   TExecQueryFInstancesSidecar sidecar;
-  sidecar.__set_query_ctx(query_ctx());
+  sidecar.__set_query_ctx(query_ctx_);
   SetRpcParams(debug_options, filter_routing_table, &request, &sidecar);
 
   RpcController rpc_controller;
@@ -242,7 +243,7 @@ void Coordinator::BackendState::Exec(
 
   VLOG_FILE << "making rpc: ExecQueryFInstances"
       << " host=" << TNetworkAddressToString(impalad_address()) << " query_id="
-      << PrintId(query_id());
+      << PrintId(query_id_);
 
   // guard against concurrent UpdateBackendExecStatus() that may arrive after RPC returns
   lock_guard<mutex> l(lock_);
@@ -268,7 +269,7 @@ void Coordinator::BackendState::Exec(
   }
 
   for (const auto& entry: instance_stats_map_) entry.second->stopwatch_.Start();
-  VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id());
+  VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id_);
 }
 
 Status Coordinator::BackendState::GetStatus(bool* is_fragment_failure,
@@ -334,7 +335,7 @@ void Coordinator::BackendState::LogFirstInProgress(
     std::vector<Coordinator::BackendState*> backend_states) {
   for (Coordinator::BackendState* backend_state : backend_states) {
     if (!backend_state->IsDone()) {
-      VLOG_QUERY << "query_id=" << PrintId(backend_state->query_id())
+      VLOG_QUERY << "query_id=" << PrintId(backend_state->query_id_)
                  << ": first in-progress backend: "
                  << TNetworkAddressToString(backend_state->impalad_address());
       break;
@@ -500,21 +501,21 @@ bool Coordinator::BackendState::Cancel() {
   // set an error status to make sure we only cancel this once
   if (status_.ok()) status_ = Status::CANCELLED;
 
-  VLOG_QUERY << "Sending CancelQueryFInstances rpc for query_id=" << PrintId(query_id())
+  VLOG_QUERY << "Sending CancelQueryFInstances rpc for query_id=" << PrintId(query_id_)
              << " backend=" << TNetworkAddressToString(krpc_host_);
 
   std::unique_ptr<ControlServiceProxy> proxy;
   Status get_proxy_status = ControlService::GetProxy(krpc_host_, host_.hostname, &proxy);
   if (!get_proxy_status.ok()) {
     status_.MergeStatus(get_proxy_status);
-    VLOG_QUERY << "Cancel query_id= " << PrintId(query_id()) << " could not get proxy to "
+    VLOG_QUERY << "Cancel query_id= " << PrintId(query_id_) << " could not get proxy to "
                << TNetworkAddressToString(krpc_host_)
                << " failure: " << get_proxy_status.msg().msg();
     return true;
   }
 
   CancelQueryFInstancesRequestPB request;
-  TUniqueIdToUniqueIdPB(query_id(), request.mutable_query_id());
+  TUniqueIdToUniqueIdPB(query_id_, request.mutable_query_id());
   CancelQueryFInstancesResponsePB response;
 
   const int num_retries = 3;
@@ -522,12 +523,12 @@ bool Coordinator::BackendState::Cancel() {
   const int64_t backoff_time_ms = 3 * MILLIS_PER_SEC;
   Status rpc_status =
       RpcMgr::DoRpcWithRetry(proxy, &ControlServiceProxy::CancelQueryFInstances, request,
-          &response, query_ctx(), "Cancel() RPC failed", num_retries, timeout_ms,
+          &response, query_ctx_, "Cancel() RPC failed", num_retries, timeout_ms,
           backoff_time_ms, "COORD_CANCEL_QUERY_FINSTANCES_RPC");
 
   if (!rpc_status.ok()) {
     status_.MergeStatus(rpc_status);
-    VLOG_QUERY << "Cancel query_id= " << PrintId(query_id()) << " could not do rpc to "
+    VLOG_QUERY << "Cancel query_id= " << PrintId(query_id_) << " could not do rpc to "
                << TNetworkAddressToString(krpc_host_)
                << " failure: " << rpc_status.msg().msg();
     return true;
@@ -535,7 +536,7 @@ bool Coordinator::BackendState::Cancel() {
   Status cancel_status = Status(response.status());
   if (!cancel_status.ok()) {
     status_.MergeStatus(cancel_status);
-    VLOG_QUERY << "Cancel query_id= " << PrintId(query_id())
+    VLOG_QUERY << "Cancel query_id= " << PrintId(query_id_)
                << " got failure after rpc to " << TNetworkAddressToString(krpc_host_)
                << " failure: " << cancel_status.msg().msg();
     return true;
@@ -544,7 +545,7 @@ bool Coordinator::BackendState::Cancel() {
 }
 
 void Coordinator::BackendState::PublishFilter(const TPublishFilterParams& rpc_params) {
-  DCHECK(rpc_params.dst_query_id == query_id());
+  DCHECK(rpc_params.dst_query_id == query_id_);
   // If the backend is already done, it's not waiting for this filter, so we skip
   // sending it in this case.
   if (IsDone()) return;
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index 4b0cfad..ceb7f77 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -58,16 +58,15 @@ struct FInstanceExecParams;
 /// Thread-safe unless pointed out otherwise.
 class Coordinator::BackendState {
  public:
-  BackendState(const Coordinator& coord, int state_idx,
-      TRuntimeFilterMode::type filter_mode);
+  BackendState(const QuerySchedule& schedule, const TQueryCtx& query_ctx, int state_idx,
+      TRuntimeFilterMode::type filter_mode, const BackendExecParams& exec_params);
 
   /// Creates InstanceStats for all instance in backend_exec_params in obj_pool
   /// and installs the instance profiles as children of the corresponding FragmentStats'
   /// root profile. Also creates a child profile below 'host_profile_parent' that contains
   /// counters for the backend.
   /// Separated from c'tor to simplify future handling of out-of-mem errors.
-  void Init(const BackendExecParams& backend_exec_params,
-      const std::vector<FragmentStats*>& fragment_stats,
+  void Init(const std::vector<FragmentStats*>& fragment_stats,
       RuntimeProfile* host_profile_parent, ObjectPool* obj_pool);
 
   /// Starts query execution at this backend by issuing an ExecQueryFInstances rpc and
@@ -250,7 +249,8 @@ class Coordinator::BackendState {
     void InitCounters();
   };
 
-  const Coordinator& coord_; /// Coordinator object that owns this BackendState
+  /// QuerySchedule associated with the Coordinator that owns this BackendState.
+  const QuerySchedule& schedule_;
 
   const int state_idx_;  /// index of 'this' in Coordinator::backend_states_
   const TRuntimeFilterMode::type filter_mode_;
@@ -315,8 +315,11 @@ class Coordinator::BackendState {
   /// Uses GenerateReportTimeout().
   int64_t last_report_time_ms_ = 0;
 
-  const TQueryCtx& query_ctx() const { return coord_.query_ctx(); }
-  const TUniqueId& query_id() const { return coord_.query_id(); }
+  /// The query context of the Coordinator that owns this BackendState.
+  const TQueryCtx& query_ctx_;
+
+  /// The query id of the Coordinator that owns this BackendState.
+  const TUniqueId& query_id_;
 
   /// Fill in 'request' and 'sidecar' based on state. Uses filter_routing_table to remove
   /// filters that weren't selected during its construction.
@@ -388,6 +391,143 @@ class Coordinator::FragmentStats {
   SummaryStats rates_;
 };
 
+/// Tracks the state of the resources of all BackendStates for a Coordinator. Implements
+/// throttling logic to limit the rate at which BackendStates release their admission
+/// controller resources. The class is initialized with all the BackendStates running for
+/// a query. 'MarkBackendFinished' and 'BackendsReleased' should be called when a Backend
+/// finishes and is released, respectively. MarkBackendFinished returns a vector of
+/// BackendStates that should be released.
+///
+/// Each BackendState has an associated ResourceState that can take on the values:
+///     * IN_USE:     All BackendStates start out in this state as their resources are
+///                   being used and have not been released yet.
+///     * PENDING:    The BackendState has completed, but should not be released yet.
+///     * RELEASABLE: The BackendState has completed, and should be released.
+///     * RELEASED:   The BackendState has been completed and released.
+///
+/// Each BackendState starts as IN_USE, and can either transition to PENDING or
+/// RELEASED. Any PENDING states must transition to RELEASABLE and then to RELEASED.
+/// All BackendStates must eventually transition to RELEASED.
+///
+/// BackendStates passed into the MarkBackendFinished method transition to the PENDING
+/// state. BackendStates returned by MarkBackendFinished are in the RELEASABLE state
+/// until they are released by BackendsReleased, after which they transition to the
+/// RELEASED state.
+///
+/// Throttling is necessary because the AdmissionController is currently protected by a
+/// single global lock, so releasing resources per-Backend per-query can overwhelm the
+/// AdmissionController on large clusters. Throttling is done using the following
+/// heuristics to limit the rate at which the Coordinator releases admission control
+/// resources:
+///     * Coordinator-Only Release: If the only running Backend is the Coordinator,
+///     release all PENDING backends. This is particularly useful when combined with
+///     result spooling because the Coordinator backend may be long lived. When result
+///     spooling is enabled, and clients don't immediately fetch query results, the
+///     coordinator fragment stays alive until the results are fetched or the query is
+///     closed.
+///     * Timed Release: If more than 'FLAGS_release_backend_states_delay_ms' milliseconds
+///     have elapsed since the last time a Backend completed, release all PENDING
+///     backends. This is useful for queries that are long running, and whose Backends
+///     complete incrementally (perhaps because of skew or fan-in). It also helps decrease
+///     the rate at which Backends are released, especially for short lived queries.
+///     * Batched Release: If more than half the remaining Backends have been released
+///     since the last time Backends were released, release all PENDING backends. This
+///     bounds the number of times resources are released to O(log(n)) where n is the
+///     number of backends. The base value of the logarithm is controlled by
+///     FLAGS_batched_release_decay_factor.
+///
+/// This class has a 'CloseAndGetUnreleasedBackends' method that must be called before the
+/// object is destroyed. The 'CloseAndGetUnreleasedBackends' method returns any remaining
+/// unreleased Backends (e.g. Backends in either the IN_USE or PENDING state). Backends in
+/// the RELEASABLE state are assumed to be released by the client, and any RELEASABLE
+/// Backends must be marked as RELEASED by a call to 'BackendsReleased' before the
+/// destructor is called. It is valid to call 'MarkBackendFinished' or 'BackendsReleased'
+/// after the BackendResourceState is closed. Once a BackendResourceState is closed,
+/// BackendStates can no longer transition to the PENDING or RELEASABLE state.
+///
+/// This class is thread-safe unless pointed out otherwise.
+class Coordinator::BackendResourceState {
+ public:
+  /// Create the BackendResourceState with the given vector of BackendStates. All
+  /// BackendStates are initially in the IN_USE state.
+  BackendResourceState(
+      const std::vector<BackendState*>& backend_states, const QuerySchedule& schedule);
+  ~BackendResourceState();
+
+  /// Mark a BackendState as finished and transition it to the PENDING state. Applies
+  /// above mentioned heuristics to determine if all PENDING BackendStates should
+  /// transition to the RELEASABLE state. If the transition to RELEASABLE occurs, this
+  /// method returns a list of RELEASABLE states that should be released by the caller
+  /// and then passed to BackendsReleased. Returns an empty list if no PENDING Backends
+  /// should be released. A no-op if the BackendResourceState is closed already.
+  void MarkBackendFinished(
+      BackendState* backend_state, std::vector<BackendState*>* releasable_backend_states);
+
+  /// Marks the BackendStates as RELEASED. Must be called after the resources for the
+  /// BackendStates have been released. This can be called after
+  /// CloseAndGetUnreleasedBackends() has been called. If CloseAndGetUnreleasedBackends()
+  /// returns any BackendStates, they must be passed to this method so they can be marked
+  /// as RELEASED.
+  void BackendsReleased(const std::vector<BackendState*>& released_backend_states);
+
+  /// Closes the state machine and returns a vector of IN_USE or PENDING BackendStates.
+  /// This method is idempotent. The caller is expected to mark all returned
+  /// BackendStates as released using BackendReleased().
+  std::vector<BackendState*> CloseAndGetUnreleasedBackends();
+
+ private:
+  /// Represents the state of the admission control resources associated with a
+  /// BackendState. Each BackendState starts off as IN_USE and eventually transitions
+  /// to RELEASED.
+  enum ResourceState { IN_USE, PENDING, RELEASABLE, RELEASED };
+
+  /// Protects all member variables below.
+  SpinLock lock_;
+
+  /// A timer used to track how frequently calls to MarkBackendFinished transition
+  /// Backends to the RELEASABLE state. Used by the 'Timed Release' heuristic.
+  MonotonicStopWatch released_timer_;
+
+  /// Counts the number of Backends in the IN_USE state.
+  int num_in_use_;
+
+  /// Counts the number of Backends in the PENDING state.
+  int num_pending_ = 0;
+
+  /// Counts the number of Backends in the RELEASED state.
+  int num_released_ = 0;
+
+  /// True if the Backend running the Coordinator fragment has been released, false
+  /// otherwise.
+  bool released_coordinator_ = false;
+
+  /// Tracks all BackendStates for a given query along with the state of their admission
+  /// control resources.
+  std::unordered_map<BackendState*, ResourceState> backend_resource_states_;
+
+  /// The BackendStates for a given query. Owned by the Coordinator.
+  const std::vector<BackendState*>& backend_states_;
+
+  /// The total number of BackendStates for a given query.
+  const int num_backends_;
+
+  // True if the BackendResourceState is closed, false otherwise.
+  bool closed_ = false;
+
+  /// QuerySchedule associated with the Coordinator that owns this BackendResourceState.
+  const QuerySchedule& schedule_;
+
+  /// Configured value of FLAGS_release_backend_states_delay_ms in nanoseconds.
+  const int64_t release_backend_states_delay_ns_;
+
+  /// The initial value of the decay factor for the 'Batched Release'. Increases by
+  /// *FLAGS_batched_released_decay_factor on each batched release.
+  int64_t batched_release_decay_value_;
+
+  // Requires access to RELEASE_BACKEND_STATES_DELAY_NS and backend_resource_states_.
+  friend class CoordinatorBackendStateTest;
+  FRIEND_TEST(CoordinatorBackendStateTest, StateMachine);
+};
 }
 
 #endif
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 4df15d9..85b48c2 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -77,7 +77,8 @@ Coordinator::Coordinator(ClientRequestState* parent, const QuerySchedule& schedu
     filter_mode_(schedule.query_options().runtime_filter_mode),
     obj_pool_(new ObjectPool()),
     query_events_(events),
-    exec_rpcs_complete_barrier_(schedule_.per_backend_exec_params().size()) {}
+    exec_rpcs_complete_barrier_(schedule_.per_backend_exec_params().size()),
+    backend_released_barrier_(schedule_.per_backend_exec_params().size()) {}
 
 Coordinator::~Coordinator() {
   // Must have entered a terminal exec state guaranteeing resources were released.
@@ -206,12 +207,16 @@ void Coordinator::InitBackendStates() {
 
   // create BackendStates
   int backend_idx = 0;
-  for (const auto& entry: schedule_.per_backend_exec_params()) {
-    BackendState* backend_state = obj_pool()->Add(
-        new BackendState(*this, backend_idx, filter_mode_));
-    backend_state->Init(entry.second, fragment_stats_, host_profiles_, obj_pool());
+  for (const auto& entry : schedule_.per_backend_exec_params()) {
+    BackendState* backend_state = obj_pool()->Add(new BackendState(
+        schedule_, query_ctx(), backend_idx, filter_mode_, entry.second));
+    backend_state->Init(fragment_stats_, host_profiles_, obj_pool());
     backend_states_[backend_idx++] = backend_state;
   }
+  backend_resource_state_ =
+      obj_pool()->Add(new BackendResourceState(backend_states_, schedule_));
+  num_completed_backends_ =
+      ADD_COUNTER(query_profile_, "NumCompletedBackends", TUnit::UNIT);
 }
 
 void Coordinator::ExecSummary::Init(const QuerySchedule& schedule) {
@@ -408,6 +413,7 @@ Status Coordinator::FinishBackendStartup() {
     // Mark backend complete if no fragment instances were assigned to it.
     if (backend_state->IsEmptyBackend()) {
       backend_exec_complete_barrier_->Notify();
+      num_completed_backends_->Add(1);
     }
   }
   query_profile_->AddInfoString(
@@ -571,7 +577,7 @@ void Coordinator::HandleExecStateTransition(
   } else {
     CancelBackends();
   }
-  ReleaseAdmissionControlResources();
+  ReleaseQueryAdmissionControlResources();
   // Once the query has released its admission control resources, update its end time.
   parent_request_state_->UpdateEndTime();
   // Can compute summary only after we stop accepting reports from the backends. Both
@@ -755,6 +761,19 @@ Status Coordinator::UpdateBackendExecStatus(const ReportExecStatusRequestPB& req
     }
     // We've applied all changes from the final status report - notify waiting threads.
     discard_result(backend_exec_complete_barrier_->Notify());
+
+    // Mark backend_state as closed and release the backend_state's resources if
+    // necessary.
+    vector<BackendState*> releasable_backends;
+    backend_resource_state_->MarkBackendFinished(backend_state, &releasable_backends);
+    if (!releasable_backends.empty()) {
+      ReleaseBackendAdmissionControlResources(releasable_backends);
+      backend_resource_state_->BackendsReleased(releasable_backends);
+      for (int i = 0; i < releasable_backends.size(); ++i) {
+        backend_released_barrier_.Notify();
+      }
+    }
+    num_completed_backends_->Add(1);
   }
   // If query execution has terminated, return a cancelled status to force the fragment
   // instance to stop executing.
@@ -898,7 +917,19 @@ void Coordinator::ReleaseExecResources() {
   // caching. The query MemTracker will be cleaned up later.
 }
 
-void Coordinator::ReleaseAdmissionControlResources() {
+void Coordinator::ReleaseQueryAdmissionControlResources() {
+  vector<BackendState*> unreleased_backends =
+      backend_resource_state_->CloseAndGetUnreleasedBackends();
+  if (!unreleased_backends.empty()) {
+    ReleaseBackendAdmissionControlResources(unreleased_backends);
+    backend_resource_state_->BackendsReleased(unreleased_backends);
+    for (int i = 0; i < unreleased_backends.size(); ++i) {
+      backend_released_barrier_.Notify();
+    }
+  }
+  // Wait for all backends to be released before calling
+  // AdmissionController::ReleaseQuery.
+  backend_released_barrier_.Wait();
   LOG(INFO) << "Release admission control resources for query_id=" << PrintId(query_id());
   AdmissionController* admission_controller =
       ExecEnv::GetInstance()->admission_controller();
@@ -908,6 +939,18 @@ void Coordinator::ReleaseAdmissionControlResources() {
   query_events_->MarkEvent("Released admission control resources");
 }
 
+void Coordinator::ReleaseBackendAdmissionControlResources(
+    const vector<BackendState*>& backend_states) {
+  AdmissionController* admission_controller =
+      ExecEnv::GetInstance()->admission_controller();
+  DCHECK(admission_controller != nullptr);
+  vector<TNetworkAddress> host_addrs;
+  for (auto backend_state : backend_states) {
+    host_addrs.push_back(backend_state->impalad_address());
+  }
+  admission_controller->ReleaseQueryBackends(schedule_, host_addrs);
+}
+
 Coordinator::ResourceUtilization Coordinator::ComputeQueryResourceUtilization() {
   ResourceUtilization query_resource_utilization;
   for (BackendState* backend_state: backend_states_) {
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 72ab3af..953a111 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -25,6 +25,8 @@
 #include <boost/unordered_map.hpp>
 #include <rapidjson/document.h>
 
+#include <gtest/gtest_prod.h> // for FRIEND_TEST
+
 #include "common/global-types.h"
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
@@ -222,6 +224,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
  private:
   class BackendState;
+  class BackendResourceState;
   struct FilterTarget;
   class FilterState;
   class FragmentStats;
@@ -268,6 +271,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   bool has_called_wait_ = false;  // if true, Wait() was called; protected by wait_lock_
 
+  BackendResourceState* backend_resource_state_ = nullptr;
+
   /// Keeps track of number of completed ranges and total scan ranges. Initialized by
   /// Exec().
   ProgressUpdater progress_;
@@ -288,6 +293,12 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// GLOBAL). Excludes repeated broadcast filter updates. Set in Exec().
   RuntimeProfile::Counter* filter_updates_received_ = nullptr;
 
+  /// A RuntimeProfile Counter of the number of completed backends. Updated for each
+  /// Backend in 'UpdateBackendExecStatus' when 'ApplyExecStatusReport' returns true.
+  /// Only valid after InitBackendStates() is called. Does not count the number of
+  /// CANCELLED Backends.
+  RuntimeProfile::Counter* num_completed_backends_ = nullptr;
+
   /// The filtering mode for this query. Set in constructor.
   TRuntimeFilterMode::type filter_mode_;
 
@@ -341,6 +352,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// cancellation. Initialized in StartBackendExec().
   boost::scoped_ptr<CountingBarrier> backend_exec_complete_barrier_;
 
+  /// Barrier that is released when all Backends have released their admission control
+  /// resources.
+  CountingBarrier backend_released_barrier_;
+
   // Protects exec_state_ and exec_status_. exec_state_ can be read independently via
   // the atomic, but the lock is held when writing either field and when reading both
   // fields together.
@@ -499,7 +514,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// use by other queries. This should only be called if one of following
   /// preconditions is satisfied for each backend on which the query is executing:
   ///
-  /// * The backend finished execution.  Rationale: the backend isn't consuming
+  /// * The backend finished execution. Rationale: the backend isn't consuming
   ///   resources.
   /// * A cancellation RPC was delivered to the backend.
   ///   Rationale: the backend will be cancelled and release resources soon. By the
@@ -521,10 +536,26 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// after that completes to avoid over-admitting queries.
   ///
   /// The ExecState state-machine ensures this is called exactly once.
-  void ReleaseAdmissionControlResources();
+  void ReleaseQueryAdmissionControlResources();
+
+  /// Helper method to release admission control resource for the given vector of
+  /// BackendStates. Resources are released using
+  /// AdmissionController::ReleaseQueryBackends which releases the admitted memory used
+  /// by each BackendState and decrements the number of running queries on the host
+  /// running the BackendState.
+  void ReleaseBackendAdmissionControlResources(
+      const std::vector<BackendState*>& backend_states);
 
   /// Checks the exec_state_ of the query and returns true if the query is executing.
   bool IsExecuting();
+
+  /// BackendState and BackendResourceState are private to the Coordinator class, so mark
+  /// all tests in CoordinatorBackendStateTest as friends.
+  friend class CoordinatorBackendStateTest;
+  FRIEND_TEST(CoordinatorBackendStateTest, StateMachine);
+  FRIEND_TEST(CoordinatorBackendStateTest, CoordinatorOnly);
+  FRIEND_TEST(CoordinatorBackendStateTest, TimedRelease);
+  FRIEND_TEST(CoordinatorBackendStateTest, BatchedRelease);
 };
 
 }
diff --git a/be/src/scheduling/admission-controller-test.cc b/be/src/scheduling/admission-controller-test.cc
index 17c8457..706d2d0 100644
--- a/be/src/scheduling/admission-controller-test.cc
+++ b/be/src/scheduling/admission-controller-test.cc
@@ -689,7 +689,15 @@ TEST_F(AdmissionControllerTest, PoolStats) {
   pool_stats->Admit(*query_schedule);
   ASSERT_EQ(1, pool_stats->agg_num_running());
   ASSERT_EQ(1, pool_stats->metrics()->agg_num_running->GetValue());
-  pool_stats->Release(*query_schedule, false);
+  int64_t mem_to_release = 0;
+  vector<TNetworkAddress> host_addrs;
+  for (auto backend_state : query_schedule->per_backend_exec_params()) {
+    host_addrs.push_back(backend_state.first);
+    mem_to_release +=
+        admission_controller->GetMemToAdmit(*query_schedule, backend_state.second);
+  }
+  pool_stats->ReleaseMem(mem_to_release);
+  pool_stats->ReleaseQuery(*query_schedule, 0);
   CheckPoolStatsEmpty(pool_stats);
 }
 
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 666659b..ff0ea4c 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -342,13 +342,9 @@ void AdmissionController::PoolStats::Admit(const QuerySchedule& schedule) {
   metrics_.total_admitted->Increment(1L);
 }
 
-void AdmissionController::PoolStats::Release(
+void AdmissionController::PoolStats::ReleaseQuery(
     const QuerySchedule& schedule, int64_t peak_mem_consumption) {
-  int64_t cluster_mem_admitted = schedule.GetClusterMemoryToAdmit();
-  DCHECK_GT(cluster_mem_admitted, 0);
-  local_mem_admitted_ -= cluster_mem_admitted;
-  metrics_.local_mem_admitted->Increment(-cluster_mem_admitted);
-
+  // Update stats tracking the number of running and admitted queries.
   agg_num_running_ -= 1;
   metrics_.agg_num_running->Increment(-1L);
 
@@ -358,13 +354,23 @@ void AdmissionController::PoolStats::Release(
   metrics_.total_released->Increment(1L);
   DCHECK_GE(local_stats_.num_admitted_running, 0);
   DCHECK_GE(agg_num_running_, 0);
-  DCHECK_GE(local_mem_admitted_, 0);
+
+  // Update the 'peak_mem_histogram' based on the given peak memory consumption of the
+  // query.
   int64_t histogram_bucket =
       BitUtil::RoundUp(peak_mem_consumption, HISTOGRAM_BIN_SIZE) / HISTOGRAM_BIN_SIZE;
   histogram_bucket = std::max(std::min(histogram_bucket, HISTOGRAM_NUM_OF_BINS), 1L) - 1;
   peak_mem_histogram_[histogram_bucket] = ++(peak_mem_histogram_[histogram_bucket]);
 }
 
+void AdmissionController::PoolStats::ReleaseMem(int64_t mem_to_release) {
+  // Update stats tracking memory admitted.
+  DCHECK_GT(mem_to_release, 0);
+  local_mem_admitted_ -= mem_to_release;
+  DCHECK_GE(local_mem_admitted_, 0);
+  metrics_.local_mem_admitted->Increment(-mem_to_release);
+}
+
 void AdmissionController::PoolStats::Queue() {
   agg_num_queued_ += 1;
   metrics_.agg_num_queued->Increment(1L);
@@ -391,29 +397,52 @@ void AdmissionController::PoolStats::Dequeue(bool timed_out) {
   }
 }
 
+int64_t AdmissionController::UpdateHostStatsForQueryBackends(
+    const QuerySchedule& schedule, const vector<TNetworkAddress>& host_addrs) {
+  int64_t total_mem_to_release = 0;
+  for (auto host_addr : host_addrs) {
+    auto backend_exec_params = schedule.per_backend_exec_params().find(host_addr);
+    if (backend_exec_params == schedule.per_backend_exec_params().end()) {
+      string err_msg =
+          strings::Substitute("Error: Cannot find exec params of host $0 for query $1.",
+              PrintThrift(host_addr), PrintId(schedule.query_id()));
+      DCHECK(false) << err_msg;
+      LOG(ERROR) << err_msg;
+      continue;
+    }
+    int64_t mem_to_release = GetMemToAdmit(schedule, backend_exec_params->second);
+    UpdateHostStats(host_addr, -mem_to_release, -1);
+    total_mem_to_release += mem_to_release;
+  }
+  return total_mem_to_release;
+}
+
 void AdmissionController::UpdateHostStats(
     const QuerySchedule& schedule, bool is_admitting) {
-  int64_t per_backend_mem_to_admit = schedule.per_backend_mem_to_admit();
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host_addr = entry.first;
-    int64_t mem_to_admit = entry.second.is_coord_backend ?
-        schedule.coord_backend_mem_to_admit() : per_backend_mem_to_admit;
+    int64_t mem_to_admit = GetMemToAdmit(schedule, entry.second);
     if (!is_admitting) mem_to_admit *= -1;
-    const string host = TNetworkAddressToString(host_addr);
-    VLOG_ROW << "Update admitted mem reserved for host=" << host
-             << " prev=" << PrintBytes(host_stats_[host].mem_admitted)
-             << " new=" << PrintBytes(host_stats_[host].mem_admitted + mem_to_admit);
-    host_stats_[host].mem_admitted += mem_to_admit;
-    DCHECK_GE(host_stats_[host].mem_admitted, 0);
     int num_queries = is_admitting ? 1 : -1;
-    VLOG_ROW << "Update admitted queries for host=" << host
-             << " prev=" << host_stats_[host].num_admitted
-             << " new=" << host_stats_[host].num_admitted + num_queries;
-    host_stats_[host].num_admitted += num_queries;
-    DCHECK_GE(host_stats_[host].num_admitted, 0);
+    UpdateHostStats(host_addr, mem_to_admit, num_queries);
   }
 }
 
+void AdmissionController::UpdateHostStats(
+    const TNetworkAddress& host_addr, int64_t mem_to_admit, int num_queries_to_admit) {
+  const string host = TNetworkAddressToString(host_addr);
+  VLOG_ROW << "Update admitted mem reserved for host=" << host
+           << " prev=" << PrintBytes(host_stats_[host].mem_admitted)
+           << " new=" << PrintBytes(host_stats_[host].mem_admitted + mem_to_admit);
+  host_stats_[host].mem_admitted += mem_to_admit;
+  DCHECK_GE(host_stats_[host].mem_admitted, 0);
+  VLOG_ROW << "Update admitted queries for host=" << host
+           << " prev=" << host_stats_[host].num_admitted
+           << " new=" << host_stats_[host].num_admitted + num_queries_to_admit;
+  host_stats_[host].num_admitted += num_queries_to_admit;
+  DCHECK_GE(host_stats_[host].num_admitted, 0);
+}
+
 // Helper method used by CanAccommodateMaxInitialReservation(). Returns true if the given
 // 'mem_limit' can accommodate 'buffer_reservation'. If not, returns false and the
 // details about the memory shortage in 'mem_unavailable_reason'.
@@ -475,8 +504,6 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
   }
 
   // Case 2:
-  int64_t executor_mem_to_admit = schedule.per_backend_mem_to_admit();
-  int64_t coord_mem_to_admit = schedule.coord_backend_mem_to_admit();
   for (const auto& entry : schedule.per_backend_exec_params()) {
     const TNetworkAddress& host = entry.first;
     const string host_id = TNetworkAddressToString(host);
@@ -484,8 +511,7 @@ bool AdmissionController::HasAvailableMemResources(const QuerySchedule& schedule
     const HostStats& host_stats = host_stats_[host_id];
     int64_t mem_reserved = host_stats.mem_reserved;
     int64_t mem_admitted = host_stats.mem_admitted;
-    int64_t mem_to_admit = entry.second.is_coord_backend ?
-        coord_mem_to_admit : executor_mem_to_admit;
+    int64_t mem_to_admit = GetMemToAdmit(schedule, entry.second);
     VLOG_ROW << "Checking memory on host=" << host_id
              << " mem_reserved=" << PrintBytes(mem_reserved)
              << " mem_admitted=" << PrintBytes(mem_admitted)
@@ -925,9 +951,12 @@ void AdmissionController::ReleaseQuery(
   const string& pool_name = schedule.request_pool();
   {
     lock_guard<mutex> lock(admission_ctrl_lock_);
+    DCHECK_EQ(num_released_backends_.at(schedule.query_id()), 0);
+    num_released_backends_.erase(num_released_backends_.find(schedule.query_id()));
     PoolStats* stats = GetPoolStats(schedule);
-    stats->Release(schedule, peak_mem_consumption);
-    UpdateHostStats(schedule, /*is_admitting=*/false);
+    stats->ReleaseQuery(schedule, peak_mem_consumption);
+    // No need to update the Host Stats as they should have been updated in
+    // ReleaseQueryBackends.
     pools_for_updates_.insert(pool_name);
     VLOG_RPC << "Released query id=" << PrintId(schedule.query_id()) << " "
              << stats->DebugString();
@@ -935,6 +964,39 @@ void AdmissionController::ReleaseQuery(
   dequeue_cv_.NotifyOne();
 }
 
+void AdmissionController::ReleaseQueryBackends(
+    const QuerySchedule& schedule, const vector<TNetworkAddress>& host_addrs) {
+  const string& pool_name = schedule.request_pool();
+  {
+    lock_guard<mutex> lock(admission_ctrl_lock_);
+    PoolStats* stats = GetPoolStats(schedule);
+    int64_t mem_to_release = UpdateHostStatsForQueryBackends(schedule, host_addrs);
+    stats->ReleaseMem(mem_to_release);
+    pools_for_updates_.insert(pool_name);
+
+    // Update num_released_backends_.
+    auto released_backends = num_released_backends_.find(schedule.query_id());
+    if (released_backends != num_released_backends_.end()) {
+      released_backends->second -= host_addrs.size();
+    } else {
+      string err_msg = Substitute("Unable to find num released backends for query $0",
+          PrintId(schedule.query_id()));
+      DCHECK(false) << err_msg;
+      LOG(ERROR) << err_msg;
+    }
+
+    if (VLOG_IS_ON(2)) {
+      stringstream ss;
+      ss << "Released query backend(s) ";
+      for (auto host_addr : host_addrs) ss << PrintThrift(host_addr) << " ";
+      ss << "for query id=" << PrintId(schedule.query_id()) << " "
+         << stats->DebugString();
+      VLOG(2) << ss.str();
+    }
+  }
+  dequeue_cv_.NotifyOne();
+}
+
 Status AdmissionController::ResolvePoolAndGetConfig(
     const TQueryCtx& query_ctx, string* pool_name, TPoolConfig* pool_config) {
   RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(query_ctx, pool_name));
@@ -1470,6 +1532,10 @@ void AdmissionController::AdmitQuery(QuerySchedule* schedule, bool was_queued) {
     schedule->summary_profile()->AddInfoString(
         PROFILE_INFO_KEY_STALENESS_WARNING, staleness_detail);
   }
+  DCHECK(
+      num_released_backends_.find(schedule->query_id()) == num_released_backends_.end());
+  num_released_backends_[schedule->query_id()] =
+      schedule->per_backend_exec_params().size();
 }
 
 string AdmissionController::GetStalenessDetail(const string& prefix,
@@ -1837,4 +1903,9 @@ bool AdmissionController::PoolHasFixedMemoryLimit(const TPoolConfig& pool_config
   return pool_config.max_mem_resources > 0 && pool_config.max_memory_multiple <= 0;
 }
 
+int64_t AdmissionController::GetMemToAdmit(
+    const QuerySchedule& schedule, const BackendExecParams& backend_exec_params) {
+  return backend_exec_params.is_coord_backend ? schedule.coord_backend_mem_to_admit() :
+                                                schedule.per_backend_mem_to_admit();
+}
 } // namespace impala
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 7cf2343..a71f99a 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -30,6 +30,7 @@
 #include "common/status.h"
 #include "scheduling/cluster-membership-mgr.h"
 #include "scheduling/request-pool-service.h"
+#include "scheduling/query-schedule.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/condition-variable.h"
 #include "util/internal-queue.h"
@@ -38,7 +39,6 @@
 
 namespace impala {
 
-class QuerySchedule;
 class ExecEnv;
 
 /// Represents the admission outcome of a query. It is stored in the 'admit_outcome'
@@ -153,6 +153,16 @@ enum class AdmissionOutcome {
 /// are used or, if there is a wide distribution of requests across impalads, the rate of
 /// submission is low enough that new state is able to be updated by the statestore.
 ///
+/// Releasing Queries:
+/// When queries complete they must be explicitly released from the admission controller
+/// using the methods 'ReleaseQuery' and 'ReleaseQueryBackends'. These methods release
+/// the admitted memory and decrement the number of admitted queries for the resource
+/// pool. All Backends for a query must be released via 'ReleaseQueryBackends' before the
+/// query is released using 'ReleaseQuery'. Releasing Backends releases the admitted
+/// memory used by that Backend and decrements the number of running queries on the host
+/// running that Backend. Releasing a query does not release any admitted memory, it only
+/// decrements the number of running queries in the resource pool.
+///
 /// Executor Groups:
 /// Executors in a cluster can be assigned to executor groups. Each executor can only be
 /// in one group. A resource pool can have multiple executor groups associated with it.
@@ -332,10 +342,20 @@ class AdmissionController {
 
   /// Updates the pool statistics when a query completes (either successfully,
   /// is cancelled or failed). This should be called for all requests that have
-  /// been submitted via AdmitQuery().
+  /// been submitted via AdmitQuery(). 'schedule' is the QuerySchedule of the completed
+  /// query and 'peak_mem_consumption' is the peak memory consumption of the query.
   /// This does not block.
   void ReleaseQuery(const QuerySchedule& schedule, int64_t peak_mem_consumption);
 
+  /// Updates the pool statistics when a Backend running a query completes (either
+  /// successfully, is cancelled or failed). This should be called for all Backends part
+  /// of a query for all queries that have been submitted via AdmitQuery().
+  /// 'schedule' is the QuerySchedule of the associated query and the vector of
+  /// TNetworkAddresses identify the completed Backends.
+  /// This does not block.
+  void ReleaseQueryBackends(
+      const QuerySchedule& schedule, const vector<TNetworkAddress>& host_addr);
+
   /// Registers the request queue topic with the statestore.
   Status Init();
 
@@ -482,7 +502,9 @@ class AdmissionController {
     /// Updates the pool stats when the request represented by 'schedule' is admitted.
     void Admit(const QuerySchedule& schedule);
     /// Updates the pool stats when the request represented by 'schedule' is released.
-    void Release(const QuerySchedule& schedule, int64_t peak_mem_consumption);
+    void ReleaseQuery(const QuerySchedule& schedule, int64_t peak_mem_consumption);
+    /// Releases the specified memory from the pool stats.
+    void ReleaseMem(int64_t mem_to_release);
     /// Updates the pool stats when the request represented by 'schedule' is queued.
     void Queue();
     /// Updates the pool stats when the request represented by 'schedule' is dequeued.
@@ -709,6 +731,12 @@ class AdmissionController {
   /// If true, tear down the dequeuing thread. This only happens in unit tests.
   bool done_;
 
+  /// Tracks the number of released Backends for each active query. Used purely for
+  /// internal state validation. Used to ensure that all Backends are released before
+  /// the query is released.
+  typedef boost::unordered_map<TUniqueId, int> NumReleasedBackends;
+  NumReleasedBackends num_released_backends_;
+
   /// Resolves the resource pool name in 'query_ctx.request_pool' and stores the resulting
   /// name in 'pool_name' and the resulting config in 'pool_config'.
   Status ResolvePoolAndGetConfig(const TQueryCtx& query_ctx, std::string* pool_name,
@@ -805,9 +833,25 @@ class AdmissionController {
 
   /// Updates the memory admitted and the num of queries running for each host in
   /// 'schedule'. If 'is_admitting' is true, the memory admitted and the num of queries is
-  /// increased, otherwise it is decreased.
+  /// increased, otherwise it is decreased. This method updates the host stats for every
+  /// Backend running the query.
   void UpdateHostStats(const QuerySchedule& schedule, bool is_admitting);
 
+  /// Updates the memory admitted and the num of queries running for each host in
+  /// 'schedule' based on the completed Backends. 'host_addrs' is a vector of completed
+  /// Backends each represented by a TNetworkAddress. Unlike UpdateHostStats, this method
+  /// only updates the host stats for the Backends specified in the 'host_addrs' vector.
+  /// Moreover, it is only used to release queries, and thus always decrements the amount
+  /// of memory admitted and the number of queries running per host. Returns the total
+  /// amount of memory to release for all the Backends.
+  int64_t UpdateHostStatsForQueryBackends(
+      const QuerySchedule& schedule, const std::vector<TNetworkAddress>& host_addrs);
+
+  /// Updates the memory admitted and the num of queries running on the specified host by
+  /// adding the specified mem and num_queries to the host stats.
+  void UpdateHostStats(
+      const TNetworkAddress& host_addr, int64_t mem_to_admit, int num_queries_to_admit);
+
   /// Rejection happens in several stages
   /// 1) Based on static pool configuration
   ///     - Check if the pool is disabled (max_requests = 0, max_mem = 0)
@@ -934,6 +978,12 @@ class AdmissionController {
   int64_t GetExecutorGroupSize(const ClusterMembershipMgr::Snapshot& membership_snapshot,
       const std::string& group_name);
 
+  /// Get the amount of memory to admit for the Backend with the given BackendExecParams.
+  /// This method may return different values depending on if the Backend is an Executor
+  /// or a Coordinator.
+  static int64_t GetMemToAdmit(
+      const QuerySchedule& schedule, const BackendExecParams& backend_exec_params);
+
   FRIEND_TEST(AdmissionControllerTest, Simple);
   FRIEND_TEST(AdmissionControllerTest, PoolStats);
   FRIEND_TEST(AdmissionControllerTest, CanAdmitRequestMemory);
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 11a2316..4698e77 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -48,6 +48,9 @@ from tests.common.test_dimensions import (
     create_uncompressed_text_dimension)
 from tests.common.test_vector import ImpalaTestDimension
 from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
+from tests.util.web_pages_util import (
+    get_num_completed_backends,
+    get_mem_admitted_backends_debug_page)
 from tests.verifiers.mem_usage_verifier import MemUsageVerifier
 from ImpalaService import ImpalaHiveServer2Service
 from TCLIService import TCLIService
@@ -571,7 +574,7 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     self.client.wait_for_finished_timeout(handle, 1000)
     expected_mem_limits = self.__get_mem_limits_admission_debug_page()
     actual_mem_limits = self.__get_mem_limits_memz_debug_page(handle.get_handle().id)
-    mem_admitted = self.__get_mem_admitted_backends_debug_page()
+    mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
     debug_string = " expected_mem_limits:" + str(
       expected_mem_limits) + " actual_mem_limits:" + str(
       actual_mem_limits) + " mem_admitted:" + str(mem_admitted)
@@ -587,7 +590,8 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     assert abs(mem_admitted['coordinator'] - expected_mem_limits[
       'coordinator']) < 5 * MB, debug_string
     assert abs(
-      mem_admitted['executor'] - expected_mem_limits['executor']) < 5 * MB, debug_string
+      mem_admitted['executor'][0] - expected_mem_limits['executor']) < 5 * MB, \
+        debug_string
 
   def __get_mem_limits_admission_debug_page(self):
     """Helper method assumes a 2 node cluster using a dedicated coordinator. Returns the
@@ -619,25 +623,6 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     return {'coordinator': float(mem_trackers[0]['limit']),
             'executor': float(mem_trackers[1]['limit'])}
 
-  def __get_mem_admitted_backends_debug_page(self):
-    """Helper method assumes a 2 node cluster using a dedicated coordinator. Returns the
-    mem admitted to the backends extracted from the backends debug page of the coordinator
-    impala daemon. Returns a dictionary with the keys 'coordinator' and 'executor' and
-    their respective mem values in bytes."""
-    # Based on how the cluster is setup, the first impalad in the cluster is the
-    # coordinator.
-    response_json = self.cluster.impalads[0].service.get_debug_webpage_json("backends")
-    assert 'backends' in response_json
-    assert len(response_json['backends']) == 2
-    ret = dict()
-    from tests.verifiers.mem_usage_verifier import parse_mem_value
-    for backend in response_json['backends']:
-      if backend['is_coordinator']:
-        ret['coordinator'] = parse_mem_value(backend['mem_admitted'])
-      else:
-        ret['executor'] = parse_mem_value(backend['mem_admitted'])
-    return ret
-
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
@@ -1300,6 +1285,56 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     assert len(reasons) == 1
     assert "Local backend has not started up yet." in reasons[0]
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
+  def test_release_backends(self, vector):
+    """Test that executor backends are shutdown when they complete, that completed
+    executor backends release their admitted memory, and that
+    NumCompletedBackends is updated each time an executor backend completes."""
+    if self.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+
+    # Craft a query where part of the executor backends completes, while the rest remain
+    # running indefinitely. The query forces the 'lineitem' table to be treated as the
+    # small table even though it is bigger than the 'customer' table. This forces the
+    # small table scan ('lineitem' scan) to run on two nodes and the big table scan
+    # ('customers' scan) to run on a single node. By using debug actions to force the
+    # big table scan to hang indefinitely, the small table scan should finish quickly.
+    # This causes one executor backend to complete quickly, and causes the other one to
+    # hang.
+    vector.get_value('exec_option')['debug_action'] = '0:GETNEXT:WAIT'
+    query = "select STRAIGHT_JOIN * from tpch.customer JOIN /* +BROADCAST */ " \
+            "tpch.lineitem where customer.c_custkey = lineitem.l_orderkey limit 100"
+
+    # Amount of time to wait for the query to reach the running state before throwing a
+    # Timeout exception.
+    timeout = 10
+
+    handle = self.execute_query_async(query, vector.get_value('exec_option'))
+    try:
+      # Wait for the query to reach the running state (it should never reach the finished
+      # state because of the 'WAIT' debug action), wait for the 'lineitem' scan to
+      # complete, and then validate that one of the executor backends shutdowns and
+      # releases its admitted memory.
+      self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], timeout)
+      sleep(10)  # Wait for the 'lineitem' scan to complete
+      assert "NumCompletedBackends: 1 (1)" in self.client.get_runtime_profile(handle)
+      get_num_completed_backends(self.cluster.impalads[0].service,
+        handle.get_handle().id) == 1
+      mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
+      num_executor_zero_admitted = 0
+      for executor_mem_admitted in mem_admitted['executor']:
+        if executor_mem_admitted == 0:
+          num_executor_zero_admitted += 1
+      assert num_executor_zero_admitted == 1
+    finally:
+      # Once the query is closed, validate that all backends have shutdown.
+      self.client.close_query(handle)
+      mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
+      assert mem_admitted['coordinator'] == 0
+      for executor_mem_admitted in mem_admitted['executor']:
+        assert executor_mem_admitted == 0
+
 
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between submissions
diff --git a/tests/custom_cluster/test_auto_scaling.py b/tests/custom_cluster/test_auto_scaling.py
index 5662575..deab85a 100644
--- a/tests/custom_cluster/test_auto_scaling.py
+++ b/tests/custom_cluster/test_auto_scaling.py
@@ -140,7 +140,7 @@ class TestAutoScaling(CustomClusterTestSuite):
     GROUP_SIZE = 2
     EXECUTOR_SLOTS = 3
     auto_scaler = AutoScaler(executor_slots=EXECUTOR_SLOTS, group_size=GROUP_SIZE,
-                             max_groups=1)
+                             max_groups=1, coordinator_slots=EXECUTOR_SLOTS)
     workload = None
     try:
       auto_scaler.start()
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index 9c0315b..32e0087 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -233,7 +233,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     client.cancel(q2)
 
   @pytest.mark.execute_serially
-  @CustomClusterTestSuite.with_args(impalad_args="-max_concurrent_queries=16")
+  @CustomClusterTestSuite.with_args(impalad_args="-max_concurrent_queries=3")
   def test_executor_concurrency(self):
     """Tests that the command line flag to limit query concurrency on executors works as
     expected."""
diff --git a/tests/custom_cluster/test_result_spooling.py b/tests/custom_cluster/test_result_spooling.py
new file mode 100644
index 0000000..0d808a4
--- /dev/null
+++ b/tests/custom_cluster/test_result_spooling.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.test_dimensions import (
+    create_single_exec_option_dimension,
+    create_uncompressed_text_dimension)
+from tests.util.web_pages_util import (
+    get_num_completed_backends,
+    get_mem_admitted_backends_debug_page)
+from time import sleep
+
+
+class TestDedicatedCoordinator(CustomClusterTestSuite):
+  """A custom cluster test that tests result spooling against a cluster with a dedicated
+  coordinator."""
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestDedicatedCoordinator, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
+    # There's no reason to test this on other file formats/compression codecs right now
+    cls.ImpalaTestMatrix.add_dimension(
+      create_uncompressed_text_dimension(cls.get_workload()))
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(cluster_size=2, num_exclusive_coordinators=1)
+  def test_dedicated_coordinator(self, vector):
+    """Test the following when result spooling is enabled on a cluster with a dedicated
+    coordinator when all results are spooled: (1) all backends are shutdown besides the
+    coordinator and (2) all non-coordinator memory is released."""
+    query = "select id from functional_parquet.alltypes order by id limit 2000"
+    vector.get_value('exec_option')['spool_query_results'] = 'true'
+
+    # Amount of time to wait for the query to reach the finished state before throwing a
+    # Timeout exception.
+    timeout = 10
+
+    handle = self.execute_query_async(query, vector.get_value('exec_option'))
+    try:
+      # Wait for the query to finish (all rows are spooled). Assert that the executor
+      # has been shutdown and its memory has been released.
+      self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], timeout)
+      # Since FINISHED does not necessarily mean all results are spooled, sleep for
+      # enough time to allow for all results to be spooled. This can be better enforced
+      # once the metrics in IMPALA-8825 have been added.
+      sleep(1)
+      assert "NumCompletedBackends: 1 (1)" in self.client.get_runtime_profile(handle)
+      mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
+      assert mem_admitted['executor'][0] == 0
+      assert mem_admitted['coordinator'] > 0
+      assert get_num_completed_backends(self.cluster.impalads[0].service,
+               handle.get_handle().id) == 1
+
+      # Fetch all results from the query and assert that the coordinator and the executor
+      # have been shutdown and their memory has been released.
+      self.client.fetch(query, handle)
+      assert "NumCompletedBackends: 2 (2)" in self.client.get_runtime_profile(handle)
+      mem_admitted = get_mem_admitted_backends_debug_page(self.cluster)
+      assert mem_admitted['executor'][0] == 0
+      assert mem_admitted['coordinator'] == 0
+      assert get_num_completed_backends(self.cluster.impalads[0].service,
+               handle.get_handle().id) == 2
+    finally:
+      self.client.close_query(handle)
diff --git a/tests/util/auto_scaler.py b/tests/util/auto_scaler.py
index e3b851e..95c0f77 100755
--- a/tests/util/auto_scaler.py
+++ b/tests/util/auto_scaler.py
@@ -43,10 +43,10 @@ class AutoScaler(object):
   DEFAULT_POOL_NAME = "default-pool"
 
   def __init__(self, executor_slots, group_size, start_batch_size=0, max_groups=0,
-               wait_up_s=0, wait_down_s=0):
+               wait_up_s=0, wait_down_s=0, coordinator_slots=128):
     # Number of queries that can run concurrently on each executor
     self.executor_slots = executor_slots
-    self.coordinator_slots = 128
+    self.coordinator_slots = coordinator_slots
     # Number of executors per executor group
     self.group_size = group_size
     # New executor groups will be started in increments of this size
diff --git a/tests/util/web_pages_util.py b/tests/util/web_pages_util.py
new file mode 100644
index 0000000..5bf97f7
--- /dev/null
+++ b/tests/util/web_pages_util.py
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+
+
+def get_num_completed_backends(service, query_id):
+    """Get the number of completed backends for the given query_id from the
+    'query_backends' web page."""
+    query_backend_url = 'query_backends?query_id={0}&json'.format(query_id)
+    query_backends_json = json.loads(service.read_debug_webpage(query_backend_url))
+    assert 'backend_states' in query_backends_json
+    num_complete_backends = 0
+    for backend_state in query_backends_json['backend_states']:
+      if backend_state['done']: num_complete_backends += 1
+    return num_complete_backends
+
+
+def get_mem_admitted_backends_debug_page(cluster):
+  """Helper method assumes a cluster using a dedicated coordinator. Returns the mem
+  admitted to the backends extracted from the backends debug page of the coordinator
+  impala daemon. Returns a dictionary with the keys 'coordinator' and 'executor' and
+  their respective mem values in bytes. The entry for 'executor' is a list of the mem
+  admitted for each executor."""
+  # Based on how the cluster is setup, the first impalad in the cluster is the
+  # coordinator.
+  response_json = cluster.impalads[0].service.get_debug_webpage_json('backends')
+  assert 'backends' in response_json
+  assert len(response_json['backends']) >= 2
+  ret = dict()
+  ret['executor'] = []
+  from tests.verifiers.mem_usage_verifier import parse_mem_value
+  for backend in response_json['backends']:
+    if backend['is_coordinator']:
+      ret['coordinator'] = parse_mem_value(backend['mem_admitted'])
+    else:
+      ret['executor'].append(parse_mem_value(backend['mem_admitted']))
+  return ret