You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:15 UTC
[03/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadpool-test.cc b/be/src/kudu/util/threadpool-test.cc
new file mode 100644
index 0000000..23fc45c
--- /dev/null
+++ b/be/src/kudu/util/threadpool-test.cc
@@ -0,0 +1,941 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <unistd.h>
+
+#include <atomic>
+#include <cstdint>
+#include <iterator>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <boost/bind.hpp> // IWYU pragma: keep
+#include <boost/smart_ptr/shared_ptr.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/bind_helpers.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/promise.h"
+#include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+using std::atomic;
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+using strings::Substitute;
+
+DECLARE_int32(thread_inject_start_latency_ms);
+
+namespace kudu {
+
+static const char* kDefaultPoolName = "test";
+
+class ThreadPoolTest : public KuduTest {
+ public:
+
+ virtual void SetUp() override {
+ KuduTest::SetUp();
+ ASSERT_OK(ThreadPoolBuilder(kDefaultPoolName).Build(&pool_));
+ }
+
+ Status RebuildPoolWithBuilder(const ThreadPoolBuilder& builder) {
+ return builder.Build(&pool_);
+ }
+
+ Status RebuildPoolWithMinMax(int min_threads, int max_threads) {
+ return ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(min_threads)
+ .set_max_threads(max_threads)
+ .Build(&pool_);
+ }
+
+ protected:
+ gscoped_ptr<ThreadPool> pool_;
+};
+
+TEST_F(ThreadPoolTest, TestNoTaskOpenClose) {
+ ASSERT_OK(RebuildPoolWithMinMax(4, 4));
+ pool_->Shutdown();
+}
+
+static void SimpleTaskMethod(int n, Atomic32 *counter) {
+ while (n--) {
+ base::subtle::NoBarrier_AtomicIncrement(counter, 1);
+ boost::detail::yield(n);
+ }
+}
+
+class SimpleTask : public Runnable {
+ public:
+ SimpleTask(int n, Atomic32 *counter)
+ : n_(n), counter_(counter) {
+ }
+
+ void Run() OVERRIDE {
+ SimpleTaskMethod(n_, counter_);
+ }
+
+ private:
+ int n_;
+ Atomic32 *counter_;
+};
+
+TEST_F(ThreadPoolTest, TestSimpleTasks) {
+ ASSERT_OK(RebuildPoolWithMinMax(4, 4));
+
+ Atomic32 counter(0);
+ std::shared_ptr<Runnable> task(new SimpleTask(15, &counter));
+
+ ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 10, &counter)));
+ ASSERT_OK(pool_->Submit(task));
+ ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 20, &counter)));
+ ASSERT_OK(pool_->Submit(task));
+ ASSERT_OK(pool_->SubmitClosure(Bind(&SimpleTaskMethod, 123, &counter)));
+ pool_->Wait();
+ ASSERT_EQ(10 + 15 + 20 + 15 + 123, base::subtle::NoBarrier_Load(&counter));
+ pool_->Shutdown();
+}
+
+static void IssueTraceStatement() {
+ TRACE("hello from task");
+}
+
+// Test that the thread-local trace is propagated to tasks
+// submitted to the threadpool.
+TEST_F(ThreadPoolTest, TestTracePropagation) {
+ ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+
+ scoped_refptr<Trace> t(new Trace);
+ {
+ ADOPT_TRACE(t.get());
+ ASSERT_OK(pool_->SubmitFunc(&IssueTraceStatement));
+ }
+ pool_->Wait();
+ ASSERT_STR_CONTAINS(t->DumpToString(), "hello from task");
+}
+
+TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
+ ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+ pool_->Shutdown();
+ Status s = pool_->SubmitFunc(&IssueTraceStatement);
+ ASSERT_EQ("Service unavailable: The pool has been shut down.",
+ s.ToString());
+}
+
+class SlowTask : public Runnable {
+ public:
+ explicit SlowTask(CountDownLatch* latch)
+ : latch_(latch) {
+ }
+
+ void Run() OVERRIDE {
+ latch_->Wait();
+ }
+
+ static shared_ptr<Runnable> NewSlowTask(CountDownLatch* latch) {
+ return std::make_shared<SlowTask>(latch);
+ }
+
+ private:
+ CountDownLatch* latch_;
+};
+
+TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(0)
+ .set_max_threads(3)
+ .set_idle_timeout(MonoDelta::FromMilliseconds(1))));
+
+ // There are no threads to start with.
+ ASSERT_TRUE(pool_->num_threads() == 0);
+ // We get up to 3 threads when submitting work.
+ CountDownLatch latch(1);
+ SCOPED_CLEANUP({
+ latch.CountDown();
+ });
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_EQ(2, pool_->num_threads());
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_EQ(3, pool_->num_threads());
+ // The 4th piece of work gets queued.
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_EQ(3, pool_->num_threads());
+ // Finish all work
+ latch.CountDown();
+ pool_->Wait();
+ ASSERT_EQ(0, pool_->active_threads_);
+ pool_->Shutdown();
+ ASSERT_EQ(0, pool_->num_threads());
+}
+
+TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
+ // By default a threadpool's max_threads is set to the number of CPUs, so
+ // this test submits more tasks than that to ensure that the number of CPUs
+ // isn't some kind of upper bound.
+ const int kNumCPUs = base::NumCPUs();
+
+ // Build a threadpool with no limit on the maximum number of threads.
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_max_threads(std::numeric_limits<int>::max())));
+ CountDownLatch latch(1);
+ auto cleanup_latch = MakeScopedCleanup([&]() {
+ latch.CountDown();
+ });
+
+ // Submit tokenless tasks. Each should create a new thread.
+ for (int i = 0; i < kNumCPUs * 2; i++) {
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ }
+ ASSERT_EQ((kNumCPUs * 2), pool_->num_threads());
+
+ // Submit tasks on two tokens. Only two threads should be created.
+ unique_ptr<ThreadPoolToken> t1 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+ unique_ptr<ThreadPoolToken> t2 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+ for (int i = 0; i < kNumCPUs * 2; i++) {
+ ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get();
+ ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+ }
+ ASSERT_EQ((kNumCPUs * 2) + 2, pool_->num_threads());
+
+ // Submit more tokenless tasks. Each should create a new thread.
+ for (int i = 0; i < kNumCPUs; i++) {
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ }
+ ASSERT_EQ((kNumCPUs * 3) + 2, pool_->num_threads());
+
+ latch.CountDown();
+ pool_->Wait();
+ pool_->Shutdown();
+}
+
+// Regression test for a bug where a task is submitted exactly
+// as a thread is about to exit. Previously this could hang forever.
+TEST_F(ThreadPoolTest, TestRace) {
+ alarm(60);
+ auto cleanup = MakeScopedCleanup([]() {
+ alarm(0); // Disable alarm on test exit.
+ });
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(0)
+ .set_max_threads(1)
+ .set_idle_timeout(MonoDelta::FromMicroseconds(1))));
+
+ for (int i = 0; i < 500; i++) {
+ CountDownLatch l(1);
+ ASSERT_OK(pool_->SubmitFunc(boost::bind(&CountDownLatch::CountDown, &l)));
+ l.Wait();
+ // Sleeping a different amount in each iteration makes it more likely to hit
+ // the bug.
+ SleepFor(MonoDelta::FromMicroseconds(i));
+ }
+}
+
+TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(1)
+ .set_max_threads(4)
+ .set_idle_timeout(MonoDelta::FromMilliseconds(1))));
+
+ // There is 1 thread to start with.
+ ASSERT_EQ(1, pool_->num_threads());
+ // We get up to 4 threads when submitting work.
+ CountDownLatch latch(1);
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_EQ(1, pool_->num_threads());
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_EQ(2, pool_->num_threads());
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_EQ(3, pool_->num_threads());
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_EQ(4, pool_->num_threads());
+ // The 5th piece of work gets queued.
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_EQ(4, pool_->num_threads());
+ // Finish all work
+ latch.CountDown();
+ pool_->Wait();
+ ASSERT_EQ(0, pool_->active_threads_);
+ pool_->Shutdown();
+ ASSERT_EQ(0, pool_->num_threads());
+}
+
+TEST_F(ThreadPoolTest, TestMaxQueueSize) {
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(1)
+ .set_max_threads(1)
+ .set_max_queue_size(1)));
+
+ CountDownLatch latch(1);
+ // We will be able to submit two tasks: one for max_threads == 1 and one for
+ // max_queue_size == 1.
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ Status s = pool_->Submit(SlowTask::NewSlowTask(&latch));
+ CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString();
+ latch.CountDown();
+ pool_->Wait();
+ pool_->Shutdown();
+}
+
+// Test that when we specify a zero-sized queue, the maximum number of threads
+// running is used for enforcement.
+TEST_F(ThreadPoolTest, TestZeroQueueSize) {
+ const int kMaxThreads = 4;
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_max_queue_size(0)
+ .set_max_threads(kMaxThreads)));
+
+ CountDownLatch latch(1);
+ for (int i = 0; i < kMaxThreads; i++) {
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ }
+ Status s = pool_->Submit(SlowTask::NewSlowTask(&latch));
+ ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity");
+ latch.CountDown();
+ pool_->Wait();
+ pool_->Shutdown();
+}
+
+// Regression test for KUDU-2187:
+//
+// If a threadpool thread is slow to start up, it shouldn't block progress of
+// other tasks on the same pool.
+TEST_F(ThreadPoolTest, TestSlowThreadStart) {
+ // Start a pool of threads from which we'll submit tasks.
+ gscoped_ptr<ThreadPool> submitter_pool;
+ ASSERT_OK(ThreadPoolBuilder("submitter")
+ .set_min_threads(5)
+ .set_max_threads(5)
+ .Build(&submitter_pool));
+
+ // Start the actual test pool, which starts with one thread
+ // but will start a second one on-demand.
+ ASSERT_OK(RebuildPoolWithMinMax(1, 2));
+ // Ensure that the second thread will take a long time to start.
+ FLAGS_thread_inject_start_latency_ms = 3000;
+
+ // Now submit 10 tasks to the 'submitter' pool, each of which
+ // submits a single task to 'pool_'. The 'pool_' task sleeps
+ // for 10ms.
+ //
+ // Because the 'submitter' tasks submit faster than they can be
+ // processed on a single thread (due to the sleep), we expect that
+ // this will trigger 'pool_' to start up its second worker thread.
+ // The thread startup will have some latency injected.
+ //
+ // We expect that the thread startup will block only one of the
+ // tasks in the 'submitter' pool after it submits its task. Other
+ // tasks will continue to be processed by the other (already-running)
+ // thread on 'pool_'.
+ std::atomic<int32_t> total_queue_time_ms(0);
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(submitter_pool->SubmitFunc([&]() {
+ auto submit_time = MonoTime::Now();
+ CHECK_OK(pool_->SubmitFunc([&,submit_time]() {
+ auto queue_time = MonoTime::Now() - submit_time;
+ total_queue_time_ms += queue_time.ToMilliseconds();
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }));
+ }));
+ }
+ submitter_pool->Wait();
+ pool_->Wait();
+
+ // Since the total amount of work submitted was only 100ms, we expect
+ // that the performance would be equivalent to a single-threaded
+ // threadpool. So, we expect the total queue time to be approximately
+ // 0 + 10 + 20 ... + 80 + 90 = 450ms.
+ //
+ // If, instead, throughput had been blocked while starting threads,
+ // we'd get something closer to 18000ms (3000ms delay * 5 submitter threads).
+ ASSERT_GE(total_queue_time_ms, 400);
+ ASSERT_LE(total_queue_time_ms, 10000);
+}
+
+// Test that setting a promise from another thread yields
+// a value on the current thread.
+TEST_F(ThreadPoolTest, TestPromises) {
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(1)
+ .set_max_threads(1)
+ .set_max_queue_size(1)));
+
+ Promise<int> my_promise;
+ ASSERT_OK(pool_->SubmitClosure(
+ Bind(&Promise<int>::Set, Unretained(&my_promise), 5)));
+ ASSERT_EQ(5, my_promise.Get());
+ pool_->Shutdown();
+}
+
+METRIC_DEFINE_entity(test_entity);
+METRIC_DEFINE_histogram(test_entity, queue_length, "queue length",
+ MetricUnit::kTasks, "queue length", 1000, 1);
+
+METRIC_DEFINE_histogram(test_entity, queue_time, "queue time",
+ MetricUnit::kMicroseconds, "queue time", 1000000, 1);
+
+METRIC_DEFINE_histogram(test_entity, run_time, "run time",
+ MetricUnit::kMicroseconds, "run time", 1000, 1);
+
+TEST_F(ThreadPoolTest, TestMetrics) {
+ MetricRegistry registry;
+ vector<ThreadPoolMetrics> all_metrics;
+ for (int i = 0; i < 3; i++) {
+ scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate(
+ ®istry, Substitute("test $0", i));
+ all_metrics.emplace_back(ThreadPoolMetrics{
+ METRIC_queue_length.Instantiate(entity),
+ METRIC_queue_time.Instantiate(entity),
+ METRIC_run_time.Instantiate(entity)
+ });
+ }
+
+ // Enable metrics for the thread pool.
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(1)
+ .set_max_threads(1)
+ .set_metrics(all_metrics[0])));
+
+ unique_ptr<ThreadPoolToken> t1 = pool_->NewTokenWithMetrics(
+ ThreadPool::ExecutionMode::SERIAL, all_metrics[1]);
+ unique_ptr<ThreadPoolToken> t2 = pool_->NewTokenWithMetrics(
+ ThreadPool::ExecutionMode::SERIAL, all_metrics[2]);
+
+ // Submit once to t1, twice to t2, and three times without a token.
+ ASSERT_OK(t1->SubmitFunc([](){}));
+ ASSERT_OK(t2->SubmitFunc([](){}));
+ ASSERT_OK(t2->SubmitFunc([](){}));
+ ASSERT_OK(pool_->SubmitFunc([](){}));
+ ASSERT_OK(pool_->SubmitFunc([](){}));
+ ASSERT_OK(pool_->SubmitFunc([](){}));
+ pool_->Wait();
+
+ // The total counts should reflect the number of submissions to each token.
+ ASSERT_EQ(1, all_metrics[1].queue_length_histogram->TotalCount());
+ ASSERT_EQ(1, all_metrics[1].queue_time_us_histogram->TotalCount());
+ ASSERT_EQ(1, all_metrics[1].run_time_us_histogram->TotalCount());
+ ASSERT_EQ(2, all_metrics[2].queue_length_histogram->TotalCount());
+ ASSERT_EQ(2, all_metrics[2].queue_time_us_histogram->TotalCount());
+ ASSERT_EQ(2, all_metrics[2].run_time_us_histogram->TotalCount());
+
+ // And the counts on the pool-wide metrics should reflect all submissions.
+ ASSERT_EQ(6, all_metrics[0].queue_length_histogram->TotalCount());
+ ASSERT_EQ(6, all_metrics[0].queue_time_us_histogram->TotalCount());
+ ASSERT_EQ(6, all_metrics[0].run_time_us_histogram->TotalCount());
+}
+
+// Test that a thread pool will crash if asked to run its own blocking
+// functions in a pool thread.
+//
+// In a multi-threaded application, TSAN is unsafe to use following a fork().
+// After a fork(), TSAN will:
+// 1. Disable verification, expecting an exec() soon anyway, and
+// 2. Die on future thread creation.
+// For some reason, this test triggers behavior #2. We could disable it with
+// the TSAN option die_after_fork=0, but this can (supposedly) lead to
+// deadlocks, so we'll disable the entire test instead.
+#ifndef THREAD_SANITIZER
+TEST_F(ThreadPoolTest, TestDeadlocks) {
+ const char* death_msg = "called pool function that would result in deadlock";
+ ASSERT_DEATH({
+ ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+ ASSERT_OK(pool_->SubmitClosure(
+ Bind(&ThreadPool::Shutdown, Unretained(pool_.get()))));
+ pool_->Wait();
+ }, death_msg);
+
+ ASSERT_DEATH({
+ ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+ ASSERT_OK(pool_->SubmitClosure(
+ Bind(&ThreadPool::Wait, Unretained(pool_.get()))));
+ pool_->Wait();
+ }, death_msg);
+}
+#endif
+
+class SlowDestructorRunnable : public Runnable {
+ public:
+ void Run() override {}
+
+ virtual ~SlowDestructorRunnable() {
+ SleepFor(MonoDelta::FromMilliseconds(100));
+ }
+};
+
+// Test that if a tasks's destructor is slow, it doesn't cause serialization of the tasks
+// in the queue.
+TEST_F(ThreadPoolTest, TestSlowDestructor) {
+ ASSERT_OK(RebuildPoolWithMinMax(1, 20));
+ MonoTime start = MonoTime::Now();
+ for (int i = 0; i < 100; i++) {
+ shared_ptr<Runnable> task(new SlowDestructorRunnable());
+ ASSERT_OK(pool_->Submit(std::move(task)));
+ }
+ pool_->Wait();
+ ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5);
+}
+
+// For test cases that should run with both kinds of tokens.
+class ThreadPoolTestTokenTypes : public ThreadPoolTest,
+ public testing::WithParamInterface<ThreadPool::ExecutionMode> {};
+
+INSTANTIATE_TEST_CASE_P(Tokens, ThreadPoolTestTokenTypes,
+ ::testing::Values(ThreadPool::ExecutionMode::SERIAL,
+ ThreadPool::ExecutionMode::CONCURRENT));
+
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitAndWait) {
+ unique_ptr<ThreadPoolToken> t = pool_->NewToken(GetParam());
+ int i = 0;
+ ASSERT_OK(t->SubmitFunc([&]() {
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ i++;
+ }));
+ t->Wait();
+ ASSERT_EQ(1, i);
+}
+
+TEST_F(ThreadPoolTest, TestTokenSubmitsProcessedSerially) {
+ unique_ptr<ThreadPoolToken> t = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+ Random r(SeedRandom());
+ string result;
+ for (char c = 'a'; c < 'f'; c++) {
+ // Sleep a little first so that there's a higher chance of out-of-order
+ // appends if the submissions did execute in parallel.
+ int sleep_ms = r.Next() % 5;
+ ASSERT_OK(t->SubmitFunc([&result, c, sleep_ms]() {
+ SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ result += c;
+ }));
+ }
+ t->Wait();
+ ASSERT_EQ("abcde", result);
+}
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitsProcessedConcurrently) {
+ const int kNumTokens = 5;
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_max_threads(kNumTokens)));
+ vector<unique_ptr<ThreadPoolToken>> tokens;
+
+ // A violation to the tested invariant would yield a deadlock, so let's set
+ // up an alarm to bail us out.
+ alarm(60);
+ SCOPED_CLEANUP({
+ alarm(0); // Disable alarm on test exit.
+ });
+ shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumTokens + 1);
+ for (int i = 0; i < kNumTokens; i++) {
+ tokens.emplace_back(pool_->NewToken(GetParam()));
+ ASSERT_OK(tokens.back()->SubmitFunc([b]() {
+ b->Wait();
+ }));
+ }
+
+ // This will deadlock if the above tasks weren't all running concurrently.
+ b->Wait();
+}
+
+TEST_F(ThreadPoolTest, TestTokenSubmitsNonSequential) {
+ const int kNumSubmissions = 5;
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_max_threads(kNumSubmissions)));
+
+ // A violation to the tested invariant would yield a deadlock, so let's set
+ // up an alarm to bail us out.
+ alarm(60);
+ SCOPED_CLEANUP({
+ alarm(0); // Disable alarm on test exit.
+ });
+ shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumSubmissions + 1);
+ unique_ptr<ThreadPoolToken> t = pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
+ for (int i = 0; i < kNumSubmissions; i++) {
+ ASSERT_OK(t->SubmitFunc([b]() {
+ b->Wait();
+ }));
+ }
+
+ // This will deadlock if the above tasks weren't all running concurrently.
+ b->Wait();
+}
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) {
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_max_threads(4)));
+
+ unique_ptr<ThreadPoolToken> t1(pool_->NewToken(GetParam()));
+ unique_ptr<ThreadPoolToken> t2(pool_->NewToken(GetParam()));
+ CountDownLatch l1(1);
+ CountDownLatch l2(1);
+
+ // A violation to the tested invariant would yield a deadlock, so let's set
+ // up an alarm to bail us out.
+ alarm(60);
+ SCOPED_CLEANUP({
+ alarm(0); // Disable alarm on test exit.
+ });
+
+ for (int i = 0; i < 3; i++) {
+ ASSERT_OK(t1->SubmitFunc([&]() {
+ l1.Wait();
+ }));
+ }
+ for (int i = 0; i < 3; i++) {
+ ASSERT_OK(t2->SubmitFunc([&]() {
+ l2.Wait();
+ }));
+ }
+
+ // Unblock all of t1's tasks, but not t2's tasks.
+ l1.CountDown();
+
+ // If this also waited for t2's tasks, it would deadlock.
+ t1->Shutdown();
+
+ // We can no longer submit to t1 but we can still submit to t2.
+ ASSERT_TRUE(t1->SubmitFunc([](){}).IsServiceUnavailable());
+ ASSERT_OK(t2->SubmitFunc([](){}));
+
+ // Unblock t2's tasks.
+ l2.CountDown();
+ t2->Shutdown();
+}
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
+ const int kNumTokens = 3;
+ const int kNumSubmissions = 20;
+ Random r(SeedRandom());
+ vector<unique_ptr<ThreadPoolToken>> tokens;
+ for (int i = 0; i < kNumTokens; i++) {
+ tokens.emplace_back(pool_->NewToken(GetParam()));
+ }
+
+ atomic<int32_t> v(0);
+ for (int i = 0; i < kNumSubmissions; i++) {
+ // Sleep a little first to raise the likelihood of the test thread
+ // reaching Wait() before the submissions finish.
+ int sleep_ms = r.Next() % 5;
+
+ auto task = [&v, sleep_ms]() {
+ SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ v++;
+ };
+
+ // Half of the submissions will be token-less, and half will use a token.
+ if (i % 2 == 0) {
+ ASSERT_OK(pool_->SubmitFunc(task));
+ } else {
+ int token_idx = r.Next() % tokens.size();
+ ASSERT_OK(tokens[token_idx]->SubmitFunc(task));
+ }
+ }
+ pool_->Wait();
+ ASSERT_EQ(kNumSubmissions, v);
+}
+
+TEST_F(ThreadPoolTest, TestFuzz) {
+ const int kNumOperations = 1000;
+ Random r(SeedRandom());
+ vector<unique_ptr<ThreadPoolToken>> tokens;
+
+ for (int i = 0; i < kNumOperations; i++) {
+ // Operation distribution:
+ //
+ // - Submit without a token: 40%
+ // - Submit with a randomly selected token: 35%
+ // - Allocate a new token: 10%
+ // - Wait on a randomly selected token: 7%
+ // - Shutdown a randomly selected token: 4%
+ // - Deallocate a randomly selected token: 2%
+ // - Wait for all submissions: 2%
+ int op = r.Next() % 100;
+ if (op < 40) {
+ // Submit without a token.
+ int sleep_ms = r.Next() % 5;
+ ASSERT_OK(pool_->SubmitFunc([sleep_ms]() {
+ // Sleep a little first to increase task overlap.
+ SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ }));
+ } else if (op < 75) {
+ // Submit with a randomly selected token.
+ if (tokens.empty()) {
+ continue;
+ }
+ int sleep_ms = r.Next() % 5;
+ int token_idx = r.Next() % tokens.size();
+ Status s = tokens[token_idx]->SubmitFunc([sleep_ms]() {
+ // Sleep a little first to increase task overlap.
+ SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ });
+ ASSERT_TRUE(s.ok() || s.IsServiceUnavailable());
+ } else if (op < 85) {
+ // Allocate a token with a randomly selected policy.
+ ThreadPool::ExecutionMode mode = r.Next() % 2 ?
+ ThreadPool::ExecutionMode::SERIAL :
+ ThreadPool::ExecutionMode::CONCURRENT;
+ tokens.emplace_back(pool_->NewToken(mode));
+ } else if (op < 92) {
+ // Wait on a randomly selected token.
+ if (tokens.empty()) {
+ continue;
+ }
+ int token_idx = r.Next() % tokens.size();
+ tokens[token_idx]->Wait();
+ } else if (op < 96) {
+ // Shutdown a randomly selected token.
+ if (tokens.empty()) {
+ continue;
+ }
+ int token_idx = r.Next() % tokens.size();
+ tokens[token_idx]->Shutdown();
+ } else if (op < 98) {
+ // Deallocate a randomly selected token.
+ if (tokens.empty()) {
+ continue;
+ }
+ auto it = tokens.begin();
+ int token_idx = r.Next() % tokens.size();
+ std::advance(it, token_idx);
+ tokens.erase(it);
+ } else {
+ // Wait on everything.
+ ASSERT_LT(op, 100);
+ ASSERT_GE(op, 98);
+ pool_->Wait();
+ }
+ }
+
+ // Some test runs will shut down the pool before the tokens, and some won't.
+ // Either way should be safe.
+ if (r.Next() % 2 == 0) {
+ pool_->Shutdown();
+ }
+}
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) {
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(1)
+ .set_max_threads(1)
+ .set_max_queue_size(1)));
+
+ CountDownLatch latch(1);
+ unique_ptr<ThreadPoolToken> t = pool_->NewToken(GetParam());
+ SCOPED_CLEANUP({
+ latch.CountDown();
+ });
+ // We will be able to submit two tasks: one for max_threads == 1 and one for
+ // max_queue_size == 1.
+ ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+ ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+ Status s = t->Submit(SlowTask::NewSlowTask(&latch));
+ ASSERT_TRUE(s.IsServiceUnavailable());
+}
+
+TEST_F(ThreadPoolTest, TestTokenConcurrency) {
+ const int kNumTokens = 20;
+ const int kTestRuntimeSecs = 1;
+ const int kCycleThreads = 2;
+ const int kShutdownThreads = 2;
+ const int kWaitThreads = 2;
+ const int kSubmitThreads = 8;
+
+ vector<shared_ptr<ThreadPoolToken>> tokens;
+ Random rng(SeedRandom());
+
+ // Protects 'tokens' and 'rng'.
+ simple_spinlock lock;
+
+ // Fetch a token from 'tokens' at random.
+ auto GetRandomToken = [&]() -> shared_ptr<ThreadPoolToken> {
+ std::lock_guard<simple_spinlock> l(lock);
+ int idx = rng.Uniform(kNumTokens);
+ return tokens[idx];
+ };
+
+ // Preallocate all of the tokens.
+ for (int i = 0; i < kNumTokens; i++) {
+ ThreadPool::ExecutionMode mode;
+ {
+ std::lock_guard<simple_spinlock> l(lock);
+ mode = rng.Next() % 2 ?
+ ThreadPool::ExecutionMode::SERIAL :
+ ThreadPool::ExecutionMode::CONCURRENT;
+ }
+ tokens.emplace_back(pool_->NewToken(mode).release());
+ }
+
+ atomic<int64_t> total_num_tokens_cycled(0);
+ atomic<int64_t> total_num_tokens_shutdown(0);
+ atomic<int64_t> total_num_tokens_waited(0);
+ atomic<int64_t> total_num_tokens_submitted(0);
+
+ CountDownLatch latch(1);
+ vector<thread> threads;
+
+ for (int i = 0; i < kCycleThreads; i++) {
+ // Pick a token at random and replace it.
+ //
+ // The replaced token is only destroyed when the last ref is dropped,
+ // possibly by another thread.
+ threads.emplace_back([&]() {
+ int num_tokens_cycled = 0;
+ while (latch.count()) {
+ {
+ std::lock_guard<simple_spinlock> l(lock);
+ int idx = rng.Uniform(kNumTokens);
+ ThreadPool::ExecutionMode mode = rng.Next() % 2 ?
+ ThreadPool::ExecutionMode::SERIAL :
+ ThreadPool::ExecutionMode::CONCURRENT;
+ tokens[idx] = shared_ptr<ThreadPoolToken>(pool_->NewToken(mode).release());
+ }
+ num_tokens_cycled++;
+
+ // Sleep a bit, otherwise this thread outpaces the other threads and
+ // nothing interesting happens to most tokens.
+ SleepFor(MonoDelta::FromMicroseconds(10));
+ }
+ total_num_tokens_cycled += num_tokens_cycled;
+ });
+ }
+
+ for (int i = 0; i < kShutdownThreads; i++) {
+ // Pick a token at random and shut it down. Submitting a task to a shut
+ // down token will return a ServiceUnavailable error.
+ threads.emplace_back([&]() {
+ int num_tokens_shutdown = 0;
+ while (latch.count()) {
+ GetRandomToken()->Shutdown();
+ num_tokens_shutdown++;
+ }
+ total_num_tokens_shutdown += num_tokens_shutdown;
+ });
+ }
+
+ for (int i = 0; i < kWaitThreads; i++) {
+ // Pick a token at random and wait for any outstanding tasks.
+ threads.emplace_back([&]() {
+ int num_tokens_waited = 0;
+ while (latch.count()) {
+ GetRandomToken()->Wait();
+ num_tokens_waited++;
+ }
+ total_num_tokens_waited += num_tokens_waited;
+ });
+ }
+
+ for (int i = 0; i < kSubmitThreads; i++) {
+ // Pick a token at random and submit a task to it.
+ threads.emplace_back([&]() {
+ int num_tokens_submitted = 0;
+ Random rng(SeedRandom());
+ while (latch.count()) {
+ int sleep_ms = rng.Next() % 5;
+ Status s = GetRandomToken()->SubmitFunc([sleep_ms]() {
+ // Sleep a little first so that tasks are running during other events.
+ SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ });
+ CHECK(s.ok() || s.IsServiceUnavailable());
+ num_tokens_submitted++;
+ }
+ total_num_tokens_submitted += num_tokens_submitted;
+ });
+ }
+
+ SleepFor(MonoDelta::FromSeconds(kTestRuntimeSecs));
+ latch.CountDown();
+ for (auto& t : threads) {
+ t.join();
+ }
+
+ LOG(INFO) << Substitute("Tokens cycled ($0 threads): $1",
+ kCycleThreads, total_num_tokens_cycled.load());
+ LOG(INFO) << Substitute("Tokens shutdown ($0 threads): $1",
+ kShutdownThreads, total_num_tokens_shutdown.load());
+ LOG(INFO) << Substitute("Tokens waited ($0 threads): $1",
+ kWaitThreads, total_num_tokens_waited.load());
+ LOG(INFO) << Substitute("Tokens submitted ($0 threads): $1",
+ kSubmitThreads, total_num_tokens_submitted.load());
+}
+
+TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
+ const int kNumThreads = 10;
+
+ // Test with a pool that allows for kNumThreads concurrent threads.
+ ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_max_threads(kNumThreads)));
+
+ // Submit kNumThreads slow tasks and unblock them, in order to produce
+ // kNumThreads worker threads.
+ CountDownLatch latch(1);
+ SCOPED_CLEANUP({
+ latch.CountDown();
+ });
+ for (int i = 0; i < kNumThreads; i++) {
+ ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+ }
+ ASSERT_EQ(kNumThreads, pool_->num_threads());
+ latch.CountDown();
+ pool_->Wait();
+
+ // The kNumThreads threads are idle and waiting for the idle timeout.
+
+ // Submit a slow trickle of lightning fast tasks.
+ //
+ // If the threads are woken up in FIFO order, this trickle is enough to
+ // prevent all of them from idling and the AssertEventually will time out.
+ //
+ // If LIFO order is used, the same thread will be reused for each task and
+ // the other threads will eventually time out.
+ AssertEventually([&]() {
+ ASSERT_OK(pool_->SubmitFunc([](){}));
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ ASSERT_EQ(1, pool_->num_threads());
+ }, MonoDelta::FromSeconds(10), AssertBackoff::NONE);
+ NO_PENDING_FATALS();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadpool.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadpool.cc b/be/src/kudu/util/threadpool.cc
new file mode 100644
index 0000000..23dda3d
--- /dev/null
+++ b/be/src/kudu/util/threadpool.cc
@@ -0,0 +1,766 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/threadpool.h"
+
+#include <cstdint>
+#include <deque>
+#include <limits>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+
+#include <boost/function.hpp> // IWYU pragma: keep
+#include <glog/logging.h>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/trace.h"
+#include "kudu/util/trace_metrics.h"
+
+namespace kudu {
+
+using std::deque;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+////////////////////////////////////////////////////////
+// FunctionRunnable
+////////////////////////////////////////////////////////
+
+class FunctionRunnable : public Runnable {
+ public:
+ explicit FunctionRunnable(boost::function<void()> func) : func_(std::move(func)) {}
+
+ void Run() OVERRIDE {
+ func_();
+ }
+
+ private:
+ boost::function<void()> func_;
+};
+
+////////////////////////////////////////////////////////
+// ClosureRunnable
+////////////////////////////////////////////////////////
+
+class ClosureRunnable : public Runnable {
+ public:
+ explicit ClosureRunnable(Closure cl) : cl_(std::move(cl)) {}
+
+ void Run() OVERRIDE {
+ cl_.Run();
+ }
+
+ private:
+ Closure cl_;
+};
+
+////////////////////////////////////////////////////////
+// ThreadPoolBuilder
+////////////////////////////////////////////////////////
+
+ThreadPoolBuilder::ThreadPoolBuilder(string name)
+ : name_(std::move(name)),
+ min_threads_(0),
+ max_threads_(base::NumCPUs()),
+ max_queue_size_(std::numeric_limits<int>::max()),
+ idle_timeout_(MonoDelta::FromMilliseconds(500)) {}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_trace_metric_prefix(const string& prefix) {
+ trace_metric_prefix_ = prefix;
+ return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
+ CHECK_GE(min_threads, 0);
+ min_threads_ = min_threads;
+ return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
+ CHECK_GT(max_threads, 0);
+ max_threads_ = max_threads;
+ return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
+ max_queue_size_ = max_queue_size;
+ return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_timeout) {
+ idle_timeout_ = idle_timeout;
+ return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) {
+ metrics_ = std::move(metrics);
+ return *this;
+}
+
+Status ThreadPoolBuilder::Build(gscoped_ptr<ThreadPool>* pool) const {
+ pool->reset(new ThreadPool(*this));
+ RETURN_NOT_OK((*pool)->Init());
+ return Status::OK();
+}
+
+////////////////////////////////////////////////////////
+// ThreadPoolToken
+////////////////////////////////////////////////////////
+
+ThreadPoolToken::ThreadPoolToken(ThreadPool* pool,
+ ThreadPool::ExecutionMode mode,
+ ThreadPoolMetrics metrics)
+ : mode_(mode),
+ metrics_(std::move(metrics)),
+ pool_(pool),
+ state_(State::IDLE),
+ not_running_cond_(&pool->lock_),
+ active_threads_(0) {
+}
+
+ThreadPoolToken::~ThreadPoolToken() {
+ Shutdown();
+ pool_->ReleaseToken(this);
+}
+
+Status ThreadPoolToken::SubmitClosure(Closure c) {
+ return Submit(std::make_shared<ClosureRunnable>(std::move(c)));
+}
+
+Status ThreadPoolToken::SubmitFunc(boost::function<void()> f) {
+ return Submit(std::make_shared<FunctionRunnable>(std::move(f)));
+}
+
+Status ThreadPoolToken::Submit(shared_ptr<Runnable> r) {
+ return pool_->DoSubmit(std::move(r), this);
+}
+
+void ThreadPoolToken::Shutdown() {
+ MutexLock unique_lock(pool_->lock_);
+ pool_->CheckNotPoolThreadUnlocked();
+
+ // Clear the queue under the lock, but defer the releasing of the tasks
+ // outside the lock, in case there are concurrent threads wanting to access
+ // the ThreadPool. The task's destructors may acquire locks, etc, so this
+ // also prevents lock inversions.
+ std::deque<ThreadPool::Task> to_release = std::move(entries_);
+ pool_->total_queued_tasks_ -= to_release.size();
+
+ switch (state()) {
+ case State::IDLE:
+ // There were no tasks outstanding; we can quiesce the token immediately.
+ Transition(State::QUIESCED);
+ break;
+ case State::RUNNING:
+ // There were outstanding tasks. If any are still running, switch to
+ // QUIESCING and wait for them to finish (the worker thread executing
+ // the token's last task will switch the token to QUIESCED). Otherwise,
+ // we can quiesce the token immediately.
+
+ // Note: this is an O(n) operation, but it's expected to be infrequent.
+ // Plus doing it this way (rather than switching to QUIESCING and waiting
+ // for a worker thread to process the queue entry) helps retain state
+ // transition symmetry with ThreadPool::Shutdown.
+ for (auto it = pool_->queue_.begin(); it != pool_->queue_.end();) {
+ if (*it == this) {
+ it = pool_->queue_.erase(it);
+ } else {
+ it++;
+ }
+ }
+
+ if (active_threads_ == 0) {
+ Transition(State::QUIESCED);
+ break;
+ }
+ Transition(State::QUIESCING);
+ FALLTHROUGH_INTENDED;
+ case State::QUIESCING:
+ // The token is already quiescing. Just wait for a worker thread to
+ // switch it to QUIESCED.
+ while (state() != State::QUIESCED) {
+ not_running_cond_.Wait();
+ }
+ break;
+ default:
+ break;
+ }
+
+ // Finally release the queued tasks, outside the lock.
+ unique_lock.Unlock();
+ for (auto& t : to_release) {
+ if (t.trace) {
+ t.trace->Release();
+ }
+ }
+}
+
+void ThreadPoolToken::Wait() {
+ MutexLock unique_lock(pool_->lock_);
+ pool_->CheckNotPoolThreadUnlocked();
+ while (IsActive()) {
+ not_running_cond_.Wait();
+ }
+}
+
+bool ThreadPoolToken::WaitUntil(const MonoTime& until) {
+ MutexLock unique_lock(pool_->lock_);
+ pool_->CheckNotPoolThreadUnlocked();
+ while (IsActive()) {
+ if (!not_running_cond_.WaitUntil(until)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool ThreadPoolToken::WaitFor(const MonoDelta& delta) {
+ return WaitUntil(MonoTime::Now() + delta);
+}
+
+void ThreadPoolToken::Transition(State new_state) {
+#ifndef NDEBUG
+ CHECK_NE(state_, new_state);
+
+ switch (state_) {
+ case State::IDLE:
+ CHECK(new_state == State::RUNNING ||
+ new_state == State::QUIESCED);
+ if (new_state == State::RUNNING) {
+ CHECK(!entries_.empty());
+ } else {
+ CHECK(entries_.empty());
+ CHECK_EQ(active_threads_, 0);
+ }
+ break;
+ case State::RUNNING:
+ CHECK(new_state == State::IDLE ||
+ new_state == State::QUIESCING ||
+ new_state == State::QUIESCED);
+ CHECK(entries_.empty());
+ if (new_state == State::QUIESCING) {
+ CHECK_GT(active_threads_, 0);
+ }
+ break;
+ case State::QUIESCING:
+ CHECK(new_state == State::QUIESCED);
+ CHECK_EQ(active_threads_, 0);
+ break;
+ case State::QUIESCED:
+ CHECK(false); // QUIESCED is a terminal state
+ break;
+ default:
+ LOG(FATAL) << "Unknown token state: " << state_;
+ }
+#endif
+
+ // Take actions based on the state we're entering.
+ switch (new_state) {
+ case State::IDLE:
+ case State::QUIESCED:
+ not_running_cond_.Broadcast();
+ break;
+ default:
+ break;
+ }
+
+ state_ = new_state;
+}
+
+const char* ThreadPoolToken::StateToString(State s) {
+ switch (s) {
+ case State::IDLE: return "IDLE"; break;
+ case State::RUNNING: return "RUNNING"; break;
+ case State::QUIESCING: return "QUIESCING"; break;
+ case State::QUIESCED: return "QUIESCED"; break;
+ }
+ return "<cannot reach here>";
+}
+
+////////////////////////////////////////////////////////
+// ThreadPool
+////////////////////////////////////////////////////////
+
+ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
+ : name_(builder.name_),
+ min_threads_(builder.min_threads_),
+ max_threads_(builder.max_threads_),
+ max_queue_size_(builder.max_queue_size_),
+ idle_timeout_(builder.idle_timeout_),
+ pool_status_(Status::Uninitialized("The pool was not initialized.")),
+ idle_cond_(&lock_),
+ no_threads_cond_(&lock_),
+ num_threads_(0),
+ num_threads_pending_start_(0),
+ active_threads_(0),
+ total_queued_tasks_(0),
+ tokenless_(NewToken(ExecutionMode::CONCURRENT)),
+ metrics_(builder.metrics_) {
+ string prefix = !builder.trace_metric_prefix_.empty() ?
+ builder.trace_metric_prefix_ : builder.name_;
+
+ queue_time_trace_metric_name_ = TraceMetrics::InternName(
+ prefix + ".queue_time_us");
+ run_wall_time_trace_metric_name_ = TraceMetrics::InternName(
+ prefix + ".run_wall_time_us");
+ run_cpu_time_trace_metric_name_ = TraceMetrics::InternName(
+ prefix + ".run_cpu_time_us");
+}
+
+ThreadPool::~ThreadPool() {
+ // There should only be one live token: the one used in tokenless submission.
+ CHECK_EQ(1, tokens_.size()) << Substitute(
+ "Threadpool $0 destroyed with $1 allocated tokens",
+ name_, tokens_.size());
+ Shutdown();
+}
+
+Status ThreadPool::Init() {
+ if (!pool_status_.IsUninitialized()) {
+ return Status::NotSupported("The thread pool is already initialized");
+ }
+ pool_status_ = Status::OK();
+ num_threads_pending_start_ = min_threads_;
+ for (int i = 0; i < min_threads_; i++) {
+ Status status = CreateThread();
+ if (!status.ok()) {
+ Shutdown();
+ return status;
+ }
+ }
+ return Status::OK();
+}
+
+void ThreadPool::Shutdown() {
+ MutexLock unique_lock(lock_);
+ CheckNotPoolThreadUnlocked();
+
+ // Note: this is the same error seen at submission if the pool is at
+ // capacity, so clients can't tell them apart. This isn't really a practical
+ // concern though because shutting down a pool typically requires clients to
+ // be quiesced first, so there's no danger of a client getting confused.
+ pool_status_ = Status::ServiceUnavailable("The pool has been shut down.");
+
+ // Clear the various queues under the lock, but defer the releasing
+ // of the tasks outside the lock, in case there are concurrent threads
+ // wanting to access the ThreadPool. The task's destructors may acquire
+ // locks, etc, so this also prevents lock inversions.
+ queue_.clear();
+ std::deque<std::deque<Task>> to_release;
+ for (auto* t : tokens_) {
+ if (!t->entries_.empty()) {
+ to_release.emplace_back(std::move(t->entries_));
+ }
+ switch (t->state()) {
+ case ThreadPoolToken::State::IDLE:
+ // The token is idle; we can quiesce it immediately.
+ t->Transition(ThreadPoolToken::State::QUIESCED);
+ break;
+ case ThreadPoolToken::State::RUNNING:
+ // The token has tasks associated with it. If they're merely queued
+ // (i.e. there are no active threads), the tasks will have been removed
+ // above and we can quiesce immediately. Otherwise, we need to wait for
+ // the threads to finish.
+ t->Transition(t->active_threads_ > 0 ?
+ ThreadPoolToken::State::QUIESCING :
+ ThreadPoolToken::State::QUIESCED);
+ break;
+ default:
+ break;
+ }
+ }
+
+ // The queues are empty. Wake any sleeping worker threads and wait for all
+ // of them to exit. Some worker threads will exit immediately upon waking,
+ // while others will exit after they finish executing an outstanding task.
+ total_queued_tasks_ = 0;
+ while (!idle_threads_.empty()) {
+ idle_threads_.front().not_empty.Signal();
+ idle_threads_.pop_front();
+ }
+ while (num_threads_ + num_threads_pending_start_ > 0) {
+ no_threads_cond_.Wait();
+ }
+
+ // All the threads have exited. Check the state of each token.
+ for (auto* t : tokens_) {
+ DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
+ t->state() == ThreadPoolToken::State::QUIESCED);
+ }
+
+ // Finally release the queued tasks, outside the lock.
+ unique_lock.Unlock();
+ for (auto& token : to_release) {
+ for (auto& t : token) {
+ if (t.trace) {
+ t.trace->Release();
+ }
+ }
+ }
+}
+
+unique_ptr<ThreadPoolToken> ThreadPool::NewToken(ExecutionMode mode) {
+ return NewTokenWithMetrics(mode, {});
+}
+
+unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics(
+ ExecutionMode mode, ThreadPoolMetrics metrics) {
+ MutexLock guard(lock_);
+ unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this,
+ mode,
+ std::move(metrics)));
+ InsertOrDie(&tokens_, t.get());
+ return t;
+}
+
+void ThreadPool::ReleaseToken(ThreadPoolToken* t) {
+ MutexLock guard(lock_);
+ CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released",
+ ThreadPoolToken::StateToString(t->state()));
+ CHECK_EQ(1, tokens_.erase(t));
+}
+
+Status ThreadPool::SubmitClosure(Closure c) {
+ return Submit(std::make_shared<ClosureRunnable>(std::move(c)));
+}
+
+Status ThreadPool::SubmitFunc(boost::function<void()> f) {
+ return Submit(std::make_shared<FunctionRunnable>(std::move(f)));
+}
+
+Status ThreadPool::Submit(shared_ptr<Runnable> r) {
+ return DoSubmit(std::move(r), tokenless_.get());
+}
+
+Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
+ DCHECK(token);
+ MonoTime submit_time = MonoTime::Now();
+
+ MutexLock guard(lock_);
+ if (PREDICT_FALSE(!pool_status_.ok())) {
+ return pool_status_;
+ }
+
+ if (PREDICT_FALSE(!token->MaySubmitNewTasks())) {
+ return Status::ServiceUnavailable("Thread pool token was shut down");
+ }
+
+ // Size limit check.
+ int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ +
+ static_cast<int64_t>(max_queue_size_) - total_queued_tasks_;
+ if (capacity_remaining < 1) {
+ return Status::ServiceUnavailable(
+ Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)",
+ num_threads_ + num_threads_pending_start_, max_threads_,
+ total_queued_tasks_, max_queue_size_));
+ }
+
+ // Should we create another thread?
+
+ // We assume that each current inactive thread will grab one item from the
+ // queue. If it seems like we'll need another thread, we create one.
+ //
+ // Rather than creating the thread here, while holding the lock, we defer
+ // it to down below. This is because thread creation can be rather slow
+ // (hundreds of milliseconds in some cases) and we'd like to allow the
+ // existing threads to continue to process tasks while we do so.
+ //
+ // In theory, a currently active thread could finish immediately after this
+ // calculation but before our new worker starts running. This would mean we
+ // created a thread we didn't really need. However, this race is unavoidable
+ // and harmless.
+ //
+ // Of course, we never create more than max_threads_ threads no matter what.
+ int threads_from_this_submit =
+ token->IsActive() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
+ int inactive_threads = num_threads_ + num_threads_pending_start_ - active_threads_;
+ int additional_threads = static_cast<int>(queue_.size())
+ + threads_from_this_submit
+ - inactive_threads;
+ bool need_a_thread = false;
+ if (additional_threads > 0 && num_threads_ + num_threads_pending_start_ < max_threads_) {
+ need_a_thread = true;
+ num_threads_pending_start_++;
+ }
+
+ Task task;
+ task.runnable = std::move(r);
+ task.trace = Trace::CurrentTrace();
+ // Need to AddRef, since the thread which submitted the task may go away,
+ // and we don't want the trace to be destructed while waiting in the queue.
+ if (task.trace) {
+ task.trace->AddRef();
+ }
+ task.submit_time = submit_time;
+
+ // Add the task to the token's queue.
+ ThreadPoolToken::State state = token->state();
+ DCHECK(state == ThreadPoolToken::State::IDLE ||
+ state == ThreadPoolToken::State::RUNNING);
+ token->entries_.emplace_back(std::move(task));
+ if (state == ThreadPoolToken::State::IDLE ||
+ token->mode() == ExecutionMode::CONCURRENT) {
+ queue_.emplace_back(token);
+ if (state == ThreadPoolToken::State::IDLE) {
+ token->Transition(ThreadPoolToken::State::RUNNING);
+ }
+ }
+ int length_at_submit = total_queued_tasks_++;
+
+ // Wake up an idle thread for this task. Choosing the thread at the front of
+ // the list ensures LIFO semantics as idling threads are also added to the front.
+ //
+ // If there are no idle threads, the new task remains on the queue and is
+ // processed by an active thread (or a thread we're about to create) at some
+ // point in the future.
+ if (!idle_threads_.empty()) {
+ idle_threads_.front().not_empty.Signal();
+ idle_threads_.pop_front();
+ }
+ guard.Unlock();
+
+ if (metrics_.queue_length_histogram) {
+ metrics_.queue_length_histogram->Increment(length_at_submit);
+ }
+ if (token->metrics_.queue_length_histogram) {
+ token->metrics_.queue_length_histogram->Increment(length_at_submit);
+ }
+
+ if (need_a_thread) {
+ Status status = CreateThread();
+ if (!status.ok()) {
+ guard.Lock();
+ num_threads_pending_start_--;
+ if (num_threads_ + num_threads_pending_start_ == 0) {
+ // If we have no threads, we can't do any work.
+ return status;
+ }
+ // If we failed to create a thread, but there are still some other
+ // worker threads, log a warning message and continue.
+ LOG(ERROR) << "Thread pool failed to create thread: "
+ << status.ToString();
+ }
+ }
+
+
+ return Status::OK();
+}
+
+void ThreadPool::Wait() {
+ MutexLock unique_lock(lock_);
+ CheckNotPoolThreadUnlocked();
+ while (total_queued_tasks_ > 0 || active_threads_ > 0) {
+ idle_cond_.Wait();
+ }
+}
+
+bool ThreadPool::WaitUntil(const MonoTime& until) {
+ MutexLock unique_lock(lock_);
+ CheckNotPoolThreadUnlocked();
+ while (total_queued_tasks_ > 0 || active_threads_ > 0) {
+ if (!idle_cond_.WaitUntil(until)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+bool ThreadPool::WaitFor(const MonoDelta& delta) {
+ return WaitUntil(MonoTime::Now() + delta);
+}
+
+void ThreadPool::DispatchThread() {
+ MutexLock unique_lock(lock_);
+ InsertOrDie(&threads_, Thread::current_thread());
+ DCHECK_GT(num_threads_pending_start_, 0);
+ num_threads_++;
+ num_threads_pending_start_--;
+ // If we are one of the first 'min_threads_' to start, we must be
+ // a "permanent" thread.
+ bool permanent = num_threads_ <= min_threads_;
+
+ // Owned by this worker thread and added/removed from idle_threads_ as needed.
+ IdleThread me(&lock_);
+
+ while (true) {
+ // Note: Status::Aborted() is used to indicate normal shutdown.
+ if (!pool_status_.ok()) {
+ VLOG(2) << "DispatchThread exiting: " << pool_status_.ToString();
+ break;
+ }
+
+ if (queue_.empty()) {
+ // There's no work to do, let's go idle.
+ //
+ // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
+ idle_threads_.push_front(me);
+ SCOPED_CLEANUP({
+ // For some wake ups (i.e. Shutdown or DoSubmit) this thread is
+ // guaranteed to be unlinked after being awakened. In others (i.e.
+ // spurious wake-up or Wait timeout), it'll still be linked.
+ if (me.is_linked()) {
+ idle_threads_.erase(idle_threads_.iterator_to(me));
+ }
+ });
+ if (permanent) {
+ me.not_empty.Wait();
+ } else {
+ if (!me.not_empty.WaitFor(idle_timeout_)) {
+ // After much investigation, it appears that pthread condition variables have
+ // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
+ // another thread did in fact signal. Apparently after a timeout there is some
+ // brief period during which another thread may actually grab the internal mutex
+ // protecting the state, signal, and release again before we get the mutex. So,
+ // we'll recheck the empty queue case regardless.
+ if (queue_.empty()) {
+ VLOG(3) << "Releasing worker thread from pool " << name_ << " after "
+ << idle_timeout_.ToMilliseconds() << "ms of idle time.";
+ break;
+ }
+ }
+ }
+ continue;
+ }
+
+ // Get the next token and task to execute.
+ ThreadPoolToken* token = queue_.front();
+ queue_.pop_front();
+ DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
+ DCHECK(!token->entries_.empty());
+ Task task = std::move(token->entries_.front());
+ token->entries_.pop_front();
+ token->active_threads_++;
+ --total_queued_tasks_;
+ ++active_threads_;
+
+ unique_lock.Unlock();
+
+ // Release the reference which was held by the queued item.
+ ADOPT_TRACE(task.trace);
+ if (task.trace) {
+ task.trace->Release();
+ }
+
+ // Update metrics
+ MonoTime now(MonoTime::Now());
+ int64_t queue_time_us = (now - task.submit_time).ToMicroseconds();
+ TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us);
+ if (metrics_.queue_time_us_histogram) {
+ metrics_.queue_time_us_histogram->Increment(queue_time_us);
+ }
+ if (token->metrics_.queue_time_us_histogram) {
+ token->metrics_.queue_time_us_histogram->Increment(queue_time_us);
+ }
+
+ // Execute the task
+ {
+ MicrosecondsInt64 start_wall_us = GetMonoTimeMicros();
+ MicrosecondsInt64 start_cpu_us = GetThreadCpuTimeMicros();
+
+ task.runnable->Run();
+
+ int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
+ int64_t cpu_us = GetThreadCpuTimeMicros() - start_cpu_us;
+
+ if (metrics_.run_time_us_histogram) {
+ metrics_.run_time_us_histogram->Increment(wall_us);
+ }
+ if (token->metrics_.run_time_us_histogram) {
+ token->metrics_.run_time_us_histogram->Increment(wall_us);
+ }
+ TRACE_COUNTER_INCREMENT(run_wall_time_trace_metric_name_, wall_us);
+ TRACE_COUNTER_INCREMENT(run_cpu_time_trace_metric_name_, cpu_us);
+ }
+ // Destruct the task while we do not hold the lock.
+ //
+ // The task's destructor may be expensive if it has a lot of bound
+ // objects, and we don't want to block submission of the threadpool.
+ // In the worst case, the destructor might even try to do something
+ // with this threadpool, and produce a deadlock.
+ task.runnable.reset();
+ unique_lock.Lock();
+
+ // Possible states:
+ // 1. The token was shut down while we ran its task. Transition to QUIESCED.
+ // 2. The token has no more queued tasks. Transition back to IDLE.
+ // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
+ ThreadPoolToken::State state = token->state();
+ DCHECK(state == ThreadPoolToken::State::RUNNING ||
+ state == ThreadPoolToken::State::QUIESCING);
+ if (--token->active_threads_ == 0) {
+ if (state == ThreadPoolToken::State::QUIESCING) {
+ DCHECK(token->entries_.empty());
+ token->Transition(ThreadPoolToken::State::QUIESCED);
+ } else if (token->entries_.empty()) {
+ token->Transition(ThreadPoolToken::State::IDLE);
+ } else if (token->mode() == ExecutionMode::SERIAL) {
+ queue_.emplace_back(token);
+ }
+ }
+ if (--active_threads_ == 0) {
+ idle_cond_.Broadcast();
+ }
+ }
+
+ // It's important that we hold the lock between exiting the loop and dropping
+ // num_threads_. Otherwise it's possible someone else could come along here
+ // and add a new task just as the last running thread is about to exit.
+ CHECK(unique_lock.OwnsLock());
+
+ CHECK_EQ(threads_.erase(Thread::current_thread()), 1);
+ num_threads_--;
+ if (num_threads_ + num_threads_pending_start_ == 0) {
+ no_threads_cond_.Broadcast();
+
+ // Sanity check: if we're the last thread exiting, the queue ought to be
+ // empty. Otherwise it will never get processed.
+ CHECK(queue_.empty());
+ DCHECK_EQ(0, total_queued_tasks_);
+ }
+}
+
+Status ThreadPool::CreateThread() {
+ return kudu::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_),
+ &ThreadPool::DispatchThread, this, nullptr);
+}
+
+void ThreadPool::CheckNotPoolThreadUnlocked() {
+ Thread* current = Thread::current_thread();
+ if (ContainsKey(threads_, current)) {
+ LOG(FATAL) << Substitute("Thread belonging to thread pool '$0' with "
+ "name '$1' called pool function that would result in deadlock",
+ name_, current->name());
+ }
+}
+
+std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
+ return o << ThreadPoolToken::StateToString(s);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadpool.h b/be/src/kudu/util/threadpool.h
new file mode 100644
index 0000000..1557486
--- /dev/null
+++ b/be/src/kudu/util/threadpool.h
@@ -0,0 +1,505 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_THREAD_POOL_H
+#define KUDU_UTIL_THREAD_POOL_H
+
+#include <deque>
+#include <iosfwd>
+#include <memory>
+#include <string>
+#include <unordered_set>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/list_hook.hpp>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+
+namespace boost {
+template <typename Signature>
+class function;
+} // namespace boost
+
+namespace kudu {
+
+class Thread;
+class ThreadPool;
+class ThreadPoolToken;
+class Trace;
+
+class Runnable {
+ public:
+ virtual void Run() = 0;
+ virtual ~Runnable() {}
+};
+
+// Interesting thread pool metrics. Can be applied to the entire pool (see
+// ThreadPoolBuilder) or to individual tokens.
+struct ThreadPoolMetrics {
+ // Measures the queue length seen by tasks when they enter the queue.
+ scoped_refptr<Histogram> queue_length_histogram;
+
+ // Measures the amount of time that tasks spend waiting in a queue.
+ scoped_refptr<Histogram> queue_time_us_histogram;
+
+ // Measures the amount of time that tasks spend running.
+ scoped_refptr<Histogram> run_time_us_histogram;
+};
+
+// ThreadPool takes a lot of arguments. We provide sane defaults with a builder.
+//
+// name: Used for debugging output and default names of the worker threads.
+// Since thread names are limited to 16 characters on Linux, it's good to
+// choose a short name here.
+// Required.
+//
+// trace_metric_prefix: used to prefix the names of TraceMetric counters.
+// When a task on a thread pool has an associated trace, the thread pool
+// implementation will increment TraceMetric counters to indicate the
+// amount of time spent waiting in the queue as well as the amount of wall
+// and CPU time spent executing. By default, these counters are prefixed
+// with the name of the thread pool. For example, if the pool is named
+// 'apply', then counters such as 'apply.queue_time_us' will be
+// incremented.
+//
+// The TraceMetrics implementation relies on the number of distinct counter
+// names being small. Thus, if the thread pool name itself is dynamically
+// generated, the default behavior described above would result in an
+// unbounded number of distinct counter names. The 'trace_metric_prefix'
+// setting can be used to override the prefix used in generating the trace
+// metric names.
+//
+// For example, the Raft thread pools are named "<tablet id>-raft" which
+// has unbounded cardinality (a server may have thousands of different
+// tablet IDs over its lifetime). In that case, setting the prefix to
+// "raft" will avoid any issues.
+//
+// min_threads: Minimum number of threads we'll have at any time.
+// Default: 0.
+//
+// max_threads: Maximum number of threads we'll have at any time.
+// Default: Number of CPUs detected on the system.
+//
+// max_queue_size: Maximum number of items to enqueue before returning a
+// Status::ServiceUnavailable message from Submit().
+// Default: INT_MAX.
+//
+// idle_timeout: How long we'll keep around an idle thread before timing it out.
+// We always keep at least min_threads.
+// Default: 500 milliseconds.
+//
+// metrics: Histograms, counters, etc. to update on various threadpool events.
+// Default: not set.
+//
+class ThreadPoolBuilder {
+ public:
+ explicit ThreadPoolBuilder(std::string name);
+
+ // Note: We violate the style guide by returning mutable references here
+ // in order to provide traditional Builder pattern conveniences.
+ ThreadPoolBuilder& set_trace_metric_prefix(const std::string& prefix);
+ ThreadPoolBuilder& set_min_threads(int min_threads);
+ ThreadPoolBuilder& set_max_threads(int max_threads);
+ ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
+ ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout);
+ ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics);
+
+ // Instantiate a new ThreadPool with the existing builder arguments.
+ Status Build(gscoped_ptr<ThreadPool>* pool) const;
+
+ private:
+ friend class ThreadPool;
+ const std::string name_;
+ std::string trace_metric_prefix_;
+ int min_threads_;
+ int max_threads_;
+ int max_queue_size_;
+ MonoDelta idle_timeout_;
+ ThreadPoolMetrics metrics_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
+};
+
+// Thread pool with a variable number of threads.
+//
+// Tasks submitted directly to the thread pool enter a FIFO queue and are
+// dispatched to a worker thread when one becomes free. Tasks may also be
+// submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions
+// can then be used to block on logical groups of tasks.
+//
+// A token operates in one of two ExecutionModes, determined at token
+// construction time:
+// 1. SERIAL: submitted tasks are run one at a time.
+// 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike
+// tasks submitted without a token, but the logical grouping that tokens
+// impart can be useful when a pool is shared by many contexts (e.g. to
+// safely shut down one context, to derive context-specific metrics, etc.).
+//
+// Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are
+// processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are
+// processed in a round-robin fashion, one task at a time. This prevents them
+// from starving one another. However, tokenless (and CONCURRENT token-based)
+// tasks can starve SERIAL token-based tasks.
+//
+// Usage Example:
+// static void Func(int n) { ... }
+// class Task : public Runnable { ... }
+//
+// gscoped_ptr<ThreadPool> thread_pool;
+// CHECK_OK(
+// ThreadPoolBuilder("my_pool")
+// .set_min_threads(0)
+// .set_max_threads(5)
+// .set_max_queue_size(10)
+// .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
+// .Build(&thread_pool));
+// thread_pool->Submit(shared_ptr<Runnable>(new Task()));
+// thread_pool->SubmitFunc(boost::bind(&Func, 10));
+class ThreadPool {
+ public:
+ ~ThreadPool();
+
+ // Wait for the running tasks to complete and then shutdown the threads.
+ // All the other pending tasks in the queue will be removed.
+ // NOTE: That the user may implement an external abort logic for the
+ // runnables, that must be called before Shutdown(), if the system
+ // should know about the non-execution of these tasks, or the runnable
+ // require an explicit "abort" notification to exit from the run loop.
+ void Shutdown();
+
+ // Submits a function using the kudu Closure system.
+ Status SubmitClosure(Closure c) WARN_UNUSED_RESULT;
+
+ // Submits a function bound using boost::bind(&FuncName, args...).
+ Status SubmitFunc(boost::function<void()> f) WARN_UNUSED_RESULT;
+
+ // Submits a Runnable class.
+ Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT;
+
+ // Waits until all the tasks are completed.
+ void Wait();
+
+ // Waits for the pool to reach the idle state, or until 'until' time is reached.
+ // Returns true if the pool reached the idle state, false otherwise.
+ bool WaitUntil(const MonoTime& until);
+
+ // Waits for the pool to reach the idle state, or until 'delta' time elapses.
+ // Returns true if the pool reached the idle state, false otherwise.
+ bool WaitFor(const MonoDelta& delta);
+
+ // Allocates a new token for use in token-based task submission. All tokens
+ // must be destroyed before their ThreadPool is destroyed.
+ //
+ // There is no limit on the number of tokens that may be allocated.
+ enum class ExecutionMode {
+ // Tasks submitted via this token will be executed serially.
+ SERIAL,
+
+ // Tasks submitted via this token may be executed concurrently.
+ CONCURRENT,
+ };
+ std::unique_ptr<ThreadPoolToken> NewToken(ExecutionMode mode);
+
+ // Like NewToken(), but lets the caller provide metrics for the token. These
+ // metrics are incremented/decremented in addition to the configured
+ // pool-wide metrics (if any).
+ std::unique_ptr<ThreadPoolToken> NewTokenWithMetrics(ExecutionMode mode,
+ ThreadPoolMetrics metrics);
+
+ // Return the number of threads currently running (or in the process of starting up)
+ // for this thread pool.
+ int num_threads() const {
+ MutexLock l(lock_);
+ return num_threads_ + num_threads_pending_start_;
+ }
+
+ private:
+ FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum);
+ FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);
+
+ friend class ThreadPoolBuilder;
+ friend class ThreadPoolToken;
+
+ // Client-provided task to be executed by this pool.
+ struct Task {
+ std::shared_ptr<Runnable> runnable;
+ Trace* trace;
+
+ // Time at which the entry was submitted to the pool.
+ MonoTime submit_time;
+ };
+
+ // Creates a new thread pool using a builder.
+ explicit ThreadPool(const ThreadPoolBuilder& builder);
+
+ // Initializes the thread pool by starting the minimum number of threads.
+ Status Init();
+
+ // Dispatcher responsible for dequeueing and executing the tasks
+ void DispatchThread();
+
+ // Create new thread.
+ //
+ // REQUIRES: caller has incremented 'num_threads_pending_start_' ahead of this call.
+ // NOTE: For performance reasons, lock_ should not be held.
+ Status CreateThread();
+
+ // Aborts if the current thread is a member of this thread pool.
+ void CheckNotPoolThreadUnlocked();
+
+ // Submits a task to be run via token.
+ Status DoSubmit(std::shared_ptr<Runnable> r, ThreadPoolToken* token);
+
+ // Releases token 't' and invalidates it.
+ void ReleaseToken(ThreadPoolToken* t);
+
+ const std::string name_;
+ const int min_threads_;
+ const int max_threads_;
+ const int max_queue_size_;
+ const MonoDelta idle_timeout_;
+
+ // Overall status of the pool. Set to an error when the pool is shut down.
+ //
+ // Protected by 'lock_'.
+ Status pool_status_;
+
+ // Synchronizes many of the members of the pool and all of its
+ // condition variables.
+ mutable Mutex lock_;
+
+ // Condition variable for "pool is idling". Waiters wake up when
+ // active_threads_ reaches zero.
+ ConditionVariable idle_cond_;
+
+ // Condition variable for "pool has no threads". Waiters wake up when
+ // num_threads_ and num_pending_threads_ are both 0.
+ ConditionVariable no_threads_cond_;
+
+ // Number of threads currently running.
+ //
+ // Protected by lock_.
+ int num_threads_;
+
+ // Number of threads which are in the process of starting.
+ // When these threads start, they will decrement this counter and
+ // accordingly increment 'num_threads_'.
+ //
+ // Protected by lock_.
+ int num_threads_pending_start_;
+
+ // Number of threads currently running and executing client tasks.
+ //
+ // Protected by lock_.
+ int active_threads_;
+
+ // Total number of client tasks queued, either directly (queue_) or
+ // indirectly (tokens_).
+ //
+ // Protected by lock_.
+ int total_queued_tasks_;
+
+ // All allocated tokens.
+ //
+ // Protected by lock_.
+ std::unordered_set<ThreadPoolToken*> tokens_;
+
+ // FIFO of tokens from which tasks should be executed. Does not own the
+ // tokens; they are owned by clients and are removed from the FIFO on shutdown.
+ //
+ // Protected by lock_.
+ std::deque<ThreadPoolToken*> queue_;
+
+ // Pointers to all running threads. Raw pointers are safe because a Thread
+ // may only go out of scope after being removed from threads_.
+ //
+ // Protected by lock_.
+ std::unordered_set<Thread*> threads_;
+
+ // List of all threads currently waiting for work.
+ //
+ // A thread is added to the front of the list when it goes idle and is
+ // removed from the front and signaled when new work arrives. This produces a
+ // LIFO usage pattern that is more efficient than idling on a single
+ // ConditionVariable (which yields FIFO semantics).
+ //
+ // Protected by lock_.
+ struct IdleThread : public boost::intrusive::list_base_hook<> {
+ explicit IdleThread(Mutex* m)
+ : not_empty(m) {}
+
+ // Condition variable for "queue is not empty". Waiters wake up when a new
+ // task is queued.
+ ConditionVariable not_empty;
+
+ DISALLOW_COPY_AND_ASSIGN(IdleThread);
+ };
+ boost::intrusive::list<IdleThread> idle_threads_; // NOLINT(build/include_what_you_use)
+
+ // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
+ std::unique_ptr<ThreadPoolToken> tokenless_;
+
+ // Metrics for the entire thread pool.
+ const ThreadPoolMetrics metrics_;
+
+ const char* queue_time_trace_metric_name_;
+ const char* run_wall_time_trace_metric_name_;
+ const char* run_cpu_time_trace_metric_name_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPool);
+};
+
+// Entry point for token-based task submission and blocking for a particular
+// thread pool. Tokens can only be created via ThreadPool::NewToken().
+//
+// All functions are thread-safe. Mutable members are protected via the
+// ThreadPool's lock.
+class ThreadPoolToken {
+ public:
+ // Destroys the token.
+ //
+ // May be called on a token with outstanding tasks, as Shutdown() will be
+ // called first to take care of them.
+ ~ThreadPoolToken();
+
+ // Submits a function using the kudu Closure system.
+ Status SubmitClosure(Closure c) WARN_UNUSED_RESULT;
+
+ // Submits a function bound using boost::bind(&FuncName, args...).
+ Status SubmitFunc(boost::function<void()> f) WARN_UNUSED_RESULT;
+
+ // Submits a Runnable class.
+ Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT;
+
+ // Marks the token as unusable for future submissions. Any queued tasks not
+ // yet running are destroyed. If tasks are in flight, Shutdown() will wait
+ // on their completion before returning.
+ void Shutdown();
+
+ // Waits until all the tasks submitted via this token are completed.
+ void Wait();
+
+ // Waits for all submissions using this token are complete, or until 'until'
+ // time is reached.
+ //
+ // Returns true if all submissions are complete, false otherwise.
+ bool WaitUntil(const MonoTime& until);
+
+ // Waits for all submissions using this token are complete, or until 'delta'
+ // time elapses.
+ //
+ // Returns true if all submissions are complete, false otherwise.
+ bool WaitFor(const MonoDelta& delta);
+
+ private:
+ // All possible token states. Legal state transitions:
+ // IDLE -> RUNNING: task is submitted via token
+ // IDLE -> QUIESCED: token or pool is shut down
+ // RUNNING -> IDLE: worker thread finishes executing a task and
+ // there are no more tasks queued to the token
+ // RUNNING -> QUIESCING: token or pool is shut down while worker thread
+ // is executing a task
+ // RUNNING -> QUIESCED: token or pool is shut down
+ // QUIESCING -> QUIESCED: worker thread finishes executing a task
+ // belonging to a shut down token or pool
+ enum class State {
+ // Token has no queued tasks.
+ IDLE,
+
+ // A worker thread is running one of the token's previously queued tasks.
+ RUNNING,
+
+ // No new tasks may be submitted to the token. A worker thread is still
+ // running a previously queued task.
+ QUIESCING,
+
+ // No new tasks may be submitted to the token. There are no active tasks
+ // either. At this state, the token may only be destroyed.
+ QUIESCED,
+ };
+
+ // Writes a textual representation of the token state in 's' to 'o'.
+ friend std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s);
+
+ friend class ThreadPool;
+
+ // Returns a textual representation of 's' suitable for debugging.
+ static const char* StateToString(State s);
+
+ // Constructs a new token.
+ //
+ // The token may not outlive its thread pool ('pool').
+ ThreadPoolToken(ThreadPool* pool,
+ ThreadPool::ExecutionMode mode,
+ ThreadPoolMetrics metrics);
+
+ // Changes this token's state to 'new_state' taking actions as needed.
+ void Transition(State new_state);
+
+ // Returns true if this token has a task queued and ready to run, or if a
+ // task belonging to this token is already running.
+ bool IsActive() const {
+ return state_ == State::RUNNING ||
+ state_ == State::QUIESCING;
+ }
+
+ // Returns true if new tasks may be submitted to this token.
+ bool MaySubmitNewTasks() const {
+ return state_ != State::QUIESCING &&
+ state_ != State::QUIESCED;
+ }
+
+ State state() const { return state_; }
+ ThreadPool::ExecutionMode mode() const { return mode_; }
+
+ // Token's configured execution mode.
+ const ThreadPool::ExecutionMode mode_;
+
+ // Metrics for just this token.
+ const ThreadPoolMetrics metrics_;
+
+ // Pointer to the token's thread pool.
+ ThreadPool* pool_;
+
+ // Token state machine.
+ State state_;
+
+ // Queued client tasks.
+ std::deque<ThreadPool::Task> entries_;
+
+ // Condition variable for "token is idle". Waiters wake up when the token
+ // transitions to IDLE or QUIESCED.
+ ConditionVariable not_running_cond_;
+
+ // Number of worker threads currently executing tasks belonging to this
+ // token.
+ int active_threads_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken);
+};
+
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/throttler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/throttler-test.cc b/be/src/kudu/util/throttler-test.cc
new file mode 100644
index 0000000..ff97eb5
--- /dev/null
+++ b/be/src/kudu/util/throttler-test.cc
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/throttler.h"
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+class ThrottlerTest : public KuduTest {
+};
+
+TEST_F(ThrottlerTest, TestOpThrottle) {
+ // Check operation rate throttling
+ MonoTime now = MonoTime::Now();
+ Throttler t0(now, 1000, 1000*1000, 1);
+ // Fill up bucket
+ now += MonoDelta::FromMilliseconds(2000);
+ // Check throttle behavior for 1 second.
+ for (int p = 0; p < 10; p++) {
+ for (int i = 0; i < 100; i++) {
+ ASSERT_TRUE(t0.Take(now, 1, 1));
+ }
+ ASSERT_FALSE(t0.Take(now, 1, 1));
+ now += MonoDelta::FromMilliseconds(100);
+ }
+}
+
+TEST_F(ThrottlerTest, TestIOThrottle) {
+ // Check operation rate throttling
+ MonoTime now = MonoTime::Now();
+ Throttler t0(now, 50000, 1000*1000, 1);
+ // Fill up bucket
+ now += MonoDelta::FromMilliseconds(2000);
+ // Check throttle behavior for 1 second.
+ for (int p = 0; p < 10; p++) {
+ for (int i = 0; i < 100; i++) {
+ ASSERT_TRUE(t0.Take(now, 1, 1000));
+ }
+ ASSERT_FALSE(t0.Take(now, 1, 1000));
+ now += MonoDelta::FromMilliseconds(100);
+ }
+}
+
+TEST_F(ThrottlerTest, TestBurst) {
+ // Check IO rate throttling
+ MonoTime now = MonoTime::Now();
+ Throttler t0(now, 2000, 1000*1000, 5);
+ // Fill up bucket
+ now += MonoDelta::FromMilliseconds(2000);
+ for (int i = 0; i < 100; i++) {
+ now += MonoDelta::FromMilliseconds(1);
+ ASSERT_TRUE(t0.Take(now, 1, 5000));
+ }
+ ASSERT_TRUE(t0.Take(now, 1, 100000));
+ ASSERT_FALSE(t0.Take(now, 1, 1));
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/throttler.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/throttler.cc b/be/src/kudu/util/throttler.cc
new file mode 100644
index 0000000..69e0f99
--- /dev/null
+++ b/be/src/kudu/util/throttler.cc
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/throttler.h"
+
+#include <algorithm>
+#include <mutex>
+
+namespace kudu {
+
+Throttler::Throttler(MonoTime now, uint64_t op_rate, uint64_t byte_rate, double burst_factor) :
+ next_refill_(now) {
+ op_refill_ = op_rate / (MonoTime::kMicrosecondsPerSecond / kRefillPeriodMicros);
+ op_token_ = 0;
+ op_token_max_ = static_cast<uint64_t>(op_refill_ * burst_factor);
+ byte_refill_ = byte_rate / (MonoTime::kMicrosecondsPerSecond / kRefillPeriodMicros);
+ byte_token_ = 0;
+ byte_token_max_ = static_cast<uint64_t>(byte_refill_ * burst_factor);
+}
+
+bool Throttler::Take(MonoTime now, uint64_t op, uint64_t byte) {
+ if (op_refill_ == 0 && byte_refill_ == 0) {
+ return true;
+ }
+ std::lock_guard<simple_spinlock> lock(lock_);
+ Refill(now);
+ if ((op_refill_ == 0 || op <= op_token_) &&
+ (byte_refill_ == 0 || byte <= byte_token_)) {
+ if (op_refill_ > 0) {
+ op_token_ -= op;
+ }
+ if (byte_refill_ > 0) {
+ byte_token_ -= byte;
+ }
+ return true;
+ }
+ return false;
+}
+
+void Throttler::Refill(MonoTime now) {
+ int64_t d = (now - next_refill_).ToMicroseconds();
+ if (d < 0) {
+ return;
+ }
+ uint64_t num_period = d / kRefillPeriodMicros + 1;
+ next_refill_ += MonoDelta::FromMicroseconds(num_period * kRefillPeriodMicros);
+ op_token_ += num_period * op_refill_;
+ op_token_ = std::min(op_token_, op_token_max_);
+ byte_token_ += num_period * byte_refill_;
+ byte_token_ = std::min(byte_token_, byte_token_max_);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/throttler.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/throttler.h b/be/src/kudu/util/throttler.h
new file mode 100644
index 0000000..5594091
--- /dev/null
+++ b/be/src/kudu/util/throttler.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_THROTTLER_H
+#define KUDU_UTIL_THROTTLER_H
+
+#include <cstdint>
+
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+
+// A throttler to throttle both operation/s and IO byte/s.
+class Throttler {
+ public:
+ // Refill period is 100ms.
+ enum {
+ kRefillPeriodMicros = 100000
+ };
+
+ // Construct a throttler with max operation per second, max IO bytes per second
+ // and burst factor (burst_rate = rate * burst), burst rate means maximum
+ // throughput within one refill period.
+ // Set op_per_sec to 0 to disable operation throttling.
+ // Set byte_per_sec to 0 to disable IO bytes throttling.
+ Throttler(MonoTime now, uint64_t op_per_sec, uint64_t byte_per_sec, double burst_factor);
+
+ // Throttle an "operation group" by taking 'op' operation tokens and 'byte' byte tokens.
+ // Return true if there are enough tokens, and operation is allowed.
+ // Return false if there are not enough tokens, and operation is throttled.
+ bool Take(MonoTime now, uint64_t op, uint64_t byte);
+
+ private:
+ void Refill(MonoTime now);
+
+ MonoTime next_refill_;
+ uint64_t op_refill_;
+ uint64_t op_token_;
+ uint64_t op_token_max_;
+ uint64_t byte_refill_;
+ uint64_t byte_token_;
+ uint64_t byte_token_max_;
+ simple_spinlock lock_;
+};
+
+} // namespace kudu
+
+#endif