You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/08/28 06:17:59 UTC

[kudu] 02/02: KUDU-1587 part 1: load meter for ThreadPool

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit fc8615c37eb4e28f3cc6bea0fcd5a8732451e883
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Aug 10 18:16:53 2020 -0700

    KUDU-1587 part 1: load meter for ThreadPool
    
    This patch introduces a load meter for ThreadPool, aiming to use
    active queue management techniques (AQM) such as CoDel [1] in scenarios
    where thread pool queue load metrics are applicable (e.g., KUDU-1587).
    
    [1] https://en.wikipedia.org/wiki/CoDel
    
    Change-Id: I640716dc32f193e68361ca623ee7b9271e661d8b
    Reviewed-on: http://gerrit.cloudera.org:8080/16332
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/util/threadpool-test.cc | 274 +++++++++++++++++++++++++++++++++++++--
 src/kudu/util/threadpool.cc      | 203 ++++++++++++++++++++++++++---
 src/kudu/util/threadpool.h       | 154 ++++++++++++++++++++++
 3 files changed, 603 insertions(+), 28 deletions(-)

diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 93806f8..cecd01b 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -162,10 +162,12 @@ TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
 }
 
 TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
-  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
-                                   .set_min_threads(0)
-                                   .set_max_threads(3)
-                                   .set_idle_timeout(MonoDelta::FromMilliseconds(1))));
+  constexpr int kIdleTimeoutMs = 1;
+  ASSERT_OK(RebuildPoolWithBuilder(
+      ThreadPoolBuilder(kDefaultPoolName)
+      .set_min_threads(0)
+      .set_max_threads(3)
+      .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleTimeoutMs))));
 
   // There are no threads to start with.
   ASSERT_TRUE(pool_->num_threads() == 0);
@@ -186,6 +188,11 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
   latch.CountDown();
   pool_->Wait();
   ASSERT_EQ(0, pool_->active_threads_);
+  // Wait for more than idle timeout: so threads should be gone since
+  // min_threads is set to 0.
+  SleepFor(MonoDelta::FromMilliseconds(10 * kIdleTimeoutMs));
+  ASSERT_EQ(0, pool_->num_threads());
+  ASSERT_EQ(0, pool_->active_threads_);
   pool_->Shutdown();
   ASSERT_EQ(0, pool_->num_threads());
 }
@@ -252,10 +259,12 @@ TEST_F(ThreadPoolTest, TestRace) {
 }
 
 TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
-  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
-                                   .set_min_threads(1)
-                                   .set_max_threads(4)
-                                   .set_idle_timeout(MonoDelta::FromMilliseconds(1))));
+  constexpr int kIdleTimeoutMs = 1;
+  ASSERT_OK(RebuildPoolWithBuilder(
+      ThreadPoolBuilder(kDefaultPoolName)
+      .set_min_threads(1)
+      .set_max_threads(4)
+      .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleTimeoutMs))));
 
   // There is 1 thread to start with.
   ASSERT_EQ(1, pool_->num_threads());
@@ -276,6 +285,9 @@ TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
   latch.CountDown();
   pool_->Wait();
   ASSERT_EQ(0, pool_->active_threads_);
+  SleepFor(MonoDelta::FromMilliseconds(10 * kIdleTimeoutMs));
+  ASSERT_EQ(0, pool_->active_threads_);
+  ASSERT_EQ(1, pool_->num_threads());
   pool_->Shutdown();
   ASSERT_EQ(0, pool_->num_threads());
 }
@@ -448,6 +460,252 @@ TEST_F(ThreadPoolTest, TestMetrics) {
   ASSERT_EQ(6, all_metrics[0].run_time_us_histogram->TotalCount());
 }
 
+// Test scenario to verify the functionality of the QueueLoadMeter.
+TEST_F(ThreadPoolTest, QueueLoadMeter) {
+  const auto kQueueTimeThresholdMs = 100;
+  const auto kIdleThreadTimeoutMs = 200;
+  constexpr auto kMaxThreads = 3;
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+      .set_min_threads(0)
+      .set_max_threads(kMaxThreads)
+      .set_queue_overload_threshold(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs))
+      .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleThreadTimeoutMs))));
+  // An idle pool must not have its queue overloaded.
+  ASSERT_FALSE(pool_->QueueOverloaded());
+
+  // One instant tasks cannot make pool's queue overloaded.
+  ASSERT_OK(pool_->Submit([](){}));
+  ASSERT_FALSE(pool_->QueueOverloaded());
+  pool_->Wait();
+  ASSERT_FALSE(pool_->QueueOverloaded());
+
+  for (auto i = 0; i < kMaxThreads; ++i) {
+    ASSERT_OK(pool_->Submit([](){
+      SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs));
+    }));
+  }
+  ASSERT_FALSE(pool_->QueueOverloaded());
+  pool_->Wait();
+  ASSERT_FALSE(pool_->QueueOverloaded());
+
+  for (auto i = 0; i < 2 * kMaxThreads; ++i) {
+    ASSERT_OK(pool_->Submit([](){
+      SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs));
+    }));
+  }
+  ASSERT_FALSE(pool_->QueueOverloaded());
+  SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs + 10));
+  ASSERT_TRUE(pool_->QueueOverloaded());
+  // Should still be overloaded after first kMaxThreads tasks are processed.
+  SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs + 10));
+  ASSERT_TRUE(pool_->QueueOverloaded());
+  pool_->Wait();
+  ASSERT_FALSE(pool_->QueueOverloaded());
+
+  // Many instant tasks cannot make pool overloaded.
+  for (auto i = 0; i < kMaxThreads; ++i) {
+    ASSERT_OK(pool_->Submit([](){}));
+  }
+  ASSERT_FALSE(pool_->QueueOverloaded());
+  pool_->Wait();
+  // For for the threads to be shutdown due to inactivity.
+  SleepFor(MonoDelta::FromMilliseconds(2 * kIdleThreadTimeoutMs));
+  // Even if all threads are shutdown, an idle pool with empty queue should not
+  // be overloaded.
+  ASSERT_FALSE(pool_->QueueOverloaded());
+
+  // Shovel some light tasks once again: this should not overload the queue.
+  for (auto i = 0; i < 10 * kMaxThreads; ++i) {
+    ASSERT_OK(pool_->Submit([](){
+      SleepFor(MonoDelta::FromMilliseconds(1));
+    }));
+  }
+  ASSERT_FALSE(pool_->QueueOverloaded());
+  pool_->Wait();
+  ASSERT_FALSE(pool_->QueueOverloaded());
+
+  // Submit a bunch of instant tasks via a single token: the queue should not
+  // become overloaded.
+  {
+    unique_ptr<ThreadPoolToken> t = pool_->NewToken(
+        ThreadPool::ExecutionMode::SERIAL);
+    ASSERT_OK(t->Submit([](){}));
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+
+    for (auto i = 0; i < 100; ++i) {
+      ASSERT_OK(t->Submit([](){}));
+    }
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+  }
+
+  // Submit many instant tasks via multiple tokens (more than the maximum
+  // number of worker threads in a pool) and many lightweight tasks which can
+  // run concurrently: the queue should not become overloaded.
+  {
+    constexpr auto kNumTokens = 2 * kMaxThreads;
+    vector<unique_ptr<ThreadPoolToken>> tokens;
+    tokens.reserve(kNumTokens);
+    for (auto i = 0; i < kNumTokens; ++i) {
+      tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+    }
+
+    for (auto& t : tokens) {
+      for (auto i = 0; i < 50; ++i) {
+        ASSERT_OK(t->Submit([](){}));
+      }
+      for (auto i = 0; i < 10; ++i) {
+        ASSERT_OK(pool_->Submit([](){}));
+      }
+    }
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+  }
+
+  // Submit many long running tasks via serial tokens where the number of tokens
+  // is less than the maximum number of worker threads in the pool. The queue
+  // of the pool should not become overloaded since the pool has one spare
+  // thread to spawn.
+  {
+    constexpr auto kNumTokens = kMaxThreads - 1;
+    ASSERT_GT(kNumTokens, 0);
+    vector<unique_ptr<ThreadPoolToken>> tokens;
+    tokens.reserve(kNumTokens);
+    for (auto i = 0; i < kNumTokens; ++i) {
+      tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+    }
+
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    for (auto& t : tokens) {
+      for (auto i = 0; i < kMaxThreads; ++i) {
+        ASSERT_OK(t->Submit([](){
+          SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+        }));
+      }
+    }
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+  }
+
+  // Submit many long running tasks via serial tokens where the number of tokens
+  // is greater or equal to the maximum number of worker threads in the pool.
+  // The queue of the pool should become overloaded since the pool is running
+  // at its capacity and queue times are over the threshold.
+  {
+    constexpr auto kNumTokens = kMaxThreads;
+    vector<unique_ptr<ThreadPoolToken>> tokens;
+    tokens.reserve(kNumTokens);
+    for (auto i = 0; i < kNumTokens; ++i) {
+      tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+    }
+
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    for (auto& t : tokens) {
+      for (auto i = 0; i < kMaxThreads; ++i) {
+        ASSERT_OK(t->Submit([](){
+          SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+        }));
+      }
+    }
+    // Since there is exactly kMaxThreads serial pool tokens with tasks,
+    // the queue is empty most of the time. This is because active serial tokens
+    // are not kept in the queue. So, the status of the queue cannot be reliably
+    // determined by peeking at the submission times of the elements in the
+    // queue. Then the only way to detect overload of the queue is the history
+    // of queue times. The latter will reflect long queue times only after
+    // processing two tasks in each of the serial tokens. So, it's expected
+    // to get a stable report on the queue status only after two
+    // kQueueTimeThresholdMs time intervals.
+    SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs));
+    ASSERT_TRUE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+  }
+
+  // A mixed case: submit many long running tasks via serial tokens where the
+  // number of tokens is less than the maximum number of worker threads in the
+  // pool and submit many instant tasks that can run concurrently.
+  {
+    constexpr auto kNumTokens = kMaxThreads - 1;
+    ASSERT_GT(kNumTokens, 0);
+    vector<unique_ptr<ThreadPoolToken>> tokens;
+    tokens.reserve(kNumTokens);
+    for (auto i = 0; i < kNumTokens; ++i) {
+      tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+    }
+
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    for (auto& t : tokens) {
+      for (auto i = 0; i < kMaxThreads; ++i) {
+        ASSERT_OK(t->Submit([](){
+          SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+        }));
+      }
+    }
+    ASSERT_FALSE(pool_->QueueOverloaded());
+
+    // Add several light tasks in addition to the scheduled serial ones. This
+    // should not overload the queue.
+    for (auto i = 0; i < 10; ++i) {
+      ASSERT_OK(pool_->Submit([](){
+        SleepFor(MonoDelta::FromMilliseconds(1));
+      }));
+    }
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+  }
+
+  // Another mixed case: submit many long running tasks via a serial token
+  // and many long running tasks that can run concurrently. The queue should
+  // become overloaded.
+  {
+    constexpr auto kNumTokens = 1;
+    vector<unique_ptr<ThreadPoolToken>> tokens;
+    tokens.reserve(kNumTokens);
+    for (auto i = 0; i < kNumTokens; ++i) {
+      tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+    }
+
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    for (auto& t : tokens) {
+      for (auto i = 0; i < kMaxThreads; ++i) {
+        ASSERT_OK(t->Submit([](){
+          SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+        }));
+      }
+    }
+    ASSERT_FALSE(pool_->QueueOverloaded());
+
+    // Add several light tasks in addition to the scheduled serial ones. This
+    // should not overload the queue.
+    for (auto i = 0; i < 2 * kMaxThreads; ++i) {
+      ASSERT_OK(pool_->Submit([](){
+        SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+      }));
+    }
+    SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+    ASSERT_TRUE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+  }
+}
+
 // Test that a thread pool will crash if asked to run its own blocking
 // functions in a pool thread.
 //
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 9d8cc4a..6e2097d 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/util/threadpool.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <deque>
 #include <functional>
@@ -24,7 +25,6 @@
 #include <memory>
 #include <ostream>
 #include <string>
-#include <utility>
 
 #include <glog/logging.h>
 
@@ -90,6 +90,12 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) {
   return *this;
 }
 
+ThreadPoolBuilder& ThreadPoolBuilder::set_queue_overload_threshold(
+    const MonoDelta& threshold) {
+  queue_overload_threshold_ = threshold;
+  return *this;
+}
+
 Status ThreadPoolBuilder::Build(unique_ptr<ThreadPool>* pool) const {
   pool->reset(new ThreadPool(*this));
   return (*pool)->Init();
@@ -266,20 +272,20 @@ const char* ThreadPoolToken::StateToString(State s) {
 ////////////////////////////////////////////////////////
 
 ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
-  : name_(builder.name_),
-    min_threads_(builder.min_threads_),
-    max_threads_(builder.max_threads_),
-    max_queue_size_(builder.max_queue_size_),
-    idle_timeout_(builder.idle_timeout_),
-    pool_status_(Status::Uninitialized("The pool was not initialized.")),
-    idle_cond_(&lock_),
-    no_threads_cond_(&lock_),
-    num_threads_(0),
-    num_threads_pending_start_(0),
-    active_threads_(0),
-    total_queued_tasks_(0),
-    tokenless_(NewToken(ExecutionMode::CONCURRENT)),
-    metrics_(builder.metrics_) {
+    : name_(builder.name_),
+      min_threads_(builder.min_threads_),
+      max_threads_(builder.max_threads_),
+      max_queue_size_(builder.max_queue_size_),
+      idle_timeout_(builder.idle_timeout_),
+      pool_status_(Status::Uninitialized("The pool was not initialized.")),
+      idle_cond_(&lock_),
+      no_threads_cond_(&lock_),
+      num_threads_(0),
+      num_threads_pending_start_(0),
+      active_threads_(0),
+      total_queued_tasks_(0),
+      tokenless_(NewToken(ExecutionMode::CONCURRENT)),
+      metrics_(builder.metrics_) {
   string prefix = !builder.trace_metric_prefix_.empty() ?
       builder.trace_metric_prefix_ : builder.name_;
 
@@ -289,6 +295,11 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
       prefix + ".run_wall_time_us");
   run_cpu_time_trace_metric_name_ = TraceMetrics::InternName(
       prefix + ".run_cpu_time_us");
+
+  const auto& ovt = builder.queue_overload_threshold_;
+  if (ovt.Initialized() && ovt.ToNanoseconds() > 0) {
+    load_meter_.reset(new QueueLoadMeter(*this, ovt, max_threads_));
+  }
 }
 
 ThreadPool::~ThreadPool() {
@@ -397,6 +408,17 @@ unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics(
   return t;
 }
 
+bool ThreadPool::QueueOverloaded(MonoDelta* overloaded_time,
+                                 MonoDelta* threshold) const {
+  if (!load_meter_) {
+    return false;
+  }
+  if (threshold) {
+    *threshold = load_meter_->queue_time_threshold();
+  }
+  return load_meter_->overloaded(overloaded_time);
+}
+
 void ThreadPool::ReleaseToken(ThreadPoolToken* t) {
   MutexLock guard(lock_);
   CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released",
@@ -410,7 +432,7 @@ Status ThreadPool::Submit(std::function<void()> f) {
 
 Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
   DCHECK(token);
-  MonoTime submit_time = MonoTime::Now();
+  const MonoTime submit_time = MonoTime::Now();
 
   MutexLock guard(lock_);
   if (PREDICT_FALSE(!pool_status_.ok())) {
@@ -493,6 +515,7 @@ Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
     idle_threads_.front().not_empty.Signal();
     idle_threads_.pop_front();
   }
+  NotifyLoadMeterUnlocked();
   guard.Unlock();
 
   if (metrics_.queue_length_histogram) {
@@ -518,7 +541,6 @@ Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
     }
   }
 
-
   return Status::OK();
 }
 
@@ -570,6 +592,7 @@ void ThreadPool::DispatchThread() {
       //
       // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
       idle_threads_.push_front(me);
+      NotifyLoadMeterUnlocked();
       SCOPED_CLEANUP({
         // For some wake ups (i.e. Shutdown or DoSubmit) this thread is
         // guaranteed to be unlinked after being awakened. In others (i.e.
@@ -609,6 +632,10 @@ void ThreadPool::DispatchThread() {
     --total_queued_tasks_;
     ++active_threads_;
 
+    const MonoTime now(MonoTime::Now());
+    const MonoDelta queue_time = now - task.submit_time;
+    NotifyLoadMeterUnlocked(queue_time);
+
     unique_lock.Unlock();
 
     // Release the reference which was held by the queued item.
@@ -617,9 +644,8 @@ void ThreadPool::DispatchThread() {
       task.trace->Release();
     }
 
-    // Update metrics
-    MonoTime now(MonoTime::Now());
-    int64_t queue_time_us = (now - task.submit_time).ToMicroseconds();
+    // Update metrics.
+    const int64_t queue_time_us = queue_time.ToMicroseconds();
     TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us);
     if (metrics_.queue_time_us_histogram) {
       metrics_.queue_time_us_histogram->Increment(queue_time_us);
@@ -673,6 +699,15 @@ void ThreadPool::DispatchThread() {
         queue_.emplace_back(token);
       }
     }
+    if (!queue_.empty()) {
+      // If the queue is empty, the LoadMeter is notified on next iteration of
+      // the outer while() loop under the 'if (queue_.empty())' clause. Here
+      // it's crucial to call NotifyLoadMeter() _before_ decrementing
+      // 'active_threads_' to avoid reporting this thread as a spare one despite
+      // the fact that it will run a task from the non-empty queue immediately
+      // on next iteration of the outer while() loop.
+      NotifyLoadMeterUnlocked();
+    }
     if (--active_threads_ == 0) {
       idle_cond_.Broadcast();
     }
@@ -709,6 +744,134 @@ void ThreadPool::CheckNotPoolThreadUnlocked() {
   }
 }
 
+void ThreadPool::NotifyLoadMeterUnlocked(const MonoDelta& queue_time) {
+  if (!load_meter_) {
+    return;
+  }
+
+  lock_.AssertAcquired();
+  MonoTime queue_head_submit_time;
+  if (!queue_.empty()) {
+    DCHECK(!queue_.front()->entries_.empty());
+    queue_head_submit_time = queue_.front()->entries_.front().submit_time;
+  }
+  load_meter_->UpdateQueueInfoUnlocked(queue_time,
+                                       queue_head_submit_time,
+                                       active_threads_ < max_threads_);
+}
+
+////////////////////////////////////////////////////////
+// ThreadPool::QueueLoadMeter
+////////////////////////////////////////////////////////
+
+ThreadPool::QueueLoadMeter::QueueLoadMeter(
+    const ThreadPool& tpool,
+    const MonoDelta& queue_time_threshold,
+    size_t queue_time_history_length)
+    : tpool_(tpool),
+      queue_time_threshold_(queue_time_threshold),
+      queue_time_history_length_(queue_time_history_length),
+      queue_head_submit_time_(MonoTime()),
+      overloaded_since_(MonoTime()),
+      has_spare_thread_(true) {
+}
+
+bool ThreadPool::QueueLoadMeter::overloaded(MonoDelta* time_overloaded) {
+  // First, check whether the queue was overloaded upon the latest update
+  // on the queue from the ThreadPool's activity. If that's the case, there is
+  // no need to dig further.
+  MonoTime overloaded_since = overloaded_since_.load();
+  if (overloaded_since.Initialized()) {
+    if (time_overloaded) {
+      *time_overloaded = MonoTime::Now() - overloaded_since;
+    }
+    return true;
+  }
+
+  // Even if the queue was not overloaded upon the latest update on the
+  // ThreadPool's activity, the queue might have stalled because all its worker
+  // threads have been busy for long time. If so, 'overloaded_since_' hasn't
+  // been updated by the activity on the thread pool itself. However, it's
+  // possible to detect whether the queue has stalled by checking if the task
+  // at the head of the queue hasn't been dispatched for longer than
+  // queue_time_threshold_ time interval.
+  MonoTime queue_head_submit_time = queue_head_submit_time_.load();
+  if (!queue_head_submit_time.Initialized() || has_spare_thread_.load()) {
+    return false;
+  }
+
+  const auto now = MonoTime::Now();
+  const MonoDelta queue_time = now - queue_head_submit_time;
+  if (queue_time > queue_time_threshold_) {
+    MonoTime overloaded_since;
+    if (overloaded_since_.compare_exchange_strong(overloaded_since, now)) {
+      VLOG(3) << Substitute("state transition: normal --> overloaded");
+      if (time_overloaded) {
+        *time_overloaded = queue_time - queue_time_threshold_;
+      }
+      return true;
+    }
+    DCHECK(overloaded_since.Initialized());
+    if (time_overloaded) {
+      *time_overloaded = now - overloaded_since;
+    }
+    return true;
+  }
+
+  return false;
+}
+
+void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked(
+    const MonoDelta& task_queue_time,
+    const MonoTime& queue_head_submit_time,
+    bool has_spare_thread) {
+  tpool_.lock_.AssertAcquired();
+  if (task_queue_time.Initialized()) {
+    // TODO(aserbin): any better way of tracking the running minimum of N numbers?
+    queue_times_.emplace_back(task_queue_time);
+    queue_times_ordered_.insert(task_queue_time);
+    if (queue_times_.size() > queue_time_history_length_) {
+      const auto& elem = queue_times_.front();
+      auto it = queue_times_ordered_.find(elem);
+      DCHECK(it != queue_times_ordered_.end());
+      queue_times_ordered_.erase(it);
+      queue_times_.pop_front();
+    }
+    min_queue_time_ = *queue_times_.begin();
+  }
+  queue_head_submit_time_.store(queue_head_submit_time);
+  has_spare_thread_.store(has_spare_thread);
+
+  // Update the load state of the queue, storing appropriate information
+  // in the 'overloaded_since_' field.
+  const auto now = MonoTime::Now();
+  const bool queue_empty = !queue_head_submit_time.Initialized();
+  const auto queue_time = queue_empty
+      ? MonoDelta::FromSeconds(0) : now - queue_head_submit_time;
+  const auto min_queue_time = min_queue_time_.Initialized()
+      ? min_queue_time_ : MonoDelta::FromSeconds(0);
+  const bool was_overloaded = overloaded_since_.load().Initialized();
+  const bool overloaded = !has_spare_thread &&
+      std::max(min_queue_time, queue_time) > queue_time_threshold_;
+  // Track the state transitions and update overloaded_since_.
+  if (!was_overloaded && overloaded) {
+    VLOG(3) << Substitute("state transition: normal --> overloaded");
+    overloaded_since_.store(now);
+  } else if (was_overloaded && !overloaded) {
+    VLOG(3) << Substitute("state transition: overloaded --> normal");
+    overloaded_since_.store(MonoTime());
+  }
+  VLOG(4) << Substitute("state refreshed: overloaded since $0, queue $1, "
+                        "has $2 thread, queue head submit time $3, "
+                        "queue time $4, min queue time $5",
+                        overloaded_since_.load().ToString(),
+                        queue_empty ? "empty" : "not empty",
+                        has_spare_thread ? "spare" : "no spare",
+                        queue_head_submit_time.ToString(),
+                        queue_time.ToString(),
+                        min_queue_time.ToString());
+}
+
 std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
   return o << ThreadPoolToken::StateToString(s);
 }
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index a1037e8..403acbd 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -16,10 +16,13 @@
 // under the License.
 #pragma once
 
+#include <atomic>
+#include <cstddef>
 #include <deque>
 #include <functional>
 #include <iosfwd>
 #include <memory>
+#include <set>
 #include <string>
 #include <unordered_set>
 
@@ -43,6 +46,7 @@ class ThreadPool;
 class ThreadPoolToken;
 class Trace;
 
+
 // Interesting thread pool metrics. Can be applied to the entire pool (see
 // ThreadPoolBuilder) or to individual tokens.
 struct ThreadPoolMetrics {
@@ -112,6 +116,7 @@ class ThreadPoolBuilder {
   ThreadPoolBuilder& set_max_threads(int max_threads);
   ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
   ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout);
+  ThreadPoolBuilder& set_queue_overload_threshold(const MonoDelta& threshold);
   ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics);
 
   // Instantiate a new ThreadPool with the existing builder arguments.
@@ -125,6 +130,7 @@ class ThreadPoolBuilder {
   int max_threads_;
   int max_queue_size_;
   MonoDelta idle_timeout_;
+  MonoDelta queue_overload_threshold_;
   ThreadPoolMetrics metrics_;
 
   DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
@@ -215,6 +221,13 @@ class ThreadPool {
     return num_threads_ + num_threads_pending_start_;
   }
 
+  // Whether the ThreadPool's queue is overloaded. If queue overload threshold
+  // isn't set, returns 'false'. Otherwise, returns whether the queue is
+  // overloaded. If overloaded, 'overloaded_time' and 'threshold' are both
+  // populated with corresponding information (if not null).
+  bool QueueOverloaded(MonoDelta* overloaded_time = nullptr,
+                       MonoDelta* threshold = nullptr) const;
+
  private:
   FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum);
   FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);
@@ -231,6 +244,127 @@ class ThreadPool {
     MonoTime submit_time;
   };
 
+  // This utility class is used to track how busy the ThreadPool is by
+  // monitoring its task queue timings. This class uses logic modeled after
+  // CoDel algorithm (see [1], [2], [3]), detecting whether the queue
+  // of a thread pool with the maximum of M worker threads is overloaded
+  // (it's assumed M > 0). The idea is to keep an eye on the queue times when
+  // the thread pool works at its full capacity. Even if there aren't any idle
+  // threads, we don't want to declare the queue overloaded when it's still able
+  // to dispatch many lightweight tasks pretty fast once they arrived. Instead,
+  // we start declaring the queue overloaded only when we see the evidence of
+  // the newly arrived tasks being stuck in the queue for a time interval longer
+  // than the configured threshold.
+  //
+  // Let's denote the minimum of the queue times of the last N tasks dispatched
+  // by min(QT_historic(N)). If the history of already dispatched tasks is
+  // empty, min(QT_historic(N)) is defined to be 0. The time interval that the
+  // very first element of the queue has been waiting to be dispatched is
+  // denoted by QT_head. The queue time threshold is denoted by T_overload.
+  //
+  // With that, the criterion to detect the 'overloaded' state of a ThreadPool's
+  // queue is defined as the following:
+  //
+  //   all available worker threads are busy
+  //     AND
+  //   max(QT_head, min(QT_historic(M))) > T_overload
+  //
+  // The maximum number of worker threads (M) in a thread pool naturally
+  // provides the proper length of the queue time history. To illustrate, let's
+  // examine one edge case. It's a case of continuous periodic workload of
+  // batches of M tasks, where all M tasks in a batch are scheduled at the same
+  // time and batches of M tasks are separated by T_overload time interval.
+  // Every batch contain (M - 1) heavyweight tasks and a single lightweight one.
+  // Let's assume it takes T_overload to complete a heavyweight task, and a
+  // lightweight one completes instantly. With such a workload running against a
+  // thread pool, it's able to handle many extra lightweight tasks with
+  // resulting queue times well under T_overload threshold. Apparently, the
+  // queue should not be declared overloaded in such case. So, if the queue time
+  // history length were less than M (e.g. (M - 1)), then the pool would be
+  // considered overloaded if capturing a moment when all worker threads are
+  // busy handling a newly arrived batch of M tasks, assuming the thread pool
+  // has already handled at least two batches of tasks since its start (the
+  // history of queue times would be { 0, T_overload, ..., T_overload } repeated
+  // many times).
+  //
+  // From the other side, if the size of the queue time history were greater
+  // than M, it would include not-so-relevant information for some patterns
+  // of scheduled tasks.
+  //
+  // References:
+  //   [1] https://queue.acm.org/detail.cfm?id=2209336
+  //   [2] https://en.wikipedia.org/wiki/CoDel
+  //   [3] https://man7.org/linux/man-pages/man8/CoDel.8.html
+  class QueueLoadMeter {
+   public:
+    // Create an instance of QueueLoadMeter class. The pool to attach to
+    // is specified by the 'tpool' parameter. The 'queue_time_threshold'
+    // parameter corresponds to 'T_overload' parameter from the algorithm
+    // description above, and 'queue_time_history_length' corresponds to 'N',
+    // respectively.
+    explicit QueueLoadMeter(const ThreadPool& tpool,
+                            const MonoDelta& queue_time_threshold,
+                            size_t queue_time_history_length);
+
+    const MonoDelta& queue_time_threshold() const {
+      return queue_time_threshold_;
+    }
+
+    // Check whether the queue is overloaded. If the queue is not overloaded,
+    // this method returns 'false'. If the queue is overloaded, this method
+    // return 'true'. In the latter case, if 'time_overloaded' is not null,
+    // it is populated with the information on how long the queue has been
+    // in the overloaded state. This method is lock-free.
+    bool overloaded(MonoDelta* time_overloaded = nullptr);
+
+    // Notify the meter about updates on the task queue. If a task being
+    // dequeued, the queue time of the task dequeued is specified via the
+    // 'task_queue_time' parameter, otherwise it's not initialized.
+    // Non-initialized 'queue_head_submit_time' means there isn't next task
+    // in the queue. The 'has_spare_thread' parameter conveys information
+    // on whether a "spare" worker thread is available.
+    void UpdateQueueInfoUnlocked(const MonoDelta& task_queue_time,
+                                 const MonoTime& queue_head_submit_time,
+                                 bool has_spare_thread);
+   private:
+    // The pool this meter is attached to.
+    const ThreadPool& tpool_;
+
+    // The threshold for the minimum queue times when determining whether the
+    // thread pool's tasks queue is in overloaded state. This corresponds to the
+    // parameter 'T_overload' in the description of the algorithm above.
+    const MonoDelta queue_time_threshold_;
+
+    // The measurement window to track task queue times. That's the number
+    // of the most recent tasks to check for the queue times. This corresponds
+    // to the parameter 'N' in the description of the algorithm above.
+    const size_t queue_time_history_length_;
+
+    // Queue timings of the most recent samples. The size of these containers
+    // is kept under queue_time_history_length_ limit.
+    std::deque<MonoDelta> queue_times_;
+    std::multiset<MonoDelta> queue_times_ordered_;
+    MonoDelta min_queue_time_;
+
+    // Below fields are to store the latest snapshot of the information about
+    // the task queue of the pool the meter is attached to.
+
+    // Time when the next queue task was submitted. Set to empty MonoTime() if
+    // there isn't a single element in the queue (i.e. Initialized() returns
+    // false).
+    std::atomic<MonoTime> queue_head_submit_time_;
+
+    // Time when the queue has entered the overloaded state. If the queue isn't
+    // in overloaded state, this member field isn't initialized
+    // (i.e. overloaded_since.Initialized() returns false).
+    std::atomic<MonoTime> overloaded_since_;
+
+    // Whether the TreadPool has a least one "spare" thread: a thread that can
+    // be spawned before reaching the maximum allowed number of threads,
+    // or one of those already spawned but currently idle.
+    std::atomic<bool> has_spare_thread_;
+  };
+
   // Creates a new thread pool using a builder.
   explicit ThreadPool(const ThreadPoolBuilder& builder);
 
@@ -255,6 +389,22 @@ class ThreadPool {
   // Releases token 't' and invalidates it.
   void ReleaseToken(ThreadPoolToken* t);
 
+  // Notify the load meter (if enabled) on relevant updates. If no information
+  // on dequeued task is available, 'queue_time' should be omitted (or be an
+  // uninitialized MonoDelta instance).
+  //
+  // The LoadMeter should be notified about events which affect the criterion
+  // to evaluate the state of the queue (overloaded vs normal). The criterion
+  // uses the following information:
+  //   * queue time of a newly dequeued task
+  //   * availability of spare worker threads
+  //   * submit time of the task at the head of the queue
+  // This means that LoadMeter should be notified about the following events:
+  //  * a task at the head of the queue has been dispatched to be run
+  //  * a worker thread completes running a task
+  //  * a new task has been scheduled (i.e. added into the queue)
+  void NotifyLoadMeterUnlocked(const MonoDelta& queue_time = MonoDelta());
+
   const std::string name_;
   const int min_threads_;
   const int max_threads_;
@@ -341,6 +491,10 @@ class ThreadPool {
   // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
   std::unique_ptr<ThreadPoolToken> tokenless_;
 
+  // The meter to track whether the pool's queue is stalled/overloaded.
+  // It's nullptr/none if the queue overload threshold is unset.
+  std::unique_ptr<QueueLoadMeter> load_meter_;
+
   // Metrics for the entire thread pool.
   const ThreadPoolMetrics metrics_;