You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:15 UTC

[03/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadpool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadpool-test.cc b/be/src/kudu/util/threadpool-test.cc
new file mode 100644
index 0000000..23fc45c
--- /dev/null
+++ b/be/src/kudu/util/threadpool-test.cc
@@ -0,0 +1,941 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <unistd.h>
+
+#include <atomic>
+#include <cstdint>
+#include <iterator>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <boost/bind.hpp> // IWYU pragma: keep
+#include <boost/smart_ptr/shared_ptr.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/bind_helpers.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/promise.h"
+#include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+using std::atomic;
+using std::shared_ptr;
+using std::string;
+using std::thread;
+using std::unique_ptr;
+using std::vector;
+
+using strings::Substitute;
+
+DECLARE_int32(thread_inject_start_latency_ms);
+
+namespace kudu {
+
+static const char* kDefaultPoolName = "test";
+
+class ThreadPoolTest : public KuduTest {
+ public:
+
+  virtual void SetUp() override {
+    KuduTest::SetUp();
+    ASSERT_OK(ThreadPoolBuilder(kDefaultPoolName).Build(&pool_));
+  }
+
+  Status RebuildPoolWithBuilder(const ThreadPoolBuilder& builder) {
+    return builder.Build(&pool_);
+  }
+
+  Status RebuildPoolWithMinMax(int min_threads, int max_threads) {
+    return ThreadPoolBuilder(kDefaultPoolName)
+        .set_min_threads(min_threads)
+        .set_max_threads(max_threads)
+        .Build(&pool_);
+  }
+
+ protected:
+  gscoped_ptr<ThreadPool> pool_;
+};
+
+TEST_F(ThreadPoolTest, TestNoTaskOpenClose) {
+  ASSERT_OK(RebuildPoolWithMinMax(4, 4));
+  pool_->Shutdown();
+}
+
+static void SimpleTaskMethod(int n, Atomic32 *counter) {
+  while (n--) {
+    base::subtle::NoBarrier_AtomicIncrement(counter, 1);
+    boost::detail::yield(n);
+  }
+}
+
+class SimpleTask : public Runnable {
+ public:
+  SimpleTask(int n, Atomic32 *counter)
+    : n_(n), counter_(counter) {
+  }
+
+  void Run() OVERRIDE {
+    SimpleTaskMethod(n_, counter_);
+  }
+
+ private:
+  int n_;
+  Atomic32 *counter_;
+};
+
+TEST_F(ThreadPoolTest, TestSimpleTasks) {
+  ASSERT_OK(RebuildPoolWithMinMax(4, 4));
+
+  Atomic32 counter(0);
+  std::shared_ptr<Runnable> task(new SimpleTask(15, &counter));
+
+  ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 10, &counter)));
+  ASSERT_OK(pool_->Submit(task));
+  ASSERT_OK(pool_->SubmitFunc(boost::bind(&SimpleTaskMethod, 20, &counter)));
+  ASSERT_OK(pool_->Submit(task));
+  ASSERT_OK(pool_->SubmitClosure(Bind(&SimpleTaskMethod, 123, &counter)));
+  pool_->Wait();
+  ASSERT_EQ(10 + 15 + 20 + 15 + 123, base::subtle::NoBarrier_Load(&counter));
+  pool_->Shutdown();
+}
+
+static void IssueTraceStatement() {
+  TRACE("hello from task");
+}
+
+// Test that the thread-local trace is propagated to tasks
+// submitted to the threadpool.
+TEST_F(ThreadPoolTest, TestTracePropagation) {
+  ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+
+  scoped_refptr<Trace> t(new Trace);
+  {
+    ADOPT_TRACE(t.get());
+    ASSERT_OK(pool_->SubmitFunc(&IssueTraceStatement));
+  }
+  pool_->Wait();
+  ASSERT_STR_CONTAINS(t->DumpToString(), "hello from task");
+}
+
+TEST_F(ThreadPoolTest, TestSubmitAfterShutdown) {
+  ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+  pool_->Shutdown();
+  Status s = pool_->SubmitFunc(&IssueTraceStatement);
+  ASSERT_EQ("Service unavailable: The pool has been shut down.",
+            s.ToString());
+}
+
+class SlowTask : public Runnable {
+ public:
+  explicit SlowTask(CountDownLatch* latch)
+    : latch_(latch) {
+  }
+
+  void Run() OVERRIDE {
+    latch_->Wait();
+  }
+
+  static shared_ptr<Runnable> NewSlowTask(CountDownLatch* latch) {
+    return std::make_shared<SlowTask>(latch);
+  }
+
+ private:
+  CountDownLatch* latch_;
+};
+
+TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(0)
+                                   .set_max_threads(3)
+                                   .set_idle_timeout(MonoDelta::FromMilliseconds(1))));
+
+  // There are no threads to start with.
+  ASSERT_TRUE(pool_->num_threads() == 0);
+  // We get up to 3 threads when submitting work.
+  CountDownLatch latch(1);
+  SCOPED_CLEANUP({
+    latch.CountDown();
+  });
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_EQ(2, pool_->num_threads());
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_EQ(3, pool_->num_threads());
+  // The 4th piece of work gets queued.
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_EQ(3, pool_->num_threads());
+  // Finish all work
+  latch.CountDown();
+  pool_->Wait();
+  ASSERT_EQ(0, pool_->active_threads_);
+  pool_->Shutdown();
+  ASSERT_EQ(0, pool_->num_threads());
+}
+
+TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) {
+  // By default a threadpool's max_threads is set to the number of CPUs, so
+  // this test submits more tasks than that to ensure that the number of CPUs
+  // isn't some kind of upper bound.
+  const int kNumCPUs = base::NumCPUs();
+
+  // Build a threadpool with no limit on the maximum number of threads.
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_threads(std::numeric_limits<int>::max())));
+  CountDownLatch latch(1);
+  auto cleanup_latch = MakeScopedCleanup([&]() {
+    latch.CountDown();
+  });
+
+  // Submit tokenless tasks. Each should create a new thread.
+  for (int i = 0; i < kNumCPUs * 2; i++) {
+    ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  }
+  ASSERT_EQ((kNumCPUs * 2), pool_->num_threads());
+
+  // Submit tasks on two tokens. Only two threads should be created.
+  unique_ptr<ThreadPoolToken> t1 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+  unique_ptr<ThreadPoolToken> t2 = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+  for (int i = 0; i < kNumCPUs * 2; i++) {
+    ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get();
+    ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+  }
+  ASSERT_EQ((kNumCPUs * 2) + 2, pool_->num_threads());
+
+  // Submit more tokenless tasks. Each should create a new thread.
+  for (int i = 0; i < kNumCPUs; i++) {
+    ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  }
+  ASSERT_EQ((kNumCPUs * 3) + 2, pool_->num_threads());
+
+  latch.CountDown();
+  pool_->Wait();
+  pool_->Shutdown();
+}
+
+// Regression test for a bug where a task is submitted exactly
+// as a thread is about to exit. Previously this could hang forever.
+TEST_F(ThreadPoolTest, TestRace) {
+  alarm(60);
+  auto cleanup = MakeScopedCleanup([]() {
+    alarm(0); // Disable alarm on test exit.
+  });
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(0)
+                                   .set_max_threads(1)
+                                   .set_idle_timeout(MonoDelta::FromMicroseconds(1))));
+
+  for (int i = 0; i < 500; i++) {
+    CountDownLatch l(1);
+    ASSERT_OK(pool_->SubmitFunc(boost::bind(&CountDownLatch::CountDown, &l)));
+    l.Wait();
+    // Sleeping a different amount in each iteration makes it more likely to hit
+    // the bug.
+    SleepFor(MonoDelta::FromMicroseconds(i));
+  }
+}
+
+TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(1)
+                                   .set_max_threads(4)
+                                   .set_idle_timeout(MonoDelta::FromMilliseconds(1))));
+
+  // There is 1 thread to start with.
+  ASSERT_EQ(1, pool_->num_threads());
+  // We get up to 4 threads when submitting work.
+  CountDownLatch latch(1);
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_EQ(1, pool_->num_threads());
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_EQ(2, pool_->num_threads());
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_EQ(3, pool_->num_threads());
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_EQ(4, pool_->num_threads());
+  // The 5th piece of work gets queued.
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_EQ(4, pool_->num_threads());
+  // Finish all work
+  latch.CountDown();
+  pool_->Wait();
+  ASSERT_EQ(0, pool_->active_threads_);
+  pool_->Shutdown();
+  ASSERT_EQ(0, pool_->num_threads());
+}
+
+TEST_F(ThreadPoolTest, TestMaxQueueSize) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(1)
+                                   .set_max_threads(1)
+                                   .set_max_queue_size(1)));
+
+  CountDownLatch latch(1);
+  // We will be able to submit two tasks: one for max_threads == 1 and one for
+  // max_queue_size == 1.
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  Status s = pool_->Submit(SlowTask::NewSlowTask(&latch));
+  CHECK(s.IsServiceUnavailable()) << "Expected failure due to queue blowout:" << s.ToString();
+  latch.CountDown();
+  pool_->Wait();
+  pool_->Shutdown();
+}
+
+// Test that when we specify a zero-sized queue, the maximum number of threads
+// running is used for enforcement.
+TEST_F(ThreadPoolTest, TestZeroQueueSize) {
+  const int kMaxThreads = 4;
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_queue_size(0)
+                                   .set_max_threads(kMaxThreads)));
+
+  CountDownLatch latch(1);
+  for (int i = 0; i < kMaxThreads; i++) {
+    ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  }
+  Status s = pool_->Submit(SlowTask::NewSlowTask(&latch));
+  ASSERT_TRUE(s.IsServiceUnavailable()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Thread pool is at capacity");
+  latch.CountDown();
+  pool_->Wait();
+  pool_->Shutdown();
+}
+
+// Regression test for KUDU-2187:
+//
+// If a threadpool thread is slow to start up, it shouldn't block progress of
+// other tasks on the same pool.
+TEST_F(ThreadPoolTest, TestSlowThreadStart) {
+  // Start a pool of threads from which we'll submit tasks.
+  gscoped_ptr<ThreadPool> submitter_pool;
+  ASSERT_OK(ThreadPoolBuilder("submitter")
+            .set_min_threads(5)
+            .set_max_threads(5)
+            .Build(&submitter_pool));
+
+  // Start the actual test pool, which starts with one thread
+  // but will start a second one on-demand.
+  ASSERT_OK(RebuildPoolWithMinMax(1, 2));
+  // Ensure that the second thread will take a long time to start.
+  FLAGS_thread_inject_start_latency_ms = 3000;
+
+  // Now submit 10 tasks to the 'submitter' pool, each of which
+  // submits a single task to 'pool_'. The 'pool_' task sleeps
+  // for 10ms.
+  //
+  // Because the 'submitter' tasks submit faster than they can be
+  // processed on a single thread (due to the sleep), we expect that
+  // this will trigger 'pool_' to start up its second worker thread.
+  // The thread startup will have some latency injected.
+  //
+  // We expect that the thread startup will block only one of the
+  // tasks in the 'submitter' pool after it submits its task. Other
+  // tasks will continue to be processed by the other (already-running)
+  // thread on 'pool_'.
+  std::atomic<int32_t> total_queue_time_ms(0);
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(submitter_pool->SubmitFunc([&]() {
+          auto submit_time = MonoTime::Now();
+          CHECK_OK(pool_->SubmitFunc([&,submit_time]() {
+                auto queue_time = MonoTime::Now() - submit_time;
+                total_queue_time_ms += queue_time.ToMilliseconds();
+                SleepFor(MonoDelta::FromMilliseconds(10));
+              }));
+        }));
+  }
+  submitter_pool->Wait();
+  pool_->Wait();
+
+  // Since the total amount of work submitted was only 100ms, we expect
+  // that the performance would be equivalent to a single-threaded
+  // threadpool. So, we expect the total queue time to be approximately
+  // 0 + 10 + 20 ... + 80 + 90 = 450ms.
+  //
+  // If, instead, throughput had been blocked while starting threads,
+  // we'd get something closer to 18000ms (3000ms delay * 5 submitter threads).
+  ASSERT_GE(total_queue_time_ms, 400);
+  ASSERT_LE(total_queue_time_ms, 10000);
+}
+
+// Test that setting a promise from another thread yields
+// a value on the current thread.
+TEST_F(ThreadPoolTest, TestPromises) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(1)
+                                   .set_max_threads(1)
+                                   .set_max_queue_size(1)));
+
+  Promise<int> my_promise;
+  ASSERT_OK(pool_->SubmitClosure(
+                     Bind(&Promise<int>::Set, Unretained(&my_promise), 5)));
+  ASSERT_EQ(5, my_promise.Get());
+  pool_->Shutdown();
+}
+
+METRIC_DEFINE_entity(test_entity);
+METRIC_DEFINE_histogram(test_entity, queue_length, "queue length",
+                        MetricUnit::kTasks, "queue length", 1000, 1);
+
+METRIC_DEFINE_histogram(test_entity, queue_time, "queue time",
+                        MetricUnit::kMicroseconds, "queue time", 1000000, 1);
+
+METRIC_DEFINE_histogram(test_entity, run_time, "run time",
+                        MetricUnit::kMicroseconds, "run time", 1000, 1);
+
+TEST_F(ThreadPoolTest, TestMetrics) {
+  MetricRegistry registry;
+  vector<ThreadPoolMetrics> all_metrics;
+  for (int i = 0; i < 3; i++) {
+    scoped_refptr<MetricEntity> entity = METRIC_ENTITY_test_entity.Instantiate(
+        &registry, Substitute("test $0", i));
+    all_metrics.emplace_back(ThreadPoolMetrics{
+      METRIC_queue_length.Instantiate(entity),
+      METRIC_queue_time.Instantiate(entity),
+      METRIC_run_time.Instantiate(entity)
+    });
+  }
+
+  // Enable metrics for the thread pool.
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(1)
+                                   .set_max_threads(1)
+                                   .set_metrics(all_metrics[0])));
+
+  unique_ptr<ThreadPoolToken> t1 = pool_->NewTokenWithMetrics(
+      ThreadPool::ExecutionMode::SERIAL, all_metrics[1]);
+  unique_ptr<ThreadPoolToken> t2 = pool_->NewTokenWithMetrics(
+      ThreadPool::ExecutionMode::SERIAL, all_metrics[2]);
+
+  // Submit once to t1, twice to t2, and three times without a token.
+  ASSERT_OK(t1->SubmitFunc([](){}));
+  ASSERT_OK(t2->SubmitFunc([](){}));
+  ASSERT_OK(t2->SubmitFunc([](){}));
+  ASSERT_OK(pool_->SubmitFunc([](){}));
+  ASSERT_OK(pool_->SubmitFunc([](){}));
+  ASSERT_OK(pool_->SubmitFunc([](){}));
+  pool_->Wait();
+
+  // The total counts should reflect the number of submissions to each token.
+  ASSERT_EQ(1, all_metrics[1].queue_length_histogram->TotalCount());
+  ASSERT_EQ(1, all_metrics[1].queue_time_us_histogram->TotalCount());
+  ASSERT_EQ(1, all_metrics[1].run_time_us_histogram->TotalCount());
+  ASSERT_EQ(2, all_metrics[2].queue_length_histogram->TotalCount());
+  ASSERT_EQ(2, all_metrics[2].queue_time_us_histogram->TotalCount());
+  ASSERT_EQ(2, all_metrics[2].run_time_us_histogram->TotalCount());
+
+  // And the counts on the pool-wide metrics should reflect all submissions.
+  ASSERT_EQ(6, all_metrics[0].queue_length_histogram->TotalCount());
+  ASSERT_EQ(6, all_metrics[0].queue_time_us_histogram->TotalCount());
+  ASSERT_EQ(6, all_metrics[0].run_time_us_histogram->TotalCount());
+}
+
+// Test that a thread pool will crash if asked to run its own blocking
+// functions in a pool thread.
+//
+// In a multi-threaded application, TSAN is unsafe to use following a fork().
+// After a fork(), TSAN will:
+// 1. Disable verification, expecting an exec() soon anyway, and
+// 2. Die on future thread creation.
+// For some reason, this test triggers behavior #2. We could disable it with
+// the TSAN option die_after_fork=0, but this can (supposedly) lead to
+// deadlocks, so we'll disable the entire test instead.
+#ifndef THREAD_SANITIZER
+TEST_F(ThreadPoolTest, TestDeadlocks) {
+  const char* death_msg = "called pool function that would result in deadlock";
+  ASSERT_DEATH({
+    ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+    ASSERT_OK(pool_->SubmitClosure(
+        Bind(&ThreadPool::Shutdown, Unretained(pool_.get()))));
+    pool_->Wait();
+  }, death_msg);
+
+  ASSERT_DEATH({
+    ASSERT_OK(RebuildPoolWithMinMax(1, 1));
+    ASSERT_OK(pool_->SubmitClosure(
+        Bind(&ThreadPool::Wait, Unretained(pool_.get()))));
+    pool_->Wait();
+  }, death_msg);
+}
+#endif
+
+class SlowDestructorRunnable : public Runnable {
+ public:
+  void Run() override {}
+
+  virtual ~SlowDestructorRunnable() {
+    SleepFor(MonoDelta::FromMilliseconds(100));
+  }
+};
+
+// Test that if a tasks's destructor is slow, it doesn't cause serialization of the tasks
+// in the queue.
+TEST_F(ThreadPoolTest, TestSlowDestructor) {
+  ASSERT_OK(RebuildPoolWithMinMax(1, 20));
+  MonoTime start = MonoTime::Now();
+  for (int i = 0; i < 100; i++) {
+    shared_ptr<Runnable> task(new SlowDestructorRunnable());
+    ASSERT_OK(pool_->Submit(std::move(task)));
+  }
+  pool_->Wait();
+  ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5);
+}
+
+// For test cases that should run with both kinds of tokens.
+class ThreadPoolTestTokenTypes : public ThreadPoolTest,
+                                 public testing::WithParamInterface<ThreadPool::ExecutionMode> {};
+
+INSTANTIATE_TEST_CASE_P(Tokens, ThreadPoolTestTokenTypes,
+                        ::testing::Values(ThreadPool::ExecutionMode::SERIAL,
+                                          ThreadPool::ExecutionMode::CONCURRENT));
+
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitAndWait) {
+  unique_ptr<ThreadPoolToken> t = pool_->NewToken(GetParam());
+  int i = 0;
+  ASSERT_OK(t->SubmitFunc([&]() {
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    i++;
+  }));
+  t->Wait();
+  ASSERT_EQ(1, i);
+}
+
+TEST_F(ThreadPoolTest, TestTokenSubmitsProcessedSerially) {
+  unique_ptr<ThreadPoolToken> t = pool_->NewToken(ThreadPool::ExecutionMode::SERIAL);
+  Random r(SeedRandom());
+  string result;
+  for (char c = 'a'; c < 'f'; c++) {
+    // Sleep a little first so that there's a higher chance of out-of-order
+    // appends if the submissions did execute in parallel.
+    int sleep_ms = r.Next() % 5;
+    ASSERT_OK(t->SubmitFunc([&result, c, sleep_ms]() {
+      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+      result += c;
+    }));
+  }
+  t->Wait();
+  ASSERT_EQ("abcde", result);
+}
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitsProcessedConcurrently) {
+  const int kNumTokens = 5;
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_threads(kNumTokens)));
+  vector<unique_ptr<ThreadPoolToken>> tokens;
+
+  // A violation to the tested invariant would yield a deadlock, so let's set
+  // up an alarm to bail us out.
+  alarm(60);
+  SCOPED_CLEANUP({
+      alarm(0); // Disable alarm on test exit.
+  });
+  shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumTokens + 1);
+  for (int i = 0; i < kNumTokens; i++) {
+    tokens.emplace_back(pool_->NewToken(GetParam()));
+    ASSERT_OK(tokens.back()->SubmitFunc([b]() {
+      b->Wait();
+    }));
+  }
+
+  // This will deadlock if the above tasks weren't all running concurrently.
+  b->Wait();
+}
+
+TEST_F(ThreadPoolTest, TestTokenSubmitsNonSequential) {
+  const int kNumSubmissions = 5;
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_threads(kNumSubmissions)));
+
+  // A violation to the tested invariant would yield a deadlock, so let's set
+  // up an alarm to bail us out.
+  alarm(60);
+  SCOPED_CLEANUP({
+      alarm(0); // Disable alarm on test exit.
+  });
+  shared_ptr<Barrier> b = std::make_shared<Barrier>(kNumSubmissions + 1);
+  unique_ptr<ThreadPoolToken> t = pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
+  for (int i = 0; i < kNumSubmissions; i++) {
+    ASSERT_OK(t->SubmitFunc([b]() {
+      b->Wait();
+    }));
+  }
+
+  // This will deadlock if the above tasks weren't all running concurrently.
+  b->Wait();
+}
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_threads(4)));
+
+  unique_ptr<ThreadPoolToken> t1(pool_->NewToken(GetParam()));
+  unique_ptr<ThreadPoolToken> t2(pool_->NewToken(GetParam()));
+  CountDownLatch l1(1);
+  CountDownLatch l2(1);
+
+  // A violation to the tested invariant would yield a deadlock, so let's set
+  // up an alarm to bail us out.
+  alarm(60);
+  SCOPED_CLEANUP({
+      alarm(0); // Disable alarm on test exit.
+  });
+
+  for (int i = 0; i < 3; i++) {
+    ASSERT_OK(t1->SubmitFunc([&]() {
+      l1.Wait();
+    }));
+  }
+  for (int i = 0; i < 3; i++) {
+    ASSERT_OK(t2->SubmitFunc([&]() {
+      l2.Wait();
+    }));
+  }
+
+  // Unblock all of t1's tasks, but not t2's tasks.
+  l1.CountDown();
+
+  // If this also waited for t2's tasks, it would deadlock.
+  t1->Shutdown();
+
+  // We can no longer submit to t1 but we can still submit to t2.
+  ASSERT_TRUE(t1->SubmitFunc([](){}).IsServiceUnavailable());
+  ASSERT_OK(t2->SubmitFunc([](){}));
+
+  // Unblock t2's tasks.
+  l2.CountDown();
+  t2->Shutdown();
+}
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) {
+  const int kNumTokens = 3;
+  const int kNumSubmissions = 20;
+  Random r(SeedRandom());
+  vector<unique_ptr<ThreadPoolToken>> tokens;
+  for (int i = 0; i < kNumTokens; i++) {
+    tokens.emplace_back(pool_->NewToken(GetParam()));
+  }
+
+  atomic<int32_t> v(0);
+  for (int i = 0; i < kNumSubmissions; i++) {
+    // Sleep a little first to raise the likelihood of the test thread
+    // reaching Wait() before the submissions finish.
+    int sleep_ms = r.Next() % 5;
+
+    auto task = [&v, sleep_ms]() {
+      SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+      v++;
+    };
+
+    // Half of the submissions will be token-less, and half will use a token.
+    if (i % 2 == 0) {
+      ASSERT_OK(pool_->SubmitFunc(task));
+    } else {
+      int token_idx = r.Next() % tokens.size();
+      ASSERT_OK(tokens[token_idx]->SubmitFunc(task));
+    }
+  }
+  pool_->Wait();
+  ASSERT_EQ(kNumSubmissions, v);
+}
+
+TEST_F(ThreadPoolTest, TestFuzz) {
+  const int kNumOperations = 1000;
+  Random r(SeedRandom());
+  vector<unique_ptr<ThreadPoolToken>> tokens;
+
+  for (int i = 0; i < kNumOperations; i++) {
+    // Operation distribution:
+    //
+    // - Submit without a token: 40%
+    // - Submit with a randomly selected token: 35%
+    // - Allocate a new token: 10%
+    // - Wait on a randomly selected token: 7%
+    // - Shutdown a randomly selected token: 4%
+    // - Deallocate a randomly selected token: 2%
+    // - Wait for all submissions: 2%
+    int op = r.Next() % 100;
+    if (op < 40) {
+      // Submit without a token.
+      int sleep_ms = r.Next() % 5;
+      ASSERT_OK(pool_->SubmitFunc([sleep_ms]() {
+        // Sleep a little first to increase task overlap.
+        SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+      }));
+    } else if (op < 75) {
+      // Submit with a randomly selected token.
+      if (tokens.empty()) {
+        continue;
+      }
+      int sleep_ms = r.Next() % 5;
+      int token_idx = r.Next() % tokens.size();
+      Status s = tokens[token_idx]->SubmitFunc([sleep_ms]() {
+        // Sleep a little first to increase task overlap.
+        SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+      });
+      ASSERT_TRUE(s.ok() || s.IsServiceUnavailable());
+    } else if (op < 85) {
+      // Allocate a token with a randomly selected policy.
+      ThreadPool::ExecutionMode mode = r.Next() % 2 ?
+          ThreadPool::ExecutionMode::SERIAL :
+          ThreadPool::ExecutionMode::CONCURRENT;
+      tokens.emplace_back(pool_->NewToken(mode));
+    } else if (op < 92) {
+      // Wait on a randomly selected token.
+      if (tokens.empty()) {
+        continue;
+      }
+      int token_idx = r.Next() % tokens.size();
+      tokens[token_idx]->Wait();
+    } else if (op < 96) {
+      // Shutdown a randomly selected token.
+      if (tokens.empty()) {
+        continue;
+      }
+      int token_idx = r.Next() % tokens.size();
+      tokens[token_idx]->Shutdown();
+    } else if (op < 98) {
+      // Deallocate a randomly selected token.
+      if (tokens.empty()) {
+        continue;
+      }
+      auto it = tokens.begin();
+      int token_idx = r.Next() % tokens.size();
+      std::advance(it, token_idx);
+      tokens.erase(it);
+    } else {
+      // Wait on everything.
+      ASSERT_LT(op, 100);
+      ASSERT_GE(op, 98);
+      pool_->Wait();
+    }
+  }
+
+  // Some test runs will shut down the pool before the tokens, and some won't.
+  // Either way should be safe.
+  if (r.Next() % 2 == 0) {
+    pool_->Shutdown();
+  }
+}
+
+TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) {
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_min_threads(1)
+                                   .set_max_threads(1)
+                                   .set_max_queue_size(1)));
+
+  CountDownLatch latch(1);
+  unique_ptr<ThreadPoolToken> t = pool_->NewToken(GetParam());
+  SCOPED_CLEANUP({
+    latch.CountDown();
+  });
+  // We will be able to submit two tasks: one for max_threads == 1 and one for
+  // max_queue_size == 1.
+  ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+  ASSERT_OK(t->Submit(SlowTask::NewSlowTask(&latch)));
+  Status s = t->Submit(SlowTask::NewSlowTask(&latch));
+  ASSERT_TRUE(s.IsServiceUnavailable());
+}
+
+TEST_F(ThreadPoolTest, TestTokenConcurrency) {
+  const int kNumTokens = 20;
+  const int kTestRuntimeSecs = 1;
+  const int kCycleThreads = 2;
+  const int kShutdownThreads = 2;
+  const int kWaitThreads = 2;
+  const int kSubmitThreads = 8;
+
+  vector<shared_ptr<ThreadPoolToken>> tokens;
+  Random rng(SeedRandom());
+
+  // Protects 'tokens' and 'rng'.
+  simple_spinlock lock;
+
+  // Fetch a token from 'tokens' at random.
+  auto GetRandomToken = [&]() -> shared_ptr<ThreadPoolToken> {
+    std::lock_guard<simple_spinlock> l(lock);
+    int idx = rng.Uniform(kNumTokens);
+    return tokens[idx];
+  };
+
+  // Preallocate all of the tokens.
+  for (int i = 0; i < kNumTokens; i++) {
+    ThreadPool::ExecutionMode mode;
+    {
+      std::lock_guard<simple_spinlock> l(lock);
+      mode = rng.Next() % 2 ?
+          ThreadPool::ExecutionMode::SERIAL :
+          ThreadPool::ExecutionMode::CONCURRENT;
+    }
+    tokens.emplace_back(pool_->NewToken(mode).release());
+  }
+
+  atomic<int64_t> total_num_tokens_cycled(0);
+  atomic<int64_t> total_num_tokens_shutdown(0);
+  atomic<int64_t> total_num_tokens_waited(0);
+  atomic<int64_t> total_num_tokens_submitted(0);
+
+  CountDownLatch latch(1);
+  vector<thread> threads;
+
+  for (int i = 0; i < kCycleThreads; i++) {
+    // Pick a token at random and replace it.
+    //
+    // The replaced token is only destroyed when the last ref is dropped,
+    // possibly by another thread.
+    threads.emplace_back([&]() {
+      int num_tokens_cycled = 0;
+      while (latch.count()) {
+        {
+          std::lock_guard<simple_spinlock> l(lock);
+          int idx = rng.Uniform(kNumTokens);
+          ThreadPool::ExecutionMode mode = rng.Next() % 2 ?
+              ThreadPool::ExecutionMode::SERIAL :
+              ThreadPool::ExecutionMode::CONCURRENT;
+          tokens[idx] = shared_ptr<ThreadPoolToken>(pool_->NewToken(mode).release());
+        }
+        num_tokens_cycled++;
+
+        // Sleep a bit, otherwise this thread outpaces the other threads and
+        // nothing interesting happens to most tokens.
+        SleepFor(MonoDelta::FromMicroseconds(10));
+      }
+      total_num_tokens_cycled += num_tokens_cycled;
+    });
+  }
+
+  for (int i = 0; i < kShutdownThreads; i++) {
+    // Pick a token at random and shut it down. Submitting a task to a shut
+    // down token will return a ServiceUnavailable error.
+    threads.emplace_back([&]() {
+      int num_tokens_shutdown = 0;
+      while (latch.count()) {
+        GetRandomToken()->Shutdown();
+        num_tokens_shutdown++;
+      }
+      total_num_tokens_shutdown += num_tokens_shutdown;
+    });
+  }
+
+  for (int i = 0; i < kWaitThreads; i++) {
+    // Pick a token at random and wait for any outstanding tasks.
+    threads.emplace_back([&]() {
+      int num_tokens_waited  = 0;
+      while (latch.count()) {
+        GetRandomToken()->Wait();
+        num_tokens_waited++;
+      }
+      total_num_tokens_waited += num_tokens_waited;
+    });
+  }
+
+  for (int i = 0; i < kSubmitThreads; i++) {
+    // Pick a token at random and submit a task to it.
+    threads.emplace_back([&]() {
+      int num_tokens_submitted = 0;
+      Random rng(SeedRandom());
+      while (latch.count()) {
+        int sleep_ms = rng.Next() % 5;
+        Status s = GetRandomToken()->SubmitFunc([sleep_ms]() {
+          // Sleep a little first so that tasks are running during other events.
+          SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+        });
+        CHECK(s.ok() || s.IsServiceUnavailable());
+        num_tokens_submitted++;
+      }
+      total_num_tokens_submitted += num_tokens_submitted;
+    });
+  }
+
+  SleepFor(MonoDelta::FromSeconds(kTestRuntimeSecs));
+  latch.CountDown();
+  for (auto& t : threads) {
+    t.join();
+  }
+
+  LOG(INFO) << Substitute("Tokens cycled ($0 threads): $1",
+                          kCycleThreads, total_num_tokens_cycled.load());
+  LOG(INFO) << Substitute("Tokens shutdown ($0 threads): $1",
+                          kShutdownThreads, total_num_tokens_shutdown.load());
+  LOG(INFO) << Substitute("Tokens waited ($0 threads): $1",
+                          kWaitThreads, total_num_tokens_waited.load());
+  LOG(INFO) << Substitute("Tokens submitted ($0 threads): $1",
+                          kSubmitThreads, total_num_tokens_submitted.load());
+}
+
+TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) {
+  const int kNumThreads = 10;
+
+  // Test with a pool that allows for kNumThreads concurrent threads.
+  ASSERT_OK(RebuildPoolWithBuilder(ThreadPoolBuilder(kDefaultPoolName)
+                                   .set_max_threads(kNumThreads)));
+
+  // Submit kNumThreads slow tasks and unblock them, in order to produce
+  // kNumThreads worker threads.
+  CountDownLatch latch(1);
+  SCOPED_CLEANUP({
+    latch.CountDown();
+  });
+  for (int i = 0; i < kNumThreads; i++) {
+    ASSERT_OK(pool_->Submit(SlowTask::NewSlowTask(&latch)));
+  }
+  ASSERT_EQ(kNumThreads, pool_->num_threads());
+  latch.CountDown();
+  pool_->Wait();
+
+  // The kNumThreads threads are idle and waiting for the idle timeout.
+
+  // Submit a slow trickle of lightning fast tasks.
+  //
+  // If the threads are woken up in FIFO order, this trickle is enough to
+  // prevent all of them from idling and the AssertEventually will time out.
+  //
+  // If LIFO order is used, the same thread will be reused for each task and
+  // the other threads will eventually time out.
+  AssertEventually([&]() {
+    ASSERT_OK(pool_->SubmitFunc([](){}));
+    SleepFor(MonoDelta::FromMilliseconds(10));
+    ASSERT_EQ(1, pool_->num_threads());
+  }, MonoDelta::FromSeconds(10), AssertBackoff::NONE);
+  NO_PENDING_FATALS();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadpool.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadpool.cc b/be/src/kudu/util/threadpool.cc
new file mode 100644
index 0000000..23dda3d
--- /dev/null
+++ b/be/src/kudu/util/threadpool.cc
@@ -0,0 +1,766 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/threadpool.h"
+
+#include <cstdint>
+#include <deque>
+#include <limits>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+
+#include <boost/function.hpp> // IWYU pragma: keep
+#include <glog/logging.h>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/trace.h"
+#include "kudu/util/trace_metrics.h"
+
+namespace kudu {
+
+using std::deque;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+////////////////////////////////////////////////////////
+// FunctionRunnable
+////////////////////////////////////////////////////////
+
+class FunctionRunnable : public Runnable {
+ public:
+  explicit FunctionRunnable(boost::function<void()> func) : func_(std::move(func)) {}
+
+  void Run() OVERRIDE {
+    func_();
+  }
+
+ private:
+  boost::function<void()> func_;
+};
+
+////////////////////////////////////////////////////////
+// ClosureRunnable
+////////////////////////////////////////////////////////
+
+class ClosureRunnable : public Runnable {
+ public:
+  explicit ClosureRunnable(Closure cl) : cl_(std::move(cl)) {}
+
+  void Run() OVERRIDE {
+    cl_.Run();
+  }
+
+ private:
+  Closure cl_;
+};
+
+////////////////////////////////////////////////////////
+// ThreadPoolBuilder
+////////////////////////////////////////////////////////
+
+ThreadPoolBuilder::ThreadPoolBuilder(string name)
+    : name_(std::move(name)),
+      min_threads_(0),
+      max_threads_(base::NumCPUs()),
+      max_queue_size_(std::numeric_limits<int>::max()),
+      idle_timeout_(MonoDelta::FromMilliseconds(500)) {}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_trace_metric_prefix(const string& prefix) {
+  trace_metric_prefix_ = prefix;
+  return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) {
+  CHECK_GE(min_threads, 0);
+  min_threads_ = min_threads;
+  return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) {
+  CHECK_GT(max_threads, 0);
+  max_threads_ = max_threads;
+  return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
+  max_queue_size_ = max_queue_size;
+  return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_timeout) {
+  idle_timeout_ = idle_timeout;
+  return *this;
+}
+
+ThreadPoolBuilder& ThreadPoolBuilder::set_metrics(ThreadPoolMetrics metrics) {
+  metrics_ = std::move(metrics);
+  return *this;
+}
+
+Status ThreadPoolBuilder::Build(gscoped_ptr<ThreadPool>* pool) const {
+  pool->reset(new ThreadPool(*this));
+  RETURN_NOT_OK((*pool)->Init());
+  return Status::OK();
+}
+
+////////////////////////////////////////////////////////
+// ThreadPoolToken
+////////////////////////////////////////////////////////
+
+ThreadPoolToken::ThreadPoolToken(ThreadPool* pool,
+                                 ThreadPool::ExecutionMode mode,
+                                 ThreadPoolMetrics metrics)
+    : mode_(mode),
+      metrics_(std::move(metrics)),
+      pool_(pool),
+      state_(State::IDLE),
+      not_running_cond_(&pool->lock_),
+      active_threads_(0) {
+}
+
+ThreadPoolToken::~ThreadPoolToken() {
+  Shutdown();
+  pool_->ReleaseToken(this);
+}
+
+Status ThreadPoolToken::SubmitClosure(Closure c) {
+  return Submit(std::make_shared<ClosureRunnable>(std::move(c)));
+}
+
+Status ThreadPoolToken::SubmitFunc(boost::function<void()> f) {
+  return Submit(std::make_shared<FunctionRunnable>(std::move(f)));
+}
+
+Status ThreadPoolToken::Submit(shared_ptr<Runnable> r) {
+  return pool_->DoSubmit(std::move(r), this);
+}
+
+void ThreadPoolToken::Shutdown() {
+  MutexLock unique_lock(pool_->lock_);
+  pool_->CheckNotPoolThreadUnlocked();
+
+  // Clear the queue under the lock, but defer the releasing of the tasks
+  // outside the lock, in case there are concurrent threads wanting to access
+  // the ThreadPool. The task's destructors may acquire locks, etc, so this
+  // also prevents lock inversions.
+  std::deque<ThreadPool::Task> to_release = std::move(entries_);
+  pool_->total_queued_tasks_ -= to_release.size();
+
+  switch (state()) {
+    case State::IDLE:
+      // There were no tasks outstanding; we can quiesce the token immediately.
+      Transition(State::QUIESCED);
+      break;
+    case State::RUNNING:
+      // There were outstanding tasks. If any are still running, switch to
+      // QUIESCING and wait for them to finish (the worker thread executing
+      // the token's last task will switch the token to QUIESCED). Otherwise,
+      // we can quiesce the token immediately.
+
+      // Note: this is an O(n) operation, but it's expected to be infrequent.
+      // Plus doing it this way (rather than switching to QUIESCING and waiting
+      // for a worker thread to process the queue entry) helps retain state
+      // transition symmetry with ThreadPool::Shutdown.
+      for (auto it = pool_->queue_.begin(); it != pool_->queue_.end();) {
+        if (*it == this) {
+          it = pool_->queue_.erase(it);
+        } else {
+          it++;
+        }
+      }
+
+      if (active_threads_ == 0) {
+        Transition(State::QUIESCED);
+        break;
+      }
+      Transition(State::QUIESCING);
+      FALLTHROUGH_INTENDED;
+    case State::QUIESCING:
+      // The token is already quiescing. Just wait for a worker thread to
+      // switch it to QUIESCED.
+      while (state() != State::QUIESCED) {
+        not_running_cond_.Wait();
+      }
+      break;
+    default:
+      break;
+  }
+
+  // Finally release the queued tasks, outside the lock.
+  unique_lock.Unlock();
+  for (auto& t : to_release) {
+    if (t.trace) {
+      t.trace->Release();
+    }
+  }
+}
+
+void ThreadPoolToken::Wait() {
+  MutexLock unique_lock(pool_->lock_);
+  pool_->CheckNotPoolThreadUnlocked();
+  while (IsActive()) {
+    not_running_cond_.Wait();
+  }
+}
+
+bool ThreadPoolToken::WaitUntil(const MonoTime& until) {
+  MutexLock unique_lock(pool_->lock_);
+  pool_->CheckNotPoolThreadUnlocked();
+  while (IsActive()) {
+    if (!not_running_cond_.WaitUntil(until)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool ThreadPoolToken::WaitFor(const MonoDelta& delta) {
+  return WaitUntil(MonoTime::Now() + delta);
+}
+
+void ThreadPoolToken::Transition(State new_state) {
+#ifndef NDEBUG
+  CHECK_NE(state_, new_state);
+
+  switch (state_) {
+    case State::IDLE:
+      CHECK(new_state == State::RUNNING ||
+            new_state == State::QUIESCED);
+      if (new_state == State::RUNNING) {
+        CHECK(!entries_.empty());
+      } else {
+        CHECK(entries_.empty());
+        CHECK_EQ(active_threads_, 0);
+      }
+      break;
+    case State::RUNNING:
+      CHECK(new_state == State::IDLE ||
+            new_state == State::QUIESCING ||
+            new_state == State::QUIESCED);
+      CHECK(entries_.empty());
+      if (new_state == State::QUIESCING) {
+        CHECK_GT(active_threads_, 0);
+      }
+      break;
+    case State::QUIESCING:
+      CHECK(new_state == State::QUIESCED);
+      CHECK_EQ(active_threads_, 0);
+      break;
+    case State::QUIESCED:
+      CHECK(false); // QUIESCED is a terminal state
+      break;
+    default:
+      LOG(FATAL) << "Unknown token state: " << state_;
+  }
+#endif
+
+  // Take actions based on the state we're entering.
+  switch (new_state) {
+    case State::IDLE:
+    case State::QUIESCED:
+      not_running_cond_.Broadcast();
+      break;
+    default:
+      break;
+  }
+
+  state_ = new_state;
+}
+
+const char* ThreadPoolToken::StateToString(State s) {
+  switch (s) {
+    case State::IDLE: return "IDLE"; break;
+    case State::RUNNING: return "RUNNING"; break;
+    case State::QUIESCING: return "QUIESCING"; break;
+    case State::QUIESCED: return "QUIESCED"; break;
+  }
+  return "<cannot reach here>";
+}
+
+////////////////////////////////////////////////////////
+// ThreadPool
+////////////////////////////////////////////////////////
+
+ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
+  : name_(builder.name_),
+    min_threads_(builder.min_threads_),
+    max_threads_(builder.max_threads_),
+    max_queue_size_(builder.max_queue_size_),
+    idle_timeout_(builder.idle_timeout_),
+    pool_status_(Status::Uninitialized("The pool was not initialized.")),
+    idle_cond_(&lock_),
+    no_threads_cond_(&lock_),
+    num_threads_(0),
+    num_threads_pending_start_(0),
+    active_threads_(0),
+    total_queued_tasks_(0),
+    tokenless_(NewToken(ExecutionMode::CONCURRENT)),
+    metrics_(builder.metrics_) {
+  string prefix = !builder.trace_metric_prefix_.empty() ?
+      builder.trace_metric_prefix_ : builder.name_;
+
+  queue_time_trace_metric_name_ = TraceMetrics::InternName(
+      prefix + ".queue_time_us");
+  run_wall_time_trace_metric_name_ = TraceMetrics::InternName(
+      prefix + ".run_wall_time_us");
+  run_cpu_time_trace_metric_name_ = TraceMetrics::InternName(
+      prefix + ".run_cpu_time_us");
+}
+
+ThreadPool::~ThreadPool() {
+  // There should only be one live token: the one used in tokenless submission.
+  CHECK_EQ(1, tokens_.size()) << Substitute(
+      "Threadpool $0 destroyed with $1 allocated tokens",
+      name_, tokens_.size());
+  Shutdown();
+}
+
+Status ThreadPool::Init() {
+  if (!pool_status_.IsUninitialized()) {
+    return Status::NotSupported("The thread pool is already initialized");
+  }
+  pool_status_ = Status::OK();
+  num_threads_pending_start_ = min_threads_;
+  for (int i = 0; i < min_threads_; i++) {
+    Status status = CreateThread();
+    if (!status.ok()) {
+      Shutdown();
+      return status;
+    }
+  }
+  return Status::OK();
+}
+
+void ThreadPool::Shutdown() {
+  MutexLock unique_lock(lock_);
+  CheckNotPoolThreadUnlocked();
+
+  // Note: this is the same error seen at submission if the pool is at
+  // capacity, so clients can't tell them apart. This isn't really a practical
+  // concern though because shutting down a pool typically requires clients to
+  // be quiesced first, so there's no danger of a client getting confused.
+  pool_status_ = Status::ServiceUnavailable("The pool has been shut down.");
+
+  // Clear the various queues under the lock, but defer the releasing
+  // of the tasks outside the lock, in case there are concurrent threads
+  // wanting to access the ThreadPool. The task's destructors may acquire
+  // locks, etc, so this also prevents lock inversions.
+  queue_.clear();
+  std::deque<std::deque<Task>> to_release;
+  for (auto* t : tokens_) {
+    if (!t->entries_.empty()) {
+      to_release.emplace_back(std::move(t->entries_));
+    }
+    switch (t->state()) {
+      case ThreadPoolToken::State::IDLE:
+        // The token is idle; we can quiesce it immediately.
+        t->Transition(ThreadPoolToken::State::QUIESCED);
+        break;
+      case ThreadPoolToken::State::RUNNING:
+        // The token has tasks associated with it. If they're merely queued
+        // (i.e. there are no active threads), the tasks will have been removed
+        // above and we can quiesce immediately. Otherwise, we need to wait for
+        // the threads to finish.
+        t->Transition(t->active_threads_ > 0 ?
+            ThreadPoolToken::State::QUIESCING :
+            ThreadPoolToken::State::QUIESCED);
+        break;
+      default:
+        break;
+    }
+  }
+
+  // The queues are empty. Wake any sleeping worker threads and wait for all
+  // of them to exit. Some worker threads will exit immediately upon waking,
+  // while others will exit after they finish executing an outstanding task.
+  total_queued_tasks_ = 0;
+  while (!idle_threads_.empty()) {
+    idle_threads_.front().not_empty.Signal();
+    idle_threads_.pop_front();
+  }
+  while (num_threads_ + num_threads_pending_start_ > 0) {
+    no_threads_cond_.Wait();
+  }
+
+  // All the threads have exited. Check the state of each token.
+  for (auto* t : tokens_) {
+    DCHECK(t->state() == ThreadPoolToken::State::IDLE ||
+           t->state() == ThreadPoolToken::State::QUIESCED);
+  }
+
+  // Finally release the queued tasks, outside the lock.
+  unique_lock.Unlock();
+  for (auto& token : to_release) {
+    for (auto& t : token) {
+      if (t.trace) {
+        t.trace->Release();
+      }
+    }
+  }
+}
+
+unique_ptr<ThreadPoolToken> ThreadPool::NewToken(ExecutionMode mode) {
+  return NewTokenWithMetrics(mode, {});
+}
+
+unique_ptr<ThreadPoolToken> ThreadPool::NewTokenWithMetrics(
+    ExecutionMode mode, ThreadPoolMetrics metrics) {
+  MutexLock guard(lock_);
+  unique_ptr<ThreadPoolToken> t(new ThreadPoolToken(this,
+                                                    mode,
+                                                    std::move(metrics)));
+  InsertOrDie(&tokens_, t.get());
+  return t;
+}
+
+void ThreadPool::ReleaseToken(ThreadPoolToken* t) {
+  MutexLock guard(lock_);
+  CHECK(!t->IsActive()) << Substitute("Token with state $0 may not be released",
+                                      ThreadPoolToken::StateToString(t->state()));
+  CHECK_EQ(1, tokens_.erase(t));
+}
+
+Status ThreadPool::SubmitClosure(Closure c) {
+  return Submit(std::make_shared<ClosureRunnable>(std::move(c)));
+}
+
+Status ThreadPool::SubmitFunc(boost::function<void()> f) {
+  return Submit(std::make_shared<FunctionRunnable>(std::move(f)));
+}
+
+Status ThreadPool::Submit(shared_ptr<Runnable> r) {
+  return DoSubmit(std::move(r), tokenless_.get());
+}
+
+Status ThreadPool::DoSubmit(shared_ptr<Runnable> r, ThreadPoolToken* token) {
+  DCHECK(token);
+  MonoTime submit_time = MonoTime::Now();
+
+  MutexLock guard(lock_);
+  if (PREDICT_FALSE(!pool_status_.ok())) {
+    return pool_status_;
+  }
+
+  if (PREDICT_FALSE(!token->MaySubmitNewTasks())) {
+    return Status::ServiceUnavailable("Thread pool token was shut down");
+  }
+
+  // Size limit check.
+  int64_t capacity_remaining = static_cast<int64_t>(max_threads_) - active_threads_ +
+                               static_cast<int64_t>(max_queue_size_) - total_queued_tasks_;
+  if (capacity_remaining < 1) {
+    return Status::ServiceUnavailable(
+        Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)",
+                   num_threads_ + num_threads_pending_start_, max_threads_,
+                   total_queued_tasks_, max_queue_size_));
+  }
+
+  // Should we create another thread?
+
+  // We assume that each current inactive thread will grab one item from the
+  // queue.  If it seems like we'll need another thread, we create one.
+  //
+  // Rather than creating the thread here, while holding the lock, we defer
+  // it to down below. This is because thread creation can be rather slow
+  // (hundreds of milliseconds in some cases) and we'd like to allow the
+  // existing threads to continue to process tasks while we do so.
+  //
+  // In theory, a currently active thread could finish immediately after this
+  // calculation but before our new worker starts running. This would mean we
+  // created a thread we didn't really need. However, this race is unavoidable
+  // and harmless.
+  //
+  // Of course, we never create more than max_threads_ threads no matter what.
+  int threads_from_this_submit =
+      token->IsActive() && token->mode() == ExecutionMode::SERIAL ? 0 : 1;
+  int inactive_threads = num_threads_ + num_threads_pending_start_ - active_threads_;
+  int additional_threads = static_cast<int>(queue_.size())
+                         + threads_from_this_submit
+                         - inactive_threads;
+  bool need_a_thread = false;
+  if (additional_threads > 0 && num_threads_ + num_threads_pending_start_ < max_threads_) {
+    need_a_thread = true;
+    num_threads_pending_start_++;
+  }
+
+  Task task;
+  task.runnable = std::move(r);
+  task.trace = Trace::CurrentTrace();
+  // Need to AddRef, since the thread which submitted the task may go away,
+  // and we don't want the trace to be destructed while waiting in the queue.
+  if (task.trace) {
+    task.trace->AddRef();
+  }
+  task.submit_time = submit_time;
+
+  // Add the task to the token's queue.
+  ThreadPoolToken::State state = token->state();
+  DCHECK(state == ThreadPoolToken::State::IDLE ||
+         state == ThreadPoolToken::State::RUNNING);
+  token->entries_.emplace_back(std::move(task));
+  if (state == ThreadPoolToken::State::IDLE ||
+      token->mode() == ExecutionMode::CONCURRENT) {
+    queue_.emplace_back(token);
+    if (state == ThreadPoolToken::State::IDLE) {
+      token->Transition(ThreadPoolToken::State::RUNNING);
+    }
+  }
+  int length_at_submit = total_queued_tasks_++;
+
+  // Wake up an idle thread for this task. Choosing the thread at the front of
+  // the list ensures LIFO semantics as idling threads are also added to the front.
+  //
+  // If there are no idle threads, the new task remains on the queue and is
+  // processed by an active thread (or a thread we're about to create) at some
+  // point in the future.
+  if (!idle_threads_.empty()) {
+    idle_threads_.front().not_empty.Signal();
+    idle_threads_.pop_front();
+  }
+  guard.Unlock();
+
+  if (metrics_.queue_length_histogram) {
+    metrics_.queue_length_histogram->Increment(length_at_submit);
+  }
+  if (token->metrics_.queue_length_histogram) {
+    token->metrics_.queue_length_histogram->Increment(length_at_submit);
+  }
+
+  if (need_a_thread) {
+    Status status = CreateThread();
+    if (!status.ok()) {
+      guard.Lock();
+      num_threads_pending_start_--;
+      if (num_threads_ + num_threads_pending_start_ == 0) {
+        // If we have no threads, we can't do any work.
+        return status;
+      }
+      // If we failed to create a thread, but there are still some other
+      // worker threads, log a warning message and continue.
+      LOG(ERROR) << "Thread pool failed to create thread: "
+                 << status.ToString();
+    }
+  }
+
+
+  return Status::OK();
+}
+
+void ThreadPool::Wait() {
+  MutexLock unique_lock(lock_);
+  CheckNotPoolThreadUnlocked();
+  while (total_queued_tasks_ > 0 || active_threads_ > 0) {
+    idle_cond_.Wait();
+  }
+}
+
+bool ThreadPool::WaitUntil(const MonoTime& until) {
+  MutexLock unique_lock(lock_);
+  CheckNotPoolThreadUnlocked();
+  while (total_queued_tasks_ > 0 || active_threads_ > 0) {
+    if (!idle_cond_.WaitUntil(until)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+bool ThreadPool::WaitFor(const MonoDelta& delta) {
+  return WaitUntil(MonoTime::Now() + delta);
+}
+
+void ThreadPool::DispatchThread() {
+  MutexLock unique_lock(lock_);
+  InsertOrDie(&threads_, Thread::current_thread());
+  DCHECK_GT(num_threads_pending_start_, 0);
+  num_threads_++;
+  num_threads_pending_start_--;
+  // If we are one of the first 'min_threads_' to start, we must be
+  // a "permanent" thread.
+  bool permanent = num_threads_ <= min_threads_;
+
+  // Owned by this worker thread and added/removed from idle_threads_ as needed.
+  IdleThread me(&lock_);
+
+  while (true) {
+    // Note: Status::Aborted() is used to indicate normal shutdown.
+    if (!pool_status_.ok()) {
+      VLOG(2) << "DispatchThread exiting: " << pool_status_.ToString();
+      break;
+    }
+
+    if (queue_.empty()) {
+      // There's no work to do, let's go idle.
+      //
+      // Note: if FIFO behavior is desired, it's as simple as changing this to push_back().
+      idle_threads_.push_front(me);
+      SCOPED_CLEANUP({
+        // For some wake ups (i.e. Shutdown or DoSubmit) this thread is
+        // guaranteed to be unlinked after being awakened. In others (i.e.
+        // spurious wake-up or Wait timeout), it'll still be linked.
+        if (me.is_linked()) {
+          idle_threads_.erase(idle_threads_.iterator_to(me));
+        }
+      });
+      if (permanent) {
+        me.not_empty.Wait();
+      } else {
+        if (!me.not_empty.WaitFor(idle_timeout_)) {
+          // After much investigation, it appears that pthread condition variables have
+          // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
+          // another thread did in fact signal. Apparently after a timeout there is some
+          // brief period during which another thread may actually grab the internal mutex
+          // protecting the state, signal, and release again before we get the mutex. So,
+          // we'll recheck the empty queue case regardless.
+          if (queue_.empty()) {
+            VLOG(3) << "Releasing worker thread from pool " << name_ << " after "
+                    << idle_timeout_.ToMilliseconds() << "ms of idle time.";
+            break;
+          }
+        }
+      }
+      continue;
+    }
+
+    // Get the next token and task to execute.
+    ThreadPoolToken* token = queue_.front();
+    queue_.pop_front();
+    DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state());
+    DCHECK(!token->entries_.empty());
+    Task task = std::move(token->entries_.front());
+    token->entries_.pop_front();
+    token->active_threads_++;
+    --total_queued_tasks_;
+    ++active_threads_;
+
+    unique_lock.Unlock();
+
+    // Release the reference which was held by the queued item.
+    ADOPT_TRACE(task.trace);
+    if (task.trace) {
+      task.trace->Release();
+    }
+
+    // Update metrics
+    MonoTime now(MonoTime::Now());
+    int64_t queue_time_us = (now - task.submit_time).ToMicroseconds();
+    TRACE_COUNTER_INCREMENT(queue_time_trace_metric_name_, queue_time_us);
+    if (metrics_.queue_time_us_histogram) {
+      metrics_.queue_time_us_histogram->Increment(queue_time_us);
+    }
+    if (token->metrics_.queue_time_us_histogram) {
+      token->metrics_.queue_time_us_histogram->Increment(queue_time_us);
+    }
+
+    // Execute the task
+    {
+      MicrosecondsInt64 start_wall_us = GetMonoTimeMicros();
+      MicrosecondsInt64 start_cpu_us = GetThreadCpuTimeMicros();
+
+      task.runnable->Run();
+
+      int64_t wall_us = GetMonoTimeMicros() - start_wall_us;
+      int64_t cpu_us = GetThreadCpuTimeMicros() - start_cpu_us;
+
+      if (metrics_.run_time_us_histogram) {
+        metrics_.run_time_us_histogram->Increment(wall_us);
+      }
+      if (token->metrics_.run_time_us_histogram) {
+        token->metrics_.run_time_us_histogram->Increment(wall_us);
+      }
+      TRACE_COUNTER_INCREMENT(run_wall_time_trace_metric_name_, wall_us);
+      TRACE_COUNTER_INCREMENT(run_cpu_time_trace_metric_name_, cpu_us);
+    }
+    // Destruct the task while we do not hold the lock.
+    //
+    // The task's destructor may be expensive if it has a lot of bound
+    // objects, and we don't want to block submission of the threadpool.
+    // In the worst case, the destructor might even try to do something
+    // with this threadpool, and produce a deadlock.
+    task.runnable.reset();
+    unique_lock.Lock();
+
+    // Possible states:
+    // 1. The token was shut down while we ran its task. Transition to QUIESCED.
+    // 2. The token has no more queued tasks. Transition back to IDLE.
+    // 3. The token has more tasks. Requeue it and transition back to RUNNABLE.
+    ThreadPoolToken::State state = token->state();
+    DCHECK(state == ThreadPoolToken::State::RUNNING ||
+           state == ThreadPoolToken::State::QUIESCING);
+    if (--token->active_threads_ == 0) {
+      if (state == ThreadPoolToken::State::QUIESCING) {
+        DCHECK(token->entries_.empty());
+        token->Transition(ThreadPoolToken::State::QUIESCED);
+      } else if (token->entries_.empty()) {
+        token->Transition(ThreadPoolToken::State::IDLE);
+      } else if (token->mode() == ExecutionMode::SERIAL) {
+        queue_.emplace_back(token);
+      }
+    }
+    if (--active_threads_ == 0) {
+      idle_cond_.Broadcast();
+    }
+  }
+
+  // It's important that we hold the lock between exiting the loop and dropping
+  // num_threads_. Otherwise it's possible someone else could come along here
+  // and add a new task just as the last running thread is about to exit.
+  CHECK(unique_lock.OwnsLock());
+
+  CHECK_EQ(threads_.erase(Thread::current_thread()), 1);
+  num_threads_--;
+  if (num_threads_ + num_threads_pending_start_ == 0) {
+    no_threads_cond_.Broadcast();
+
+    // Sanity check: if we're the last thread exiting, the queue ought to be
+    // empty. Otherwise it will never get processed.
+    CHECK(queue_.empty());
+    DCHECK_EQ(0, total_queued_tasks_);
+  }
+}
+
+Status ThreadPool::CreateThread() {
+  return kudu::Thread::Create("thread pool", strings::Substitute("$0 [worker]", name_),
+                              &ThreadPool::DispatchThread, this, nullptr);
+}
+
+void ThreadPool::CheckNotPoolThreadUnlocked() {
+  Thread* current = Thread::current_thread();
+  if (ContainsKey(threads_, current)) {
+    LOG(FATAL) << Substitute("Thread belonging to thread pool '$0' with "
+        "name '$1' called pool function that would result in deadlock",
+        name_, current->name());
+  }
+}
+
+std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
+  return o << ThreadPoolToken::StateToString(s);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/threadpool.h b/be/src/kudu/util/threadpool.h
new file mode 100644
index 0000000..1557486
--- /dev/null
+++ b/be/src/kudu/util/threadpool.h
@@ -0,0 +1,505 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_THREAD_POOL_H
+#define KUDU_UTIL_THREAD_POOL_H
+
+#include <deque>
+#include <iosfwd>
+#include <memory>
+#include <string>
+#include <unordered_set>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/list_hook.hpp>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+
+namespace boost {
+template <typename Signature>
+class function;
+} // namespace boost
+
+namespace kudu {
+
+class Thread;
+class ThreadPool;
+class ThreadPoolToken;
+class Trace;
+
+class Runnable {
+ public:
+  virtual void Run() = 0;
+  virtual ~Runnable() {}
+};
+
+// Interesting thread pool metrics. Can be applied to the entire pool (see
+// ThreadPoolBuilder) or to individual tokens.
+struct ThreadPoolMetrics {
+  // Measures the queue length seen by tasks when they enter the queue.
+  scoped_refptr<Histogram> queue_length_histogram;
+
+  // Measures the amount of time that tasks spend waiting in a queue.
+  scoped_refptr<Histogram> queue_time_us_histogram;
+
+  // Measures the amount of time that tasks spend running.
+  scoped_refptr<Histogram> run_time_us_histogram;
+};
+
+// ThreadPool takes a lot of arguments. We provide sane defaults with a builder.
+//
+// name: Used for debugging output and default names of the worker threads.
+//    Since thread names are limited to 16 characters on Linux, it's good to
+//    choose a short name here.
+//    Required.
+//
+// trace_metric_prefix: used to prefix the names of TraceMetric counters.
+//    When a task on a thread pool has an associated trace, the thread pool
+//    implementation will increment TraceMetric counters to indicate the
+//    amount of time spent waiting in the queue as well as the amount of wall
+//    and CPU time spent executing. By default, these counters are prefixed
+//    with the name of the thread pool. For example, if the pool is named
+//    'apply', then counters such as 'apply.queue_time_us' will be
+//    incremented.
+//
+//    The TraceMetrics implementation relies on the number of distinct counter
+//    names being small. Thus, if the thread pool name itself is dynamically
+//    generated, the default behavior described above would result in an
+//    unbounded number of distinct counter names. The 'trace_metric_prefix'
+//    setting can be used to override the prefix used in generating the trace
+//    metric names.
+//
+//    For example, the Raft thread pools are named "<tablet id>-raft" which
+//    has unbounded cardinality (a server may have thousands of different
+//    tablet IDs over its lifetime). In that case, setting the prefix to
+//    "raft" will avoid any issues.
+//
+// min_threads: Minimum number of threads we'll have at any time.
+//    Default: 0.
+//
+// max_threads: Maximum number of threads we'll have at any time.
+//    Default: Number of CPUs detected on the system.
+//
+// max_queue_size: Maximum number of items to enqueue before returning a
+//    Status::ServiceUnavailable message from Submit().
+//    Default: INT_MAX.
+//
+// idle_timeout: How long we'll keep around an idle thread before timing it out.
+//    We always keep at least min_threads.
+//    Default: 500 milliseconds.
+//
+// metrics: Histograms, counters, etc. to update on various threadpool events.
+//    Default: not set.
+//
+class ThreadPoolBuilder {
+ public:
+  explicit ThreadPoolBuilder(std::string name);
+
+  // Note: We violate the style guide by returning mutable references here
+  // in order to provide traditional Builder pattern conveniences.
+  ThreadPoolBuilder& set_trace_metric_prefix(const std::string& prefix);
+  ThreadPoolBuilder& set_min_threads(int min_threads);
+  ThreadPoolBuilder& set_max_threads(int max_threads);
+  ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
+  ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout);
+  ThreadPoolBuilder& set_metrics(ThreadPoolMetrics metrics);
+
+  // Instantiate a new ThreadPool with the existing builder arguments.
+  Status Build(gscoped_ptr<ThreadPool>* pool) const;
+
+ private:
+  friend class ThreadPool;
+  const std::string name_;
+  std::string trace_metric_prefix_;
+  int min_threads_;
+  int max_threads_;
+  int max_queue_size_;
+  MonoDelta idle_timeout_;
+  ThreadPoolMetrics metrics_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder);
+};
+
+// Thread pool with a variable number of threads.
+//
+// Tasks submitted directly to the thread pool enter a FIFO queue and are
+// dispatched to a worker thread when one becomes free. Tasks may also be
+// submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions
+// can then be used to block on logical groups of tasks.
+//
+// A token operates in one of two ExecutionModes, determined at token
+// construction time:
+// 1. SERIAL: submitted tasks are run one at a time.
+// 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike
+//    tasks submitted without a token, but the logical grouping that tokens
+//    impart can be useful when a pool is shared by many contexts (e.g. to
+//    safely shut down one context, to derive context-specific metrics, etc.).
+//
+// Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are
+// processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are
+// processed in a round-robin fashion, one task at a time. This prevents them
+// from starving one another. However, tokenless (and CONCURRENT token-based)
+// tasks can starve SERIAL token-based tasks.
+//
+// Usage Example:
+//    static void Func(int n) { ... }
+//    class Task : public Runnable { ... }
+//
+//    gscoped_ptr<ThreadPool> thread_pool;
+//    CHECK_OK(
+//        ThreadPoolBuilder("my_pool")
+//            .set_min_threads(0)
+//            .set_max_threads(5)
+//            .set_max_queue_size(10)
+//            .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
+//            .Build(&thread_pool));
+//    thread_pool->Submit(shared_ptr<Runnable>(new Task()));
+//    thread_pool->SubmitFunc(boost::bind(&Func, 10));
+class ThreadPool {
+ public:
+  ~ThreadPool();
+
+  // Wait for the running tasks to complete and then shutdown the threads.
+  // All the other pending tasks in the queue will be removed.
+  // NOTE: That the user may implement an external abort logic for the
+  //       runnables, that must be called before Shutdown(), if the system
+  //       should know about the non-execution of these tasks, or the runnable
+  //       require an explicit "abort" notification to exit from the run loop.
+  void Shutdown();
+
+  // Submits a function using the kudu Closure system.
+  Status SubmitClosure(Closure c) WARN_UNUSED_RESULT;
+
+  // Submits a function bound using boost::bind(&FuncName, args...).
+  Status SubmitFunc(boost::function<void()> f) WARN_UNUSED_RESULT;
+
+  // Submits a Runnable class.
+  Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT;
+
+  // Waits until all the tasks are completed.
+  void Wait();
+
+  // Waits for the pool to reach the idle state, or until 'until' time is reached.
+  // Returns true if the pool reached the idle state, false otherwise.
+  bool WaitUntil(const MonoTime& until);
+
+  // Waits for the pool to reach the idle state, or until 'delta' time elapses.
+  // Returns true if the pool reached the idle state, false otherwise.
+  bool WaitFor(const MonoDelta& delta);
+
+  // Allocates a new token for use in token-based task submission. All tokens
+  // must be destroyed before their ThreadPool is destroyed.
+  //
+  // There is no limit on the number of tokens that may be allocated.
+  enum class ExecutionMode {
+    // Tasks submitted via this token will be executed serially.
+    SERIAL,
+
+    // Tasks submitted via this token may be executed concurrently.
+    CONCURRENT,
+  };
+  std::unique_ptr<ThreadPoolToken> NewToken(ExecutionMode mode);
+
+  // Like NewToken(), but lets the caller provide metrics for the token. These
+  // metrics are incremented/decremented in addition to the configured
+  // pool-wide metrics (if any).
+  std::unique_ptr<ThreadPoolToken> NewTokenWithMetrics(ExecutionMode mode,
+                                                       ThreadPoolMetrics metrics);
+
+  // Return the number of threads currently running (or in the process of starting up)
+  // for this thread pool.
+  int num_threads() const {
+    MutexLock l(lock_);
+    return num_threads_ + num_threads_pending_start_;
+  }
+
+ private:
+  FRIEND_TEST(ThreadPoolTest, TestThreadPoolWithNoMinimum);
+  FRIEND_TEST(ThreadPoolTest, TestVariableSizeThreadPool);
+
+  friend class ThreadPoolBuilder;
+  friend class ThreadPoolToken;
+
+  // Client-provided task to be executed by this pool.
+  struct Task {
+    std::shared_ptr<Runnable> runnable;
+    Trace* trace;
+
+    // Time at which the entry was submitted to the pool.
+    MonoTime submit_time;
+  };
+
+  // Creates a new thread pool using a builder.
+  explicit ThreadPool(const ThreadPoolBuilder& builder);
+
+  // Initializes the thread pool by starting the minimum number of threads.
+  Status Init();
+
+  // Dispatcher responsible for dequeueing and executing the tasks
+  void DispatchThread();
+
+  // Create new thread.
+  //
+  // REQUIRES: caller has incremented 'num_threads_pending_start_' ahead of this call.
+  // NOTE: For performance reasons, lock_ should not be held.
+  Status CreateThread();
+
+  // Aborts if the current thread is a member of this thread pool.
+  void CheckNotPoolThreadUnlocked();
+
+  // Submits a task to be run via token.
+  Status DoSubmit(std::shared_ptr<Runnable> r, ThreadPoolToken* token);
+
+  // Releases token 't' and invalidates it.
+  void ReleaseToken(ThreadPoolToken* t);
+
+  const std::string name_;
+  const int min_threads_;
+  const int max_threads_;
+  const int max_queue_size_;
+  const MonoDelta idle_timeout_;
+
+  // Overall status of the pool. Set to an error when the pool is shut down.
+  //
+  // Protected by 'lock_'.
+  Status pool_status_;
+
+  // Synchronizes many of the members of the pool and all of its
+  // condition variables.
+  mutable Mutex lock_;
+
+  // Condition variable for "pool is idling". Waiters wake up when
+  // active_threads_ reaches zero.
+  ConditionVariable idle_cond_;
+
+  // Condition variable for "pool has no threads". Waiters wake up when
+  // num_threads_ and num_pending_threads_ are both 0.
+  ConditionVariable no_threads_cond_;
+
+  // Number of threads currently running.
+  //
+  // Protected by lock_.
+  int num_threads_;
+
+  // Number of threads which are in the process of starting.
+  // When these threads start, they will decrement this counter and
+  // accordingly increment 'num_threads_'.
+  //
+  // Protected by lock_.
+  int num_threads_pending_start_;
+
+  // Number of threads currently running and executing client tasks.
+  //
+  // Protected by lock_.
+  int active_threads_;
+
+  // Total number of client tasks queued, either directly (queue_) or
+  // indirectly (tokens_).
+  //
+  // Protected by lock_.
+  int total_queued_tasks_;
+
+  // All allocated tokens.
+  //
+  // Protected by lock_.
+  std::unordered_set<ThreadPoolToken*> tokens_;
+
+  // FIFO of tokens from which tasks should be executed. Does not own the
+  // tokens; they are owned by clients and are removed from the FIFO on shutdown.
+  //
+  // Protected by lock_.
+  std::deque<ThreadPoolToken*> queue_;
+
+  // Pointers to all running threads. Raw pointers are safe because a Thread
+  // may only go out of scope after being removed from threads_.
+  //
+  // Protected by lock_.
+  std::unordered_set<Thread*> threads_;
+
+  // List of all threads currently waiting for work.
+  //
+  // A thread is added to the front of the list when it goes idle and is
+  // removed from the front and signaled when new work arrives. This produces a
+  // LIFO usage pattern that is more efficient than idling on a single
+  // ConditionVariable (which yields FIFO semantics).
+  //
+  // Protected by lock_.
+  struct IdleThread : public boost::intrusive::list_base_hook<> {
+    explicit IdleThread(Mutex* m)
+        : not_empty(m) {}
+
+    // Condition variable for "queue is not empty". Waiters wake up when a new
+    // task is queued.
+    ConditionVariable not_empty;
+
+    DISALLOW_COPY_AND_ASSIGN(IdleThread);
+  };
+  boost::intrusive::list<IdleThread> idle_threads_; // NOLINT(build/include_what_you_use)
+
+  // ExecutionMode::CONCURRENT token used by the pool for tokenless submission.
+  std::unique_ptr<ThreadPoolToken> tokenless_;
+
+  // Metrics for the entire thread pool.
+  const ThreadPoolMetrics metrics_;
+
+  const char* queue_time_trace_metric_name_;
+  const char* run_wall_time_trace_metric_name_;
+  const char* run_cpu_time_trace_metric_name_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPool);
+};
+
+// Entry point for token-based task submission and blocking for a particular
+// thread pool. Tokens can only be created via ThreadPool::NewToken().
+//
+// All functions are thread-safe. Mutable members are protected via the
+// ThreadPool's lock.
+class ThreadPoolToken {
+ public:
+  // Destroys the token.
+  //
+  // May be called on a token with outstanding tasks, as Shutdown() will be
+  // called first to take care of them.
+  ~ThreadPoolToken();
+
+  // Submits a function using the kudu Closure system.
+  Status SubmitClosure(Closure c) WARN_UNUSED_RESULT;
+
+  // Submits a function bound using boost::bind(&FuncName, args...).
+  Status SubmitFunc(boost::function<void()> f) WARN_UNUSED_RESULT;
+
+  // Submits a Runnable class.
+  Status Submit(std::shared_ptr<Runnable> r) WARN_UNUSED_RESULT;
+
+  // Marks the token as unusable for future submissions. Any queued tasks not
+  // yet running are destroyed. If tasks are in flight, Shutdown() will wait
+  // on their completion before returning.
+  void Shutdown();
+
+  // Waits until all the tasks submitted via this token are completed.
+  void Wait();
+
+  // Waits for all submissions using this token are complete, or until 'until'
+  // time is reached.
+  //
+  // Returns true if all submissions are complete, false otherwise.
+  bool WaitUntil(const MonoTime& until);
+
+  // Waits for all submissions using this token are complete, or until 'delta'
+  // time elapses.
+  //
+  // Returns true if all submissions are complete, false otherwise.
+  bool WaitFor(const MonoDelta& delta);
+
+ private:
+  // All possible token states. Legal state transitions:
+  //   IDLE      -> RUNNING: task is submitted via token
+  //   IDLE      -> QUIESCED: token or pool is shut down
+  //   RUNNING   -> IDLE: worker thread finishes executing a task and
+  //                      there are no more tasks queued to the token
+  //   RUNNING   -> QUIESCING: token or pool is shut down while worker thread
+  //                           is executing a task
+  //   RUNNING   -> QUIESCED: token or pool is shut down
+  //   QUIESCING -> QUIESCED:  worker thread finishes executing a task
+  //                           belonging to a shut down token or pool
+  enum class State {
+    // Token has no queued tasks.
+    IDLE,
+
+    // A worker thread is running one of the token's previously queued tasks.
+    RUNNING,
+
+    // No new tasks may be submitted to the token. A worker thread is still
+    // running a previously queued task.
+    QUIESCING,
+
+    // No new tasks may be submitted to the token. There are no active tasks
+    // either. At this state, the token may only be destroyed.
+    QUIESCED,
+  };
+
+  // Writes a textual representation of the token state in 's' to 'o'.
+  friend std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s);
+
+  friend class ThreadPool;
+
+  // Returns a textual representation of 's' suitable for debugging.
+  static const char* StateToString(State s);
+
+  // Constructs a new token.
+  //
+  // The token may not outlive its thread pool ('pool').
+  ThreadPoolToken(ThreadPool* pool,
+                  ThreadPool::ExecutionMode mode,
+                  ThreadPoolMetrics metrics);
+
+  // Changes this token's state to 'new_state' taking actions as needed.
+  void Transition(State new_state);
+
+  // Returns true if this token has a task queued and ready to run, or if a
+  // task belonging to this token is already running.
+  bool IsActive() const {
+    return state_ == State::RUNNING ||
+           state_ == State::QUIESCING;
+  }
+
+  // Returns true if new tasks may be submitted to this token.
+  bool MaySubmitNewTasks() const {
+    return state_ != State::QUIESCING &&
+           state_ != State::QUIESCED;
+  }
+
+  State state() const { return state_; }
+  ThreadPool::ExecutionMode mode() const { return mode_; }
+
+  // Token's configured execution mode.
+  const ThreadPool::ExecutionMode mode_;
+
+  // Metrics for just this token.
+  const ThreadPoolMetrics metrics_;
+
+  // Pointer to the token's thread pool.
+  ThreadPool* pool_;
+
+  // Token state machine.
+  State state_;
+
+  // Queued client tasks.
+  std::deque<ThreadPool::Task> entries_;
+
+  // Condition variable for "token is idle". Waiters wake up when the token
+  // transitions to IDLE or QUIESCED.
+  ConditionVariable not_running_cond_;
+
+  // Number of worker threads currently executing tasks belonging to this
+  // token.
+  int active_threads_;
+
+  DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken);
+};
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/throttler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/throttler-test.cc b/be/src/kudu/util/throttler-test.cc
new file mode 100644
index 0000000..ff97eb5
--- /dev/null
+++ b/be/src/kudu/util/throttler-test.cc
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/throttler.h"
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+class ThrottlerTest : public KuduTest {
+};
+
+TEST_F(ThrottlerTest, TestOpThrottle) {
+  // Check operation rate throttling
+  MonoTime now = MonoTime::Now();
+  Throttler t0(now, 1000, 1000*1000, 1);
+  // Fill up bucket
+  now += MonoDelta::FromMilliseconds(2000);
+  // Check throttle behavior for 1 second.
+  for (int p = 0; p < 10; p++) {
+    for (int i = 0; i < 100; i++) {
+      ASSERT_TRUE(t0.Take(now, 1, 1));
+    }
+    ASSERT_FALSE(t0.Take(now, 1, 1));
+    now += MonoDelta::FromMilliseconds(100);
+  }
+}
+
+TEST_F(ThrottlerTest, TestIOThrottle) {
+  // Check operation rate throttling
+  MonoTime now = MonoTime::Now();
+  Throttler t0(now, 50000, 1000*1000, 1);
+  // Fill up bucket
+  now += MonoDelta::FromMilliseconds(2000);
+  // Check throttle behavior for 1 second.
+  for (int p = 0; p < 10; p++) {
+    for (int i = 0; i < 100; i++) {
+      ASSERT_TRUE(t0.Take(now, 1, 1000));
+    }
+    ASSERT_FALSE(t0.Take(now, 1, 1000));
+    now += MonoDelta::FromMilliseconds(100);
+  }
+}
+
+TEST_F(ThrottlerTest, TestBurst) {
+  // Check IO rate throttling
+  MonoTime now = MonoTime::Now();
+  Throttler t0(now, 2000, 1000*1000, 5);
+  // Fill up bucket
+  now += MonoDelta::FromMilliseconds(2000);
+  for (int i = 0; i < 100; i++) {
+    now += MonoDelta::FromMilliseconds(1);
+    ASSERT_TRUE(t0.Take(now, 1, 5000));
+  }
+  ASSERT_TRUE(t0.Take(now, 1, 100000));
+  ASSERT_FALSE(t0.Take(now, 1, 1));
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/throttler.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/throttler.cc b/be/src/kudu/util/throttler.cc
new file mode 100644
index 0000000..69e0f99
--- /dev/null
+++ b/be/src/kudu/util/throttler.cc
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/throttler.h"
+
+#include <algorithm>
+#include <mutex>
+
+namespace kudu {
+
+Throttler::Throttler(MonoTime now, uint64_t op_rate, uint64_t byte_rate, double burst_factor) :
+    next_refill_(now) {
+  op_refill_ = op_rate / (MonoTime::kMicrosecondsPerSecond / kRefillPeriodMicros);
+  op_token_ = 0;
+  op_token_max_ = static_cast<uint64_t>(op_refill_ * burst_factor);
+  byte_refill_ = byte_rate / (MonoTime::kMicrosecondsPerSecond / kRefillPeriodMicros);
+  byte_token_ = 0;
+  byte_token_max_ = static_cast<uint64_t>(byte_refill_ * burst_factor);
+}
+
+bool Throttler::Take(MonoTime now, uint64_t op, uint64_t byte) {
+  if (op_refill_ == 0 && byte_refill_ == 0) {
+    return true;
+  }
+  std::lock_guard<simple_spinlock> lock(lock_);
+  Refill(now);
+  if ((op_refill_ == 0 || op <= op_token_) &&
+      (byte_refill_ == 0 || byte <= byte_token_)) {
+    if (op_refill_ > 0) {
+      op_token_ -= op;
+    }
+    if (byte_refill_ > 0) {
+      byte_token_ -= byte;
+    }
+    return true;
+  }
+  return false;
+}
+
+void Throttler::Refill(MonoTime now) {
+  int64_t d = (now - next_refill_).ToMicroseconds();
+  if (d < 0) {
+    return;
+  }
+  uint64_t num_period = d / kRefillPeriodMicros + 1;
+  next_refill_ += MonoDelta::FromMicroseconds(num_period * kRefillPeriodMicros);
+  op_token_ += num_period * op_refill_;
+  op_token_ = std::min(op_token_, op_token_max_);
+  byte_token_ += num_period * byte_refill_;
+  byte_token_ = std::min(byte_token_, byte_token_max_);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/throttler.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/throttler.h b/be/src/kudu/util/throttler.h
new file mode 100644
index 0000000..5594091
--- /dev/null
+++ b/be/src/kudu/util/throttler.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_THROTTLER_H
+#define KUDU_UTIL_THROTTLER_H
+
+#include <cstdint>
+
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+
+// A throttler to throttle both operation/s and IO byte/s.
+class Throttler {
+ public:
+  // Refill period is 100ms.
+  enum {
+    kRefillPeriodMicros = 100000
+  };
+
+  // Construct a throttler with max operation per second, max IO bytes per second
+  // and burst factor (burst_rate = rate * burst), burst rate means maximum
+  // throughput within one refill period.
+  // Set op_per_sec to 0 to disable operation throttling.
+  // Set byte_per_sec to 0 to disable IO bytes throttling.
+  Throttler(MonoTime now, uint64_t op_per_sec, uint64_t byte_per_sec, double burst_factor);
+
+  // Throttle an "operation group" by taking 'op' operation tokens and 'byte' byte tokens.
+  // Return true if there are enough tokens, and operation is allowed.
+  // Return false if there are not enough tokens, and operation is throttled.
+  bool Take(MonoTime now, uint64_t op, uint64_t byte);
+
+ private:
+  void Refill(MonoTime now);
+
+  MonoTime next_refill_;
+  uint64_t op_refill_;
+  uint64_t op_token_;
+  uint64_t op_token_max_;
+  uint64_t byte_refill_;
+  uint64_t byte_token_;
+  uint64_t byte_token_max_;
+  simple_spinlock lock_;
+};
+
+} // namespace kudu
+
+#endif