You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/06/28 00:05:38 UTC
[2/3] kudu git commit: threadpool: per-token metrics
threadpool: per-token metrics
This patch makes thread pool metrics configurable on a token by token basis.
The idea is simple: if a pool is shared by multiple entities in a server
(i.e. one token per tablet), pool operation metrics should reflect those
entities rather than be server-wide.
Change-Id: I29e3f78d0c033ff24f675cd84e7f7d65bf19954b
Reviewed-on: http://gerrit.cloudera.org:8080/7149
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/31b852f4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/31b852f4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/31b852f4
Branch: refs/heads/master
Commit: 31b852f4ae3979401e6cee0f021e4ff20c467d2c
Parents: 3fb84a7
Author: Adar Dembo <ad...@cloudera.com>
Authored: Mon Jun 12 00:38:17 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Jun 27 23:58:59 2017 +0000
----------------------------------------------------------------------
src/kudu/kserver/kserver.cc | 12 ++++----
src/kudu/tablet/tablet_replica.cc | 16 +++++-----
src/kudu/util/threadpool-test.cc | 54 ++++++++++++++++++++++-----------
src/kudu/util/threadpool.cc | 55 ++++++++++++++++++++--------------
src/kudu/util/threadpool.h | 53 +++++++++++++++++++-------------
5 files changed, 117 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/31b852f4/src/kudu/kserver/kserver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/kserver/kserver.cc b/src/kudu/kserver/kserver.cc
index df9da48..47ffd0e 100644
--- a/src/kudu/kserver/kserver.cc
+++ b/src/kudu/kserver/kserver.cc
@@ -65,14 +65,14 @@ KuduServer::KuduServer(string name,
Status KuduServer::Init() {
RETURN_NOT_OK(ServerBase::Init());
+ ThreadPoolMetrics metrics = {
+ METRIC_op_apply_queue_length.Instantiate(metric_entity_),
+ METRIC_op_apply_queue_time.Instantiate(metric_entity_),
+ METRIC_op_apply_run_time.Instantiate(metric_entity_)
+ };
RETURN_NOT_OK(ThreadPoolBuilder("apply")
+ .set_metrics(std::move(metrics))
.Build(&tablet_apply_pool_));
- tablet_apply_pool_->SetQueueLengthHistogram(
- METRIC_op_apply_queue_length.Instantiate(metric_entity_));
- tablet_apply_pool_->SetQueueTimeMicrosHistogram(
- METRIC_op_apply_queue_time.Instantiate(metric_entity_));
- tablet_apply_pool_->SetRunTimeMicrosHistogram(
- METRIC_op_apply_run_time.Instantiate(metric_entity_));
// This pool is shared by all replicas hosted by this server.
//
http://git-wip-us.apache.org/repos/asf/kudu/blob/31b852f4/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index b6295ee..9698098 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -134,13 +134,15 @@ Status TabletReplica::Init(const shared_ptr<Tablet>& tablet,
DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
DCHECK(log) << "A TabletReplica must be provided with a Log";
- RETURN_NOT_OK(ThreadPoolBuilder("prepare").set_max_threads(1).Build(&prepare_pool_));
- prepare_pool_->SetQueueLengthHistogram(
- METRIC_op_prepare_queue_length.Instantiate(metric_entity));
- prepare_pool_->SetQueueTimeMicrosHistogram(
- METRIC_op_prepare_queue_time.Instantiate(metric_entity));
- prepare_pool_->SetRunTimeMicrosHistogram(
- METRIC_op_prepare_run_time.Instantiate(metric_entity));
+ ThreadPoolMetrics metrics = {
+ METRIC_op_prepare_queue_length.Instantiate(metric_entity),
+ METRIC_op_prepare_queue_time.Instantiate(metric_entity),
+ METRIC_op_prepare_run_time.Instantiate(metric_entity)
+ };
+ RETURN_NOT_OK(ThreadPoolBuilder("prepare")
+ .set_max_threads(1)
+ .set_metrics(std::move(metrics))
+ .Build(&prepare_pool_));
{
std::lock_guard<simple_spinlock> lock(lock_);
http://git-wip-us.apache.org/repos/asf/kudu/blob/31b852f4/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index a66dfde..cec1b93 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -351,29 +351,49 @@ METRIC_DEFINE_histogram(test_entity, run_time, "run time",
TEST_F(ThreadPoolTest, TestMetrics) {
MetricRegistry registry;
- scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate(
- ®istry, "test entity");
- ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+ vector<ThreadPoolMetrics> all_metrics;
+ for (int i = 0; i < 3; i++) {
+ scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate(
+ ®istry, Substitute("test $0", i));
+ all_metrics.emplace_back(ThreadPoolMetrics{
+ METRIC_queue_length.Instantiate(entity),
+ METRIC_queue_time.Instantiate(entity),
+ METRIC_run_time.Instantiate(entity)
+ });
+ }
// Enable metrics for the thread pool.
- scoped_refptr<Histogram> queue_length = METRIC_queue_length.Instantiate(entity);
- scoped_refptr<Histogram> queue_time = METRIC_queue_time.Instantiate(entity);
- scoped_refptr<Histogram> run_time = METRIC_run_time.Instantiate(entity);
- pool_->SetQueueLengthHistogram(queue_length);
- pool_->SetQueueTimeMicrosHistogram(queue_time);
- pool_->SetRunTimeMicrosHistogram(run_time);
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(1)
+ .set_max_threads(1)
+ .set_metrics(all_metrics[0])));
- int kNumItems = 500;
- for (int i = 0; i < kNumItems; i++) {
- ASSERT_OK(pool_->SubmitFunc(boost::bind(&usleep, i)));
- }
+ unique_ptr<ThreadPoolToken> t1 = pool_->NewTokenWithMetrics(
+ ThreadPool::ExecutionMode::SERIAL, all_metrics[1]);
+ unique_ptr<ThreadPoolToken> t2 = pool_->NewTokenWithMetrics(
+ ThreadPool::ExecutionMode::SERIAL, all_metrics[2]);
+ // Submit once to t1, twice to t2, and three times without a token.
+ ASSERT_OK(t1->SubmitFunc([](){}));
+ ASSERT_OK(t2->SubmitFunc([](){}));
+ ASSERT_OK(t2->SubmitFunc([](){}));
+ ASSERT_OK(pool_->SubmitFunc([](){}));
+ ASSERT_OK(pool_->SubmitFunc([](){}));
+ ASSERT_OK(pool_->SubmitFunc([](){}));
pool_->Wait();
- // Check that all histograms were incremented once per submitted item.
- ASSERT_EQ(kNumItems, queue_length->TotalCount());
- ASSERT_EQ(kNumItems, queue_time->TotalCount());
- ASSERT_EQ(kNumItems, run_time->TotalCount());
+ // The total counts should reflect the number of submissions to each token.
+ ASSERT_EQ(1, all_metrics[1].queue_length_histogram->TotalCount());
+ ASSERT_EQ(1, all_metrics[1].queue_time_us_histogram->TotalCount());
+ ASSERT_EQ(1, all_metrics[1].run_time_us_histogram->TotalCount());
+ ASSERT_EQ(2, all_metrics[2].queue_length_histogram->TotalCount());
+ ASSERT_EQ(2, all_metrics[2].queue_time_us_histogram->TotalCount());
+ ASSERT_EQ(2, all_metrics[2].run_time_us_histogram->TotalCount());
+
+ // And the counts on the pool-wide metrics should reflect all submissions.
+ ASSERT_EQ(6, all_metrics[0].queue_length_histogram->TotalCount());
+ ASSERT_EQ(6, all_metrics[0].queue_time_us_histogram->TotalCount());
+ ASSERT_EQ(6, all_metrics[0].run_time_us_histogram->TotalCount());
}
// Test that a thread pool will crash if asked to run its own blocking
http://git-wip-us.apache.org/repos/asf/kudu/blob/31b852f4/src/kudu/util/threadpool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index eda052f..d133b4b 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -115,6 +115,11 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_tim
return *this;
}
+ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) {
+ metrics_ = std::move(metrics);
+ return *this;
+}
+
Status ThreadPoolBuilder::Build(gscoped_ptr<ThreadPool>* pool) const {
pool->reset(new ThreadPool(*this));
RETURN_NOT_OK((*pool)->Init());
@@ -126,8 +131,10 @@ Status ThreadPoolBuilder::Build(gscoped_ptr<ThreadPool>* pool) const {
////////////////////////////////////////////////////////
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool,
- ThreadPool::ExecutionMode mode)
+ ThreadPool::ExecutionMode mode,
+ ThreadPoolMetrics metrics)
: mode_(mode),
+ metrics_(std::move(metrics)),
pool_(pool),
state_(State::IDLE),
not_running_cond_(&pool->lock_),
@@ -310,8 +317,8 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
num_threads_(0),
active_threads_(0),
total_queued_tasks_(0),
- tokenless_(NewToken(ExecutionMode::CONCURRENT)) {
-
+ tokenless_(NewToken(ExecutionMode::CONCURRENT)),
+ metrics_(builder.metrics_) {
string prefix = !builder.trace_metric_prefix_.empty() ?
builder.trace_metric_prefix_ : builder.name_;
@@ -413,8 +420,15 @@ void ThreadPool::Shutdown() {
}
unique_ptr<ThreadPoolToken> ThreadPool::NewToken(ExecutionMode mode) {
+ return NewTokenWithMetrics(mode, {});
+}
+
+unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics(
+ ExecutionMode mode, ThreadPoolMetrics metrics) {
MutexLock guard(lock_);
- unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode));
+ unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this,
+ mode,
+ std::move(metrics)));
InsertOrDie(&tokens_, t.get());
return t;
}
@@ -514,8 +528,11 @@ Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
guard.Unlock();
not_empty_.Signal();
- if (queue_length_histogram_) {
- queue_length_histogram_->Increment(length_at_submit);
+ if (metrics_.queue_length_histogram) {
+ metrics_.queue_length_histogram->Increment(length_at_submit);
+ }
+ if (token->metrics_.queue_length_histogram) {
+ token->metrics_.queue_length_histogram->Increment(length_at_submit);
}
return Status::OK();
@@ -544,18 +561,6 @@ bool ThreadPool::WaitFor(const MonoDelta& delta) {
return true;
}
-void ThreadPool::SetQueueLengthHistogram(const scoped_refptr<Histogram>& hist) {
- queue_length_histogram_ = hist;
-}
-
-void ThreadPool::SetQueueTimeMicrosHistogram(const scoped_refptr<Histogram>& hist) {
- queue_time_us_histogram_ = hist;
-}
-
-void ThreadPool::SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist) {
- run_time_us_histogram_ = hist;
-}
-
void ThreadPool::DispatchThread(bool permanent) {
MutexLock unique_lock(lock_);
while (true) {
@@ -609,8 +614,11 @@ void ThreadPool::DispatchThread(bool permanent) {
MonoTime now(MonoTime::Now());
int64_t queue_time_us = (now - task.submit_time).ToMicroseconds();
TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us);
- if (queue_time_us_histogram_) {
- queue_time_us_histogram_->Increment(queue_time_us);
+ if (metrics_.queue_time_us_histogram) {
+ metrics_.queue_time_us_histogram->Increment(queue_time_us);
+ }
+ if (token->metrics_.queue_time_us_histogram) {
+ token->metrics_.queue_time_us_histogram->Increment(queue_time_us);
}
// Execute the task
@@ -623,8 +631,11 @@ void ThreadPool::DispatchThread(bool permanent) {
int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
int64_t cpu_us = GetThreadCpuTimeMicros() - start_cpu_us;
- if (run_time_us_histogram_) {
- run_time_us_histogram_->Increment(wall_us);
+ if (metrics_.run_time_us_histogram) {
+ metrics_.run_time_us_histogram->Increment(wall_us);
+ }
+ if (token->metrics_.run_time_us_histogram) {
+ token->metrics_.run_time_us_histogram->Increment(wall_us);
}
TRACE_COUNTER_INCREMENT(run_wall_time_trace_metric_name_, wall_us);
TRACE_COUNTER_INCREMENT(run_cpu_time_trace_metric_name_, cpu_us);
http://git-wip-us.apache.org/repos/asf/kudu/blob/31b852f4/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 5abe0b4..69fed97 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -32,13 +32,13 @@
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/condition_variable.h"
+#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
#include "kudu/util/status.h"
namespace kudu {
-class Histogram;
class Thread;
class ThreadPool;
class ThreadPoolToken;
@@ -50,6 +50,19 @@ class Runnable {
virtual ~Runnable() {}
};
+// Interesting thread pool metrics. Can be applied to the entire pool (see
+// ThreadPoolBuilder) or to individual tokens.
+struct ThreadPoolMetrics {
+ // Measures the queue length seen by tasks when they enter the queue.
+ scoped_refptr<Histogram> queue_length_histogram;
+
+ // Measures the amount of time that tasks spend waiting in a queue.
+ scoped_refptr<Histogram> queue_time_us_histogram;
+
+ // Measures the amount of time that tasks spend running.
+ scoped_refptr<Histogram> run_time_us_histogram;
+};
+
// ThreadPool takes a lot of arguments. We provide sane defaults with a builder.
//
// name: Used for debugging output and default names of the worker threads.
@@ -92,6 +105,9 @@ class Runnable {
// We always keep at least min_threads.
// Default: 500 milliseconds.
//
+// metrics: Histograms, counters, etc. to update on various threadpool events.
+// Default: not set.
+//
class ThreadPoolBuilder {
public:
explicit ThreadPoolBuilder(std::string name);
@@ -103,12 +119,7 @@ class ThreadPoolBuilder {
ThreadPoolBuilder& set_max_threads(int max_threads);
ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout);
-
- const std::string& name() const { return name_; }
- int min_threads() const { return min_threads_; }
- int max_threads() const { return max_threads_; }
- int max_queue_size() const { return max_queue_size_; }
- const MonoDelta& idle_timeout() const { return idle_timeout_; }
+ ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics);
// Instantiate a new ThreadPool with the existing builder arguments.
Status Build(gscoped_ptr<ThreadPool>* pool) const;
@@ -121,6 +132,7 @@ class ThreadPoolBuilder {
int max_threads_;
int max_queue_size_;
MonoDelta idle_timeout_;
+ ThreadPoolMetrics metrics_;
DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
};
@@ -205,16 +217,11 @@ class ThreadPool {
};
std::unique_ptr<ThreadPoolToken> NewToken(ExecutionMode mode);
- // Attaches a histogram which measures the queue length seen by tasks when they enter
- // the thread pool's queue.
- void SetQueueLengthHistogram(const scoped_refptr<Histogram>& hist);
-
- // Attaches a histogram which measures the amount of time that tasks spend waiting in
- // the queue.
- void SetQueueTimeMicrosHistogram(const scoped_refptr<Histogram>& hist);
-
- // Attaches a histogram which measures the amount of time that tasks spend running.
- void SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist);
+ // Like NewToken(), but lets the caller provide metrics for the token. These
+ // metrics are incremented/decremented in addition to the configured
+ // pool-wide metrics (if any).
+ std::unique_ptr<ThreadPoolToken> NewTokenWithMetrics(ExecutionMode mode,
+ ThreadPoolMetrics metrics);
private:
FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMaxThreads);
@@ -317,9 +324,8 @@ class ThreadPool {
// ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
std::unique_ptr<ThreadPoolToken> tokenless_;
- scoped_refptr<Histogram> queue_length_histogram_;
- scoped_refptr<Histogram> queue_time_us_histogram_;
- scoped_refptr<Histogram> run_time_us_histogram_;
+ // Metrics for the entire thread pool.
+ const ThreadPoolMetrics metrics_;
const char* queue_time_trace_metric_name_;
const char* run_wall_time_trace_metric_name_;
@@ -408,7 +414,9 @@ class ThreadPoolToken {
// Constructs a new token.
//
// The token may not outlive its thread pool ('pool').
- ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode);
+ ThreadPoolToken(ThreadPool* pool,
+ ThreadPool::ExecutionMode mode,
+ ThreadPoolMetrics metrics);
// Changes this token's state to 'new_state' taking actions as needed.
void Transition(State new_state);
@@ -432,6 +440,9 @@ class ThreadPoolToken {
// Token's configured execution mode.
const ThreadPool::ExecutionMode mode_;
+ // Metrics for just this token.
+ const ThreadPoolMetrics metrics_;
+
// Pointer to the token's thread pool.
ThreadPool* pool_;