You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/06/27 23:03:45 UTC

[5/5] kudu git commit: threadpool: new test for pools with no max_threads

threadpool: new test for pools with no max_threads

This test ensures that a pool created with effectively no max_threads works
as expected. That is:
1. Tokenless tasks should trigger the creation of a new thread.
2. Serial token-based tasks can create new threads, but only up to the
   number of tokens submitted against.

I intend to use this "feature" to consolidate some Raft thread pools where a
num_cpus upper bound may be undesirable (i.e. where tasks submitted to the
pools may result in blocking IO).

Change-Id: I37d0bd0a05098bcc1a81ec9f1ac33e74e98781e4
Reviewed-on: http://gerrit.cloudera.org:8080/6945
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cbd2fb67
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cbd2fb67
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cbd2fb67

Branch: refs/heads/master
Commit: cbd2fb6730ec4f2fabc97fc60757c4ce2203d7b4
Parents: 428a34b
Author: Adar Dembo <ad...@cloudera.com>
Authored: Sun May 21 20:54:19 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Jun 27 22:55:10 2017 +0000

----------------------------------------------------------------------
 src/kudu/util/threadpool-test.cc | 82 +++++++++++++++++++++++++++--------
 src/kudu/util/threadpool.h       |  1 +
 2 files changed, 66 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/cbd2fb67/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 0beb962..a66dfde 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -30,6 +30,7 @@
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
 #include "kudu/util/barrier.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/locks.h"
@@ -157,6 +158,10 @@ class SlowTask : public Runnable {
     latch_->Wait();
   }
 
+  static shared_ptr<Runnable> NewSlowTask(CountDownLatch* latch) {
+    return std::make_shared<SlowTask>(latch);
+  }
+
  private:
   CountDownLatch* latch_;
 };
@@ -171,13 +176,16 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
   ASSERT_TRUE(pool_->num_threads_ == 0);
   // We get up to 3 threads when submitting work.
   CountDownLatch latch(1);
-  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
-  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  auto cleanup = MakeScopedCleanup([&]() {
+    latch.CountDown();
+  });
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
   ASSERT_EQ(2, pool_->num_threads_);
-  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
   ASSERT_EQ(3, pool_->num_threads_);
   // The 4th piece of work gets queued.
-  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
   ASSERT_EQ(3, pool_->num_threads_);
   // Finish all work
   latch.CountDown();
@@ -187,6 +195,46 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
   ASSERT_EQ(0, pool_->num_threads_);
 }
 
+TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
+  // By default a threadpool's max_threads is set to the number of CPUs, so
+  // this test submits more tasks than that to ensure that the number of CPUs
+  // isn't some kind of upper bound.
+  const int kNumCPUs = base::NumCPUs();
+
+  // Build a threadpool with no limit on the maximum number of threads.
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_threads(std::numeric_limits<int>::max())));
+  CountDownLatch latch(1);
+  auto cleanup_latch = MakeScopedCleanup([&]() {
+    latch.CountDown();
+  });
+
+  // Submit tokenless tasks. Each should create a new thread.
+  for (int i = 0; i < kNumCPUs * 2; i++) {
+    ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  }
+  ASSERT_EQ((kNumCPUs * 2), pool_->num_threads_);
+
+  // Submit tasks on two tokens. Only two threads should be created.
+  unique_ptr<ThreadPoolToken> t1 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+  unique_ptr<ThreadPoolToken> t2 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+  for (int i = 0; i < kNumCPUs * 2; i++) {
+    ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get();
+    ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+  }
+  ASSERT_EQ((kNumCPUs * 2) + 2, pool_->num_threads_);
+
+  // Submit more tokenless tasks. Each should create a new thread.
+  for (int i = 0; i < kNumCPUs; i++) {
+    ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  }
+  ASSERT_EQ((kNumCPUs * 3) + 2, pool_->num_threads_);
+
+  latch.CountDown();
+  pool_->Wait();
+  pool_->Shutdown();
+}
+
 // Regression test for a bug where a task is submitted exactly
 // as a thread is about to exit. Previously this could hang forever.
 TEST_F(ThreadPoolTest, TestRace) {
@@ -219,16 +267,16 @@ TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
   ASSERT_EQ(1, pool_->num_threads_);
   // We get up to 4 threads when submitting work.
   CountDownLatch latch(1);
-  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
   ASSERT_EQ(1, pool_->num_threads_);
-  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
   ASSERT_EQ(2, pool_->num_threads_);
-  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
   ASSERT_EQ(3, pool_->num_threads_);
-  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
   ASSERT_EQ(4, pool_->num_threads_);
   // The 5th piece of work gets queued.
-  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
   ASSERT_EQ(4, pool_->num_threads_);
   // Finish all work
   latch.CountDown();
@@ -247,9 +295,9 @@ TEST_F(ThreadPoolTest, TestMaxQueueSize) {
   CountDownLatch latch(1);
   // We will be able to submit two tasks: one for max_threads == 1 and one for
   // max_queue_size == 1.
-  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
-  ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
-  Status s = pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  Status s = pool_->Submit(SlowTask::NewSlowTask(&latch));
   CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString();
   latch.CountDown();
   pool_->Wait();
@@ -266,9 +314,9 @@ TEST_F(ThreadPoolTest, TestZeroQueueSize) {
 
   CountDownLatch latch(1);
   for (int i = 0; i < kMaxThreads; i++) {
-    ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+    ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
   }
-  Status s = pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
+  Status s = pool_->Submit(SlowTask::NewSlowTask(&latch));
   ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity");
   latch.CountDown();
@@ -629,9 +677,9 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) {
   });
   // We will be able to submit two tasks: one for max_threads == 1 and one for
   // max_queue_size == 1.
-  ASSERT_OK(t->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
-  ASSERT_OK(t->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
-  Status s = t->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
+  ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+  Status s = t->Submit(SlowTask::NewSlowTask(&latch));
   ASSERT_TRUE(s.IsServiceUnavailable());
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/cbd2fb67/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index e15c991..5abe0b4 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -217,6 +217,7 @@ class ThreadPool {
   void SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist);
 
  private:
+  FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMaxThreads);
   FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum);
   FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);