You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2018/01/17 23:27:13 UTC

kudu git commit: KUDU-1913: LIFO wake up ordering for threadpool worker threads

Repository: kudu
Updated Branches:
  refs/heads/master e453f7e5a -> bec5a200c


KUDU-1913: LIFO wake up ordering for threadpool worker threads

This patch changes the wake-up ordering for idle pool worker threads
to be LIFO. Previously all worker threads idled on a single
ConditionVariable which, by virtue of using pthread_cond_t under the hood,
was FIFO ordered.

In the abstract, FIFO ordering ensures a fair distribution of work amongst a
set of threads, but that's totally undesirable in a thread pool where the
goal is to get work done as quickly and with as few threads as possible. For
example, a fast stream of cheap tasks (e.g. RPCs) should be serviceable via
a single thread or something close to that.

The new unit test was looped 1000 times in TSAN mode with 8 stress threads
to shake out any flakes. Additionally, I ran through a variation of Todd's
test from a past code review [1]. I spun up three tservers, two of which had
failure detection disabled so that the third would get all of the leader
replicas. I created 240 tablets and looked at the size of the Raft thread
pool on that tserver. The steady state thread count was mildly lower at
first (4 vs. 8) but was much lower (4 vs. 30) after a SIGSTOP+SIGCONT cycle.

1. https://gerrit.cloudera.org/c/7331

Change-Id: I8036bf0d15f9ffcb3fa76579e3bfe0a340d38320
Reviewed-on: http://gerrit.cloudera.org:8080/9021
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bec5a200
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bec5a200
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bec5a200

Branch: refs/heads/master
Commit: bec5a200c0b7e5349a488a71985547820feebbdc
Parents: e453f7e
Author: Adar Dembo <ad...@cloudera.com>
Authored: Fri Jan 12 15:17:52 2018 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Jan 17 23:26:13 2018 +0000

----------------------------------------------------------------------
 src/kudu/util/test_util.cc       | 15 ++++++++++++--
 src/kudu/util/test_util.h        | 13 +++++++++---
 src/kudu/util/threadpool-test.cc | 37 ++++++++++++++++++++++++++++++++++
 src/kudu/util/threadpool.cc      | 38 +++++++++++++++++++++++++++++------
 src/kudu/util/threadpool.h       | 26 ++++++++++++++++++++----
 5 files changed, 114 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5a200/src/kudu/util/test_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/test_util.cc b/src/kudu/util/test_util.cc
index 744ef62..c5acbbb 100644
--- a/src/kudu/util/test_util.cc
+++ b/src/kudu/util/test_util.cc
@@ -234,7 +234,8 @@ string GetTestDataDirectory() {
 }
 
 void AssertEventually(const std::function<void(void)>& f,
-                      const MonoDelta& timeout) {
+                      const MonoDelta& timeout,
+                      AssertBackoff backoff) {
   const MonoTime deadline = MonoTime::Now() + timeout;
   {
     // Disable --gtest_break_on_failure, or else the assertion failures
@@ -265,7 +266,17 @@ void AssertEventually(const std::function<void(void)>& f,
       }
 
       // If they had failures, sleep and try again.
-      int sleep_ms = (attempts < 10) ? (1 << attempts) : 1000;
+      int sleep_ms;
+      switch (backoff) {
+        case AssertBackoff::EXPONENTIAL:
+          sleep_ms = (attempts < 10) ? (1 << attempts) : 1000;
+          break;
+        case AssertBackoff::NONE:
+          sleep_ms = 1;
+          break;
+        default:
+          LOG(FATAL) << "Unknown backoff type";
+      }
       SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
     }
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5a200/src/kudu/util/test_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/test_util.h b/src/kudu/util/test_util.h
index b279160..f79daa6 100644
--- a/src/kudu/util/test_util.h
+++ b/src/kudu/util/test_util.h
@@ -103,14 +103,21 @@ std::string GetTestDataDirectory();
 //     ASSERT_GT(ReadValueOfMetric(), 10);
 //   });
 //
-// The function is run in a loop with exponential backoff, capped at once
-// a second.
+// The function is run in a loop with optional back-off.
 //
 // To check whether AssertEventually() eventually succeeded, call
 // NO_PENDING_FATALS() afterward, or use ASSERT_EVENTUALLY() which performs
 // this check automatically.
+enum class AssertBackoff {
+  // Use exponential back-off while looping, capped at one second.
+  EXPONENTIAL,
+
+  // Sleep for a millisecond while looping.
+  NONE,
+};
 void AssertEventually(const std::function<void(void)>& f,
-                      const MonoDelta& timeout = MonoDelta::FromSeconds(30));
+                      const MonoDelta& timeout = MonoDelta::FromSeconds(30),
+                      AssertBackoff backoff = AssertBackoff::EXPONENTIAL);
 
 // Count the number of open file descriptors in use by this process.
 int CountOpenFds(Env* env);

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5a200/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index b92407b..23fc45c 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -901,4 +901,41 @@ TEST_F(ThreadPoolTest, TestTokenConcurrency) {
                           kSubmitThreads, total_num_tokens_submitted.load());
 }
 
+TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
+  const int kNumThreads = 10;
+
+  // Test with a pool that allows for kNumThreads concurrent threads.
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_threads(kNumThreads)));
+
+  // Submit kNumThreads slow tasks and unblock them, in order to produce
+  // kNumThreads worker threads.
+  CountDownLatch latch(1);
+  SCOPED_CLEANUP({
+    latch.CountDown();
+  });
+  for (int i = 0; i < kNumThreads; i++) {
+    ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  }
+  ASSERT_EQ(kNumThreads, pool_->num_threads());
+  latch.CountDown();
+  pool_->Wait();
+
+  // The kNumThreads threads are idle and waiting for the idle timeout.
+
+  // Submit a slow trickle of lightning fast tasks.
+  //
+  // If the threads are woken up in FIFO order, this trickle is enough to
+  // prevent all of them from idling and the AssertEventually will time out.
+  //
+  // If LIFO order is used, the same thread will be reused for each task and
+  // the other threads will eventually time out.
+  AssertEventually([&]() {
+    ASSERT_OK(pool_->SubmitFunc([](){}));
+    SleepFor(MonoDelta::FromMilliseconds(10));
+    ASSERT_EQ(1, pool_->num_threads());
+  }, MonoDelta::FromSeconds(10), AssertBackoff::NONE);
+  NO_PENDING_FATALS();
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5a200/src/kudu/util/threadpool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 59a0650..3d14766 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -35,6 +35,7 @@
 #include "kudu/gutil/sysinfo.h"
 #include "kudu/gutil/walltime.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/thread.h"
 #include "kudu/util/trace.h"
 #include "kudu/util/trace_metrics.h"
@@ -315,7 +316,6 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
     pool_status_(Status::Uninitialized("The pool was not initialized.")),
     idle_cond_(&lock_),
     no_threads_cond_(&lock_),
-    not_empty_(&lock_),
     num_threads_(0),
     num_threads_pending_start_(0),
     active_threads_(0),
@@ -400,7 +400,10 @@ void ThreadPool::Shutdown() {
   // of them to exit. Some worker threads will exit immediately upon waking,
   // while others will exit after they finish executing an outstanding task.
   total_queued_tasks_ = 0;
-  not_empty_.Broadcast();
+  while (!idle_threads_.empty()) {
+    idle_threads_.front().not_empty.Signal();
+    idle_threads_.pop_front();
+  }
   while (num_threads_ + num_threads_pending_start_ > 0) {
     no_threads_cond_.Wait();
   }
@@ -528,10 +531,18 @@ Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
   }
   int length_at_submit = total_queued_tasks_++;
 
+  // Wake up an idle thread for this task. Choosing the thread at the front of
+  // the list ensures LIFO semantics as idling threads are also added to the front.
+  //
+  // If there are no idle threads, the new task remains on the queue and is
+  // processed by an active thread (or a thread we're about to create) at some
+  // point in the future.
+  if (!idle_threads_.empty()) {
+    idle_threads_.front().not_empty.Signal();
+    idle_threads_.pop_front();
+  }
   guard.Unlock();
 
-  not_empty_.Signal();
-
   if (metrics_.queue_length_histogram) {
     metrics_.queue_length_histogram->Increment(length_at_submit);
   }
@@ -592,6 +603,9 @@ void ThreadPool::DispatchThread() {
   // a "permanent" thread.
   bool permanent = num_threads_ <= min_threads_;
 
+  // Owned by this worker thread and added/removed from idle_threads_ as needed.
+  IdleThread me(&lock_);
+
   while (true) {
     // Note: Status::Aborted() is used to indicate normal shutdown.
     if (!pool_status_.ok()) {
@@ -600,10 +614,22 @@ void ThreadPool::DispatchThread() {
     }
 
     if (queue_.empty()) {
+      // There's no work to do, let's go idle.
+      //
+      // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
+      idle_threads_.push_front(me);
+      SCOPED_CLEANUP({
+        // For some wake ups (i.e. Shutdown or DoSubmit) this thread is
+        // guaranteed to be unlinked after being awakened. In others (i.e.
+        // spurious wake-up or TimedWait timeout), it'll still be linked.
+        if (me.is_linked()) {
+          idle_threads_.erase(idle_threads_.iterator_to(me));
+        }
+      });
       if (permanent) {
-        not_empty_.Wait();
+        me.not_empty.Wait();
       } else {
-        if (!not_empty_.TimedWait(idle_timeout_)) {
+        if (!me.not_empty.TimedWait(idle_timeout_)) {
           // After much investigation, it appears that pthread condition variables have
           // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
           // another thread did in fact signal. Apparently after a timeout there is some

http://git-wip-us.apache.org/repos/asf/kudu/blob/bec5a200/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 392130f..f1a7a37 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -23,6 +23,8 @@
 #include <string>
 #include <unordered_set>
 
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/list_hook.hpp>
 #include <gtest/gtest_prod.h>
 
 #include "kudu/gutil/callback.h"
@@ -297,10 +299,6 @@ class ThreadPool {
   // num_threads_ and num_pending_threads_ are both 0.
   ConditionVariable no_threads_cond_;
 
-  // Condition variable for "queue is not empty". Waiters wake up when
-  // a new task is queued.
-  ConditionVariable not_empty_;
-
   // Number of threads currently running.
   //
   // Protected by lock_.
@@ -341,6 +339,26 @@ class ThreadPool {
   // Protected by lock_.
   std::unordered_set<Thread*> threads_;
 
+  // List of all threads currently waiting for work.
+  //
+  // A thread is added to the front of the list when it goes idle and is
+  // removed from the front and signaled when new work arrives. This produces a
+  // LIFO usage pattern that is more efficient than idling on a single
+  // ConditionVariable (which yields FIFO semantics).
+  //
+  // Protected by lock_.
+  struct IdleThread : public boost::intrusive::list_base_hook<> {
+    explicit IdleThread(Mutex* m)
+        : not_empty(m) {}
+
+    // Condition variable for "queue is not empty". Waiters wake up when a new
+    // task is queued.
+    ConditionVariable not_empty;
+
+    DISALLOW_COPY_AND_ASSIGN(IdleThread);
+  };
+  boost::intrusive::list<IdleThread> idle_threads_; // NOLINT(build/include_what_you_use)
+
   // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
   std::unique_ptr<ThreadPoolToken> tokenless_;