You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/06/27 23:03:45 UTC
[5/5] kudu git commit: threadpool: new test for pools with no
max_threads
threadpool: new test for pools with no max_threads
This test ensures that a pool created with effectively no max_threads works
as expected. That is:
1. Tokenless tasks should trigger the creation of a new thread.
2. Serial token-based tasks can create new threads, but only up to the
number of tokens submitted against.
I intend to use this "feature" to consolidate some Raft thread pools where a
num_cpus upper bound may be undesirable (i.e. where tasks submitted to the
pools may result in blocking IO).
Change-Id: I37d0bd0a05098bcc1a81ec9f1ac33e74e98781e4
Reviewed-on: http://gerrit.cloudera.org:8080/6945
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cbd2fb67
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cbd2fb67
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cbd2fb67
Branch: refs/heads/master
Commit: cbd2fb6730ec4f2fabc97fc60757c4ce2203d7b4
Parents: 428a34b
Author: Adar Dembo <ad...@cloudera.com>
Authored: Sun May 21 20:54:19 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue Jun 27 22:55:10 2017 +0000
----------------------------------------------------------------------
src/kudu/util/threadpool-test.cc | 82 +++++++++++++++++++++++++++--------
src/kudu/util/threadpool.h | 1 +
2 files changed, 66 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/cbd2fb67/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 0beb962..a66dfde 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -30,6 +30,7 @@
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
#include "kudu/util/barrier.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/locks.h"
@@ -157,6 +158,10 @@ class SlowTask : public Runnable {
latch_->Wait();
}
+ static shared_ptr<Runnable> NewSlowTask(CountDownLatch* latch) {
+ return std::make_shared<SlowTask>(latch);
+ }
+
private:
CountDownLatch* latch_;
};
@@ -171,13 +176,16 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
ASSERT_TRUE(pool_->num_threads_ == 0);
// We get up to 3 threads when submitting work.
CountDownLatch latch(1);
- ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
- ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+ auto cleanup = MakeScopedCleanup([&]() {
+ latch.CountDown();
+ });
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(2, pool_->num_threads_);
- ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(3, pool_->num_threads_);
// The 4th piece of work gets queued.
- ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(3, pool_->num_threads_);
// Finish all work
latch.CountDown();
@@ -187,6 +195,46 @@ TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
ASSERT_EQ(0, pool_->num_threads_);
}
+TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
+ // By default a threadpool's max_threads is set to the number of CPUs, so
+ // this test submits more tasks than that to ensure that the number of CPUs
+ // isn't some kind of upper bound.
+ const int kNumCPUs = base::NumCPUs();
+
+ // Build a threadpool with no limit on the maximum number of threads.
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_max_threads(std::numeric_limits<int>::max())));
+ CountDownLatch latch(1);
+ auto cleanup_latch = MakeScopedCleanup([&]() {
+ latch.CountDown();
+ });
+
+ // Submit tokenless tasks. Each should create a new thread.
+ for (int i = 0; i < kNumCPUs * 2; i++) {
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ }
+ ASSERT_EQ((kNumCPUs * 2), pool_->num_threads_);
+
+ // Submit tasks on two tokens. Only two threads should be created.
+ unique_ptr<ThreadPoolToken> t1 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+ unique_ptr<ThreadPoolToken> t2 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+ for (int i = 0; i < kNumCPUs * 2; i++) {
+ ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get();
+ ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+ }
+ ASSERT_EQ((kNumCPUs * 2) + 2, pool_->num_threads_);
+
+ // Submit more tokenless tasks. Each should create a new thread.
+ for (int i = 0; i < kNumCPUs; i++) {
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ }
+ ASSERT_EQ((kNumCPUs * 3) + 2, pool_->num_threads_);
+
+ latch.CountDown();
+ pool_->Wait();
+ pool_->Shutdown();
+}
+
// Regression test for a bug where a task is submitted exactly
// as a thread is about to exit. Previously this could hang forever.
TEST_F(ThreadPoolTest, TestRace) {
@@ -219,16 +267,16 @@ TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
ASSERT_EQ(1, pool_->num_threads_);
// We get up to 4 threads when submitting work.
CountDownLatch latch(1);
- ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(1, pool_->num_threads_);
- ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(2, pool_->num_threads_);
- ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(3, pool_->num_threads_);
- ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(4, pool_->num_threads_);
// The 5th piece of work gets queued.
- ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
ASSERT_EQ(4, pool_->num_threads_);
// Finish all work
latch.CountDown();
@@ -247,9 +295,9 @@ TEST_F(ThreadPoolTest, TestMaxQueueSize) {
CountDownLatch latch(1);
// We will be able to submit two tasks: one for max_threads == 1 and one for
// max_queue_size == 1.
- ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
- ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
- Status s = pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ Status s = pool_->Submit(SlowTask::NewSlowTask(&latch));
CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString();
latch.CountDown();
pool_->Wait();
@@ -266,9 +314,9 @@ TEST_F(ThreadPoolTest, TestZeroQueueSize) {
CountDownLatch latch(1);
for (int i = 0; i < kMaxThreads; i++) {
- ASSERT_OK(pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
}
- Status s = pool_->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
+ Status s = pool_->Submit(SlowTask::NewSlowTask(&latch));
ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity");
latch.CountDown();
@@ -629,9 +677,9 @@ TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) {
});
// We will be able to submit two tasks: one for max_threads == 1 and one for
// max_queue_size == 1.
- ASSERT_OK(t->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
- ASSERT_OK(t->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
- Status s = t->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
+ ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+ Status s = t->Submit(SlowTask::NewSlowTask(&latch));
ASSERT_TRUE(s.IsServiceUnavailable());
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/cbd2fb67/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index e15c991..5abe0b4 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -217,6 +217,7 @@ class ThreadPool {
void SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist);
private:
+ FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMaxThreads);
FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum);
FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);