You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/08/28 06:17:57 UTC

[kudu] branch master updated (4eff1e7 -> fc8615c)

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 4eff1e7  [net] KUDU-3184: Fix GetFQDN() when canonical name returns null
     new c6d438a  [tserver] add test to reproduce KUDU-1587 conditions
     new fc8615c  KUDU-1587 part 1: load meter for ThreadPool

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/tserver/tablet_server-test.cc | 133 ++++++++++++++++
 src/kudu/util/threadpool-test.cc       | 274 ++++++++++++++++++++++++++++++++-
 src/kudu/util/threadpool.cc            | 203 +++++++++++++++++++++---
 src/kudu/util/threadpool.h             | 154 ++++++++++++++++++
 4 files changed, 736 insertions(+), 28 deletions(-)


[kudu] 02/02: KUDU-1587 part 1: load meter for ThreadPool

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit fc8615c37eb4e28f3cc6bea0fcd5a8732451e883
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Mon Aug 10 18:16:53 2020 -0700

    KUDU-1587 part 1: load meter for ThreadPool
    
    This patch introduces a load meter for ThreadPool, aiming to use
    active queue management techniques (AQM) such as CoDel [1] in scenarios
    where thread pool queue load metrics are applicable (e.g., KUDU-1587).
    
    [1] https://en.wikipedia.org/wiki/CoDel
    
    Change-Id: I640716dc32f193e68361ca623ee7b9271e661d8b
    Reviewed-on: http://gerrit.cloudera.org:8080/16332
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/util/threadpool-test.cc | 274 +++++++++++++++++++++++++++++++++++++--
 src/kudu/util/threadpool.cc      | 203 ++++++++++++++++++++++++++---
 src/kudu/util/threadpool.h       | 154 ++++++++++++++++++++++
 3 files changed, 603 insertions(+), 28 deletions(-)

diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 93806f8..cecd01b 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -162,10 +162,12 @@ TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
 }
 
 TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
-  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
-                                   .set_min_threads(0)
-                                   .set_max_threads(3)
-                                   .set_idle_timeout(MonoDelta::FromMilliseconds(1))));
+  constexpr int kIdleTimeoutMs = 1;
+  ASSERT_OK(RebuildPoolWithBuilder(
+      ThreadPoolBuilder(kDefaultPoolName)
+      .set_min_threads(0)
+      .set_max_threads(3)
+      .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleTimeoutMs))));
 
   // There are no threads to start with.
   ASSERT_TRUE(pool_->num_threads() == 0);
@@ -186,6 +188,11 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
   latch.CountDown();
   pool_->Wait();
   ASSERT_EQ(0, pool_->active_threads_);
+  // Wait for more than idle timeout: so threads should be gone since
+  // min_threads is set to 0.
+  SleepFor(MonoDelta::FromMilliseconds(10 * kIdleTimeoutMs));
+  ASSERT_EQ(0, pool_->num_threads());
+  ASSERT_EQ(0, pool_->active_threads_);
   pool_->Shutdown();
   ASSERT_EQ(0, pool_->num_threads());
 }
@@ -252,10 +259,12 @@ TEST_F(ThreadPoolTest, TestRace) {
 }
 
 TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
-  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
-                                   .set_min_threads(1)
-                                   .set_max_threads(4)
-                                   .set_idle_timeout(MonoDelta::FromMilliseconds(1))));
+  constexpr int kIdleTimeoutMs = 1;
+  ASSERT_OK(RebuildPoolWithBuilder(
+      ThreadPoolBuilder(kDefaultPoolName)
+      .set_min_threads(1)
+      .set_max_threads(4)
+      .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleTimeoutMs))));
 
   // There is 1 thread to start with.
   ASSERT_EQ(1, pool_->num_threads());
@@ -276,6 +285,9 @@ TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
   latch.CountDown();
   pool_->Wait();
   ASSERT_EQ(0, pool_->active_threads_);
+  SleepFor(MonoDelta::FromMilliseconds(10 * kIdleTimeoutMs));
+  ASSERT_EQ(0, pool_->active_threads_);
+  ASSERT_EQ(1, pool_->num_threads());
   pool_->Shutdown();
   ASSERT_EQ(0, pool_->num_threads());
 }
@@ -448,6 +460,252 @@ TEST_F(ThreadPoolTest, TestMetrics) {
   ASSERT_EQ(6, all_metrics[0].run_time_us_histogram->TotalCount());
 }
 
+// Test scenario to verify the functionality of the QueueLoadMeter.
+TEST_F(ThreadPoolTest, QueueLoadMeter) {
+  const auto kQueueTimeThresholdMs = 100;
+  const auto kIdleThreadTimeoutMs = 200;
+  constexpr auto kMaxThreads = 3;
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+      .set_min_threads(0)
+      .set_max_threads(kMaxThreads)
+      .set_queue_overload_threshold(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs))
+      .set_idle_timeout(MonoDelta::FromMilliseconds(kIdleThreadTimeoutMs))));
+  // An idle pool must not have its queue overloaded.
+  ASSERT_FALSE(pool_->QueueOverloaded());
+
+  // One instant tasks cannot make pool's queue overloaded.
+  ASSERT_OK(pool_->Submit([](){}));
+  ASSERT_FALSE(pool_->QueueOverloaded());
+  pool_->Wait();
+  ASSERT_FALSE(pool_->QueueOverloaded());
+
+  for (auto i = 0; i < kMaxThreads; ++i) {
+    ASSERT_OK(pool_->Submit([](){
+      SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs));
+    }));
+  }
+  ASSERT_FALSE(pool_->QueueOverloaded());
+  pool_->Wait();
+  ASSERT_FALSE(pool_->QueueOverloaded());
+
+  for (auto i = 0; i < 2 * kMaxThreads; ++i) {
+    ASSERT_OK(pool_->Submit([](){
+      SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs));
+    }));
+  }
+  ASSERT_FALSE(pool_->QueueOverloaded());
+  SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs + 10));
+  ASSERT_TRUE(pool_->QueueOverloaded());
+  // Should still be overloaded after first kMaxThreads tasks are processed.
+  SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs + 10));
+  ASSERT_TRUE(pool_->QueueOverloaded());
+  pool_->Wait();
+  ASSERT_FALSE(pool_->QueueOverloaded());
+
+  // Many instant tasks cannot make pool overloaded.
+  for (auto i = 0; i < kMaxThreads; ++i) {
+    ASSERT_OK(pool_->Submit([](){}));
+  }
+  ASSERT_FALSE(pool_->QueueOverloaded());
+  pool_->Wait();
+  // For for the threads to be shutdown due to inactivity.
+  SleepFor(MonoDelta::FromMilliseconds(2 * kIdleThreadTimeoutMs));
+  // Even if all threads are shutdown, an idle pool with empty queue should not
+  // be overloaded.
+  ASSERT_FALSE(pool_->QueueOverloaded());
+
+  // Shovel some light tasks once again: this should not overload the queue.
+  for (auto i = 0; i < 10 * kMaxThreads; ++i) {
+    ASSERT_OK(pool_->Submit([](){
+      SleepFor(MonoDelta::FromMilliseconds(1));
+    }));
+  }
+  ASSERT_FALSE(pool_->QueueOverloaded());
+  pool_->Wait();
+  ASSERT_FALSE(pool_->QueueOverloaded());
+
+  // Submit a bunch of instant tasks via a single token: the queue should not
+  // become overloaded.
+  {
+    unique_ptr<ThreadPoolToken> t = pool_->NewToken(
+        ThreadPool::ExecutionMode::SERIAL);
+    ASSERT_OK(t->Submit([](){}));
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+
+    for (auto i = 0; i < 100; ++i) {
+      ASSERT_OK(t->Submit([](){}));
+    }
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+  }
+
+  // Submit many instant tasks via multiple tokens (more than the maximum
+  // number of worker threads in a pool) and many lightweight tasks which can
+  // run concurrently: the queue should not become overloaded.
+  {
+    constexpr auto kNumTokens = 2 * kMaxThreads;
+    vector<unique_ptr<ThreadPoolToken>> tokens;
+    tokens.reserve(kNumTokens);
+    for (auto i = 0; i < kNumTokens; ++i) {
+      tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+    }
+
+    for (auto& t : tokens) {
+      for (auto i = 0; i < 50; ++i) {
+        ASSERT_OK(t->Submit([](){}));
+      }
+      for (auto i = 0; i < 10; ++i) {
+        ASSERT_OK(pool_->Submit([](){}));
+      }
+    }
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+  }
+
+  // Submit many long running tasks via serial tokens where the number of tokens
+  // is less than the maximum number of worker threads in the pool. The queue
+  // of the pool should not become overloaded since the pool has one spare
+  // thread to spawn.
+  {
+    constexpr auto kNumTokens = kMaxThreads - 1;
+    ASSERT_GT(kNumTokens, 0);
+    vector<unique_ptr<ThreadPoolToken>> tokens;
+    tokens.reserve(kNumTokens);
+    for (auto i = 0; i < kNumTokens; ++i) {
+      tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+    }
+
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    for (auto& t : tokens) {
+      for (auto i = 0; i < kMaxThreads; ++i) {
+        ASSERT_OK(t->Submit([](){
+          SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+        }));
+      }
+    }
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+  }
+
+  // Submit many long running tasks via serial tokens where the number of tokens
+  // is greater or equal to the maximum number of worker threads in the pool.
+  // The queue of the pool should become overloaded since the pool is running
+  // at its capacity and queue times are over the threshold.
+  {
+    constexpr auto kNumTokens = kMaxThreads;
+    vector<unique_ptr<ThreadPoolToken>> tokens;
+    tokens.reserve(kNumTokens);
+    for (auto i = 0; i < kNumTokens; ++i) {
+      tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+    }
+
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    for (auto& t : tokens) {
+      for (auto i = 0; i < kMaxThreads; ++i) {
+        ASSERT_OK(t->Submit([](){
+          SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+        }));
+      }
+    }
+    // Since there is exactly kMaxThreads serial pool tokens with tasks,
+    // the queue is empty most of the time. This is because active serial tokens
+    // are not kept in the queue. So, the status of the queue cannot be reliably
+    // determined by peeking at the submission times of the elements in the
+    // queue. Then the only way to detect overload of the queue is the history
+    // of queue times. The latter will reflect long queue times only after
+    // processing two tasks in each of the serial tokens. So, it's expected
+    // to get a stable report on the queue status only after two
+    // kQueueTimeThresholdMs time intervals.
+    SleepFor(MonoDelta::FromMilliseconds(2 * kQueueTimeThresholdMs));
+    ASSERT_TRUE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+  }
+
+  // A mixed case: submit many long running tasks via serial tokens where the
+  // number of tokens is less than the maximum number of worker threads in the
+  // pool and submit many instant tasks that can run concurrently.
+  {
+    constexpr auto kNumTokens = kMaxThreads - 1;
+    ASSERT_GT(kNumTokens, 0);
+    vector<unique_ptr<ThreadPoolToken>> tokens;
+    tokens.reserve(kNumTokens);
+    for (auto i = 0; i < kNumTokens; ++i) {
+      tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+    }
+
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    for (auto& t : tokens) {
+      for (auto i = 0; i < kMaxThreads; ++i) {
+        ASSERT_OK(t->Submit([](){
+          SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+        }));
+      }
+    }
+    ASSERT_FALSE(pool_->QueueOverloaded());
+
+    // Add several light tasks in addition to the scheduled serial ones. This
+    // should not overload the queue.
+    for (auto i = 0; i < 10; ++i) {
+      ASSERT_OK(pool_->Submit([](){
+        SleepFor(MonoDelta::FromMilliseconds(1));
+      }));
+    }
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+  }
+
+  // Another mixed case: submit many long running tasks via a serial token
+  // and many long running tasks that can run concurrently. The queue should
+  // become overloaded.
+  {
+    constexpr auto kNumTokens = 1;
+    vector<unique_ptr<ThreadPoolToken>> tokens;
+    tokens.reserve(kNumTokens);
+    for (auto i = 0; i < kNumTokens; ++i) {
+      tokens.emplace_back(pool_->NewToken(ThreadPool::ExecutionMode::SERIAL));
+    }
+
+    ASSERT_FALSE(pool_->QueueOverloaded());
+    for (auto& t : tokens) {
+      for (auto i = 0; i < kMaxThreads; ++i) {
+        ASSERT_OK(t->Submit([](){
+          SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+        }));
+      }
+    }
+    ASSERT_FALSE(pool_->QueueOverloaded());
+
+    // Add several light tasks in addition to the scheduled serial ones. This
+    // should not overload the queue.
+    for (auto i = 0; i < 2 * kMaxThreads; ++i) {
+      ASSERT_OK(pool_->Submit([](){
+        SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+      }));
+    }
+    SleepFor(MonoDelta::FromMilliseconds(kQueueTimeThresholdMs));
+    ASSERT_TRUE(pool_->QueueOverloaded());
+    pool_->Wait();
+    ASSERT_FALSE(pool_->QueueOverloaded());
+  }
+}
+
 // Test that a thread pool will crash if asked to run its own blocking
 // functions in a pool thread.
 //
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 9d8cc4a..6e2097d 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/util/threadpool.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <deque>
 #include <functional>
@@ -24,7 +25,6 @@
 #include <memory>
 #include <ostream>
 #include <string>
-#include <utility>
 
 #include <glog/logging.h>
 
@@ -90,6 +90,12 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) {
   return *this;
 }
 
+ThreadPoolBuilder& ThreadPoolBuilder::set_queue_overload_threshold(
+    const MonoDelta& threshold) {
+  queue_overload_threshold_ = threshold;
+  return *this;
+}
+
 Status ThreadPoolBuilder::Build(unique_ptr<ThreadPool>* pool) const {
   pool->reset(new ThreadPool(*this));
   return (*pool)->Init();
@@ -266,20 +272,20 @@ const char* ThreadPoolToken::StateToString(State s) {
 ////////////////////////////////////////////////////////
 
 ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
-  : name_(builder.name_),
-    min_threads_(builder.min_threads_),
-    max_threads_(builder.max_threads_),
-    max_queue_size_(builder.max_queue_size_),
-    idle_timeout_(builder.idle_timeout_),
-    pool_status_(Status::Uninitialized("The pool was not initialized.")),
-    idle_cond_(&lock_),
-    no_threads_cond_(&lock_),
-    num_threads_(0),
-    num_threads_pending_start_(0),
-    active_threads_(0),
-    total_queued_tasks_(0),
-    tokenless_(NewToken(ExecutionMode::CONCURRENT)),
-    metrics_(builder.metrics_) {
+    : name_(builder.name_),
+      min_threads_(builder.min_threads_),
+      max_threads_(builder.max_threads_),
+      max_queue_size_(builder.max_queue_size_),
+      idle_timeout_(builder.idle_timeout_),
+      pool_status_(Status::Uninitialized("The pool was not initialized.")),
+      idle_cond_(&lock_),
+      no_threads_cond_(&lock_),
+      num_threads_(0),
+      num_threads_pending_start_(0),
+      active_threads_(0),
+      total_queued_tasks_(0),
+      tokenless_(NewToken(ExecutionMode::CONCURRENT)),
+      metrics_(builder.metrics_) {
   string prefix = !builder.trace_metric_prefix_.empty() ?
       builder.trace_metric_prefix_ : builder.name_;
 
@@ -289,6 +295,11 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
       prefix + ".run_wall_time_us");
   run_cpu_time_trace_metric_name_ = TraceMetrics::InternName(
       prefix + ".run_cpu_time_us");
+
+  const auto& ovt = builder.queue_overload_threshold_;
+  if (ovt.Initialized() && ovt.ToNanoseconds() > 0) {
+    load_meter_.reset(new QueueLoadMeter(*this, ovt, max_threads_));
+  }
 }
 
 ThreadPool::~ThreadPool() {
@@ -397,6 +408,17 @@ unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics(
   return t;
 }
 
+bool ThreadPool::QueueOverloaded(MonoDelta* overloaded_time,
+                                 MonoDelta* threshold) const {
+  if (!load_meter_) {
+    return false;
+  }
+  if (threshold) {
+    *threshold = load_meter_->queue_time_threshold();
+  }
+  return load_meter_->overloaded(overloaded_time);
+}
+
 void ThreadPool::ReleaseToken(ThreadPoolToken* t) {
   MutexLock guard(lock_);
   CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released",
@@ -410,7 +432,7 @@ Status ThreadPool::Submit(std::function<void()> f) {
 
 Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
   DCHECK(token);
-  MonoTime submit_time = MonoTime::Now();
+  const MonoTime submit_time = MonoTime::Now();
 
   MutexLock guard(lock_);
   if (PREDICT_FALSE(!pool_status_.ok())) {
@@ -493,6 +515,7 @@ Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
     idle_threads_.front().not_empty.Signal();
     idle_threads_.pop_front();
   }
+  NotifyLoadMeterUnlocked();
   guard.Unlock();
 
   if (metrics_.queue_length_histogram) {
@@ -518,7 +541,6 @@ Status ThreadPool::DoSubmit(std::function<void()> f, ThreadPoolToken* token) {
     }
   }
 
-
   return Status::OK();
 }
 
@@ -570,6 +592,7 @@ void ThreadPool::DispatchThread() {
       //
       // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
       idle_threads_.push_front(me);
+      NotifyLoadMeterUnlocked();
       SCOPED_CLEANUP({
         // For some wake ups (i.e. Shutdown or DoSubmit) this thread is
         // guaranteed to be unlinked after being awakened. In others (i.e.
@@ -609,6 +632,10 @@ void ThreadPool::DispatchThread() {
     --total_queued_tasks_;
     ++active_threads_;
 
+    const MonoTime now(MonoTime::Now());
+    const MonoDelta queue_time = now - task.submit_time;
+    NotifyLoadMeterUnlocked(queue_time);
+
     unique_lock.Unlock();
 
     // Release the reference which was held by the queued item.
@@ -617,9 +644,8 @@ void ThreadPool::DispatchThread() {
       task.trace->Release();
     }
 
-    // Update metrics
-    MonoTime now(MonoTime::Now());
-    int64_t queue_time_us = (now - task.submit_time).ToMicroseconds();
+    // Update metrics.
+    const int64_t queue_time_us = queue_time.ToMicroseconds();
     TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us);
     if (metrics_.queue_time_us_histogram) {
       metrics_.queue_time_us_histogram->Increment(queue_time_us);
@@ -673,6 +699,15 @@ void ThreadPool::DispatchThread() {
         queue_.emplace_back(token);
       }
     }
+    if (!queue_.empty()) {
+      // If the queue is empty, the LoadMeter is notified on next iteration of
+      // the outer while() loop under the 'if (queue_.empty())' clause. Here
+      // it's crucial to call NotifyLoadMeter() _before_ decrementing
+      // 'active_threads_' to avoid reporting this thread as a spare one despite
+      // the fact that it will run a task from the non-empty queue immediately
+      // on next iteration of the outer while() loop.
+      NotifyLoadMeterUnlocked();
+    }
     if (--active_threads_ == 0) {
       idle_cond_.Broadcast();
     }
@@ -709,6 +744,134 @@ void ThreadPool::CheckNotPoolThreadUnlocked() {
   }
 }
 
+void ThreadPool::NotifyLoadMeterUnlocked(const MonoDelta& queue_time) {
+  if (!load_meter_) {
+    return;
+  }
+
+  lock_.AssertAcquired();
+  MonoTime queue_head_submit_time;
+  if (!queue_.empty()) {
+    DCHECK(!queue_.front()->entries_.empty());
+    queue_head_submit_time = queue_.front()->entries_.front().submit_time;
+  }
+  load_meter_->UpdateQueueInfoUnlocked(queue_time,
+                                       queue_head_submit_time,
+                                       active_threads_ < max_threads_);
+}
+
+////////////////////////////////////////////////////////
+// ThreadPool::QueueLoadMeter
+////////////////////////////////////////////////////////
+
+ThreadPool::QueueLoadMeter::QueueLoadMeter(
+    const ThreadPool& tpool,
+    const MonoDelta& queue_time_threshold,
+    size_t queue_time_history_length)
+    : tpool_(tpool),
+      queue_time_threshold_(queue_time_threshold),
+      queue_time_history_length_(queue_time_history_length),
+      queue_head_submit_time_(MonoTime()),
+      overloaded_since_(MonoTime()),
+      has_spare_thread_(true) {
+}
+
+bool ThreadPool::QueueLoadMeter::overloaded(MonoDelta* time_overloaded) {
+  // First, check whether the queue was overloaded upon the latest update
+  // on the queue from the ThreadPool's activity. If that's the case, there is
+  // no need to dig further.
+  MonoTime overloaded_since = overloaded_since_.load();
+  if (overloaded_since.Initialized()) {
+    if (time_overloaded) {
+      *time_overloaded = MonoTime::Now() - overloaded_since;
+    }
+    return true;
+  }
+
+  // Even if the queue was not overloaded upon the latest update on the
+  // ThreadPool's activity, the queue might have stalled because all its worker
+  // threads have been busy for long time. If so, 'overloaded_since_' hasn't
+  // been updated by the activity on the thread pool itself. However, it's
+  // possible to detect whether the queue has stalled by checking if the task
+  // at the head of the queue hasn't been dispatched for longer than
+  // queue_time_threshold_ time interval.
+  MonoTime queue_head_submit_time = queue_head_submit_time_.load();
+  if (!queue_head_submit_time.Initialized() || has_spare_thread_.load()) {
+    return false;
+  }
+
+  const auto now = MonoTime::Now();
+  const MonoDelta queue_time = now - queue_head_submit_time;
+  if (queue_time > queue_time_threshold_) {
+    MonoTime overloaded_since;
+    if (overloaded_since_.compare_exchange_strong(overloaded_since, now)) {
+      VLOG(3) << Substitute("state transition: normal --> overloaded");
+      if (time_overloaded) {
+        *time_overloaded = queue_time - queue_time_threshold_;
+      }
+      return true;
+    }
+    DCHECK(overloaded_since.Initialized());
+    if (time_overloaded) {
+      *time_overloaded = now - overloaded_since;
+    }
+    return true;
+  }
+
+  return false;
+}
+
+void ThreadPool::QueueLoadMeter::UpdateQueueInfoUnlocked(
+    const MonoDelta& task_queue_time,
+    const MonoTime& queue_head_submit_time,
+    bool has_spare_thread) {
+  tpool_.lock_.AssertAcquired();
+  if (task_queue_time.Initialized()) {
+    // TODO(aserbin): any better way of tracking the running minimum of N numbers?
+    queue_times_.emplace_back(task_queue_time);
+    queue_times_ordered_.insert(task_queue_time);
+    if (queue_times_.size() > queue_time_history_length_) {
+      const auto& elem = queue_times_.front();
+      auto it = queue_times_ordered_.find(elem);
+      DCHECK(it != queue_times_ordered_.end());
+      queue_times_ordered_.erase(it);
+      queue_times_.pop_front();
+    }
+    min_queue_time_ = *queue_times_.begin();
+  }
+  queue_head_submit_time_.store(queue_head_submit_time);
+  has_spare_thread_.store(has_spare_thread);
+
+  // Update the load state of the queue, storing appropriate information
+  // in the 'overloaded_since_' field.
+  const auto now = MonoTime::Now();
+  const bool queue_empty = !queue_head_submit_time.Initialized();
+  const auto queue_time = queue_empty
+      ? MonoDelta::FromSeconds(0) : now - queue_head_submit_time;
+  const auto min_queue_time = min_queue_time_.Initialized()
+      ? min_queue_time_ : MonoDelta::FromSeconds(0);
+  const bool was_overloaded = overloaded_since_.load().Initialized();
+  const bool overloaded = !has_spare_thread &&
+      std::max(min_queue_time, queue_time) > queue_time_threshold_;
+  // Track the state transitions and update overloaded_since_.
+  if (!was_overloaded && overloaded) {
+    VLOG(3) << Substitute("state transition: normal --> overloaded");
+    overloaded_since_.store(now);
+  } else if (was_overloaded && !overloaded) {
+    VLOG(3) << Substitute("state transition: overloaded --> normal");
+    overloaded_since_.store(MonoTime());
+  }
+  VLOG(4) << Substitute("state refreshed: overloaded since $0, queue $1, "
+                        "has $2 thread, queue head submit time $3, "
+                        "queue time $4, min queue time $5",
+                        overloaded_since_.load().ToString(),
+                        queue_empty ? "empty" : "not empty",
+                        has_spare_thread ? "spare" : "no spare",
+                        queue_head_submit_time.ToString(),
+                        queue_time.ToString(),
+                        min_queue_time.ToString());
+}
+
 std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
   return o << ThreadPoolToken::StateToString(s);
 }
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index a1037e8..403acbd 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -16,10 +16,13 @@
 // under the License.
 #pragma once
 
+#include <atomic>
+#include <cstddef>
 #include <deque>
 #include <functional>
 #include <iosfwd>
 #include <memory>
+#include <set>
 #include <string>
 #include <unordered_set>
 
@@ -43,6 +46,7 @@ class ThreadPool;
 class ThreadPoolToken;
 class Trace;
 
+
 // Interesting thread pool metrics. Can be applied to the entire pool (see
 // ThreadPoolBuilder) or to individual tokens.
 struct ThreadPoolMetrics {
@@ -112,6 +116,7 @@ class ThreadPoolBuilder {
   ThreadPoolBuilder& set_max_threads(int max_threads);
   ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
   ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout);
+  ThreadPoolBuilder& set_queue_overload_threshold(const MonoDelta& threshold);
   ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics);
 
   // Instantiate a new ThreadPool with the existing builder arguments.
@@ -125,6 +130,7 @@ class ThreadPoolBuilder {
   int max_threads_;
   int max_queue_size_;
   MonoDelta idle_timeout_;
+  MonoDelta queue_overload_threshold_;
   ThreadPoolMetrics metrics_;
 
   DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
@@ -215,6 +221,13 @@ class ThreadPool {
     return num_threads_ + num_threads_pending_start_;
   }
 
+  // Whether the ThreadPool's queue is overloaded. If queue overload threshold
+  // isn't set, returns 'false'. Otherwise, returns whether the queue is
+  // overloaded. If overloaded, 'overloaded_time' and 'threshold' are both
+  // populated with corresponding information (if not null).
+  bool QueueOverloaded(MonoDelta* overloaded_time = nullptr,
+                       MonoDelta* threshold = nullptr) const;
+
  private:
   FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum);
   FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);
@@ -231,6 +244,127 @@ class ThreadPool {
     MonoTime submit_time;
   };
 
+  // This utility class is used to track how busy the ThreadPool is by
+  // monitoring its task queue timings. This class uses logic modeled after
+  // CoDel algorithm (see [1], [2], [3]), detecting whether the queue
+  // of a thread pool with the maximum of M worker threads is overloaded
+  // (it's assumed M > 0). The idea is to keep an eye on the queue times when
+  // the thread pool works at its full capacity. Even if there aren't any idle
+  // threads, we don't want to declare the queue overloaded when it's still able
+  // to dispatch many lightweight tasks pretty fast once they arrived. Instead,
+  // we start declaring the queue overloaded only when we see the evidence of
+  // the newly arrived tasks being stuck in the queue for a time interval longer
+  // than the configured threshold.
+  //
+  // Let's denote the minimum of the queue times of the last N tasks dispatched
+  // by min(QT_historic(N)). If the history of already dispatched tasks is
+  // empty, min(QT_historic(N)) is defined to be 0. The time interval that the
+  // very first element of the queue has been waiting to be dispatched is
+  // denoted by QT_head. The queue time threshold is denoted by T_overload.
+  //
+  // With that, the criterion to detect the 'overloaded' state of a ThreadPool's
+  // queue is defined as the following:
+  //
+  //   all available worker threads are busy
+  //     AND
+  //   max(QT_head, min(QT_historic(M))) > T_overload
+  //
+  // The maximum number of worker threads (M) in a thread pool naturally
+  // provides the proper length of the queue time history. To illustrate, let's
+  // examine one edge case. It's a case of continuous periodic workload of
+  // batches of M tasks, where all M tasks in a batch are scheduled at the same
+  // time and batches of M tasks are separated by T_overload time interval.
+  // Every batch contain (M - 1) heavyweight tasks and a single lightweight one.
+  // Let's assume it takes T_overload to complete a heavyweight task, and a
+  // lightweight one completes instantly. With such a workload running against a
+  // thread pool, it's able to handle many extra lightweight tasks with
+  // resulting queue times well under T_overload threshold. Apparently, the
+  // queue should not be declared overloaded in such case. So, if the queue time
+  // history length were less than M (e.g. (M - 1)), then the pool would be
+  // considered overloaded if capturing a moment when all worker threads are
+  // busy handling a newly arrived batch of M tasks, assuming the thread pool
+  // has already handled at least two batches of tasks since its start (the
+  // history of queue times would be { 0, T_overload, ..., T_overload } repeated
+  // many times).
+  //
+  // From the other side, if the size of the queue time history were greater
+  // than M, it would include not-so-relevant information for some patterns
+  // of scheduled tasks.
+  //
+  // References:
+  //   [1] https://queue.acm.org/detail.cfm?id=2209336
+  //   [2] https://en.wikipedia.org/wiki/CoDel
+  //   [3] https://man7.org/linux/man-pages/man8/CoDel.8.html
+  class QueueLoadMeter {
+   public:
+    // Create an instance of QueueLoadMeter class. The pool to attach to
+    // is specified by the 'tpool' parameter. The 'queue_time_threshold'
+    // parameter corresponds to 'T_overload' parameter from the algorithm
+    // description above, and 'queue_time_history_length' corresponds to 'N',
+    // respectively.
+    explicit QueueLoadMeter(const ThreadPool& tpool,
+                            const MonoDelta& queue_time_threshold,
+                            size_t queue_time_history_length);
+
+    const MonoDelta& queue_time_threshold() const {
+      return queue_time_threshold_;
+    }
+
+    // Check whether the queue is overloaded. If the queue is not overloaded,
+    // this method returns 'false'. If the queue is overloaded, this method
+    // return 'true'. In the latter case, if 'time_overloaded' is not null,
+    // it is populated with the information on how long the queue has been
+    // in the overloaded state. This method is lock-free.
+    bool overloaded(MonoDelta* time_overloaded = nullptr);
+
+    // Notify the meter about updates on the task queue. If a task being
+    // dequeued, the queue time of the task dequeued is specified via the
+    // 'task_queue_time' parameter, otherwise it's not initialized.
+    // Non-initialized 'queue_head_submit_time' means there isn't next task
+    // in the queue. The 'has_spare_thread' parameter conveys information
+    // on whether a "spare" worker thread is available.
+    void UpdateQueueInfoUnlocked(const MonoDelta& task_queue_time,
+                                 const MonoTime& queue_head_submit_time,
+                                 bool has_spare_thread);
+   private:
+    // The pool this meter is attached to.
+    const ThreadPool& tpool_;
+
+    // The threshold for the minimum queue times when determining whether the
+    // thread pool's tasks queue is in overloaded state. This corresponds to the
+    // parameter 'T_overload' in the description of the algorithm above.
+    const MonoDelta queue_time_threshold_;
+
+    // The measurement window to track task queue times. That's the number
+    // of the most recent tasks to check for the queue times. This corresponds
+    // to the parameter 'N' in the description of the algorithm above.
+    const size_t queue_time_history_length_;
+
+    // Queue timings of the most recent samples. The size of these containers
+    // is kept under queue_time_history_length_ limit.
+    std::deque<MonoDelta> queue_times_;
+    std::multiset<MonoDelta> queue_times_ordered_;
+    MonoDelta min_queue_time_;
+
+    // Below fields are to store the latest snapshot of the information about
+    // the task queue of the pool the meter is attached to.
+
+    // Time when the next queue task was submitted. Set to empty MonoTime() if
+    // there isn't a single element in the queue (i.e. Initialized() returns
+    // false).
+    std::atomic<MonoTime> queue_head_submit_time_;
+
+    // Time when the queue has entered the overloaded state. If the queue isn't
+    // in overloaded state, this member field isn't initialized
+    // (i.e. overloaded_since.Initialized() returns false).
+    std::atomic<MonoTime> overloaded_since_;
+
+    // Whether the TreadPool has a least one "spare" thread: a thread that can
+    // be spawned before reaching the maximum allowed number of threads,
+    // or one of those already spawned but currently idle.
+    std::atomic<bool> has_spare_thread_;
+  };
+
   // Creates a new thread pool using a builder.
   explicit ThreadPool(const ThreadPoolBuilder& builder);
 
@@ -255,6 +389,22 @@ class ThreadPool {
   // Releases token 't' and invalidates it.
   void ReleaseToken(ThreadPoolToken* t);
 
+  // Notify the load meter (if enabled) on relevant updates. If no information
+  // on dequeued task is available, 'queue_time' should be omitted (or be an
+  // uninitialized MonoDelta instance).
+  //
+  // The LoadMeter should be notified about events which affect the criterion
+  // to evaluate the state of the queue (overloaded vs normal). The criterion
+  // uses the following information:
+  //   * queue time of a newly dequeued task
+  //   * availability of spare worker threads
+  //   * submit time of the task at the head of the queue
+  // This means that LoadMeter should be notified about the following events:
+  //  * a task at the head of the queue has been dispatched to be run
+  //  * a worker thread completes running a task
+  //  * a new task has been scheduled (i.e. added into the queue)
+  void NotifyLoadMeterUnlocked(const MonoDelta& queue_time = MonoDelta());
+
   const std::string name_;
   const int min_threads_;
   const int max_threads_;
@@ -341,6 +491,10 @@ class ThreadPool {
   // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
   std::unique_ptr<ThreadPoolToken> tokenless_;
 
+  // The meter to track whether the pool's queue is stalled/overloaded.
+  // It's nullptr/none if the queue overload threshold is unset.
+  std::unique_ptr<QueueLoadMeter> load_meter_;
+
   // Metrics for the entire thread pool.
   const ThreadPoolMetrics metrics_;
 


[kudu] 01/02: [tserver] add test to reproduce KUDU-1587 conditions

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit c6d438ab417009e8007a1de274178d0bcf0dfb63
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Aug 7 20:51:04 2020 -0700

    [tserver] add test to reproduce KUDU-1587 conditions
    
    Added a test to reproduce conditions described in KUDU-1587.
    As of now, the test is disabled: it will be enabled once
    KUDU-1587 is addressed.
    
    Change-Id: I515a1b26152680ee9b9361afcf84fec39b8f962d
    Reviewed-on: http://gerrit.cloudera.org:8080/16312
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/tserver/tablet_server-test.cc | 133 +++++++++++++++++++++++++++++++++
 1 file changed, 133 insertions(+)

diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 62de1d8..dcab726 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -73,6 +73,7 @@
 #include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"
@@ -139,9 +140,11 @@ using kudu::tablet::Tablet;
 using kudu::tablet::TabletReplica;
 using kudu::tablet::TabletStatePB;
 using kudu::tablet::TabletSuperBlockPB;
+using std::endl;
 using std::make_shared;
 using std::map;
 using std::pair;
+using std::ostringstream;
 using std::set;
 using std::shared_ptr;
 using std::string;
@@ -184,9 +187,11 @@ DECLARE_int32(maintenance_manager_num_threads);
 DECLARE_int32(maintenance_manager_polling_interval_ms);
 DECLARE_int32(memory_pressure_percentage);
 DECLARE_int32(metrics_retirement_age_ms);
+DECLARE_int32(rpc_service_queue_length);
 DECLARE_int32(scanner_batch_size_rows);
 DECLARE_int32(scanner_gc_check_interval_us);
 DECLARE_int32(scanner_ttl_ms);
+DECLARE_int32(tablet_inject_latency_on_apply_write_op_ms);
 DECLARE_int32(workload_stats_rate_collection_min_interval_ms);
 DECLARE_int32(workload_stats_metric_collection_interval_ms);
 DECLARE_string(block_manager);
@@ -199,6 +204,8 @@ METRIC_DECLARE_counter(log_block_manager_holes_punched);
 METRIC_DECLARE_counter(rows_inserted);
 METRIC_DECLARE_counter(rows_updated);
 METRIC_DECLARE_counter(rows_deleted);
+METRIC_DECLARE_counter(rpcs_queue_overflow);
+METRIC_DECLARE_counter(rpcs_timed_out_in_queue);
 METRIC_DECLARE_counter(scanners_expired);
 METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
 METRIC_DECLARE_gauge_uint64(log_block_manager_containers);
@@ -206,6 +213,9 @@ METRIC_DECLARE_gauge_size(active_scanners);
 METRIC_DECLARE_gauge_size(tablet_active_scanners);
 METRIC_DECLARE_gauge_size(num_rowsets_on_disk);
 METRIC_DECLARE_histogram(flush_dms_duration);
+METRIC_DECLARE_histogram(op_apply_queue_length);
+METRIC_DECLARE_histogram(op_apply_queue_time);
+
 
 namespace kudu {
 
@@ -4351,5 +4361,128 @@ TEST_F(TabletServerTest, TestStarvePerfImprovementOpsInColdTablet) {
   });
 }
 
+class OpApplyQueueTest : public TabletServerTestBase {
+ public:
+  // Starts the tablet server, override to start it later.
+  void SetUp() override {
+    // Since scenarios of this test make bursts of requests, set the maximum
+    // length of the service queue to accomodate many requests.
+    FLAGS_rpc_service_queue_length = 1000;
+    NO_FATALS(TabletServerTestBase::SetUp());
+    NO_FATALS(StartTabletServer(/*num_data_dirs=*/1));
+  }
+
+  static constexpr const int32_t kInjectedLatencyMs = 100;
+};
+
+// This is a regression test for KUDU-1587.
+// TODO(aserbin): enable the test once KUDU-1587 is addressed.
+TEST_F(OpApplyQueueTest, DISABLED_ApplyQueueBackpressure) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  constexpr size_t kNumCalls = 1000;
+  const int num_cpus = base::NumCPUs();
+  WriteRequestPB req;
+  req.set_tablet_id(kTabletId);
+  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
+
+  // Inject latency into WriteOp::Apply().
+  FLAGS_tablet_inject_latency_on_apply_write_op_ms = kInjectedLatencyMs;
+
+  // Send many calls to tablet server, not awaiting for responses.
+  // After sending the first chunk, wait for some time to allow for propagating
+  // the operations to the apply queue and to have apply queue times growing.
+  vector<unique_ptr<RpcController>> controllers;
+  vector<unique_ptr<WriteResponsePB>> responses;
+  CountDownLatch latch(kNumCalls);
+  for (size_t idx = 0; idx < kNumCalls; ++idx) {
+    controllers.emplace_back(new RpcController);
+    responses.emplace_back(new WriteResponsePB);
+
+    req.clear_row_operations();
+    auto* data = req.mutable_row_operations();
+    AddTestRowWithNullableStringToPB(
+        RowOperationsPB::INSERT, schema_, idx, idx, nullptr, data);
+    proxy_->AsyncRequest("Write", req, responses.back().get(), controllers.back().get(),
+                         [&latch]() { latch.CountDown(); });
+    if (idx == 10 * num_cpus) {
+      // Allow to realize what current queue times are once ops reached
+      // the apply queue.
+      SleepFor(MonoDelta::FromMilliseconds(2 * kInjectedLatencyMs));
+    }
+  }
+
+  // Wait for calls to be processed before capturing the apply queue stats.
+  latch.Wait();
+
+  size_t num_ok = 0;
+  size_t num_error = 0;
+  for (const auto& ctl : controllers) {
+    if (ctl->status().ok()) {
+      ++num_ok;
+    } else {
+      ++num_error;
+    }
+  }
+  ASSERT_EQ(kNumCalls, num_ok + num_error);
+
+  // Not all request should succeed -- due to long apply times, some should be
+  // rejected.
+  EXPECT_GT(num_error, 0);
+
+  {
+    // No RPC queue overflows are expected.
+    auto rpc_queue_overflows = METRIC_rpcs_queue_overflow.Instantiate(
+        mini_server_->server()->metric_entity());
+    ASSERT_EQ(0, rpc_queue_overflows->value());
+  }
+
+  {
+    // No requests should timeout while in the queue.
+    auto timed_out_in_rpc_queue = METRIC_rpcs_timed_out_in_queue.Instantiate(
+        mini_server_->server()->metric_entity());
+    ASSERT_EQ(0, timed_out_in_rpc_queue->value());
+  }
+
+  {
+    // Some calls should be rejected due to overloaded apply queue,
+    // so the corresponding queue times should not get too close to the value of
+    // total_request * injected_latency / number_of_apply_threads.
+    auto qt = METRIC_op_apply_queue_time.Instantiate(
+        mini_server_->server()->metric_entity());
+    ostringstream ostr;
+    ostr << qt->prototype()->name() << ":" << endl;
+    const auto* h = qt->histogram();
+    h->DumpHumanReadable(&ostr);
+    LOG(INFO) << ostr.str();
+
+    // These are simple heuristics rather than exact theoretical thresholds.
+    // They depend on the injected latency and the 'overloaded' threshold
+    // for the apply queue.
+    EXPECT_LT(h->MaxValue(),
+              kInjectedLatencyMs * 1000 * kNumCalls * 4 / (4 * num_cpus));
+    EXPECT_LT(h->ValueAtPercentile(99),
+              kInjectedLatencyMs * 1000 * kNumCalls * 3 / (4 * num_cpus));
+    EXPECT_LT(h->ValueAtPercentile(75),
+              kInjectedLatencyMs * 1000 * kNumCalls * 2 / (4 * num_cpus));
+  }
+
+  {
+    // With current apply latency, the queue should not get too long.
+    auto ql = METRIC_op_apply_queue_length.Instantiate(
+        mini_server_->server()->metric_entity());
+    ostringstream ostr;
+    ostr << ql->prototype()->name() << ":" << endl;
+    const auto* h = ql->histogram();
+    h->DumpHumanReadable(&ostr);
+    LOG(INFO) << ostr.str();
+
+    // These are simple heuristics as well. They depend on the injected latency
+    // and the 'overloaded' threshold for the apply queue.
+    EXPECT_LT(h->MaxValue(), 3 * kNumCalls / 4);
+    EXPECT_LT(h->MeanValue(), kNumCalls / 2);
+  }
+}
+
 } // namespace tserver
 } // namespace kudu