You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/06/28 00:05:38 UTC

[2/3] kudu git commit: threadpool: per-token metrics

threadpool: per-token metrics

This patch makes thread pool metrics configurable on a token by token basis.
The idea is simple: if a pool is shared by multiple entities in a server
(i.e. one token per tablet), pool operation metrics should reflect those
entities rather than be server-wide.

Change-Id: I29e3f78d0c033ff24f675cd84e7f7d65bf19954b
Reviewed-on: http://gerrit.cloudera.org:8080/7149
Tested-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/31b852f4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/31b852f4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/31b852f4

Branch: refs/heads/master
Commit: 31b852f4ae3979401e6cee0f021e4ff20c467d2c
Parents: 3fb84a7
Author: Adar Dembo <ad...@cloudera.com>
Authored: Mon Jun 12 00:38:17 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Jun 27 23:58:59 2017 +0000

----------------------------------------------------------------------
 src/kudu/kserver/kserver.cc       | 12 ++++----
 src/kudu/tablet/tablet_replica.cc | 16 +++++-----
 src/kudu/util/threadpool-test.cc  | 54 ++++++++++++++++++++++-----------
 src/kudu/util/threadpool.cc       | 55 ++++++++++++++++++++--------------
 src/kudu/util/threadpool.h        | 53 +++++++++++++++++++-------------
 5 files changed, 117 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/31b852f4/src/kudu/kserver/kserver.cc
----------------------------------------------------------------------
diff --git a/src/kudu/kserver/kserver.cc b/src/kudu/kserver/kserver.cc
index df9da48..47ffd0e 100644
--- a/src/kudu/kserver/kserver.cc
+++ b/src/kudu/kserver/kserver.cc
@@ -65,14 +65,14 @@ KuduServer::KuduServer(string name,
 Status KuduServer::Init() {
   RETURN_NOT_OK(ServerBase::Init());
 
+  ThreadPoolMetrics metrics = {
+      METRIC_op_apply_queue_length.Instantiate(metric_entity_),
+      METRIC_op_apply_queue_time.Instantiate(metric_entity_),
+      METRIC_op_apply_run_time.Instantiate(metric_entity_)
+  };
   RETURN_NOT_OK(ThreadPoolBuilder("apply")
+                .set_metrics(std::move(metrics))
                 .Build(&tablet_apply_pool_));
-  tablet_apply_pool_->SetQueueLengthHistogram(
-      METRIC_op_apply_queue_length.Instantiate(metric_entity_));
-  tablet_apply_pool_->SetQueueTimeMicrosHistogram(
-      METRIC_op_apply_queue_time.Instantiate(metric_entity_));
-  tablet_apply_pool_->SetRunTimeMicrosHistogram(
-      METRIC_op_apply_run_time.Instantiate(metric_entity_));
 
   // This pool is shared by all replicas hosted by this server.
   //

http://git-wip-us.apache.org/repos/asf/kudu/blob/31b852f4/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index b6295ee..9698098 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -134,13 +134,15 @@ Status TabletReplica::Init(const shared_ptr<Tablet>& tablet,
   DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
   DCHECK(log) << "A TabletReplica must be provided with a Log";
 
-  RETURN_NOT_OK(ThreadPoolBuilder("prepare").set_max_threads(1).Build(&prepare_pool_));
-  prepare_pool_->SetQueueLengthHistogram(
-      METRIC_op_prepare_queue_length.Instantiate(metric_entity));
-  prepare_pool_->SetQueueTimeMicrosHistogram(
-      METRIC_op_prepare_queue_time.Instantiate(metric_entity));
-  prepare_pool_->SetRunTimeMicrosHistogram(
-      METRIC_op_prepare_run_time.Instantiate(metric_entity));
+  ThreadPoolMetrics metrics = {
+      METRIC_op_prepare_queue_length.Instantiate(metric_entity),
+      METRIC_op_prepare_queue_time.Instantiate(metric_entity),
+      METRIC_op_prepare_run_time.Instantiate(metric_entity)
+  };
+  RETURN_NOT_OK(ThreadPoolBuilder("prepare")
+                .set_max_threads(1)
+                .set_metrics(std::move(metrics))
+                .Build(&prepare_pool_));
 
   {
     std::lock_guard<simple_spinlock> lock(lock_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/31b852f4/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index a66dfde..cec1b93 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -351,29 +351,49 @@ METRIC_DEFINE_histogram(test_entity, run_time, "run time",
 
 TEST_F(ThreadPoolTest, TestMetrics) {
   MetricRegistry registry;
-  scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate(
-      &registry, "test entity");
-  ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+  vector<ThreadPoolMetrics> all_metrics;
+  for (int i = 0; i < 3; i++) {
+    scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate(
+        &registry, Substitute("test $0", i));
+    all_metrics.emplace_back(ThreadPoolMetrics{
+      METRIC_queue_length.Instantiate(entity),
+      METRIC_queue_time.Instantiate(entity),
+      METRIC_run_time.Instantiate(entity)
+    });
+  }
 
   // Enable metrics for the thread pool.
-  scoped_refptr<Histogram> queue_length = METRIC_queue_length.Instantiate(entity);
-  scoped_refptr<Histogram> queue_time = METRIC_queue_time.Instantiate(entity);
-  scoped_refptr<Histogram> run_time = METRIC_run_time.Instantiate(entity);
-  pool_->SetQueueLengthHistogram(queue_length);
-  pool_->SetQueueTimeMicrosHistogram(queue_time);
-  pool_->SetRunTimeMicrosHistogram(run_time);
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(1)
+                                   .set_max_threads(1)
+                                   .set_metrics(all_metrics[0])));
 
-  int kNumItems = 500;
-  for (int i = 0; i < kNumItems; i++) {
-    ASSERT_OK(pool_->SubmitFunc(boost::bind(&usleep, i)));
-  }
+  unique_ptr<ThreadPoolToken> t1 = pool_->NewTokenWithMetrics(
+      ThreadPool::ExecutionMode::SERIAL, all_metrics[1]);
+  unique_ptr<ThreadPoolToken> t2 = pool_->NewTokenWithMetrics(
+      ThreadPool::ExecutionMode::SERIAL, all_metrics[2]);
 
+  // Submit once to t1, twice to t2, and three times without a token.
+  ASSERT_OK(t1->SubmitFunc([](){}));
+  ASSERT_OK(t2->SubmitFunc([](){}));
+  ASSERT_OK(t2->SubmitFunc([](){}));
+  ASSERT_OK(pool_->SubmitFunc([](){}));
+  ASSERT_OK(pool_->SubmitFunc([](){}));
+  ASSERT_OK(pool_->SubmitFunc([](){}));
   pool_->Wait();
 
-  // Check that all histograms were incremented once per submitted item.
-  ASSERT_EQ(kNumItems, queue_length->TotalCount());
-  ASSERT_EQ(kNumItems, queue_time->TotalCount());
-  ASSERT_EQ(kNumItems, run_time->TotalCount());
+  // The total counts should reflect the number of submissions to each token.
+  ASSERT_EQ(1, all_metrics[1].queue_length_histogram->TotalCount());
+  ASSERT_EQ(1, all_metrics[1].queue_time_us_histogram->TotalCount());
+  ASSERT_EQ(1, all_metrics[1].run_time_us_histogram->TotalCount());
+  ASSERT_EQ(2, all_metrics[2].queue_length_histogram->TotalCount());
+  ASSERT_EQ(2, all_metrics[2].queue_time_us_histogram->TotalCount());
+  ASSERT_EQ(2, all_metrics[2].run_time_us_histogram->TotalCount());
+
+  // And the counts on the pool-wide metrics should reflect all submissions.
+  ASSERT_EQ(6, all_metrics[0].queue_length_histogram->TotalCount());
+  ASSERT_EQ(6, all_metrics[0].queue_time_us_histogram->TotalCount());
+  ASSERT_EQ(6, all_metrics[0].run_time_us_histogram->TotalCount());
 }
 
 // Test that a thread pool will crash if asked to run its own blocking

http://git-wip-us.apache.org/repos/asf/kudu/blob/31b852f4/src/kudu/util/threadpool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index eda052f..d133b4b 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -115,6 +115,11 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_tim
   return *this;
 }
 
+ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) {
+  metrics_ = std::move(metrics);
+  return *this;
+}
+
 Status ThreadPoolBuilder::Build(gscoped_ptr<ThreadPool>* pool) const {
   pool->reset(new ThreadPool(*this));
   RETURN_NOT_OK((*pool)->Init());
@@ -126,8 +131,10 @@ Status ThreadPoolBuilder::Build(gscoped_ptr<ThreadPool>* pool) const {
 ////////////////////////////////////////////////////////
 
 ThreadPoolToken::ThreadPoolToken(ThreadPool* pool,
-                                 ThreadPool::ExecutionMode mode)
+                                 ThreadPool::ExecutionMode mode,
+                                 ThreadPoolMetrics metrics)
     : mode_(mode),
+      metrics_(std::move(metrics)),
       pool_(pool),
       state_(State::IDLE),
       not_running_cond_(&pool->lock_),
@@ -310,8 +317,8 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
     num_threads_(0),
     active_threads_(0),
     total_queued_tasks_(0),
-    tokenless_(NewToken(ExecutionMode::CONCURRENT)) {
-
+    tokenless_(NewToken(ExecutionMode::CONCURRENT)),
+    metrics_(builder.metrics_) {
   string prefix = !builder.trace_metric_prefix_.empty() ?
       builder.trace_metric_prefix_ : builder.name_;
 
@@ -413,8 +420,15 @@ void ThreadPool::Shutdown() {
 }
 
 unique_ptr<ThreadPoolToken> ThreadPool::NewToken(ExecutionMode mode) {
+  return NewTokenWithMetrics(mode, {});
+}
+
+unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics(
+    ExecutionMode mode, ThreadPoolMetrics metrics) {
   MutexLock guard(lock_);
-  unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this, mode));
+  unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this,
+                                                    mode,
+                                                    std::move(metrics)));
   InsertOrDie(&tokens_, t.get());
   return t;
 }
@@ -514,8 +528,11 @@ Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
   guard.Unlock();
   not_empty_.Signal();
 
-  if (queue_length_histogram_) {
-    queue_length_histogram_->Increment(length_at_submit);
+  if (metrics_.queue_length_histogram) {
+    metrics_.queue_length_histogram->Increment(length_at_submit);
+  }
+  if (token->metrics_.queue_length_histogram) {
+    token->metrics_.queue_length_histogram->Increment(length_at_submit);
   }
 
   return Status::OK();
@@ -544,18 +561,6 @@ bool ThreadPool::WaitFor(const MonoDelta& delta) {
   return true;
 }
 
-void ThreadPool::SetQueueLengthHistogram(const scoped_refptr<Histogram>& hist) {
-  queue_length_histogram_ = hist;
-}
-
-void ThreadPool::SetQueueTimeMicrosHistogram(const scoped_refptr<Histogram>& hist) {
-  queue_time_us_histogram_ = hist;
-}
-
-void ThreadPool::SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist) {
-  run_time_us_histogram_ = hist;
-}
-
 void ThreadPool::DispatchThread(bool permanent) {
   MutexLock unique_lock(lock_);
   while (true) {
@@ -609,8 +614,11 @@ void ThreadPool::DispatchThread(bool permanent) {
     MonoTime now(MonoTime::Now());
     int64_t queue_time_us = (now - task.submit_time).ToMicroseconds();
     TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us);
-    if (queue_time_us_histogram_) {
-      queue_time_us_histogram_->Increment(queue_time_us);
+    if (metrics_.queue_time_us_histogram) {
+      metrics_.queue_time_us_histogram->Increment(queue_time_us);
+    }
+    if (token->metrics_.queue_time_us_histogram) {
+      token->metrics_.queue_time_us_histogram->Increment(queue_time_us);
     }
 
     // Execute the task
@@ -623,8 +631,11 @@ void ThreadPool::DispatchThread(bool permanent) {
       int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
       int64_t cpu_us = GetThreadCpuTimeMicros() - start_cpu_us;
 
-      if (run_time_us_histogram_) {
-        run_time_us_histogram_->Increment(wall_us);
+      if (metrics_.run_time_us_histogram) {
+        metrics_.run_time_us_histogram->Increment(wall_us);
+      }
+      if (token->metrics_.run_time_us_histogram) {
+        token->metrics_.run_time_us_histogram->Increment(wall_us);
       }
       TRACE_COUNTER_INCREMENT(run_wall_time_trace_metric_name_, wall_us);
       TRACE_COUNTER_INCREMENT(run_cpu_time_trace_metric_name_, cpu_us);

http://git-wip-us.apache.org/repos/asf/kudu/blob/31b852f4/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 5abe0b4..69fed97 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -32,13 +32,13 @@
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/condition_variable.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/mutex.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
 
-class Histogram;
 class Thread;
 class ThreadPool;
 class ThreadPoolToken;
@@ -50,6 +50,19 @@ class Runnable {
   virtual ~Runnable() {}
 };
 
+// Interesting thread pool metrics. Can be applied to the entire pool (see
+// ThreadPoolBuilder) or to individual tokens.
+struct ThreadPoolMetrics {
+  // Measures the queue length seen by tasks when they enter the queue.
+  scoped_refptr<Histogram> queue_length_histogram;
+
+  // Measures the amount of time that tasks spend waiting in a queue.
+  scoped_refptr<Histogram> queue_time_us_histogram;
+
+  // Measures the amount of time that tasks spend running.
+  scoped_refptr<Histogram> run_time_us_histogram;
+};
+
 // ThreadPool takes a lot of arguments. We provide sane defaults with a builder.
 //
 // name: Used for debugging output and default names of the worker threads.
@@ -92,6 +105,9 @@ class Runnable {
 //    We always keep at least min_threads.
 //    Default: 500 milliseconds.
 //
+// metrics: Histograms, counters, etc. to update on various threadpool events.
+//    Default: not set.
+//
 class ThreadPoolBuilder {
  public:
   explicit ThreadPoolBuilder(std::string name);
@@ -103,12 +119,7 @@ class ThreadPoolBuilder {
   ThreadPoolBuilder& set_max_threads(int max_threads);
   ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
   ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout);
-
-  const std::string& name() const { return name_; }
-  int min_threads() const { return min_threads_; }
-  int max_threads() const { return max_threads_; }
-  int max_queue_size() const { return max_queue_size_; }
-  const MonoDelta& idle_timeout() const { return idle_timeout_; }
+  ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics);
 
   // Instantiate a new ThreadPool with the existing builder arguments.
   Status Build(gscoped_ptr<ThreadPool>* pool) const;
@@ -121,6 +132,7 @@ class ThreadPoolBuilder {
   int max_threads_;
   int max_queue_size_;
   MonoDelta idle_timeout_;
+  ThreadPoolMetrics metrics_;
 
   DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
 };
@@ -205,16 +217,11 @@ class ThreadPool {
   };
   std::unique_ptr<ThreadPoolToken> NewToken(ExecutionMode mode);
 
-  // Attaches a histogram which measures the queue length seen by tasks when they enter
-  // the thread pool's queue.
-  void SetQueueLengthHistogram(const scoped_refptr<Histogram>& hist);
-
-  // Attaches a histogram which measures the amount of time that tasks spend waiting in
-  // the queue.
-  void SetQueueTimeMicrosHistogram(const scoped_refptr<Histogram>& hist);
-
-  // Attaches a histogram which measures the amount of time that tasks spend running.
-  void SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist);
+  // Like NewToken(), but lets the caller provide metrics for the token. These
+  // metrics are incremented/decremented in addition to the configured
+  // pool-wide metrics (if any).
+  std::unique_ptr<ThreadPoolToken> NewTokenWithMetrics(ExecutionMode mode,
+                                                       ThreadPoolMetrics metrics);
 
  private:
   FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMaxThreads);
@@ -317,9 +324,8 @@ class ThreadPool {
   // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
   std::unique_ptr<ThreadPoolToken> tokenless_;
 
-  scoped_refptr<Histogram> queue_length_histogram_;
-  scoped_refptr<Histogram> queue_time_us_histogram_;
-  scoped_refptr<Histogram> run_time_us_histogram_;
+  // Metrics for the entire thread pool.
+  const ThreadPoolMetrics metrics_;
 
   const char* queue_time_trace_metric_name_;
   const char* run_wall_time_trace_metric_name_;
@@ -408,7 +414,9 @@ class ThreadPoolToken {
   // Constructs a new token.
   //
   // The token may not outlive its thread pool ('pool').
-  ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode);
+  ThreadPoolToken(ThreadPool* pool,
+                  ThreadPool::ExecutionMode mode,
+                  ThreadPoolMetrics metrics);
 
   // Changes this token's state to 'new_state' taking actions as needed.
   void Transition(State new_state);
@@ -432,6 +440,9 @@ class ThreadPoolToken {
   // Token's configured execution mode.
   const ThreadPool::ExecutionMode mode_;
 
+  // Metrics for just this token.
+  const ThreadPoolMetrics metrics_;
+
   // Pointer to the token's thread pool.
   ThreadPool* pool_;