You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/12/01 02:00:34 UTC

[1/2] kudu git commit: Make threadpool-test less flaky

Repository: kudu
Updated Branches:
  refs/heads/master b39491de6 -> c52cc1630


Make threadpool-test less flaky

Under heavy load, threadpool-test may take more than 10 seconds in rare
cases and cause a test failure. This change increases the timeout.

Also reliably disable the alarm, even if an assertion fires, using a
ScopedCleanup object.

Change-Id: I6465922e5b43dc9aff80fa2b6e6549061bfd0425
Reviewed-on: http://gerrit.cloudera.org:8080/5293
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/730bae3b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/730bae3b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/730bae3b

Branch: refs/heads/master
Commit: 730bae3b5828875dab818eac191f526ec61b6424
Parents: b39491d
Author: Mike Percy <mp...@apache.org>
Authored: Wed Nov 30 23:04:45 2016 +0000
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Dec 1 01:48:19 2016 +0000

----------------------------------------------------------------------
 src/kudu/util/threadpool-test.cc | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/730bae3b/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index b6bcd55..2f8e2fa 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -25,6 +25,7 @@
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/promise.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/threadpool.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/trace.h"
@@ -161,7 +162,10 @@ TEST(TestThreadPool, TestThreadPoolWithNoMinimum) {
 // Regression test for a bug where a task is submitted exactly
 // as a thread is about to exit. Previously this could hang forever.
 TEST(TestThreadPool, TestRace) {
-  alarm(10);
+  alarm(60);
+  auto cleanup = MakeScopedCleanup([]() {
+    alarm(0); // Disable alarm on test exit.
+  });
   MonoDelta idle_timeout = MonoDelta::FromMicroseconds(1);
   gscoped_ptr<ThreadPool> thread_pool;
   ASSERT_OK(ThreadPoolBuilder("test")
@@ -176,7 +180,6 @@ TEST(TestThreadPool, TestRace) {
     // the bug.
     SleepFor(MonoDelta::FromMicroseconds(i));
   }
-  alarm(0);
 }
 
 TEST(TestThreadPool, TestVariableSizeThreadPool) {


[2/2] kudu git commit: threadpool: Allow zero-size task queue

Posted by to...@apache.org.
threadpool: Allow zero-size task queue

Previously, our threadpool implementation's concept of max_queue_size
was not very useful, and one could not specify a zero-size queue, because
we took the meaning of queue size quite literally, thus leaking an
implementation detail: we use the task queue to hand off tasks from the
user's thread to our worker threads.

Now, max_queue_size is more intuitive: the user is allowed to submit
(max_queue_size + max_threads) tasks before new submissions are
rejected (assuming no task completes in the mean time).

In this paradigm, a zero-size queue is a useful thing. It implies that
the total number of tasks running or queued at a given time will never
exceed max_threads, which under typical circumstances means that no
successfully-submitted task is left waiting for an executor for very
long.

Added a new functional test for max_queue_size = 0 and updated an
existing queue-related test that is now deterministic instead of being
racy.

Change-Id: I5abf40473ee813c625e0a02232d714aab2e65109
Reviewed-on: http://gerrit.cloudera.org:8080/5275
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c52cc163
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c52cc163
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c52cc163

Branch: refs/heads/master
Commit: c52cc1630d76c123619e40578b82cb2c7e8560c9
Parents: 730bae3
Author: Mike Percy <mp...@apache.org>
Authored: Wed Nov 30 14:47:26 2016 +0000
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Dec 1 01:48:24 2016 +0000

----------------------------------------------------------------------
 src/kudu/util/threadpool-test.cc | 32 +++++++++++++++++++++++++-------
 src/kudu/util/threadpool.cc      | 20 ++++++++++----------
 2 files changed, 35 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c52cc163/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool-test.cc b/src/kudu/util/threadpool-test.cc
index 2f8e2fa..444fbf4 100644
--- a/src/kudu/util/threadpool-test.cc
+++ b/src/kudu/util/threadpool-test.cc
@@ -223,20 +223,39 @@ TEST(TestThreadPool, TestMaxQueueSize) {
       .set_max_queue_size(1).Build(&thread_pool));
 
   CountDownLatch latch(1);
+  // We will be able to submit two tasks: one for max_threads == 1 and one for
+  // max_queue_size == 1.
+  ASSERT_OK(thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
   ASSERT_OK(thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
   Status s = thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
-  // We race against the worker thread to re-enqueue.
-  // If we get there first, we fail on the 2nd Submit().
-  // If the worker dequeues first, we fail on the 3rd.
-  if (s.ok()) {
-    s = thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
-  }
   CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString();
   latch.CountDown();
   thread_pool->Wait();
   thread_pool->Shutdown();
 }
 
+// Test that when we specify a zero-sized queue, the maximum number of threads
+// running is used for enforcement.
+TEST(TestThreadPool, TestZeroQueueSize) {
+  gscoped_ptr<ThreadPool> thread_pool;
+  const int kMaxThreads = 4;
+  ASSERT_OK(ThreadPoolBuilder("test")
+      .set_max_queue_size(0)
+      .set_max_threads(kMaxThreads)
+      .Build(&thread_pool));
+
+  CountDownLatch latch(1);
+  for (int i = 0; i < kMaxThreads; i++) {
+    ASSERT_OK(thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch))));
+  }
+  Status s = thread_pool->Submit(shared_ptr<Runnable>(new SlowTask(&latch)));
+  ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity");
+  latch.CountDown();
+  thread_pool->Wait();
+  thread_pool->Shutdown();
+}
+
 // Test that setting a promise from another thread yields
 // a value on the current thread.
 TEST(TestThreadPool, TestPromises) {
@@ -252,7 +271,6 @@ TEST(TestThreadPool, TestPromises) {
   thread_pool->Shutdown();
 }
 
-
 METRIC_DEFINE_entity(test_entity);
 METRIC_DEFINE_histogram(test_entity, queue_length, "queue length",
                         MetricUnit::kTasks, "queue length", 1000, 1);

http://git-wip-us.apache.org/repos/asf/kudu/blob/c52cc163/src/kudu/util/threadpool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.cc b/src/kudu/util/threadpool.cc
index 00cd6f8..b2def9c 100644
--- a/src/kudu/util/threadpool.cc
+++ b/src/kudu/util/threadpool.cc
@@ -82,7 +82,6 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
 }
 
 ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
-  CHECK_GT(max_queue_size, 0);
   max_queue_size_ = max_queue_size;
   return *this;
 }
@@ -191,9 +190,12 @@ Status ThreadPool::Submit(const std::shared_ptr<Runnable>& task) {
   }
 
   // Size limit check.
-  if (queue_size_ == max_queue_size_) {
-    return Status::ServiceUnavailable(Substitute("Thread pool queue is full ($0 items)",
-                                                 queue_size_));
+  int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ +
+                               static_cast<int64_t>(max_queue_size_) - queue_size_;
+  if (capacity_remaining < 1) {
+    return Status::ServiceUnavailable(
+        Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)",
+                   num_threads_, max_threads_, queue_size_, max_queue_size_));
   }
 
   // Should we create another thread?
@@ -213,12 +215,11 @@ Status ThreadPool::Submit(const std::shared_ptr<Runnable>& task) {
       if (num_threads_ == 0) {
         // If we have no threads, we can't do any work.
         return status;
-      } else {
-        // If we failed to create a thread, but there are still some other
-        // worker threads, log a warning message and continue.
-        LOG(WARNING) << "Thread pool failed to create thread: "
-                     << status.ToString();
       }
+      // If we failed to create a thread, but there are still some other
+      // worker threads, log a warning message and continue.
+      LOG(ERROR) << "Thread pool failed to create thread: "
+                 << status.ToString();
     }
   }
 
@@ -281,7 +282,6 @@ void ThreadPool::SetRunTimeMicrosHistogram(const scoped_refptr<Histogram>& hist)
   run_time_us_histogram_ = hist;
 }
 
-
 void ThreadPool::DispatchThread(bool permanent) {
   MutexLock unique_lock(lock_);
   while (true) {