You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/12/01 02:00:34 UTC
[1/2] kudu git commit: Make threadpool-test less flaky
Repository: kudu
Updated Branches:
refs/heads/master b39491de6 -> c52cc1630
Make threadpool-test less flaky
Under heavy load, threadpool-test may take more than 10 seconds in rare
cases and cause a test failure. This change increases the timeout.
Also reliably disable the alarm, even if an assertion fires, using a
ScopedCleanup object.
Change-Id: I6465922e5b43dc9aff80fa2b6e6549061bfd0425
Reviewed-on: http://gerrit.cloudera.org:8080/5293
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/730bae3b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/730bae3b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/730bae3b
Branch: refs/heads/master
Commit: 730bae3b5828875dab818eac191f526ec61b6424
Parents: b39491d
Author: Mike Percy <mp...@apache.org>
Authored: Wed Nov 30 23:04:45 2016 +0000
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Dec 1 01:48:19 2016 +0000
----------------------------------------------------------------------
src/kudu/util/threadpool-test.cc | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/730bae3b/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index b6bcd55..2f8e2fa 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -25,6 +25,7 @@
#include "kudu/util/countdown_latch.h"
#include "kudu/util/metrics.h"
#include "kudu/util/promise.h"
+#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/trace.h"
@@ -161,7 +162,10 @@ TEST(TestThreadPool, TestThreadPoolWithNoMinimum) {
// Regression test for a bug where a task is submitted exactly
// as a thread is about to exit. Previously this could hang forever.
TEST(TestThreadPool, TestRace) {
- alarm(10);
+ alarm(60);
+ auto cleanup = MakeScopedCleanup([]() {
+ alarm(0); // Disable alarm on test exit.
+ });
MonoDelta idle_timeout = MonoDelta::FromMicroseconds(1);
gscoped_ptr<ThreadPool> thread_pool;
ASSERT_OK(ThreadPoolBuilder("test")
@@ -176,7 +180,6 @@ TEST(TestThreadPool, TestRace) {
// the bug.
SleepFor(MonoDelta::FromMicroseconds(i));
}
- alarm(0);
}
TEST(TestThreadPool, TestVariableSizeThreadPool) {
[2/2] kudu git commit: threadpool: Allow zero-size task queue
Posted by to...@apache.org.
threadpool: Allow zero-size task queue
Previously, our threadpool implementation's concept of max_queue_size
was not very useful, and one could not specify a zero-size queue, because
we took the meaning of queue size quite literally, thus leaking an
implementation detail: we use the task queue to hand off tasks from the
user's thread to our worker threads.
Now, max_queue_size is more intuitive: the user is allowed to submit
(max_queue_size + max_threads) tasks before new submissions are
rejected (assuming no task completes in the mean time).
In this paradigm, a zero-size queue is a useful thing. It implies that
the total number of tasks running or queued at a given time will never
exceed max_threads, which under typical circumstances means that no
successfully-submitted task is left waiting for an executor for very
long.
Added a new functional test for max_queue_size = 0 and updated an
existing queue-related test that is now deterministic instead of being
racy.
Change-Id: I5abf40473ee813c625e0a02232d714aab2e65109
Reviewed-on: http://gerrit.cloudera.org:8080/5275
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c52cc163
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c52cc163
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c52cc163
Branch: refs/heads/master
Commit: c52cc1630d76c123619e40578b82cb2c7e8560c9
Parents: 730bae3
Author: Mike Percy <mp...@apache.org>
Authored: Wed Nov 30 14:47:26 2016 +0000
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Dec 1 01:48:24 2016 +0000
----------------------------------------------------------------------
src/kudu/util/threadpool-test.cc | 32 +++++++++++++++++++++++++-------
src/kudu/util/threadpool.cc | 20 ++++++++++----------
2 files changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/c52cc163/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 2f8e2fa..444fbf4 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -223,20 +223,39 @@ TEST(TestThreadPool, TestMaxQueueSize) {
.set_max_queue_size(1).Build(&thread_pool));
CountDownLatch latch(1);
+ // We will be able to submit two tasks: one for max_threads == 1 and one for
+ // max_queue_size == 1.
+ ASSERT_OK(thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
ASSERT_OK(thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
Status s = thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
- // We race against the worker thread to re-enqueue.
- // If we get there first, we fail on the 2nd Submit().
- // If the worker dequeues first, we fail on the 3rd.
- if (s.ok()) {
- s = thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
- }
CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString();
latch.CountDown();
thread_pool->Wait();
thread_pool->Shutdown();
}
+// Test that when we specify a zero-sized queue, the maximum number of threads
+// running is used for enforcement.
+TEST(TestThreadPool, TestZeroQueueSize) {
+ gscoped_ptr<ThreadPool> thread_pool;
+ const int kMaxThreads = 4;
+ ASSERT_OK(ThreadPoolBuilder("test")
+ .set_max_queue_size(0)
+ .set_max_threads(kMaxThreads)
+ .Build(&thread_pool));
+
+ CountDownLatch latch(1);
+ for (int i = 0; i < kMaxThreads; i++) {
+ ASSERT_OK(thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+ }
+ Status s = thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
+ ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity");
+ latch.CountDown();
+ thread_pool->Wait();
+ thread_pool->Shutdown();
+}
+
// Test that setting a promise from another thread yields
// a value on the current thread.
TEST(TestThreadPool, TestPromises) {
@@ -252,7 +271,6 @@ TEST(TestThreadPool, TestPromises) {
thread_pool->Shutdown();
}
-
METRIC_DEFINE_entity(test_entity);
METRIC_DEFINE_histogram(test_entity, queue_length, "queue length",
MetricUnit::kTasks, "queue length", 1000, 1);
http://git-wip-us.apache.org/repos/asf/kudu/blob/c52cc163/src/kudu/util/threadpool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 00cd6f8..b2def9c 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -82,7 +82,6 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
}
ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
- CHECK_GT(max_queue_size, 0);
max_queue_size_ = max_queue_size;
return *this;
}
@@ -191,9 +190,12 @@ Status ThreadPool::Submit(const std::shared_ptr<Runnable>& task) {
}
// Size limit check.
- if (queue_size_ == max_queue_size_) {
- return Status::ServiceUnavailable(Substitute("Thread pool queue is full ($0 items)",
- queue_size_));
+ int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ +
+ static_cast<int64_t>(max_queue_size_) - queue_size_;
+ if (capacity_remaining < 1) {
+ return Status::ServiceUnavailable(
+ Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)",
+ num_threads_, max_threads_, queue_size_, max_queue_size_));
}
// Should we create another thread?
@@ -213,12 +215,11 @@ Status ThreadPool::Submit(const std::shared_ptr<Runnable>& task) {
if (num_threads_ == 0) {
// If we have no threads, we can't do any work.
return status;
- } else {
- // If we failed to create a thread, but there are still some other
- // worker threads, log a warning message and continue.
- LOG(WARNING) << "Thread pool failed to create thread: "
- << status.ToString();
}
+ // If we failed to create a thread, but there are still some other
+ // worker threads, log a warning message and continue.
+ LOG(ERROR) << "Thread pool failed to create thread: "
+ << status.ToString();
}
}
@@ -281,7 +282,6 @@ void ThreadPool::SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist)
run_time_us_histogram_ = hist;
}
-
void ThreadPool::DispatchThread(bool permanent) {
MutexLock unique_lock(lock_);
while (true) {