You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/28 05:28:47 UTC

[4/4] impala git commit: IMPALA-6920: fix inconsistencies with scanner thread tokens

IMPALA-6920: fix inconsistencies with scanner thread tokens

The first scanner thread to start now takes a "required" token,
which always succeeds. Only additional threads try to get
"optional" tokens, which can fail. Previously threads always
requested optional tokens, which could fail and leave the scan
node without any running threads until its callback is invoked.

This allows us to remove the "reserved optional token" and
set_max_quota() interfaces from ThreadResourceManager. There should
be no behavioural changes in ThreadResourceMgr in cases when those
features are not used.

Also switch Kudu to using the same logic for implementing
NUM_SCANNER_THREADS (it was not switched over to the improved
HDFS scanner logic added in IMPALA-2831).

Do some cleanup in ThreadResourceMgr code while we're here:
* Fix some benign data races in ThreadResourceMgr by switching to
  AtomicInt* classes.
* Remove pointless object caching (TCMalloc will do better).
* Reduce dependencies on the thread-resource-mgr.h header.

Testing:
Ran core tests.

Ran a few queries under TSAN, checked that it didn't report any more
races in this code after fixing those data races.

I couldn't construct a regression test because there are no easily
testable consequences of the change - the main difference is that
some scanner threads start earlier when there is pressure on scanner
thread tokens but that is hard to construct a robust test around.

Change-Id: I16d31d72441aff7293759281d0248e641df43704
Reviewed-on: http://gerrit.cloudera.org:8080/10186
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/789c5aac
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/789c5aac
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/789c5aac

Branch: refs/heads/master
Commit: 789c5aac23480acc6e18c057b767b65fdd791c97
Parents: d0f838b
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Apr 24 15:36:41 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Sat Apr 28 04:30:55 2018 +0000

----------------------------------------------------------------------
 be/src/exec/blocking-join-node.cc          |   1 +
 be/src/exec/hdfs-scan-node.cc              |  25 +-
 be/src/exec/hdfs-scan-node.h               |   7 +-
 be/src/exec/kudu-scan-node.cc              |  31 +-
 be/src/exec/kudu-scan-node.h               |  21 +-
 be/src/runtime/fragment-instance-state.cc  |   5 +-
 be/src/runtime/io/disk-io-mgr-internal.h   |   1 -
 be/src/runtime/io/disk-io-mgr.h            |   1 -
 be/src/runtime/runtime-state.cc            |   7 +-
 be/src/runtime/runtime-state.h             |   6 +-
 be/src/runtime/thread-resource-mgr-test.cc |  36 +--
 be/src/runtime/thread-resource-mgr.cc      | 112 +++----
 be/src/runtime/thread-resource-mgr.h       | 374 ++++++++++--------------
 13 files changed, 290 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index cfaf91a..57b7723 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -26,6 +26,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/tuple-row.h"
+#include "runtime/thread-resource-mgr.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 #include "util/time.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 7c64338..045ae18 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -30,6 +30,7 @@
 #include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/thread-resource-mgr.h"
 #include "util/debug-util.h"
 #include "util/disk-info.h"
 #include "util/runtime-profile-counters.h"
@@ -211,9 +212,6 @@ Status HdfsScanNode::Open(RuntimeState* state) {
 
   if (file_descs_.empty() || progress_.done()) return Status::OK();
 
-  // We need at least one scanner thread to make progress. We need to make this
-  // reservation before any ranges are issued.
-  runtime_state_->resource_pool()->ReserveOptionalTokens(1);
   if (runtime_state_->query_options().num_scanner_threads > 0) {
     max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads;
   }
@@ -295,7 +293,7 @@ bool HdfsScanNode::EnoughMemoryForScannerThread(bool new_thread) {
   return est_additional_scanner_mem < mem_tracker()->SpareCapacity();
 }
 
-void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
+void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
   // This is called to start up new scanner threads. It's not a big deal if we
   // spin up more than strictly necessary since they will go through and terminate
   // promptly. However, we want to minimize that by checking a conditions.
@@ -328,17 +326,20 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
       break;
     }
 
+    bool first_thread = active_scanner_thread_counter_.value() == 0;
     // Cases 5 and 6.
-    if (active_scanner_thread_counter_.value() > 0 &&
+    if (!first_thread &&
         (materialized_row_batches_->Size() >= max_materialized_row_batches_ ||
          !EnoughMemoryForScannerThread(true))) {
       break;
     }
 
     // Case 7 and 8.
-    bool is_reserved = false;
-    if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ ||
-        !pool->TryAcquireThreadToken(&is_reserved)) {
+    if (first_thread) {
+      // The first thread is required to make progress on the scan.
+      pool->AcquireThreadToken();
+    } else if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_
+        || !pool->TryAcquireThreadToken()) {
       break;
     }
 
@@ -347,7 +348,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
         PrintId(runtime_state_->fragment_instance_id()), id(),
         num_scanner_threads_started_counter_->value());
 
-    auto fn = [this]() { this->ScannerThread(); };
+    auto fn = [this, first_thread]() { this->ScannerThread(first_thread); };
     std::unique_ptr<Thread> t;
     status =
       Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
@@ -357,7 +358,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
       // serves two purposes. First, it prevents a mutual recursion between this function
       // and ReleaseThreadToken()->InvokeCallbacks(). Second, Thread::Create() failed and
       // is likely to continue failing for future callbacks.
-      pool->ReleaseThreadToken(false, true);
+      pool->ReleaseThreadToken(first_thread, true);
 
       // Abort the query. This is still holding the lock_, so done_ is known to be
       // false and status_ must be ok.
@@ -372,7 +373,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
   }
 }
 
-void HdfsScanNode::ScannerThread() {
+void HdfsScanNode::ScannerThread(bool first_thread) {
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
 
@@ -474,7 +475,7 @@ void HdfsScanNode::ScannerThread() {
   COUNTER_ADD(&active_scanner_thread_counter_, -1);
 
 exit:
-  runtime_state_->resource_pool()->ReleaseThreadToken(false);
+  runtime_state_->resource_pool()->ReleaseThreadToken(first_thread);
   for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
   filter_mem_pool.FreeAll();
   expr_results_pool.FreeAll();

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index a1c97cf..a9be94e 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -39,6 +39,7 @@ class DescriptorTbl;
 class ObjectPool;
 class RuntimeState;
 class RowBatch;
+class ThreadResourcePool;
 class TPlanNode;
 
 /// Legacy ScanNode implementation used in the non-multi-threaded execution mode
@@ -155,12 +156,14 @@ class HdfsScanNode : public HdfsScanNodeBase {
 
   /// Tries to spin up as many scanner threads as the quota allows. Called explicitly
   /// (e.g., when adding new ranges) or when threads are available for this scan node.
-  void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool);
+  void ThreadTokenAvailableCb(ThreadResourcePool* pool);
 
   /// Main function for scanner thread. This thread pulls the next range to be
   /// processed from the IoMgr and then processes the entire range end to end.
   /// This thread terminates when all scan ranges are complete or an error occurred.
-  void ScannerThread();
+  /// 'first_thread' is true if this was the first scanner thread to start and
+  /// it acquired a "required" thread token from ThreadResourceMgr.
+  void ScannerThread(bool first_thread);
 
   /// Process the entire scan range with a new scanner object. Executed in scanner
   /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 6d5e085..16e0633 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -27,6 +27,7 @@
 #include "runtime/mem-pool.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
+#include "runtime/thread-resource-mgr.h"
 #include "runtime/tuple-row.h"
 #include "util/disk-info.h"
 #include "util/runtime-profile-counters.h"
@@ -43,6 +44,7 @@ KuduScanNode::KuduScanNode(ObjectPool* pool, const TPlanNode& tnode,
     : KuduScanNodeBase(pool, tnode, descs),
       num_active_scanners_(0),
       done_(false),
+      max_num_scanner_threads_(CpuInfo::num_cores()),
       thread_avail_cb_id_(-1) {
   DCHECK(KuduIsAvailable());
 
@@ -68,12 +70,10 @@ Status KuduScanNode::Open(RuntimeState* state) {
   num_scanner_threads_started_counter_ =
       ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
 
-  // Reserve one thread.
-  state->resource_pool()->ReserveOptionalTokens(1);
   if (state->query_options().num_scanner_threads > 0) {
-    state->resource_pool()->set_max_quota(
-        state->query_options().num_scanner_threads);
+    max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads;
   }
+  DCHECK_GT(max_num_scanner_threads_, 0);
 
   if (filter_ctxs_.size() > 0) WaitForRuntimeFilters();
 
@@ -138,14 +138,20 @@ void KuduScanNode::Close(RuntimeState* state) {
   KuduScanNodeBase::Close(state);
 }
 
-void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
+void KuduScanNode::ThreadAvailableCb(ThreadResourcePool* pool) {
   while (true) {
     unique_lock<mutex> lock(lock_);
     // All done or all tokens are assigned.
     if (done_ || !HasScanToken()) break;
+    bool first_thread = active_scanner_thread_counter_.value() == 0;
 
-    // Check if we can get a token.
-    if (!pool->TryAcquireThreadToken()) break;
+    // Check if we can get a token. We need at least one thread to run.
+    if (first_thread) {
+      pool->AcquireThreadToken();
+    } else if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_
+        || !pool->TryAcquireThreadToken()) {
+      break;
+    }
 
     string name = Substitute(
         "kudu-scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
@@ -154,7 +160,9 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
 
     // Reserve the first token so no other thread picks it up.
     const string* token = GetNextScanToken();
-    auto fn = [this, token, name]() { this->RunScannerThread(name, token); };
+    auto fn = [this, first_thread, token, name]() {
+      this->RunScannerThread(first_thread, name, token);
+    };
     std::unique_ptr<Thread> t;
     Status status =
       Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
@@ -163,7 +171,7 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
       // serves two purposes. First, it prevents a mutual recursion between this function
       // and ReleaseThreadToken()->InvokeCallbacks(). Second, Thread::Create() failed and
       // is likely to continue failing for future callbacks.
-      pool->ReleaseThreadToken(false, true);
+      pool->ReleaseThreadToken(first_thread, true);
 
       // Abort the query. This is still holding the lock_, so done_ is known to be
       // false and status_ must be ok.
@@ -201,7 +209,8 @@ Status KuduScanNode::ProcessScanToken(KuduScanner* scanner, const string& scan_t
   return Status::OK();
 }
 
-void KuduScanNode::RunScannerThread(const string& name, const string* initial_token) {
+void KuduScanNode::RunScannerThread(
+    bool first_thread, const string& name, const string* initial_token) {
   DCHECK(initial_token != NULL);
   SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
@@ -257,7 +266,7 @@ void KuduScanNode::RunScannerThread(const string& name, const string* initial_to
   // lock_ is released before calling ThreadResourceMgr::ReleaseThreadToken() which
   // invokes ThreadAvailableCb() which attempts to take the same lock.
   VLOG_RPC << "Thread done: " << name;
-  runtime_state_->resource_pool()->ReleaseThreadToken(false);
+  runtime_state_->resource_pool()->ReleaseThreadToken(first_thread);
 }
 
 void KuduScanNode::SetDoneInternal() {

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/exec/kudu-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index 4759f0a..2f8d808 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -23,13 +23,13 @@
 #include <kudu/client/client.h>
 
 #include "exec/kudu-scan-node-base.h"
-#include "runtime/thread-resource-mgr.h"
 #include "gutil/gscoped_ptr.h"
 #include "util/thread.h"
 
 namespace impala {
 
 class KuduScanner;
+class ThreadResourcePool;
 
 /// A scan node that scans a Kudu table.
 ///
@@ -77,6 +77,12 @@ class KuduScanNode : public KuduScanNodeBase {
   /// Thread group for all scanner worker threads
   ThreadGroup scanner_threads_;
 
+  /// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query
+  /// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads
+  /// are generally cpu bound so there is no benefit in spinning up more threads than
+  /// the number of cores.
+  int max_num_scanner_threads_;
+
   /// The id of the callback added to the thread resource manager when a thread
   /// is available. Used to remove the callback before this scan node is destroyed.
   /// -1 if no callback is registered.
@@ -84,13 +90,16 @@ class KuduScanNode : public KuduScanNodeBase {
 
   /// Called when scanner threads are available for this scan node. This will
   /// try to spin up as many scanner threads as the quota allows.
-  void ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool);
+  void ThreadAvailableCb(ThreadResourcePool* pool);
 
   /// Main function for scanner thread which executes a KuduScanner. Begins by processing
-  /// 'initial_token', and continues processing scan tokens returned by
-  /// 'GetNextScanToken()' until there are none left, an error occurs, or the limit is
-  /// reached.
-  void RunScannerThread(const std::string& name, const std::string* initial_token);
+  /// 'initial_token', and continues processing scan tokens returned by GetNextScanToken()
+  /// until there are none left, an error occurs, or the limit is reached. The caller must
+  /// have acquired a thread token from the ThreadResourceMgr for this thread. The token
+  /// is released before this function returns. 'first_thread' is true if this was the
+  /// first scanner thread to start and it acquired a "required" thread token.
+  void RunScannerThread(
+      bool first_thread, const std::string& name, const std::string* initial_token);
 
   /// Processes a single scan token. Row batches are fetched using 'scanner' and enqueued
   /// in 'materialized_row_batches_' until the scanner reports eos, an error occurs, or

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index a6ae1ff..a14bf31 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -42,6 +42,7 @@
 #include "runtime/query-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/thread-resource-mgr.h"
 #include "scheduling/query-schedule.h"
 #include "util/debug-util.h"
 #include "util/container-util.h"
@@ -141,7 +142,7 @@ Status FragmentInstanceState::Prepare() {
   // Reserve one main thread from the pool
   runtime_state_->resource_pool()->AcquireThreadToken();
   avg_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
-      bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
+      bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
           runtime_state_->resource_pool()));
   mem_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("MemoryUsage",
       TUnit::BYTES,
@@ -149,7 +150,7 @@ Status FragmentInstanceState::Prepare() {
           runtime_state_->instance_mem_tracker()));
   thread_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("ThreadUsage",
       TUnit::UNIT,
-      bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
+      bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
           runtime_state_->resource_pool()));
 
   // set up plan

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/io/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h
index 3fc3895..e6962ea 100644
--- a/be/src/runtime/io/disk-io-mgr-internal.h
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -27,7 +27,6 @@
 #include "runtime/io/request-context.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/thread-resource-mgr.h"
 #include "util/condition-variable.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index cfac328..52d6993 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -33,7 +33,6 @@
 #include "runtime/io/handle-cache.h"
 #include "runtime/io/local-file-system.h"
 #include "runtime/io/request-ranges.h"
-#include "runtime/thread-resource-mgr.h"
 #include "util/aligned-new.h"
 #include "util/bit-util.h"
 #include "util/condition-variable.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 29ea737..c8776ac 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -40,6 +40,7 @@
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
 #include "runtime/runtime-filter-bank.h"
+#include "runtime/thread-resource-mgr.h"
 #include "runtime/timestamp-value.h"
 #include "util/auth-util.h" // for GetEffectiveUser()
 #include "util/bitmap.h"
@@ -106,8 +107,8 @@ void RuntimeState::Init() {
   SCOPED_TIMER(profile_->total_time_counter());
 
   // Register with the thread mgr
-  resource_pool_ = exec_env_->thread_mgr()->RegisterPool();
-  DCHECK(resource_pool_ != NULL);
+  resource_pool_ = exec_env_->thread_mgr()->CreatePool();
+  DCHECK(resource_pool_ != nullptr);
 
   total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), "TotalThreads");
   total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), "TotalStorageWaitTime");
@@ -229,7 +230,7 @@ void RuntimeState::ReleaseResources() {
   DCHECK(!released_resources_);
   if (filter_bank_ != nullptr) filter_bank_->Close();
   if (resource_pool_ != nullptr) {
-    exec_env_->thread_mgr()->UnregisterPool(resource_pool_);
+    exec_env_->thread_mgr()->DestroyPool(move(resource_pool_));
   }
   // Release any memory associated with codegen.
   if (codegen_ != nullptr) codegen_->Close();

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 4b005b2..359afc5 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -26,7 +26,6 @@
 // NOTE: try not to add more headers here: runtime-state.h is included in many many files.
 #include "common/global-types.h"  // for PlanNodeId
 #include "runtime/client-cache-types.h"
-#include "runtime/thread-resource-mgr.h"
 #include "runtime/dml-exec-state.h"
 #include "util/runtime-profile.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
@@ -45,6 +44,7 @@ class RuntimeFilterBank;
 class ScalarFnCall;
 class Status;
 class TimestampValue;
+class ThreadResourcePool;
 class TUniqueId;
 class ExecEnv;
 class DataStreamMgrBase;
@@ -116,7 +116,7 @@ class RuntimeState {
   ReservationTracker* instance_buffer_reservation() {
     return instance_buffer_reservation_.get();
   }
-  ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
+  ThreadResourcePool* resource_pool() { return resource_pool_.get(); }
 
   void set_fragment_root_id(PlanNodeId id) {
     DCHECK_EQ(root_node_id_, -1) << "Should not set this twice.";
@@ -322,7 +322,7 @@ class RuntimeState {
 
   /// Thread resource management object for this fragment's execution.  The runtime
   /// state is responsible for returning this pool to the thread mgr.
-  ThreadResourceMgr::ResourcePool* resource_pool_ = nullptr;
+  std::unique_ptr<ThreadResourcePool> resource_pool_;
 
   /// Execution state for DML statements.
   DmlExecState dml_exec_state_;

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/thread-resource-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr-test.cc b/be/src/runtime/thread-resource-mgr-test.cc
index f480ecd..66c6d14 100644
--- a/be/src/runtime/thread-resource-mgr-test.cc
+++ b/be/src/runtime/thread-resource-mgr-test.cc
@@ -31,7 +31,7 @@ class NotifiedCounter {
   NotifiedCounter() : counter_(0) {
   }
 
-  void Notify(ThreadResourceMgr::ResourcePool* consumer) {
+  void Notify(ThreadResourcePool* consumer) {
     ASSERT_TRUE(consumer != NULL);
     ASSERT_LT(consumer->num_threads(), consumer->quota());
     ++counter_;
@@ -47,7 +47,7 @@ TEST(ThreadResourceMgr, BasicTest) {
   ThreadResourceMgr mgr(5);
   NotifiedCounter counter1, counter2;
 
-  ThreadResourceMgr::ResourcePool* c1 = mgr.RegisterPool();
+  unique_ptr<ThreadResourcePool> c1 = mgr.CreatePool();
   int callback1 = c1->AddThreadAvailableCb(bind<void>(mem_fn(&NotifiedCounter::Notify),
       &counter1, _1));
   c1->AcquireThreadToken();
@@ -62,16 +62,10 @@ TEST(ThreadResourceMgr, BasicTest) {
   EXPECT_EQ(c1->num_required_threads(), 2);
   EXPECT_EQ(c1->num_optional_threads(), 0);
   EXPECT_EQ(counter1.counter(), 1);
-  bool is_reserved = false;
-  c1->ReserveOptionalTokens(1);
-  EXPECT_TRUE(c1->TryAcquireThreadToken(&is_reserved));
-  EXPECT_TRUE(is_reserved);
-  EXPECT_TRUE(c1->TryAcquireThreadToken(&is_reserved));
-  EXPECT_FALSE(is_reserved);
-  EXPECT_TRUE(c1->TryAcquireThreadToken(&is_reserved));
-  EXPECT_FALSE(is_reserved);
-  EXPECT_FALSE(c1->TryAcquireThreadToken(&is_reserved));
-  EXPECT_FALSE(is_reserved);
+  EXPECT_TRUE(c1->TryAcquireThreadToken());
+  EXPECT_TRUE(c1->TryAcquireThreadToken());
+  EXPECT_TRUE(c1->TryAcquireThreadToken());
+  EXPECT_FALSE(c1->TryAcquireThreadToken());
   EXPECT_EQ(c1->num_threads(), 5);
   EXPECT_EQ(c1->num_required_threads(), 2);
   EXPECT_EQ(c1->num_optional_threads(), 3);
@@ -80,7 +74,7 @@ TEST(ThreadResourceMgr, BasicTest) {
   EXPECT_EQ(counter1.counter(), 3);
 
   // Register a new consumer, quota is cut in half
-  ThreadResourceMgr::ResourcePool* c2 = mgr.RegisterPool();
+  unique_ptr<ThreadResourcePool> c2 = mgr.CreatePool();
   int callback2 = c2->AddThreadAvailableCb(bind<void>(mem_fn(&NotifiedCounter::Notify),
       &counter2, _1));
   EXPECT_FALSE(c1->TryAcquireThreadToken());
@@ -91,9 +85,9 @@ TEST(ThreadResourceMgr, BasicTest) {
   EXPECT_EQ(c1->num_optional_threads(), 2);
 
   c1->RemoveThreadAvailableCb(callback1);
-  mgr.UnregisterPool(c1);
+  mgr.DestroyPool(move(c1));
   c2->RemoveThreadAvailableCb(callback2);
-  mgr.UnregisterPool(c2);
+  mgr.DestroyPool(move(c2));
   EXPECT_EQ(counter1.counter(), 3);
   EXPECT_EQ(counter2.counter(), 1);
 }
@@ -102,7 +96,7 @@ TEST(ThreadResourceMgr, MultiCallbacks) {
   ThreadResourceMgr mgr(6);
   NotifiedCounter counter1, counter2, counter3;
 
-  ThreadResourceMgr::ResourcePool* c1 = mgr.RegisterPool();
+  unique_ptr<ThreadResourcePool> c1 = mgr.CreatePool();
   int callback1 = c1->AddThreadAvailableCb(
       bind<void>(mem_fn(&NotifiedCounter::Notify), &counter1, _1));
   int callback2 = c1->AddThreadAvailableCb(
@@ -155,13 +149,15 @@ TEST(ThreadResourceMgr, MultiCallbacks) {
   EXPECT_EQ(counter1.counter(), 6);
   EXPECT_EQ(counter2.counter(), 3);
 
-  // Also verify UnregisterPool() will invoke the callback.
-  ThreadResourceMgr::ResourcePool* c2 = mgr.RegisterPool();
-  c2->AddThreadAvailableCb(
+  // Also verify DestroyPool() will invoke the callback.
+  unique_ptr<ThreadResourcePool> c2 = mgr.CreatePool();
+  int callback3 = c2->AddThreadAvailableCb(
       bind<void>(mem_fn(&NotifiedCounter::Notify), &counter3, _1));
   EXPECT_EQ(counter3.counter(), 0);
-  mgr.UnregisterPool(c1);
+  mgr.DestroyPool(move(c1));
   EXPECT_EQ(counter3.counter(), 1);
+  c2->RemoveThreadAvailableCb(callback3);
+  mgr.DestroyPool(move(c2));
 }
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/thread-resource-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr.cc b/be/src/runtime/thread-resource-mgr.cc
index 72fdf42..86ea794 100644
--- a/be/src/runtime/thread-resource-mgr.cc
+++ b/be/src/runtime/thread-resource-mgr.cc
@@ -42,87 +42,65 @@ ThreadResourceMgr::ThreadResourceMgr(int threads_quota) {
   } else {
     system_threads_quota_ = threads_quota;
   }
-  per_pool_quota_ = 0;
 }
 
-ThreadResourceMgr::ResourcePool::ResourcePool(ThreadResourceMgr* parent)
+ThreadResourcePool::ThreadResourcePool(ThreadResourceMgr* parent)
   : parent_(parent) {
 }
 
-void ThreadResourceMgr::ResourcePool::Reset() {
-  num_threads_ = 0;
-  num_reserved_optional_threads_ = 0;
-  thread_callbacks_.clear();
-  num_callbacks_ = 0;
-  next_callback_idx_ = 0;
-  max_quota_ = INT_MAX;
-}
-
-void ThreadResourceMgr::ResourcePool::ReserveOptionalTokens(int num) {
-  DCHECK_GE(num, 0);
-  num_reserved_optional_threads_ = num;
-}
-
-ThreadResourceMgr::ResourcePool* ThreadResourceMgr::RegisterPool() {
+unique_ptr<ThreadResourcePool> ThreadResourceMgr::CreatePool() {
   unique_lock<mutex> l(lock_);
-  ResourcePool* pool = NULL;
-  if (free_pool_objs_.empty()) {
-    pool = new ResourcePool(this);
-  } else {
-    pool = free_pool_objs_.front();
-    free_pool_objs_.pop_front();
-  }
-
-  DCHECK(pool != NULL);
-  DCHECK(pools_.find(pool) == pools_.end());
-  pools_.insert(pool);
-  pool->Reset();
+  unique_ptr<ThreadResourcePool> pool =
+      unique_ptr<ThreadResourcePool>(new ThreadResourcePool(this));
+  pools_.insert(pool.get());
 
   // Added a new pool, update the quotas for each pool.
-  UpdatePoolQuotas(pool);
+  UpdatePoolQuotas(pool.get());
   return pool;
 }
 
-void ThreadResourceMgr::UnregisterPool(ResourcePool* pool) {
-  DCHECK(pool != NULL);
-  DCHECK_EQ(pool->num_callbacks_, 0);
+void ThreadResourceMgr::DestroyPool(unique_ptr<ThreadResourcePool> pool) {
+  DCHECK(pool != nullptr);
+  DCHECK(pool->parent_ != nullptr) << "Already unregistered";
+  DCHECK_EQ(pool->num_callbacks_.Load(), 0);
   unique_lock<mutex> l(lock_);
-  DCHECK(pools_.find(pool) != pools_.end());
-  pools_.erase(pool);
-  free_pool_objs_.push_back(pool);
+  DCHECK(pools_.find(pool.get()) != pools_.end());
+  pools_.erase(pool.get());
+  pool->parent_ = nullptr;
+  pool.reset();
   UpdatePoolQuotas();
 }
 
-int ThreadResourceMgr::ResourcePool::AddThreadAvailableCb(ThreadAvailableCb fn) {
+int ThreadResourcePool::AddThreadAvailableCb(ThreadAvailableCb fn) {
   unique_lock<mutex> l(lock_);
   // The id is unique for each callback and is monotonically increasing.
   int id = thread_callbacks_.size();
   thread_callbacks_.push_back(fn);
-  ++num_callbacks_;
+  num_callbacks_.Add(1);
   return id;
 }
 
-void ThreadResourceMgr::ResourcePool::RemoveThreadAvailableCb(int id) {
+void ThreadResourcePool::RemoveThreadAvailableCb(int id) {
   unique_lock<mutex> l(lock_);
-  DCHECK(thread_callbacks_[id] != NULL);
-  DCHECK_GT(num_callbacks_, 0);
-  thread_callbacks_[id] = NULL;
-  --num_callbacks_;
+  DCHECK(!thread_callbacks_[id].empty());
+  DCHECK_GT(num_callbacks_.Load(), 0);
+  thread_callbacks_[id].clear();
+  num_callbacks_.Add(-1);
 }
 
-void ThreadResourceMgr::ResourcePool::InvokeCallbacks() {
+void ThreadResourcePool::InvokeCallbacks() {
   // We need to grab a lock before issuing the callbacks to prevent the
   // them from being removed while it is happening.
   // Note: this is unlikely to be a big deal for performance currently
   // since this is only called with any frequency on (1) the scanner thread
   // completion path and (2) pool unregistration.
-  if (num_available_threads() > 0 && num_callbacks_ > 0) {
+  if (num_available_threads() > 0 && num_callbacks_.Load() > 0) {
     int num_invoked = 0;
     unique_lock<mutex> l(lock_);
-    while (num_available_threads() > 0 && num_invoked < num_callbacks_) {
+    while (num_available_threads() > 0 && num_invoked < num_callbacks_.Load()) {
       DCHECK_LT(next_callback_idx_, thread_callbacks_.size());
       ThreadAvailableCb fn = thread_callbacks_[next_callback_idx_];
-      if (LIKELY(fn != NULL)) {
+      if (LIKELY(!fn.empty())) {
         ++num_invoked;
         fn(this);
       }
@@ -132,15 +110,45 @@ void ThreadResourceMgr::ResourcePool::InvokeCallbacks() {
   }
 }
 
-void ThreadResourceMgr::UpdatePoolQuotas(ResourcePool* new_pool) {
+void ThreadResourceMgr::UpdatePoolQuotas(ThreadResourcePool* new_pool) {
   if (pools_.empty()) return;
-  per_pool_quota_ =
-      ceil(static_cast<double>(system_threads_quota_) / pools_.size());
+  per_pool_quota_.Store(
+      ceil(static_cast<double>(system_threads_quota_) / pools_.size()));
   // Only invoke callbacks on pool unregistration.
   if (new_pool == NULL) {
-    for (Pools::iterator it = pools_.begin(); it != pools_.end(); ++it) {
-      ResourcePool* pool = *it;
+    for (ThreadResourcePool* pool : pools_) {
       pool->InvokeCallbacks();
     }
   }
 }
+
+bool ThreadResourcePool::TryAcquireThreadToken() {
+  while (true) {
+    int64_t previous_num_threads = num_threads_.Load();
+    int64_t new_optional_threads = (previous_num_threads >> 32) + 1;
+    int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
+    if (new_optional_threads + new_required_threads > quota()) return false;
+    int64_t new_value = new_optional_threads << 32 | new_required_threads;
+    // Atomically swap the new value if no one updated num_threads_.  We do not
+    // care about the ABA problem here.
+    if (num_threads_.CompareAndSwap(previous_num_threads, new_value)) return true;
+  }
+}
+
+void ThreadResourcePool::ReleaseThreadToken(
+    bool required, bool skip_callbacks) {
+  if (required) {
+    DCHECK_GT(num_required_threads(), 0);
+    num_threads_.Add(-1);
+  } else {
+    DCHECK_GT(num_optional_threads(), 0);
+    while (true) {
+      int64_t previous_num_threads = num_threads_.Load();
+      int64_t new_optional_threads = (previous_num_threads >> 32) - 1;
+      int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
+      int64_t new_value = new_optional_threads << 32 | new_required_threads;
+      if (num_threads_.CompareAndSwap(previous_num_threads, new_value)) break;
+    }
+  }
+  if (!skip_callbacks) InvokeCallbacks();
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/789c5aac/be/src/runtime/thread-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr.h b/be/src/runtime/thread-resource-mgr.h
index bf601bd..e6c73a6 100644
--- a/be/src/runtime/thread-resource-mgr.h
+++ b/be/src/runtime/thread-resource-mgr.h
@@ -25,269 +25,195 @@
 
 #include <list>
 
+#include "common/atomic.h"
 #include "common/status.h"
 
 namespace impala {
 
 /// Singleton object to manage CPU (aka thread) resources for the process.
-/// Conceptually, there is a fixed pool of threads that are shared between
-/// query fragments.  If there is only one fragment running, it can use the
+/// Implements a soft limit on the total number of threads being used across running
+/// fragment instances. If there is only one fragment instance running, it can use the
 /// entire pool, spinning up the maximum number of threads to saturate the
-/// hardware.  If there are multiple fragments, the CPU pool must be shared
-/// between them.  Currently, the total system pool is split evenly between
-/// all consumers.  Each consumer gets ceil(total_system_threads / num_consumers).
+/// hardware. If there are multiple fragment instances, we try to share evenly
+/// between them. Currently, the total system pool is split evenly between
+/// all consumers. Each consumer gets ceil(total_system_threads / num_consumers).
 //
-/// Each fragment must register with the ThreadResourceMgr to request threads
-/// (in the form of tokens).  The fragment has required threads (it can't run
-/// with fewer threads) and optional threads.  If the fragment is running on its
-/// own, it will be able to spin up more optional threads.  When the system
-/// is under load, the ThreadResourceMgr will stop giving out tokens for optional
-/// threads.
-/// Pools should not use this for threads that are almost always idle (e.g.
+/// Each fragment instance must register with the ThreadResourceMgr to request threads
+/// (in the form of tokens). The fragment instance has required threads (it can't run
+/// with fewer threads) and optional threads. If the fragment instance is running on its
+/// own, it will be able to spin up more optional threads. When the system is under load,
+/// the ThreadResourceMgr will stop giving out tokens for optional threads.
+///
+/// ThreadResourcePools should not be used for threads that are almost always idle (e.g.
 /// periodic reporting threads).
-/// Pools will temporarily go over the quota regularly and this is very
-/// much by design.  For example, if a pool is running on its own with
-/// 4 required threads and 28 optional and another pool is added to the
-/// system, the first pool's quota is then cut by half (16 total) and will
-/// over time drop the optional threads.
+/// ThreadResourcePools will temporarily go over the quota regularly and this is very
+/// much by design. For example, if a fragment instance is running on its own with
+/// 4 required threads and 28 optional and another fragment instance starts, the first
+/// pool's quota is then cut by half (16 total) and will over time drop the optional
+/// threads.
+///
 /// This class is thread safe.
-/// TODO: this is an initial simple version to improve the behavior with
-/// concurrency.  This will need to be expanded post GA.  These include:
-///  - More places where threads are optional (e.g. hash table build side,
-///    data stream threads, etc).
-///  - Admission control
-///  - Integration with other nodes/statestore
-///  - Priorities for different pools
-/// If both the mgr and pool locks need to be taken, the mgr lock must
-/// be taken first.
 ///
-/// TODO: make ResourcePool a stand-alone class
+/// Note: this is a fairly limited way to manage CPU consumption and has flaws, including:
+/// * non-deterministic decisions about resource allocation
+/// * lack of integration with admission control
+/// * lack of any non-trivial policies such as hierachical limits or priorities.
+
+class ThreadResourcePool;
+
 class ThreadResourceMgr {
  public:
-  class ResourcePool;
-
-  /// This function will be called whenever the pool has more threads it can run on.
-  /// This can happen on ReleaseThreadToken or if the quota for this pool increases.
-  /// This is a good place, for example, to wake up anything blocked on available threads.
-  /// This callback must not block.
-  /// Note that this is not called once for each available thread or even guaranteed that
-  /// when it is called, a thread is available (the quota could have changed again in
-  /// between).  It is simply that something might have happened (similar to condition
-  /// variable semantics).
-  typedef boost::function<void (ResourcePool*)> ThreadAvailableCb;
-
-  /// Pool abstraction for a single resource pool.
-  /// TODO: this is not quite sufficient going forward.  We need a hierarchy of pools,
-  /// one for the entire query, and a sub pool for each component that needs threads,
-  /// all of which share a quota.  Currently, the way state is tracked here, it would
-  /// be impossible to have two components both want optional threads (e.g. two things
-  /// that have 1+ thread usage).
-  class ResourcePool {
-   public:
-    /// Acquire a thread for the pool.  This will always succeed; the
-    /// pool will go over the quota.
-    /// Pools should use this API to reserve threads they need in order
-    /// to make progress.
-    void AcquireThreadToken();
-
-    /// Try to acquire a thread for this pool.  If the pool is at
-    /// the quota, this will return false and the pool should not run.
-    /// Pools should use this API for resources they can use but don't
-    /// need (e.g. scanner threads).
-    bool TryAcquireThreadToken(bool* is_reserved = NULL);
-
-    /// Set a reserved optional number of threads for this pool.  This can be
-    /// used to implement that a component needs n+ number of threads.  The
-    /// first 'num' threads are guaranteed to be acquirable (via TryAcquireThreadToken)
-    /// but anything beyond can fail.
-    /// This can also be done with:
-    ///  if (pool->num_optional_threads() < num) AcquireThreadToken();
-    ///  else TryAcquireThreadToken();
-    /// and similar tracking on the Release side but this is common enough to
-    /// abstract it away.
-    void ReserveOptionalTokens(int num);
-
-    /// Release a thread for the pool.  This must be called once for
-    /// each call to AcquireThreadToken and each successful call to TryAcquireThreadToken
-    /// If the thread token is from AcquireThreadToken, required must be true; false
-    /// if from TryAcquireThreadToken.
-    /// If 'skip_callbacks' is true, ReleaseThreadToken() will not run callbacks to find
-    /// a replacement for this thread. This is dangerous and can lead to underutilization
-    /// of the system.
-    void ReleaseThreadToken(bool required, bool skip_callbacks = false);
-
-    /// Register a callback to be notified when a thread is available.
-    /// Returns a unique id to be used when removing the callback.
-    /// TODO: rethink this.  How we do coordinate when we have multiple places in
-    /// the execution that all need threads (e.g. do we use that thread for
-    /// the scanner or for the join).
-    int AddThreadAvailableCb(ThreadAvailableCb fn);
-
-    /// Unregister the callback corresponding to 'id'.
-    void RemoveThreadAvailableCb(int id);
-
-    /// Returns the number of threads that are from AcquireThreadToken.
-    int num_required_threads() const { return num_threads_ & 0xFFFFFFFF; }
-
-    /// Returns the number of thread resources returned by successful calls
-    /// to TryAcquireThreadToken.
-    int num_optional_threads() const { return num_threads_ >> 32; }
-
-    /// Returns the total number of thread resources for this pool
-    /// (i.e. num_optional_threads + num_required_threads).
-    int64_t num_threads() const {
-      return num_required_threads() + num_optional_threads();
-    }
-
-    int num_reserved_optional_threads() { return num_reserved_optional_threads_; }
-
-    /// Returns true if the number of optional threads has now exceeded the quota.
-    bool optional_exceeded() {
-      // Cache this so optional/required are computed based on the same value.
-      volatile int64_t num_threads = num_threads_;
-      int64_t optional_threads = num_threads >> 32;
-      int64_t required_threads = num_threads & 0xFFFFFFFF;
-      return optional_threads > num_reserved_optional_threads_ &&
-             optional_threads + required_threads > quota();
-    }
-
-    /// Returns the number of optional threads that can still be used.
-    int num_available_threads() const {
-      int value = std::max(quota() - static_cast<int>(num_threads()),
-          num_reserved_optional_threads_ - num_optional_threads());
-      return std::max(0, value);
-    }
-
-    /// Returns the quota for this pool.  Note this changes dynamically
-    /// based on system load.
-    int quota() const { return std::min(max_quota_, parent_->per_pool_quota_); }
-
-    /// Sets the max thread quota for this pool.
-    /// The actual quota is the min of this value and the dynamic value.
-    void set_max_quota(int quota) { max_quota_ = quota; }
-
-   private:
-    friend class ThreadResourceMgr;
-
-    ResourcePool(ThreadResourceMgr* parent);
-
-    /// Resets internal state.
-    void Reset();
-
-    /// Invoke registered callbacks in round-robin manner until the quota is exhausted.
-    void InvokeCallbacks();
-
-    ThreadResourceMgr* parent_;
-
-    int max_quota_;
-    int num_reserved_optional_threads_;
-
-    /// A single 64 bit value to store both the number of optional and
-    /// required threads.  This is combined to allow using compare and
-    /// swap operations.  The number of required threads is the lower
-    /// 32 bits and the number of optional threads is the upper 32 bits.
-    int64_t num_threads_;
-
-    /// Lock for the fields below.  This lock is taken when the callback
-    /// function is called.
-    /// TODO: reconsider this.
-    boost::mutex lock_;
-
-    /// A vector of registered callback functions. Entries will be NULL
-    /// for unregistered functions.
-    std::vector<ThreadAvailableCb> thread_callbacks_;
-
-    /// The number of registered callbacks (i.e. the number of non-NULL entries in
-    /// thread_callbacks_).
-    int num_callbacks_;
-
-    /// The index into thread_callbacks_ of the next callback to invoke.
-    int next_callback_idx_;
-  };
-
-  /// Create a thread mgr object.  If threads_quota is non-zero, it will be
+  /// Create a thread mgr object. If threads_quota is non-zero, it will be
   /// the number of threads for the system, otherwise it will be determined
   /// based on the hardware.
   ThreadResourceMgr(int threads_quota = 0);
 
   int system_threads_quota() const { return system_threads_quota_; }
 
-  /// Register a new pool with the thread mgr.  Registering a pool
+  /// Create a new pool and register with the thread mgr. Registering a pool
   /// will update the quotas for all existing pools.
-  ResourcePool* RegisterPool();
+  std::unique_ptr<ThreadResourcePool> CreatePool();
 
-  /// Unregisters the pool.  'pool' is no longer valid after this.
-  /// This updates the quotas for the remaining pools.
-  void UnregisterPool(ResourcePool* pool);
+  /// Destroy the pool and unregister with the thread mgr. This updates the quotas for
+  /// the remaining pools.
+  void DestroyPool(std::unique_ptr<ThreadResourcePool> pool);
 
  private:
+  friend class ThreadResourcePool;
+
   /// 'Optimal' number of threads for the entire process.
   int system_threads_quota_;
 
-  /// Lock for the entire object.  Protects all fields below.
+  /// Lock for the entire object. Protects all fields below. Must be acquired before
+  /// ThreadResourcePool::lock_ if both are held at the same time.
   boost::mutex lock_;
 
   /// Pools currently being managed
-  typedef std::set<ResourcePool*> Pools;
+  typedef std::set<ThreadResourcePool*> Pools;
   Pools pools_;
 
-  /// Each pool currently gets the same share.  This is the ceil of the
+  /// Each pool currently gets the same share. This is the ceil of the
   /// system quota divided by the number of pools.
-  int per_pool_quota_;
-
-  /// Recycled list of pool objects
-  std::list<ResourcePool*> free_pool_objs_;
+  AtomicInt32 per_pool_quota_{0};
 
   /// Updates the per pool quota and notifies any pools that now have
-  /// more threads they can use.  Must be called with lock_ taken.
+  /// more threads they can use. Must be called with lock_ taken.
   /// If new_pool is non-null, new_pool will *not* be notified.
-  void UpdatePoolQuotas(ResourcePool* new_pool = NULL);
+  void UpdatePoolQuotas(ThreadResourcePool* new_pool = nullptr);
 };
 
-inline void ThreadResourceMgr::ResourcePool::AcquireThreadToken() {
-  __sync_fetch_and_add(&num_threads_, 1);
-}
-
-inline bool ThreadResourceMgr::ResourcePool::TryAcquireThreadToken(bool* is_reserved) {
-  while (true) {
-    int64_t previous_num_threads = num_threads_;
-    int64_t new_optional_threads = (previous_num_threads >> 32) + 1;
-    int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
-    if (new_optional_threads > num_reserved_optional_threads_ &&
-        new_optional_threads + new_required_threads > quota()) {
-      return false;
-    }
-    bool thread_is_reserved = new_optional_threads <= num_reserved_optional_threads_;
-    int64_t new_value = new_optional_threads << 32 | new_required_threads;
-    // Atomically swap the new value if no one updated num_threads_.  We do not
-    // not care about the ABA problem here.
-    if (__sync_bool_compare_and_swap(&num_threads_, previous_num_threads, new_value)) {
-      if (is_reserved != NULL) *is_reserved = thread_is_reserved;
-      return true;
-    }
+/// Pool abstraction for a single resource pool.
+/// Note; there is no concept of hierarchy - all pools are treated equally even if
+/// they belong to the same query..
+class ThreadResourcePool {
+ public:
+  /// This function will be called whenever the pool has more threads it can run on.
+  /// This can happen on ReleaseThreadToken or if the quota for this pool increases.
+  /// This is a good place, for example, to wake up anything blocked on available threads.
+  /// This callback must not block.
+  /// Note that this is not called once for each available thread or even guaranteed that
+  /// when it is called, a thread is available (the quota could have changed again in
+  /// between). It is simply that something might have happened (similar to condition
+  /// variable semantics).
+  typedef boost::function<void (ThreadResourcePool*)> ThreadAvailableCb;
+
+  ~ThreadResourcePool() { DCHECK(parent_ == nullptr) << "Must unregister pool"; }
+
+  /// Acquire a thread for the pool. This will always succeed; the pool will go over the
+  /// quota if needed. Pools should use this API to reserve threads they need in order to
+  /// make progress.
+  void AcquireThreadToken() { num_threads_.Add(1); }
+
+  /// Try to acquire a thread for this pool. If the pool is at the quota, this will
+  /// return false and the pool should not run. Pools should use this API for resources
+  /// they can use but don't need (e.g. extra scanner threads).
+  bool TryAcquireThreadToken();
+
+  /// Release a thread for the pool. This must be called once for each call to
+  /// AcquireThreadToken() and each successful call to TryAcquireThreadToken()
+  /// If the thread token is from AcquireThreadToken(), required must be true; false
+  /// if from TryAcquireThreadToken().
+  /// If 'skip_callbacks' is true, ReleaseThreadToken() will not run callbacks to find
+  /// a replacement for this thread. This is dangerous and can lead to underutilization
+  /// of the system.
+  void ReleaseThreadToken(bool required, bool skip_callbacks = false);
+
+  /// Register a callback to be notified when a thread is available.
+  /// Returns a unique id to be used when removing the callback.
+  /// Note: this is limited because we can't coordinate between multiple places in
+  /// execution that could use extra threads (e.g. do we use that thread for a
+  /// scanner or for a join).
+  int AddThreadAvailableCb(ThreadAvailableCb fn);
+
+  /// Unregister the callback corresponding to 'id'.
+  void RemoveThreadAvailableCb(int id);
+
+  /// Returns the number of threads that are from AcquireThreadToken.
+  int num_required_threads() const { return num_threads_.Load() & 0xFFFFFFFF; }
+
+  /// Returns the number of thread resources returned by successful calls
+  /// to TryAcquireThreadToken.
+  int num_optional_threads() const { return num_threads_.Load() >> 32; }
+
+  /// Returns the total number of thread resources for this pool
+  /// (i.e. num_optional_threads + num_required_threads).
+  int64_t num_threads() const {
+    return num_required_threads() + num_optional_threads();
+  }
+
+  /// Returns true if the number of optional threads has now exceeded the quota.
+  bool optional_exceeded() {
+    // Cache this so optional/required are computed based on the same value.
+    int64_t num_threads = num_threads_.Load();
+    int64_t optional_threads = num_threads >> 32;
+    int64_t required_threads = num_threads & 0xFFFFFFFF;
+    return optional_threads + required_threads > quota();
   }
-}
-
-inline void ThreadResourceMgr::ResourcePool::ReleaseThreadToken(
-    bool required, bool skip_callbacks) {
-  if (required) {
-    DCHECK_GT(num_required_threads(), 0);
-    __sync_fetch_and_add(&num_threads_, -1);
-  } else {
-    DCHECK_GT(num_optional_threads(), 0);
-    while (true) {
-      int64_t previous_num_threads = num_threads_;
-      int64_t new_optional_threads = (previous_num_threads >> 32) - 1;
-      int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
-      int64_t new_value = new_optional_threads << 32 | new_required_threads;
-      if (__sync_bool_compare_and_swap(&num_threads_, previous_num_threads, new_value)) {
-        break;
-      }
-    }
+
+  /// Returns the number of optional threads that can still be used.
+  int num_available_threads() const {
+    return std::max(0, quota() - static_cast<int>(num_threads()));
   }
-  if (!skip_callbacks) InvokeCallbacks();
-}
 
+  /// Returns the quota for this pool. Note this changes dynamically based on the global
+  /// number of registered resource pools.
+  int quota() const { return parent_->per_pool_quota_.Load(); }
+
+ private:
+  friend class ThreadResourceMgr;
+
+  ThreadResourcePool(ThreadResourceMgr* parent);
+
+  /// Invoke registered callbacks in round-robin manner until the quota is exhausted.
+  void InvokeCallbacks();
+
+  /// The parent resource manager. Set to NULL when unregistered.
+  ThreadResourceMgr* parent_;
+
+  /// A single 64 bit value to store both the number of optional and required threads.
+  /// This is combined to allow atomic compare-and-swap of both fields. The number of
+  /// required threads is the lower 32 bits and the number of optional threads is the
+  /// upper 32 bits.
+  AtomicInt64 num_threads_{0};
+
+  /// Lock for the fields below. This lock is taken when the callback function is called.
+  /// Must be acquired after ThreadResourceMgr::lock_ if both are held at the same time.
+  boost::mutex lock_;
+
+  /// A vector of registered callback functions. Entries will be set to "empty" function
+  /// objects, which can be constructed with the default ThreadAvailableCb() constructor,
+  /// when the function is unregistered.
+  std::vector<ThreadAvailableCb> thread_callbacks_;
+
+  /// The number of registered callbacks (i.e. the number of non-NULL entries in
+  /// 'thread_callbacks_'). Must hold 'lock_' to write, but can read without holding
+  /// 'lock_'.
+  AtomicInt32 num_callbacks_{0};
+
+  /// The index of the next callback to invoke in 'thread_callbacks_'. Protected by
+  /// 'lock_'.
+  int next_callback_idx_ = 0;
+};
 } // namespace impala
 
 #endif