You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/08/28 06:17:59 UTC
[kudu] 02/02: KUDU-1587 part 1: load meter for ThreadPool
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit fc8615c37eb4e28f3cc6bea0fcd5a8732451e883
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Aug 10 18:16:53 2020 -0700
KUDU-1587 part 1: load meter for ThreadPool
This patch introduces a load meter for ThreadPool, aiming to use
active queue management techniques (AQM) such as CoDel [1] in scenarios
where thread pool queue load metrics are applicable (e.g., KUDU-1587).
[1] https://en.wikipedia.org/wiki/CoDel
Change-Id: I640716dc32f193e68361ca623ee7b9271e661d8b
Reviewed-on: http://gerrit.cloudera.org:8080/16332
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/util/threadpool-test.cc | 274 +++++++++++++++++++++++++++++++++++++--
src/kudu/util/threadpool.cc | 203 ++++++++++++++++++++++++++---
src/kudu/util/threadpool.h | 154 ++++++++++++++++++++++
3 files changed, 603 insertions(+), 28 deletions(-)
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 93806f8..cecd01b 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -162,10 +162,12 @@ TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
}
TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
- ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
- .set_min_threads(0)
- .set_max_threads(3)
- .set_idle_timeout(MonoDelta::FromMilliseconds(1))));
+ constexpr int kIdleTimeoutMs = 1;
+ ASSERT_OK(RebuildPoolWithBuilder(
+ ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(0)
+ .set_max_threads(3)
+ .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleTimeoutMs))));
// There are no threads to start with.
ASSERT_TRUE(pool_->num_threads() == 0);
@@ -186,6 +188,11 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
latch.CountDown();
pool_->Wait();
ASSERT_EQ(0, pool_->active_threads_);
+ // Wait for more than idle timeout: so threads should be gone since
+ // min_threads is set to 0.
+ SleepFor(MonoDelta::FromMilliseconds(10 * kIdleTimeoutMs));
+ ASSERT_EQ(0, pool_->num_threads());
+ ASSERT_EQ(0, pool_->active_threads_);
pool_->Shutdown();
ASSERT_EQ(0, pool_->num_threads());
}
@@ -252,10 +259,12 @@ TEST_F(ThreadPoolTest, TestRace) {
}
TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
- ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
- .set_min_threads(1)
- .set_max_threads(4)
- .set_idle_timeout(MonoDelta::FromMilliseconds(1))));
+ constexpr int kIdleTimeoutMs = 1;
+ ASSERT_OK(RebuildPoolWithBuilder(
+ ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(1)
+ .set_max_threads(4)
+ .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleTimeoutMs))));
// There is 1 thread to start with.
ASSERT_EQ(1, pool_->num_threads());
@@ -276,6 +285,9 @@ TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
latch.CountDown();
pool_->Wait();
ASSERT_EQ(0, pool_->active_threads_);
+ SleepFor(MonoDelta::FromMilliseconds(10 * kIdleTimeoutMs));
+ ASSERT_EQ(0, pool_->active_threads_);
+ ASSERT_EQ(1, pool_->num_threads());
pool_->Shutdown();
ASSERT_EQ(0, pool_->num_threads());
}
@@ -448,6 +460,252 @@ TEST_F(ThreadPoolTest, TestMetrics) {
ASSERT_EQ(6, all_metrics[0].run_time_us_histogram->TotalCount());
}
+// Test scenario to verify the functionality of the QueueLoadMeter.
+TEST_F(ThreadPoolTest, QueueLoadMeter) {
+ const auto kQueueTimeThresholdMs = 100;
+ const auto kIdleThreadTimeoutMs = 200;
+ constexpr auto kMaxThreads = 3;
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(0)
+ .set_max_threads(kMaxThreads)
+ .set_queue_overload_threshold(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs))
+ .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleThreadTimeoutMs))));
+ // An idle pool must not have its queue overloaded.
+ ASSERT_FALSE(pool_->QueueOverloaded());
+
+ // One instant tasks cannot make pool's queue overloaded.
+ ASSERT_OK(pool_->Submit([](){}));
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ pool_->Wait();
+ ASSERT_FALSE(pool_->QueueOverloaded());
+
+ for (auto i = 0; i < kMaxThreads; ++i) {
+ ASSERT_OK(pool_->Submit([](){
+ SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs));
+ }));
+ }
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ pool_->Wait();
+ ASSERT_FALSE(pool_->QueueOverloaded());
+
+ for (auto i = 0; i < 2 * kMaxThreads; ++i) {
+ ASSERT_OK(pool_->Submit([](){
+ SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs));
+ }));
+ }
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs + 10));
+ ASSERT_TRUE(pool_->QueueOverloaded());
+ // Should still be overloaded after first kMaxThreads tasks are processed.
+ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs + 10));
+ ASSERT_TRUE(pool_->QueueOverloaded());
+ pool_->Wait();
+ ASSERT_FALSE(pool_->QueueOverloaded());
+
+ // Many instant tasks cannot make pool overloaded.
+ for (auto i = 0; i < kMaxThreads; ++i) {
+ ASSERT_OK(pool_->Submit([](){}));
+ }
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ pool_->Wait();
+ // For for the threads to be shutdown due to inactivity.
+ SleepFor(MonoDelta::FromMilliseconds(2 * kIdleThreadTimeoutMs));
+ // Even if all threads are shutdown, an idle pool with empty queue should not
+ // be overloaded.
+ ASSERT_FALSE(pool_->QueueOverloaded());
+
+ // Shovel some light tasks once again: this should not overload the queue.
+ for (auto i = 0; i < 10 * kMaxThreads; ++i) {
+ ASSERT_OK(pool_->Submit([](){
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ }));
+ }
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ pool_->Wait();
+ ASSERT_FALSE(pool_->QueueOverloaded());
+
+ // Submit a bunch of instant tasks via a single token: the queue should not
+ // become overloaded.
+ {
+ unique_ptr<ThreadPoolToken> t = pool_->NewToken(
+ ThreadPool::ExecutionMode::SERIAL);
+ ASSERT_OK(t->Submit([](){}));
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ pool_->Wait();
+ ASSERT_FALSE(pool_->QueueOverloaded());
+
+ for (auto i = 0; i < 100; ++i) {
+ ASSERT_OK(t->Submit([](){}));
+ }
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ pool_->Wait();
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ }
+
+ // Submit many instant tasks via multiple tokens (more than the maximum
+ // number of worker threads in a pool) and many lightweight tasks which can
+ // run concurrently: the queue should not become overloaded.
+ {
+ constexpr auto kNumTokens = 2 * kMaxThreads;
+ vector<unique_ptr<ThreadPoolToken>> tokens;
+ tokens.reserve(kNumTokens);
+ for (auto i = 0; i < kNumTokens; ++i) {
+ tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+ }
+
+ for (auto& t : tokens) {
+ for (auto i = 0; i < 50; ++i) {
+ ASSERT_OK(t->Submit([](){}));
+ }
+ for (auto i = 0; i < 10; ++i) {
+ ASSERT_OK(pool_->Submit([](){}));
+ }
+ }
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ pool_->Wait();
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ }
+
+ // Submit many long running tasks via serial tokens where the number of tokens
+ // is less than the maximum number of worker threads in the pool. The queue
+ // of the pool should not become overloaded since the pool has one spare
+ // thread to spawn.
+ {
+ constexpr auto kNumTokens = kMaxThreads - 1;
+ ASSERT_GT(kNumTokens, 0);
+ vector<unique_ptr<ThreadPoolToken>> tokens;
+ tokens.reserve(kNumTokens);
+ for (auto i = 0; i < kNumTokens; ++i) {
+ tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+ }
+
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ for (auto& t : tokens) {
+ for (auto i = 0; i < kMaxThreads; ++i) {
+ ASSERT_OK(t->Submit([](){
+ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+ }));
+ }
+ }
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ pool_->Wait();
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ }
+
+ // Submit many long running tasks via serial tokens where the number of tokens
+ // is greater or equal to the maximum number of worker threads in the pool.
+ // The queue of the pool should become overloaded since the pool is running
+ // at its capacity and queue times are over the threshold.
+ {
+ constexpr auto kNumTokens = kMaxThreads;
+ vector<unique_ptr<ThreadPoolToken>> tokens;
+ tokens.reserve(kNumTokens);
+ for (auto i = 0; i < kNumTokens; ++i) {
+ tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+ }
+
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ for (auto& t : tokens) {
+ for (auto i = 0; i < kMaxThreads; ++i) {
+ ASSERT_OK(t->Submit([](){
+ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+ }));
+ }
+ }
+ // Since there is exactly kMaxThreads serial pool tokens with tasks,
+ // the queue is empty most of the time. This is because active serial tokens
+ // are not kept in the queue. So, the status of the queue cannot be reliably
+ // determined by peeking at the submission times of the elements in the
+ // queue. Then the only way to detect overload of the queue is the history
+ // of queue times. The latter will reflect long queue times only after
+ // processing two tasks in each of the serial tokens. So, it's expected
+ // to get a stable report on the queue status only after two
+ // kQueueTimeThresholdMs time intervals.
+ SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs));
+ ASSERT_TRUE(pool_->QueueOverloaded());
+ pool_->Wait();
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ }
+
+ // A mixed case: submit many long running tasks via serial tokens where the
+ // number of tokens is less than the maximum number of worker threads in the
+ // pool and submit many instant tasks that can run concurrently.
+ {
+ constexpr auto kNumTokens = kMaxThreads - 1;
+ ASSERT_GT(kNumTokens, 0);
+ vector<unique_ptr<ThreadPoolToken>> tokens;
+ tokens.reserve(kNumTokens);
+ for (auto i = 0; i < kNumTokens; ++i) {
+ tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+ }
+
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ for (auto& t : tokens) {
+ for (auto i = 0; i < kMaxThreads; ++i) {
+ ASSERT_OK(t->Submit([](){
+ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+ }));
+ }
+ }
+ ASSERT_FALSE(pool_->QueueOverloaded());
+
+ // Add several light tasks in addition to the scheduled serial ones. This
+ // should not overload the queue.
+ for (auto i = 0; i < 10; ++i) {
+ ASSERT_OK(pool_->Submit([](){
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ }));
+ }
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ pool_->Wait();
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ }
+
+ // Another mixed case: submit many long running tasks via a serial token
+ // and many long running tasks that can run concurrently. The queue should
+ // become overloaded.
+ {
+ constexpr auto kNumTokens = 1;
+ vector<unique_ptr<ThreadPoolToken>> tokens;
+ tokens.reserve(kNumTokens);
+ for (auto i = 0; i < kNumTokens; ++i) {
+ tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+ }
+
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ for (auto& t : tokens) {
+ for (auto i = 0; i < kMaxThreads; ++i) {
+ ASSERT_OK(t->Submit([](){
+ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+ }));
+ }
+ }
+ ASSERT_FALSE(pool_->QueueOverloaded());
+
+ // Add several light tasks in addition to the scheduled serial ones. This
+ // should not overload the queue.
+ for (auto i = 0; i < 2 * kMaxThreads; ++i) {
+ ASSERT_OK(pool_->Submit([](){
+ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+ }));
+ }
+ SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+ ASSERT_TRUE(pool_->QueueOverloaded());
+ pool_->Wait();
+ ASSERT_FALSE(pool_->QueueOverloaded());
+ }
+}
+
// Test that a thread pool will crash if asked to run its own blocking
// functions in a pool thread.
//
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 9d8cc4a..6e2097d 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -17,6 +17,7 @@
#include "kudu/util/threadpool.h"
+#include <algorithm>
#include <cstdint>
#include <deque>
#include <functional>
@@ -24,7 +25,6 @@
#include <memory>
#include <ostream>
#include <string>
-#include <utility>
#include <glog/logging.h>
@@ -90,6 +90,12 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) {
return *this;
}
+ThreadPoolBuilder& ThreadPoolBuilder::set_queue_overload_threshold(
+ const MonoDelta& threshold) {
+ queue_overload_threshold_ = threshold;
+ return *this;
+}
+
Status ThreadPoolBuilder::Build(unique_ptr<ThreadPool>* pool) const {
pool->reset(new ThreadPool(*this));
return (*pool)->Init();
@@ -266,20 +272,20 @@ const char* ThreadPoolToken::StateToString(State s) {
////////////////////////////////////////////////////////
ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
- : name_(builder.name_),
- min_threads_(builder.min_threads_),
- max_threads_(builder.max_threads_),
- max_queue_size_(builder.max_queue_size_),
- idle_timeout_(builder.idle_timeout_),
- pool_status_(Status::Uninitialized("The pool was not initialized.")),
- idle_cond_(&lock_),
- no_threads_cond_(&lock_),
- num_threads_(0),
- num_threads_pending_start_(0),
- active_threads_(0),
- total_queued_tasks_(0),
- tokenless_(NewToken(ExecutionMode::CONCURRENT)),
- metrics_(builder.metrics_) {
+ : name_(builder.name_),
+ min_threads_(builder.min_threads_),
+ max_threads_(builder.max_threads_),
+ max_queue_size_(builder.max_queue_size_),
+ idle_timeout_(builder.idle_timeout_),
+ pool_status_(Status::Uninitialized("The pool was not initialized.")),
+ idle_cond_(&lock_),
+ no_threads_cond_(&lock_),
+ num_threads_(0),
+ num_threads_pending_start_(0),
+ active_threads_(0),
+ total_queued_tasks_(0),
+ tokenless_(NewToken(ExecutionMode::CONCURRENT)),
+ metrics_(builder.metrics_) {
string prefix = !builder.trace_metric_prefix_.empty() ?
builder.trace_metric_prefix_ : builder.name_;
@@ -289,6 +295,11 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
prefix + ".run_wall_time_us");
run_cpu_time_trace_metric_name_ = TraceMetrics::InternName(
prefix + ".run_cpu_time_us");
+
+ const auto& ovt = builder.queue_overload_threshold_;
+ if (ovt.Initialized() && ovt.ToNanoseconds() > 0) {
+ load_meter_.reset(new QueueLoadMeter(*this, ovt, max_threads_));
+ }
}
ThreadPool::~ThreadPool() {
@@ -397,6 +408,17 @@ unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics(
return t;
}
+bool ThreadPool::QueueOverloaded(MonoDelta* overloaded_time,
+ MonoDelta* threshold) const {
+ if (!load_meter_) {
+ return false;
+ }
+ if (threshold) {
+ *threshold = load_meter_->queue_time_threshold();
+ }
+ return load_meter_->overloaded(overloaded_time);
+}
+
void ThreadPool::ReleaseToken(ThreadPoolToken* t) {
MutexLock guard(lock_);
CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released",
@@ -410,7 +432,7 @@ Status ThreadPool::Submit(std::function<void()> f) {
Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
DCHECK(token);
- MonoTime submit_time = MonoTime::Now();
+ const MonoTime submit_time = MonoTime::Now();
MutexLock guard(lock_);
if (PREDICT_FALSE(!pool_status_.ok())) {
@@ -493,6 +515,7 @@ Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
idle_threads_.front().not_empty.Signal();
idle_threads_.pop_front();
}
+ NotifyLoadMeterUnlocked();
guard.Unlock();
if (metrics_.queue_length_histogram) {
@@ -518,7 +541,6 @@ Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
}
}
-
return Status::OK();
}
@@ -570,6 +592,7 @@ void ThreadPool::DispatchThread() {
//
// Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
idle_threads_.push_front(me);
+ NotifyLoadMeterUnlocked();
SCOPED_CLEANUP({
// For some wake ups (i.e. Shutdown or DoSubmit) this thread is
// guaranteed to be unlinked after being awakened. In others (i.e.
@@ -609,6 +632,10 @@ void ThreadPool::DispatchThread() {
--total_queued_tasks_;
++active_threads_;
+ const MonoTime now(MonoTime::Now());
+ const MonoDelta queue_time = now - task.submit_time;
+ NotifyLoadMeterUnlocked(queue_time);
+
unique_lock.Unlock();
// Release the reference which was held by the queued item.
@@ -617,9 +644,8 @@ void ThreadPool::DispatchThread() {
task.trace->Release();
}
- // Update metrics
- MonoTime now(MonoTime::Now());
- int64_t queue_time_us = (now - task.submit_time).ToMicroseconds();
+ // Update metrics.
+ const int64_t queue_time_us = queue_time.ToMicroseconds();
TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us);
if (metrics_.queue_time_us_histogram) {
metrics_.queue_time_us_histogram->Increment(queue_time_us);
@@ -673,6 +699,15 @@ void ThreadPool::DispatchThread() {
queue_.emplace_back(token);
}
}
+ if (!queue_.empty()) {
+ // If the queue is empty, the LoadMeter is notified on next iteration of
+ // the outer while() loop under the 'if (queue_.empty())' clause. Here
+ // it's crucial to call NotifyLoadMeter() _before_ decrementing
+ // 'active_threads_' to avoid reporting this thread as a spare one despite
+ // the fact that it will run a task from the non-empty queue immediately
+ // on next iteration of the outer while() loop.
+ NotifyLoadMeterUnlocked();
+ }
if (--active_threads_ == 0) {
idle_cond_.Broadcast();
}
@@ -709,6 +744,134 @@ void ThreadPool::CheckNotPoolThreadUnlocked() {
}
}
+void ThreadPool::NotifyLoadMeterUnlocked(const MonoDelta& queue_time) {
+ if (!load_meter_) {
+ return;
+ }
+
+ lock_.AssertAcquired();
+ MonoTime queue_head_submit_time;
+ if (!queue_.empty()) {
+ DCHECK(!queue_.front()->entries_.empty());
+ queue_head_submit_time = queue_.front()->entries_.front().submit_time;
+ }
+ load_meter_->UpdateQueueInfoUnlocked(queue_time,
+ queue_head_submit_time,
+ active_threads_ < max_threads_);
+}
+
+////////////////////////////////////////////////////////
+// ThreadPool::QueueLoadMeter
+////////////////////////////////////////////////////////
+
+ThreadPool::QueueLoadMeter::QueueLoadMeter(
+ const ThreadPool& tpool,
+ const MonoDelta& queue_time_threshold,
+ size_t queue_time_history_length)
+ : tpool_(tpool),
+ queue_time_threshold_(queue_time_threshold),
+ queue_time_history_length_(queue_time_history_length),
+ queue_head_submit_time_(MonoTime()),
+ overloaded_since_(MonoTime()),
+ has_spare_thread_(true) {
+}
+
+bool ThreadPool::QueueLoadMeter::overloaded(MonoDelta* time_overloaded) {
+ // First, check whether the queue was overloaded upon the latest update
+ // on the queue from the ThreadPool's activity. If that's the case, there is
+ // no need to dig further.
+ MonoTime overloaded_since = overloaded_since_.load();
+ if (overloaded_since.Initialized()) {
+ if (time_overloaded) {
+ *time_overloaded = MonoTime::Now() - overloaded_since;
+ }
+ return true;
+ }
+
+ // Even if the queue was not overloaded upon the latest update on the
+ // ThreadPool's activity, the queue might have stalled because all its worker
+ // threads have been busy for long time. If so, 'overloaded_since_' hasn't
+ // been updated by the activity on the thread pool itself. However, it's
+ // possible to detect whether the queue has stalled by checking if the task
+ // at the head of the queue hasn't been dispatched for longer than
+ // queue_time_threshold_ time interval.
+ MonoTime queue_head_submit_time = queue_head_submit_time_.load();
+ if (!queue_head_submit_time.Initialized() || has_spare_thread_.load()) {
+ return false;
+ }
+
+ const auto now = MonoTime::Now();
+ const MonoDelta queue_time = now - queue_head_submit_time;
+ if (queue_time > queue_time_threshold_) {
+ MonoTime overloaded_since;
+ if (overloaded_since_.compare_exchange_strong(overloaded_since, now)) {
+ VLOG(3) << Substitute("state transition: normal --> overloaded");
+ if (time_overloaded) {
+ *time_overloaded = queue_time - queue_time_threshold_;
+ }
+ return true;
+ }
+ DCHECK(overloaded_since.Initialized());
+ if (time_overloaded) {
+ *time_overloaded = now - overloaded_since;
+ }
+ return true;
+ }
+
+ return false;
+}
+
+void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked(
+ const MonoDelta& task_queue_time,
+ const MonoTime& queue_head_submit_time,
+ bool has_spare_thread) {
+ tpool_.lock_.AssertAcquired();
+ if (task_queue_time.Initialized()) {
+ // TODO(aserbin): any better way of tracking the running minimum of N numbers?
+ queue_times_.emplace_back(task_queue_time);
+ queue_times_ordered_.insert(task_queue_time);
+ if (queue_times_.size() > queue_time_history_length_) {
+ const auto& elem = queue_times_.front();
+ auto it = queue_times_ordered_.find(elem);
+ DCHECK(it != queue_times_ordered_.end());
+ queue_times_ordered_.erase(it);
+ queue_times_.pop_front();
+ }
+ min_queue_time_ = *queue_times_.begin();
+ }
+ queue_head_submit_time_.store(queue_head_submit_time);
+ has_spare_thread_.store(has_spare_thread);
+
+ // Update the load state of the queue, storing appropriate information
+ // in the 'overloaded_since_' field.
+ const auto now = MonoTime::Now();
+ const bool queue_empty = !queue_head_submit_time.Initialized();
+ const auto queue_time = queue_empty
+ ? MonoDelta::FromSeconds(0) : now - queue_head_submit_time;
+ const auto min_queue_time = min_queue_time_.Initialized()
+ ? min_queue_time_ : MonoDelta::FromSeconds(0);
+ const bool was_overloaded = overloaded_since_.load().Initialized();
+ const bool overloaded = !has_spare_thread &&
+ std::max(min_queue_time, queue_time) > queue_time_threshold_;
+ // Track the state transitions and update overloaded_since_.
+ if (!was_overloaded && overloaded) {
+ VLOG(3) << Substitute("state transition: normal --> overloaded");
+ overloaded_since_.store(now);
+ } else if (was_overloaded && !overloaded) {
+ VLOG(3) << Substitute("state transition: overloaded --> normal");
+ overloaded_since_.store(MonoTime());
+ }
+ VLOG(4) << Substitute("state refreshed: overloaded since $0, queue $1, "
+ "has $2 thread, queue head submit time $3, "
+ "queue time $4, min queue time $5",
+ overloaded_since_.load().ToString(),
+ queue_empty ? "empty" : "not empty",
+ has_spare_thread ? "spare" : "no spare",
+ queue_head_submit_time.ToString(),
+ queue_time.ToString(),
+ min_queue_time.ToString());
+}
+
std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
return o << ThreadPoolToken::StateToString(s);
}
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index a1037e8..403acbd 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -16,10 +16,13 @@
// under the License.
#pragma once
+#include <atomic>
+#include <cstddef>
#include <deque>
#include <functional>
#include <iosfwd>
#include <memory>
+#include <set>
#include <string>
#include <unordered_set>
@@ -43,6 +46,7 @@ class ThreadPool;
class ThreadPoolToken;
class Trace;
+
// Interesting thread pool metrics. Can be applied to the entire pool (see
// ThreadPoolBuilder) or to individual tokens.
struct ThreadPoolMetrics {
@@ -112,6 +116,7 @@ class ThreadPoolBuilder {
ThreadPoolBuilder& set_max_threads(int max_threads);
ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout);
+ ThreadPoolBuilder& set_queue_overload_threshold(const MonoDelta& threshold);
ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics);
// Instantiate a new ThreadPool with the existing builder arguments.
@@ -125,6 +130,7 @@ class ThreadPoolBuilder {
int max_threads_;
int max_queue_size_;
MonoDelta idle_timeout_;
+ MonoDelta queue_overload_threshold_;
ThreadPoolMetrics metrics_;
DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
@@ -215,6 +221,13 @@ class ThreadPool {
return num_threads_ + num_threads_pending_start_;
}
+ // Whether the ThreadPool's queue is overloaded. If queue overload threshold
+ // isn't set, returns 'false'. Otherwise, returns whether the queue is
+ // overloaded. If overloaded, 'overloaded_time' and 'threshold' are both
+ // populated with corresponding information (if not null).
+ bool QueueOverloaded(MonoDelta* overloaded_time = nullptr,
+ MonoDelta* threshold = nullptr) const;
+
private:
FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum);
FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);
@@ -231,6 +244,127 @@ class ThreadPool {
MonoTime submit_time;
};
+ // This utility class is used to track how busy the ThreadPool is by
+ // monitoring its task queue timings. This class uses logic modeled after
+ // CoDel algorithm (see [1], [2], [3]), detecting whether the queue
+ // of a thread pool with the maximum of M worker threads is overloaded
+ // (it's assumed M > 0). The idea is to keep an eye on the queue times when
+ // the thread pool works at its full capacity. Even if there aren't any idle
+ // threads, we don't want to declare the queue overloaded when it's still able
+ // to dispatch many lightweight tasks pretty fast once they arrived. Instead,
+ // we start declaring the queue overloaded only when we see the evidence of
+ // the newly arrived tasks being stuck in the queue for a time interval longer
+ // than the configured threshold.
+ //
+ // Let's denote the minimum of the queue times of the last N tasks dispatched
+ // by min(QT_historic(N)). If the history of already dispatched tasks is
+ // empty, min(QT_historic(N)) is defined to be 0. The time interval that the
+ // very first element of the queue has been waiting to be dispatched is
+ // denoted by QT_head. The queue time threshold is denoted by T_overload.
+ //
+ // With that, the criterion to detect the 'overloaded' state of a ThreadPool's
+ // queue is defined as the following:
+ //
+ // all available worker threads are busy
+ // AND
+ // max(QT_head, min(QT_historic(M))) > T_overload
+ //
+ // The maximum number of worker threads (M) in a thread pool naturally
+ // provides the proper length of the queue time history. To illustrate, let's
+ // examine one edge case. It's a case of continuous periodic workload of
+ // batches of M tasks, where all M tasks in a batch are scheduled at the same
+ // time and batches of M tasks are separated by T_overload time interval.
+ // Every batch contain (M - 1) heavyweight tasks and a single lightweight one.
+ // Let's assume it takes T_overload to complete a heavyweight task, and a
+ // lightweight one completes instantly. With such a workload running against a
+ // thread pool, it's able to handle many extra lightweight tasks with
+ // resulting queue times well under T_overload threshold. Apparently, the
+ // queue should not be declared overloaded in such case. So, if the queue time
+ // history length were less than M (e.g. (M - 1)), then the pool would be
+ // considered overloaded if capturing a moment when all worker threads are
+ // busy handling a newly arrived batch of M tasks, assuming the thread pool
+ // has already handled at least two batches of tasks since its start (the
+ // history of queue times would be { 0, T_overload, ..., T_overload } repeated
+ // many times).
+ //
+ // From the other side, if the size of the queue time history were greater
+ // than M, it would include not-so-relevant information for some patterns
+ // of scheduled tasks.
+ //
+ // References:
+ // [1] https://queue.acm.org/detail.cfm?id=2209336
+ // [2] https://en.wikipedia.org/wiki/CoDel
+ // [3] https://man7.org/linux/man-pages/man8/CoDel.8.html
+ class QueueLoadMeter {
+ public:
+ // Create an instance of QueueLoadMeter class. The pool to attach to
+ // is specified by the 'tpool' parameter. The 'queue_time_threshold'
+ // parameter corresponds to 'T_overload' parameter from the algorithm
+ // description above, and 'queue_time_history_length' corresponds to 'N',
+ // respectively.
+ explicit QueueLoadMeter(const ThreadPool& tpool,
+ const MonoDelta& queue_time_threshold,
+ size_t queue_time_history_length);
+
+ const MonoDelta& queue_time_threshold() const {
+ return queue_time_threshold_;
+ }
+
+ // Check whether the queue is overloaded. If the queue is not overloaded,
+ // this method returns 'false'. If the queue is overloaded, this method
+ // return 'true'. In the latter case, if 'time_overloaded' is not null,
+ // it is populated with the information on how long the queue has been
+ // in the overloaded state. This method is lock-free.
+ bool overloaded(MonoDelta* time_overloaded = nullptr);
+
+ // Notify the meter about updates on the task queue. If a task being
+ // dequeued, the queue time of the task dequeued is specified via the
+ // 'task_queue_time' parameter, otherwise it's not initialized.
+ // Non-initialized 'queue_head_submit_time' means there isn't next task
+ // in the queue. The 'has_spare_thread' parameter conveys information
+ // on whether a "spare" worker thread is available.
+ void UpdateQueueInfoUnlocked(const MonoDelta& task_queue_time,
+ const MonoTime& queue_head_submit_time,
+ bool has_spare_thread);
+ private:
+ // The pool this meter is attached to.
+ const ThreadPool& tpool_;
+
+ // The threshold for the minimum queue times when determining whether the
+ // thread pool's tasks queue is in overloaded state. This corresponds to the
+ // parameter 'T_overload' in the description of the algorithm above.
+ const MonoDelta queue_time_threshold_;
+
+ // The measurement window to track task queue times. That's the number
+ // of the most recent tasks to check for the queue times. This corresponds
+ // to the parameter 'N' in the description of the algorithm above.
+ const size_t queue_time_history_length_;
+
+ // Queue timings of the most recent samples. The size of these containers
+ // is kept under queue_time_history_length_ limit.
+ std::deque<MonoDelta> queue_times_;
+ std::multiset<MonoDelta> queue_times_ordered_;
+ MonoDelta min_queue_time_;
+
+ // Below fields are to store the latest snapshot of the information about
+ // the task queue of the pool the meter is attached to.
+
+ // Time when the next queue task was submitted. Set to empty MonoTime() if
+ // there isn't a single element in the queue (i.e. Initialized() returns
+ // false).
+ std::atomic<MonoTime> queue_head_submit_time_;
+
+ // Time when the queue has entered the overloaded state. If the queue isn't
+ // in overloaded state, this member field isn't initialized
+ // (i.e. overloaded_since.Initialized() returns false).
+ std::atomic<MonoTime> overloaded_since_;
+
+ // Whether the TreadPool has a least one "spare" thread: a thread that can
+ // be spawned before reaching the maximum allowed number of threads,
+ // or one of those already spawned but currently idle.
+ std::atomic<bool> has_spare_thread_;
+ };
+
// Creates a new thread pool using a builder.
explicit ThreadPool(const ThreadPoolBuilder& builder);
@@ -255,6 +389,22 @@ class ThreadPool {
// Releases token 't' and invalidates it.
void ReleaseToken(ThreadPoolToken* t);
+ // Notify the load meter (if enabled) on relevant updates. If no information
+ // on dequeued task is available, 'queue_time' should be omitted (or be an
+ // uninitialized MonoDelta instance).
+ //
+ // The LoadMeter should be notified about events which affect the criterion
+ // to evaluate the state of the queue (overloaded vs normal). The criterion
+ // uses the following information:
+ // * queue time of a newly dequeued task
+ // * availability of spare worker threads
+ // * submit time of the task at the head of the queue
+ // This means that LoadMeter should be notified about the following events:
+ // * a task at the head of the queue has been dispatched to be run
+ // * a worker thread completes running a task
+ // * a new task has been scheduled (i.e. added into the queue)
+ void NotifyLoadMeterUnlocked(const MonoDelta& queue_time = MonoDelta());
+
const std::string name_;
const int min_threads_;
const int max_threads_;
@@ -341,6 +491,10 @@ class ThreadPool {
// ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
std::unique_ptr<ThreadPoolToken> tokenless_;
+ // The meter to track whether the pool's queue is stalled/overloaded.
+ // It's nullptr/none if the queue overload threshold is unset.
+ std::unique_ptr<QueueLoadMeter> load_meter_;
+
// Metrics for the entire thread pool.
const ThreadPoolMetrics metrics_;