You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/10/29 18:44:22 UTC

[3/5] incubator-impala git commit: IMPALA-1575: Part 1: eagerly release query exec resources

IMPALA-1575: Part 1: eagerly release query exec resources

Release of backend resources for query execution on the coordinator
daemon should occur eagerly as soon as query execution is finished
or cancelled. Before this patch some resources managed by QueryState,
like scratch files, were only released when the query was closed
and all of the query's control structures were torn down.

These resources are referred to as "ExecResources" in various places to
distinguish them from resources associated with the client request (like
the result cache) that are still required after the query finishes
executing.

This first change does not solve the admission control problem for two
reasons:
* We don't release the "admitted" memory on the coordinator until
  the query is unregistered.
* Admission control still considers the memory reserved until the
  query memtracker is unregistered, which happens only when the
  QueryState is destroyed: see MemTracker::GetPoolMemReserved().

The flow is mostly similar to initial_reservation_refcnt_, except the
coordinator also holds onto a reference count, which is released
when either the final row is returned or cancellation is initiated.
After the coordinator releases its refcount, the resources can be
freed as soon as local fragments also release their refcounts.

Also clean up Coordinator slightly by preventing runtime_state() from
leaking out to ClientRequestState - instead it's possible to log
the query MemTracker by following parent links in the MemTracker.

This patch is partially based on Joe McDonnell's IMPALA-1575 patch.

Testing:
Ran core tests.

Change-Id: I41ff374b0403f10a145f7fee9b3145953ee32341
Reviewed-on: http://gerrit.cloudera.org:8080/8303
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9174af38
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9174af38
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9174af38

Branch: refs/heads/master
Commit: 9174af38c85c080c776ecc5b1aff0aee9aef2f57
Parents: 5810c75
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Oct 16 22:59:03 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Oct 28 02:45:35 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc          | 43 ++++++++++++++--------
 be/src/runtime/coordinator.h           | 30 +++++++--------
 be/src/runtime/mem-tracker.cc          | 17 +++++++--
 be/src/runtime/mem-tracker.h           | 15 +++++++-
 be/src/runtime/query-exec-mgr.cc       |  6 +--
 be/src/runtime/query-state.cc          | 57 +++++++++++++++++------------
 be/src/runtime/query-state.h           | 48 +++++++++++++-----------
 be/src/runtime/runtime-state.cc        |  8 +++-
 be/src/runtime/test-env.cc             |  2 +-
 be/src/service/client-request-state.cc |  3 +-
 10 files changed, 140 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174af38/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index af3ec21..5f6bd08 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -77,8 +77,8 @@ Coordinator::Coordinator(
     query_events_(events) {}
 
 Coordinator::~Coordinator() {
-  DCHECK(released_resources_)
-      << "ReleaseResources() must be called before Coordinator is destroyed";
+  DCHECK(released_exec_resources_)
+      << "ReleaseExecResources() must be called before Coordinator is destroyed";
   if (query_state_ != nullptr) {
     ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
   }
@@ -122,6 +122,7 @@ Status Coordinator::Exec() {
   lock_guard<mutex> l(lock_);
 
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx_);
+  query_state_->AcquireExecResourceRefcount(); // Decremented in ReleaseExecResources().
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
       -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
 
@@ -813,6 +814,10 @@ Status Coordinator::Wait() {
   Status status = WaitForBackendCompletion();
   if (!needs_finalization_ && !status.ok()) return status;
 
+  // Execution of query fragments has finished. We don't need to hold onto query execution
+  // resources while we finalize the query.
+  ReleaseExecResources();
+
   // Query finalization is required only for HDFS table sinks
   if (needs_finalization_) RETURN_IF_ERROR(FinalizeQuery());
 
@@ -849,11 +854,8 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
 
   if (*eos) {
     returned_all_results_ = true;
-    // release resources here, since we won't be fetching more result rows
-    {
-      lock_guard<mutex> l(lock_);
-      ReleaseResources();
-    }
+    // release query execution resources here, since we won't be fetching more result rows
+    ReleaseExecResources();
 
     // wait for all backends to complete before computing the summary
     // TODO: relocate this so GetNext() won't have to wait for backends to complete?
@@ -895,10 +897,10 @@ void Coordinator::CancelInternal() {
       PrintId(query_id()), num_cancelled);
   backend_completion_cv_.notify_all();
 
+  ReleaseExecResourcesLocked();
+
   // Report the summary with whatever progress the query made before being cancelled.
   ComputeQuerySummary();
-
-  ReleaseResources();
 }
 
 Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& params) {
@@ -1039,9 +1041,14 @@ string Coordinator::GetErrorLog() {
   return PrintErrorMapToString(merged);
 }
 
-void Coordinator::ReleaseResources() {
-  if (released_resources_) return;
-  released_resources_ = true;
+void Coordinator::ReleaseExecResources() {
+  lock_guard<mutex> l(lock_);
+  ReleaseExecResourcesLocked();
+}
+
+void Coordinator::ReleaseExecResourcesLocked() {
+  if (released_exec_resources_) return;
+  released_exec_resources_ = true;
   if (filter_routing_table_.size() > 0) {
     query_profile_->AddInfoString("Final filter table", FilterDebugString());
   }
@@ -1056,9 +1063,11 @@ void Coordinator::ReleaseResources() {
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
   // Need to protect against failed Prepare(), where root_sink() would not be set.
-  if (coord_sink_ != nullptr) {
-    coord_sink_->CloseConsumer();
-  }
+  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
+  // Now that we've released our own resources, can release query-wide resources.
+  if (query_state_ != nullptr) query_state_->ReleaseExecResourceRefcount();
+  // At this point some tracked memory may still be used in the coordinator for result
+  // caching. The query MemTracker will be cleaned up later.
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
@@ -1084,12 +1093,14 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
     DCHECK(state->desc().has_remote_targets)
           << "Coordinator received filter that has only local targets";
 
-    // Check if the filter has already been sent, which could happen in three cases:
+    // Check if the filter has already been sent, which could happen in four cases:
     //   * if one local filter had always_true set - no point waiting for other local
     //     filters that can't affect the aggregated global filter
     //   * if this is a broadcast join, and another local filter was already received
     //   * if the filter could not be allocated and so an always_true filter was sent
     //     immediately.
+    //   * query execution finished and resources were released: filters do not need
+    //     to be processed.
     if (state->disabled()) return;
 
     if (filter_updates_received_->value() == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174af38/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 5802c83..611e4ae 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -139,14 +139,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Only valid to call after Exec().
   QueryState* query_state() const { return query_state_; }
 
-  /// Only valid *after* calling Exec(). Return nullptr if the running query does not
-  /// produce any rows.
-  ///
-  /// TODO: The only dependency on this is QueryExecState, used to track memory for the
-  /// result cache. Remove this dependency, possibly by moving result caching inside this
-  /// class.
-  RuntimeState* runtime_state();
-
   /// Get cumulative profile aggregated over all fragments of the query.
   /// This is a snapshot of the current state of execution and will change in
   /// the future if not all fragments have finished execution.
@@ -286,8 +278,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// time.
   /// TODO: clarify to what extent the fields below need to be protected by lock_
   /// Lock ordering is
-  /// 1. lock_
-  /// 2. BackendState::lock_
+  /// 1. wait_lock_
+  /// 2. lock_
+  /// 3. BackendState::lock_
+  /// 4. filter_lock_
   boost::mutex lock_;
 
   /// Overall status of the entire query; set to the first reported fragment error
@@ -349,12 +343,16 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// safe to concurrently read from filter_routing_table_.
   bool filter_routing_table_complete_ = false;
 
-  /// True if and only if ReleaseResources() has been called.
-  bool released_resources_ = false;
+  /// True if and only if ReleaseExecResources() has been called.
+  bool released_exec_resources_ = false;
 
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
+  /// Only valid *after* calling Exec(). Return nullptr if the running query does not
+  /// produce any rows.
+  RuntimeState* runtime_state();
+
   /// Returns a pretty-printed table of the current filter state.
   std::string FilterDebugString();
 
@@ -434,9 +432,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// filters that they either produce or consume.
   void InitFilterRoutingTable();
 
-  /// Releases filter resources, unregisters the filter mem tracker, and calls
-  /// CloseConsumer() on coord_sink_. Requires lock_ to be held. Idempotent.
-  void ReleaseResources();
+  /// Releases all resources associated with query execution. Acquires lock_. Idempotent.
+  void ReleaseExecResources();
+
+  /// Same as ReleaseExecResources() except the lock must be held by the caller.
+  void ReleaseExecResourcesLocked();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174af38/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 670a62e..a8cb37e 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -193,6 +193,7 @@ MemTracker* MemTracker::CreateQueryMemTracker(const TUniqueId& id,
           pool_name, true);
   MemTracker* tracker = obj_pool->Add(new MemTracker(
       byte_limit, Substitute("Query($0)", lexical_cast<string>(id)), pool_tracker));
+  tracker->is_query_mem_tracker_ = true;
   tracker->query_id_ = id;
   return tracker;
 }
@@ -316,6 +317,14 @@ string MemTracker::LogUsage(int max_recursive_depth, const string& prefix,
   return join(usage_strings, "\n");
 }
 
+MemTracker* MemTracker::GetQueryMemTracker() {
+  MemTracker* tracker = this;
+  while (tracker != nullptr && !tracker->is_query_mem_tracker_) {
+    tracker = tracker->parent_;
+  }
+  return tracker;
+}
+
 Status MemTracker::MemLimitExceeded(RuntimeState* state, const std::string& details,
     int64_t failed_allocation_size) {
   DCHECK_GE(failed_allocation_size, 0);
@@ -336,8 +345,8 @@ Status MemTracker::MemLimitExceeded(RuntimeState* state, const std::string& deta
      << PrettyPrinter::Print(process_capacity, TUnit::BYTES) << endl;
 
   // Always log the query tracker (if available).
-  if (state != nullptr) {
-    MemTracker* query_tracker = state->query_mem_tracker();
+  MemTracker* query_tracker = GetQueryMemTracker();
+  if (query_tracker != nullptr) {
     if (query_tracker->has_limit()) {
       const int64_t query_capacity = query_tracker->limit() - query_tracker->consumption();
       ss << "Memory left in query limit: "
@@ -347,8 +356,8 @@ Status MemTracker::MemLimitExceeded(RuntimeState* state, const std::string& deta
   }
 
   // Log the process level if the process tracker is close to the limit or
-  // if the query tracker was not available.
-  if (process_capacity < failed_allocation_size || state == nullptr) {
+  // if this tracker is not within a query's MemTracker hierarchy.
+  if (process_capacity < failed_allocation_size || query_tracker == nullptr) {
     // IMPALA-5598: For performance reasons, limit the levels of recursion when
     // dumping the process tracker to only two layers.
     ss << process_tracker->LogUsage(PROCESS_MEMTRACKER_LIMITED_DEPTH);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174af38/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 644353d..1260351 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -347,6 +347,7 @@ class MemTracker {
   /// details of the allocation which caused the limit to be exceeded.
   /// If 'failed_allocation_size' is greater than zero, logs the allocation size. If
   /// 'failed_allocation_size' is zero, nothing about the allocation size is logged.
+  /// If 'state' is non-NULL, logs the error to 'state'.
   Status MemLimitExceeded(RuntimeState* state, const std::string& details,
       int64_t failed_allocation = 0) WARN_UNUSED_RESULT;
 
@@ -375,9 +376,16 @@ class MemTracker {
   static std::string LogUsage(int max_recursive_depth, const std::string& prefix,
       const std::list<MemTracker*>& trackers, int64_t* logged_consumption);
 
+  /// If an ancestor of this tracker is a query MemTracker, return that tracker.
+  /// Otherwise return NULL.
+  MemTracker* GetQueryMemTracker();
+
   /// Lock to protect GcMemory(). This prevents many GCs from occurring at once.
   boost::mutex gc_lock_;
 
+  /// True if this is a Query MemTracker returned from CreateQueryMemTracker().
+  bool is_query_mem_tracker_ = false;
+
   /// Only valid for MemTrackers returned from CreateQueryMemTracker()
   TUniqueId query_id_;
 
@@ -386,10 +394,13 @@ class MemTracker {
 
   /// Hard limit on memory consumption, in bytes. May not be exceeded. If limit_ == -1,
   /// there is no consumption limit.
-  int64_t limit_;
+  const int64_t limit_;
 
   std::string label_;
-  MemTracker* parent_;
+
+  /// The parent of this tracker. The pointer is never modified, even after this tracker
+  /// is unregistered.
+  MemTracker* const parent_;
 
   /// in bytes; not owned
   RuntimeProfile::HighWaterMarkCounter* consumption_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174af38/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 071c5dd..4f30f4e 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -50,6 +50,7 @@ Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
   QueryState* qs = GetOrCreateQueryState(params.query_ctx, &dummy);
   Status status = qs->Init(params);
   if (!status.ok()) {
+    qs->ReleaseExecResourceRefcount(); // Release refcnt acquired in Init().
     ReleaseQueryState(qs);
     return status;
   }
@@ -61,7 +62,7 @@ Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
           &QueryExecMgr::StartQueryHelper, this, qs, &t, true);
   if (!status.ok()) {
     // decrement refcount taken in QueryState::Init()
-    qs->ReleaseInitialReservationRefcount();
+    qs->ReleaseExecResourceRefcount();
     // decrement refcount taken in GetOrCreateQueryState()
     ReleaseQueryState(qs);
     return status;
@@ -133,7 +134,7 @@ void QueryExecMgr::StartQueryHelper(QueryState* qs) {
 #endif
 
   // decrement refcount taken in QueryState::Init();
-  qs->ReleaseInitialReservationRefcount();
+  qs->ReleaseExecResourceRefcount();
   // decrement refcount taken in StartQuery()
   ReleaseQueryState(qs);
 }
@@ -166,6 +167,5 @@ void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
     qs_map_.erase(it);
   }
   // TODO: send final status report during gc, but do this from a different thread
-  qs_from_map->ReleaseResources();
   delete qs_from_map;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174af38/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index ea24411..6796c82 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -51,7 +51,7 @@ QueryState::ScopedRef::~ScopedRef() {
 
 QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool)
   : query_ctx_(query_ctx),
-    initial_reservation_refcnt_(0),
+    exec_resource_refcnt_(0),
     refcnt_(0),
     is_cancelled_(0),
     query_spilled_(0) {
@@ -74,31 +74,39 @@ QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool)
   InitMemTrackers();
 }
 
-void QueryState::ReleaseResources() {
-  DCHECK(!released_resources_);
+void QueryState::ReleaseExecResources() {
+  DCHECK(!released_exec_resources_);
   // Clean up temporary files.
   if (file_group_ != nullptr) file_group_->Close();
   // Release any remaining reservation.
   if (initial_reservations_ != nullptr) initial_reservations_->ReleaseResources();
   if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
-  if (query_mem_tracker_ != nullptr) {
-    // No more tracked memory should be used by the query after this point, so we can
-    // close the MemTracker and remove the whole query subtree of MemTrackers from the
-    // global tree. After this point nothing should be touching this query's MemTrackers
-    // and they can be safely destroyed.
-    query_mem_tracker_->CloseAndUnregisterFromParent();
-  }
   if (desc_tbl_ != nullptr) desc_tbl_->ReleaseResources();
-  released_resources_ = true;
+  // At this point query execution should not be consuming any resources but some tracked
+  // memory may still be used by the ClientRequestState for result caching. The query
+  // MemTracker will be closed later when this QueryState is torn down.
+  released_exec_resources_ = true;
 }
 
 QueryState::~QueryState() {
-  DCHECK(released_resources_);
   DCHECK_EQ(refcnt_.Load(), 0);
-  DCHECK_EQ(initial_reservation_refcnt_.Load(), 0);
+  DCHECK_EQ(exec_resource_refcnt_.Load(), 0);
+  DCHECK(released_exec_resources_);
+  if (query_mem_tracker_ != nullptr) {
+    // Disconnect the query MemTracker hierarchy from the global hierarchy. After this
+    // point nothing must touch this query's MemTracker and all tracked memory associated
+    // with the query must be released. The whole query subtree of MemTrackers can
+    // therefore be safely destroyed.
+    query_mem_tracker_->CloseAndUnregisterFromParent();
+  }
 }
 
 Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
+  // Decremented in QueryExecMgr::StartQueryHelper() on success or by the caller of
+  // Init() on failure. We need to do this before any returns because Init() always
+  // returns a resource refcount to its caller.
+  AcquireExecResourceRefcount();
+
   // Starting a new query creates threads and consumes a non-trivial amount of memory.
   // If we are already starved for memory, fail as early as possible to avoid consuming
   // more resources.
@@ -129,8 +137,6 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
       rpc_params.initial_reservation_total_claims));
   RETURN_IF_ERROR(
       initial_reservations_->Init(query_id(), rpc_params.min_reservation_bytes));
-  DCHECK_EQ(0, initial_reservation_refcnt_.Load());
-  initial_reservation_refcnt_.Add(1); // Decremented in QueryExecMgr::StartQueryHelper().
   return Status::OK();
 }
 
@@ -278,7 +284,7 @@ void QueryState::StartFInstances() {
   VLOG_QUERY << "StartFInstances(): query_id=" << PrintId(query_id())
       << " #instances=" << rpc_params_.fragment_instance_ctxs.size();
   DCHECK_GT(refcnt_.Load(), 0);
-  DCHECK_GT(initial_reservation_refcnt_.Load(), 0) << "Should have been taken in Init()";
+  DCHECK_GT(exec_resource_refcnt_.Load(), 0) << "Should have been taken in Init()";
 
   // set up desc tbl
   DCHECK(query_ctx().__isset.desc_tbl);
@@ -308,8 +314,8 @@ void QueryState::StartFInstances() {
         new FragmentInstanceState(this, *fragment_ctx, instance_ctx));
 
     // start new thread to execute instance
-    refcnt_.Add(1);  // decremented in ExecFInstance()
-    initial_reservation_refcnt_.Add(1);  // decremented in ExecFInstance()
+    refcnt_.Add(1); // decremented in ExecFInstance()
+    AcquireExecResourceRefcount(); // decremented in ExecFInstance()
     string thread_name = Substitute(
         "exec-finstance (finst:$0)", PrintId(instance_ctx.fragment_instance_id));
     unique_ptr<Thread> t;
@@ -319,7 +325,7 @@ void QueryState::StartFInstances() {
       // Undo refcnt increments done immediately prior to Thread::Create(). The
       // reference counts were both greater than zero before the increments, so
       // neither of these decrements will free any structures.
-      ReleaseInitialReservationRefcount();
+      ReleaseExecResourceRefcount();
       ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
       break;
     }
@@ -352,10 +358,15 @@ void QueryState::StartFInstances() {
   }
 }
 
-void QueryState::ReleaseInitialReservationRefcount() {
-  int32_t new_val = initial_reservation_refcnt_.Add(-1);
+void QueryState::AcquireExecResourceRefcount() {
+  DCHECK(!released_exec_resources_);
+  exec_resource_refcnt_.Add(1);
+}
+
+void QueryState::ReleaseExecResourceRefcount() {
+  int32_t new_val = exec_resource_refcnt_.Add(-1);
   DCHECK_GE(new_val, 0);
-  if (new_val == 0) initial_reservations_->ReleaseResources();
+  if (new_val == 0) ReleaseExecResources();
 }
 
 void QueryState::ExecFInstance(FragmentInstanceState* fis) {
@@ -374,7 +385,7 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
   // initiate cancellation if nobody has done so yet
   if (!status.ok()) Cancel();
   // decrement refcount taken in StartFInstances()
-  ReleaseInitialReservationRefcount();
+  ReleaseExecResourceRefcount();
   // decrement refcount taken in StartFInstances()
   ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174af38/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index d9606be..82f2c52 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -27,7 +27,6 @@
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
 #include "runtime/tmp-file-mgr.h"
-#include "util/spinlock.h"
 #include "util/uid-util.h"
 #include "util/promise.h"
 
@@ -55,6 +54,10 @@ class RuntimeState;
 /// structures (contained either in this class or accessible through this class, such
 /// as the FragmentInstanceStates) are guaranteed to be alive.
 ///
+/// Query execution resources (non-control-structure memory, scratch files, threads, etc)
+/// are also managed via a separate resource reference count, which should be released as
+/// soon as the resources are not needed to free resources promptly.
+///
 /// When any fragment instance execution returns with an error status, all
 /// fragment instances are automatically cancelled.
 ///
@@ -66,10 +69,6 @@ class RuntimeState;
 ///
 /// TODO:
 /// - set up kudu clients in Init(), remove related locking
-/// - release resources (those referenced directly or indirectly by the query result
-///   set) automatically when all instances have finished execution
-///   (either by returning all rows or by being cancelled), rather than waiting for an
-///   explicit call to ReleaseResources()
 /// - when ReportExecStatus() encounters an error, query execution at this node
 ///   gets aborted, but it's possible for the coordinator not to find out about that;
 ///   fix the coordinator to periodically ping the backends (should the coordinator
@@ -120,9 +119,10 @@ class QueryState {
   const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
 
   /// Sets up state required for fragment execution: memory reservations, etc. Fails
-  /// if resources could not be acquired. On success, acquires an initial reservation
-  /// refcount for the caller, which the caller must release by calling
-  /// ReleaseInitialReservationRefcount().
+  /// if resources could not be acquired. Acquires a resource refcount and returns it
+  /// to the caller on both success and failure. The caller must release it by calling
+  /// ReleaseExecResourceRefcount().
+  ///
   /// Uses few cycles and never blocks. Not idempotent, not thread-safe.
   /// The remaining public functions must be called only after Init().
   Status Init(const TExecQueryFInstancesParams& rpc_params) WARN_UNUSED_RESULT;
@@ -149,10 +149,16 @@ class QueryState {
   /// instances have finished their Prepare phase. Idempotent.
   void Cancel();
 
-  /// Called once the query is complete to release any resources.
-  /// Must be called only once and before destroying the QueryState.
-  /// Not idempotent, not thread-safe.
-  void ReleaseResources();
+  /// Increment the resource refcount. Must be decremented before the query state
+  /// reference is released. A refcount should be held by a fragment or other entity
+  /// for as long as it is consuming query execution resources (e.g. memory).
+  void AcquireExecResourceRefcount();
+
+  /// Decrement the execution resource refcount and release resources if it goes to zero.
+  /// All resource refcounts must be released before query state references are released.
+  /// Should be called by the owner of the refcount after it is done consuming query
+  /// execution resources.
+  void ReleaseExecResourceRefcount();
 
   /// Sends a ReportExecStatus rpc to the coordinator. If fis == nullptr, the
   /// status must be an error. If fis is given, the content will depend on whether
@@ -198,10 +204,10 @@ class QueryState {
   /// 'buffer_reservation_'. Owned by 'obj_pool_'. Set in Prepare().
   InitialReservations* initial_reservations_ = nullptr;
 
-  /// Number of fragment instances executing, which may need to claim
-  /// from 'initial_reservations_'.
-  /// TODO: not needed if we call ReleaseResources() in a timely manner (IMPALA-1575).
-  AtomicInt32 initial_reservation_refcnt_;
+  /// Number of active fragment instances and coordinators for this query that may consume
+  /// resources for query execution (i.e. threads, memory) on the Impala daemon.
+  /// Query-wide execution resources for this query are released once this goes to zero.
+  AtomicInt32 exec_resource_refcnt_;
 
   /// Temporary files for this query (owned by obj_pool_). Non-null if spilling is
   /// enabled. Set in Prepare().
@@ -230,8 +236,8 @@ class QueryState {
   /// initiate cancellation exactly once
   AtomicInt32 is_cancelled_;
 
-  /// True if and only if ReleaseResources() has been called.
-  bool released_resources_ = false;
+  /// True if and only if ReleaseExecResources() has been called.
+  bool released_exec_resources_ = false;
 
   /// Whether the query has spilled. 0 if the query has not spilled. Atomically set to 1
   /// when the query first starts to spill. Required to correctly maintain the
@@ -252,9 +258,9 @@ class QueryState {
   /// Called from Init() to set up buffer reservations and the file group.
   Status InitBufferPoolState() WARN_UNUSED_RESULT;
 
-  /// Decrement 'initial_reservation_refcnt_' and release the initial reservation if it
-  /// goes to zero.
-  void ReleaseInitialReservationRefcount();
+  /// Releases resources used for query execution. Guaranteed to be called only once.
+  /// Must be called before destroying the QueryState. Not idempotent and not thread-safe.
+  void ReleaseExecResources();
 
   /// Same behavior as ReportExecStatus().
   /// Cancel on error only if instances_started is true.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174af38/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 0565cf5..be393f1 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -75,6 +75,8 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
   Init();
 }
 
+// Constructor for standalone RuntimeState for test execution and fe-support.cc.
+// Sets up a dummy local QueryState to allow evaluating exprs, etc.
 RuntimeState::RuntimeState(
     const TQueryCtx& qctx, ExecEnv* exec_env, DescriptorTbl* desc_tbl)
   : query_state_(new QueryState(qctx, "test-pool")),
@@ -85,6 +87,9 @@ RuntimeState::RuntimeState(
     utc_timestamp_(new TimestampValue(TimestampValue::Parse(qctx.utc_timestamp_string))),
     exec_env_(exec_env),
     profile_(RuntimeProfile::Create(obj_pool(), "<unnamed>")) {
+  // We may use execution resources while evaluating exprs, etc. Decremented in
+  // ReleaseResources() to release resources.
+  local_query_state_->AcquireExecResourceRefcount();
   if (query_ctx().request_pool.empty()) {
     const_cast<TQueryCtx&>(query_ctx()).request_pool = "test-pool";
   }
@@ -251,8 +256,7 @@ void RuntimeState::ReleaseResources() {
   instance_mem_tracker_->Close();
 
   if (local_query_state_.get() != nullptr) {
-    // if we created this QueryState, we must call ReleaseResources()
-    local_query_state_->ReleaseResources();
+    local_query_state_->ReleaseExecResourceRefcount();
   }
   released_resources_ = true;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174af38/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 69bbc50..770eaba 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -86,7 +86,7 @@ void TestEnv::TearDownQueries() {
   for (RuntimeState* runtime_state : runtime_states_) runtime_state->ReleaseResources();
   runtime_states_.clear();
   for (QueryState* query_state : query_states_) {
-    query_state->ReleaseInitialReservationRefcount();
+    query_state->ReleaseExecResourceRefcount();
     exec_env_->query_exec_mgr()->ReleaseQueryState(query_state);
   }
   query_states_.clear();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9174af38/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index ec4768e..e6fd618 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -824,8 +824,7 @@ Status ClientRequestState::FetchRowsInternal(const int32_t max_rows,
     // Count the cached rows towards the mem limit.
     if (UNLIKELY(!query_mem_tracker->TryConsume(delta_bytes))) {
       string details("Failed to allocate memory for result cache.");
-      return query_mem_tracker->MemLimitExceeded(coord_->runtime_state(), details,
-          delta_bytes);
+      return query_mem_tracker->MemLimitExceeded(nullptr, details, delta_bytes);
     }
     // Append all rows fetched from the coordinator into the cache.
     int num_rows_added = result_cache_->AddRows(