You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/05/17 00:10:23 UTC

[4/4] kudu git commit: log: shut down appender thread when idle

log: shut down appender thread when idle

This changes the log appender thread to be based on submitting tasks to a pool,
rather than a thread which is always running.

See the comments on Log::AppendThread in log.cc for notes on the design.

This also adds a new test case in mt-log-test which triggers the various
complexities of the appender lifecycle. I manually checked coverage using
LOG(WARNING) statements to make sure it does exercise the various cases.

Change-Id: Id40a4cfcec96198b537c2f50be7ff1204faf96d2
Reviewed-on: http://gerrit.cloudera.org:8080/6856
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6311acb2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6311acb2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6311acb2

Branch: refs/heads/master
Commit: 6311acb2faf4f78964fe95cb07ab9331bc2657f7
Parents: c82c0f4
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Apr 12 00:26:38 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue May 16 23:31:13 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log-test-base.h   |   7 +-
 src/kudu/consensus/log-test.cc       |  19 +++
 src/kudu/consensus/log.cc            | 263 ++++++++++++++++++++++--------
 src/kudu/consensus/log.h             |   3 +
 src/kudu/consensus/mt-log-test.cc    |  51 ++++--
 src/kudu/util/blocking_queue-test.cc |  25 ++-
 src/kudu/util/blocking_queue.h       |  25 ++-
 src/kudu/util/threadpool.h           |   4 +-
 8 files changed, 298 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/consensus/log-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test-base.h b/src/kudu/consensus/log-test-base.h
index e21623e..77a9bc7 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -153,7 +153,6 @@ inline Status CorruptLogFile(Env* env, const string& log_path,
 
 class LogTestBase : public KuduTest {
  public:
-
   typedef pair<int, int> DeltaId;
 
   LogTestBase()
@@ -163,7 +162,7 @@ class LogTestBase : public KuduTest {
 
   virtual void SetUp() OVERRIDE {
     KuduTest::SetUp();
-    current_index_ = 1;
+    current_index_ = kStartIndex;
     fs_manager_.reset(new FsManager(env_, GetTestPath("fs_root")));
     metric_registry_.reset(new MetricRegistry());
     metric_entity_ = METRIC_ENTITY_tablet.Instantiate(metric_registry_.get(), "log-test-base");
@@ -377,6 +376,10 @@ class LogTestBase : public KuduTest {
   }
 
  protected:
+  enum {
+    kStartIndex = 1
+  };
+
   const Schema schema_;
   gscoped_ptr<FsManager> fs_manager_;
   gscoped_ptr<MetricRegistry> metric_registry_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/consensus/log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index f37f60e..12394a9 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -1097,5 +1097,24 @@ TEST_F(LogTest, TestDiskSpaceCheck) {
   // detect that we are past the preallocation limit.
 }
 
+// Test that the append thread shuts itself down after it's idle.
+TEST_F(LogTest, TestAutoStopIdleAppendThread) {
+  ASSERT_OK(BuildLog());
+  OpId opid = MakeOpId(1, 1);
+
+  // Append something to the queue and ensure that the thread starts itself.
+  // We loop here in case for some reason this thread gets de-scheduled just
+  // after the append long enough for the append thread to shut itself down
+  // again.
+  ASSERT_EVENTUALLY([&]() {
+      AppendNoOpsToLogSync(clock_, log_.get(), &opid, 2);
+      ASSERT_TRUE(log_->append_thread_active_for_tests());
+    });
+  // After some time, the append thread should shut itself down.
+  ASSERT_EVENTUALLY([&]() {
+      ASSERT_FALSE(log_->append_thread_active_for_tests());
+    });
+}
+
 } // namespace log
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 50ce12b..8acc3cf 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -78,6 +78,12 @@ DEFINE_int32(group_commit_queue_size_bytes, 4 * 1024 * 1024,
 TAG_FLAG(group_commit_queue_size_bytes, advanced);
 
 
+DEFINE_int32(log_thread_idle_threshold_ms, 1000,
+             "Number of milliseconds after which the log append thread decides that a "
+             "log is idle, and considers shutting down. Used by tests.");
+TAG_FLAG(log_thread_idle_threshold_ms, experimental);
+TAG_FLAG(log_thread_idle_threshold_ms, hidden);
+
 // Compression configuration.
 // -----------------------------
 DEFINE_string(log_compression_codec, "LZ4",
@@ -96,12 +102,16 @@ DEFINE_int32(log_inject_latency_ms_mean, 100,
 DEFINE_int32(log_inject_latency_ms_stddev, 100,
              "The standard deviation of latency to inject in the log. "
              "Only takes effect if --log_inject_latency is true");
+DEFINE_int32(log_inject_thread_lifecycle_latency_ms, 0,
+             "Injection point for random latency during key thread lifecycle transition "
+             "points.");
 DEFINE_double(fault_crash_before_append_commit, 0.0,
               "Fraction of the time when the server will crash just before appending a "
               "COMMIT message to the log. (For testing only!)");
 TAG_FLAG(log_inject_latency, unsafe);
 TAG_FLAG(log_inject_latency_ms_mean, unsafe);
 TAG_FLAG(log_inject_latency_ms_stddev, unsafe);
+TAG_FLAG(log_inject_thread_lifecycle_latency_ms, unsafe);
 TAG_FLAG(fault_crash_before_append_commit, unsafe);
 
 DEFINE_double(log_inject_io_error_on_append_fraction, 0.0,
@@ -152,13 +162,33 @@ using std::vector;
 using std::unique_ptr;
 using strings::Substitute;
 
-// This class is responsible for managing the thread that appends to
-// the log file.
+// Manages the thread which drains groups of batches from the log's queue and
+// appends them to the underlying log instance.
+//
+// Rather than being a long-running thread, this instead uses a threadpool with
+// size 1 to automatically start and stop a thread on demand. When the log
+// is idle for some amount of time, no task will be on the thread pool, and thus
+// the underlying thread may exit.
+//
+// The design of submitting tasks to the threadpool is slightly tricky in order
+// to achieve group commit and not have to submit one task per appended batch.
+// Instead, a generic 'DoWork()' task is used which loops collecting work until
+// it finds that it has been idle for a while, at which point the task finishes.
+//
+// The trick, then, lies in two areas:
+//
+// 1) after appending a batch, we need to ensure that a task is already running,
+//    and if not, start one. This is done in Wake().
+//
+// 2) when the task finds no more work to do and wants to go idle, it needs to
+//    ensure that it doesn't miss a concurrent wake-up. This is done in GoIdle().
+//
+// See the implementation comments in Wake() and GoIdle() for details.
 class Log::AppendThread {
  public:
   explicit AppendThread(Log* log);
 
-  // Initializes the objects and starts the thread.
+  // Initializes the objects and starts the thread pool.
   Status Init();
 
   // Waits until the last enqueued elements are processed, sets the
@@ -167,16 +197,47 @@ class Log::AppendThread {
   // method.
   void Shutdown();
 
+  // Wake up the appender task, if it is not already running.
+  // This should be called after each time that a new entry is
+  // appended to the log's queue.
+  void Wake();
+
+  bool active() const {
+    return base::subtle::NoBarrier_Load(&worker_state_) == WORKER_ACTIVE;
+  }
+
  private:
-  void RunThread();
+  // The task submitted to the threadpool which collects batches from the queue
+  // and appends them, until it determines that the queue is idle.
+  void DoWork();
+
+  // Tries to transition back to WORKER_STOPPED state. If successful, returns true.
+  //
+  // Otherwise, returns false to indicate that the task should keep running because
+  // a new task was enqueued just as we were trying to go idle.
+  bool GoIdle();
+
+  // Handle the actual appending of a group of entries. Responsible for deleting the
+  // LogEntryBatch* pointers.
+  void HandleGroup(vector<LogEntryBatch*> entry_batches);
 
   string LogPrefix() const;
 
   Log* const log_;
 
-  // Lock to protect access to thread_ during shutdown.
-  mutable std::mutex lock_;
-  scoped_refptr<Thread> thread_;
+  // Atomic state machine for whether there is any worker task currently
+  // queued or running on append_pool_. See Wake() and GoIdle() for more details.
+  enum WorkerState {
+    // No worker task is queued or running.
+    WORKER_STOPPED,
+    // A worker task is queued or running.
+    WORKER_ACTIVE
+  };
+  Atomic32 worker_state_ = WORKER_STOPPED;
+
+  // Pool with a single thread, which handles shutting down the thread
+  // when idle.
+  gscoped_ptr<ThreadPool> append_pool_;
 };
 
 
@@ -185,88 +246,149 @@ Log::AppendThread::AppendThread(Log *log)
 }
 
 Status Log::AppendThread::Init() {
-  DCHECK(!thread_) << "Already initialized";
+  DCHECK(!append_pool_) << "Already initialized";
   VLOG_WITH_PREFIX(1) << "Starting log append thread";
-  RETURN_NOT_OK(kudu::Thread::Create("log", "appender",
-      &AppendThread::RunThread, this, &thread_));
+  RETURN_NOT_OK(ThreadPoolBuilder("wal-append")
+                .set_min_threads(0)
+                // Only need one thread since we'll only schedule one
+                // task at a time.
+                .set_max_threads(1)
+                // No need for keeping idle threads, since the task itself
+                // handles waiting for work while idle.
+                .set_idle_timeout(MonoDelta::FromSeconds(0))
+                .Build(&append_pool_));
   return Status::OK();
 }
 
-void Log::AppendThread::RunThread() {
+void Log::AppendThread::Wake() {
+  DCHECK(append_pool_);
+  auto old_status = base::subtle::NoBarrier_CompareAndSwap(
+      &worker_state_, WORKER_STOPPED, WORKER_ACTIVE);
+  if (old_status == WORKER_STOPPED) {
+    CHECK_OK(append_pool_->SubmitClosure(Bind(&Log::AppendThread::DoWork, Unretained(this))));
+  }
+}
+
+bool Log::AppendThread::GoIdle() {
+  // Inject latency at key points in this function for the purposes of tests.
+  MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
+
+  // Stopping is a bit tricky. We have to consider the following race:
+  //
+  // T1                         AppendThread
+  // ------------               -------------
+  //                            - state is TRIGGERED
+  //                            - BlockingDrainTo returns TimedOut()
+  // - queue.Put()
+  // - Wake() no-op because
+  //   it's already triggered
+
+  // So, we first transition back to STOPPED state, and then re-check to see
+  // if there has been something enqueued in the meantime.
+  auto old_state = base::subtle::NoBarrier_AtomicExchange(&worker_state_, WORKER_STOPPED);
+  DCHECK_EQ(old_state, WORKER_ACTIVE);
+  if (log_->entry_queue()->empty()) {
+    // Nothing got enqueued, which means there must not have been any missed wakeup.
+    // We are now in WORKER_STOPPED state.
+    return true;
+  }
+
+  MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
+  // Someone enqueued something. We don't know whether their wakeup was successful
+  // or not, but we can just try to transition back to ACTIVE mode here.
+  if (base::subtle::NoBarrier_CompareAndSwap(&worker_state_, WORKER_STOPPED, WORKER_ACTIVE)
+      == WORKER_STOPPED) {
+    // Their wake-up was lost, but we've now marked ourselves as running.
+    MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
+    return false;
+  }
+
+  // Their wake-up was successful, meaning that there is another task on the
+  // queue behind us now, so we can exit this one.
+  MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
+  return true;
+}
+
+void Log::AppendThread::DoWork() {
+  DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(worker_state_), WORKER_ACTIVE);
+  VLOG_WITH_PREFIX(2) << "WAL Appender going active";
   while (true) {
+    MonoTime deadline = MonoTime::Now() +
+        MonoDelta::FromMilliseconds(FLAGS_log_thread_idle_threshold_ms);
     vector<LogEntryBatch*> entry_batches;
-    ElementDeleter d(&entry_batches);
-
-    if (PREDICT_FALSE(!log_->entry_queue()->BlockingDrainTo(&entry_batches))) {
-      CHECK(entry_batches.empty());
+    Status s = log_->entry_queue()->BlockingDrainTo(&entry_batches, deadline);
+    if (PREDICT_FALSE(s.IsAborted())) {
       break;
+    } else if (PREDICT_FALSE(s.IsTimedOut())) {
+      if (GoIdle()) break;
+      continue;
     }
+    HandleGroup(std::move(entry_batches));
+  }
+  VLOG_WITH_PREFIX(2) << "WAL Appender going idle";
+}
 
-    if (log_->metrics_) {
-      log_->metrics_->entry_batches_per_group->Increment(entry_batches.size());
-    }
-    TRACE_EVENT1("log", "batch", "batch_size", entry_batches.size());
+void Log::AppendThread::HandleGroup(vector<LogEntryBatch*> entry_batches) {
+  if (log_->metrics_) {
+    log_->metrics_->entry_batches_per_group->Increment(entry_batches.size());
+  }
+  TRACE_EVENT1("log", "batch", "batch_size", entry_batches.size());
 
-    SCOPED_LATENCY_METRIC(log_->metrics_, group_commit_latency);
+  SCOPED_LATENCY_METRIC(log_->metrics_, group_commit_latency);
 
-    bool is_all_commits = true;
-    for (LogEntryBatch* entry_batch : entry_batches) {
-      TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
-      Status s = log_->DoAppend(entry_batch);
-      if (PREDICT_FALSE(!s.ok())) {
-        LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
-        // TODO(af): If a single transaction fails to append, should we
-        // abort all subsequent transactions in this batch or allow
-        // them to be appended? What about transactions in future
-        // batches?
-        if (!entry_batch->callback().is_null()) {
-          entry_batch->callback().Run(s);
-          entry_batch->callback_.Reset();
-        }
-      }
-      if (is_all_commits && entry_batch->type_ != COMMIT) {
-        is_all_commits = false;
+  bool is_all_commits = true;
+  for (LogEntryBatch* entry_batch : entry_batches) {
+    TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
+    Status s = log_->DoAppend(entry_batch);
+    if (PREDICT_FALSE(!s.ok())) {
+      LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
+      // TODO(af): If a single transaction fails to append, should we
+      // abort all subsequent transactions in this batch or allow
+      // them to be appended? What about transactions in future
+      // batches?
+      if (!entry_batch->callback().is_null()) {
+        entry_batch->callback().Run(s);
+        entry_batch->callback_.Reset();
       }
     }
-
-    Status s;
-    if (!is_all_commits) {
-      s = log_->Sync();
+    if (is_all_commits && entry_batch->type_ != COMMIT) {
+      is_all_commits = false;
     }
-    if (PREDICT_FALSE(!s.ok())) {
-      LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
-      for (LogEntryBatch* entry_batch : entry_batches) {
-        if (!entry_batch->callback().is_null()) {
-          entry_batch->callback().Run(s);
-        }
+  }
+
+  Status s;
+  if (!is_all_commits) {
+    s = log_->Sync();
+  }
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
+    for (LogEntryBatch* entry_batch : entry_batches) {
+      if (!entry_batch->callback().is_null()) {
+        entry_batch->callback().Run(s);
       }
-    } else {
-      TRACE_EVENT0("log", "Callbacks");
-      VLOG_WITH_PREFIX(2) << "Synchronized " << entry_batches.size() << " entry batches";
-      SCOPED_WATCH_STACK(100);
-      for (LogEntryBatch* entry_batch : entry_batches) {
-        if (PREDICT_TRUE(!entry_batch->callback().is_null())) {
-          entry_batch->callback().Run(Status::OK());
-        }
-        // It's important to delete each batch as we see it, because
-        // deleting it may free up memory from memory trackers, and the
-        // callback of a later batch may want to use that memory.
-        delete entry_batch;
+      delete entry_batch;
+    }
+  } else {
+    TRACE_EVENT0("log", "Callbacks");
+    VLOG_WITH_PREFIX(2) << "Synchronized " << entry_batches.size() << " entry batches";
+    SCOPED_WATCH_STACK(100);
+    for (LogEntryBatch* entry_batch : entry_batches) {
+      if (PREDICT_TRUE(!entry_batch->callback().is_null())) {
+        entry_batch->callback().Run(Status::OK());
       }
-      entry_batches.clear();
+      // It's important to delete each batch as we see it, because
+      // deleting it may free up memory from memory trackers, and the
+      // callback of a later batch may want to use that memory.
+      delete entry_batch;
     }
   }
-  VLOG_WITH_PREFIX(1) << "Exiting AppendThread";
 }
 
 void Log::AppendThread::Shutdown() {
   log_->entry_queue()->Shutdown();
-  std::lock_guard<std::mutex> lock_guard(lock_);
-  if (thread_) {
-    VLOG_WITH_PREFIX(1) << "Shutting down log append thread";
-    CHECK_OK(ThreadJoiner(thread_.get()).Join());
-    VLOG_WITH_PREFIX(1) << "Log append thread is shut down";
-    thread_.reset();
+  if (append_pool_) {
+    append_pool_->Wait();
+    append_pool_->Shutdown();
   }
 }
 
@@ -274,6 +396,12 @@ string Log::AppendThread::LogPrefix() const {
   return log_->LogPrefix();
 }
 
+// Return true if the append thread is currently active.
+bool Log::append_thread_active_for_tests() const {
+  return append_thread_->active();
+}
+
+
 // This task is submitted to allocation_pool_ in order to
 // asynchronously pre-allocate new log segments.
 void Log::SegmentAllocationTask() {
@@ -448,6 +576,7 @@ Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch, const StatusCallb
     TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
     return kLogShutdownStatus;
   }
+  append_thread_->Wake();
   entry_batch.release();
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/consensus/log.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 9048b38..4027c8a 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -202,6 +202,9 @@ class Log : public RefCountedThreadSafe<Log> {
     return active_segment_->path();
   }
 
+  // Return true if the append thread is currently active.
+  bool append_thread_active_for_tests() const;
+
   // Forces the Log to allocate a new segment and roll over.
   // This can be used to make sure all entries appended up to this point are
   // available in closed, readable segments.

http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/consensus/mt-log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/mt-log-test.cc b/src/kudu/consensus/mt-log-test.cc
index 09cd819..d925fb4 100644
--- a/src/kudu/consensus/mt-log-test.cc
+++ b/src/kudu/consensus/mt-log-test.cc
@@ -27,6 +27,7 @@
 #include "kudu/consensus/log_index.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/fault_injection.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/random.h"
 #include "kudu/util/thread.h"
@@ -37,6 +38,9 @@ DEFINE_int32(num_batches_per_thread, 2000, "Number of batches per thread");
 DEFINE_int32(num_ops_per_batch_avg, 5, "Target average number of ops per batch");
 DEFINE_bool(verify_log, true, "Whether to verify the log by reading it after the writes complete");
 
+DECLARE_int32(log_thread_idle_threshold_ms);
+DECLARE_int32(log_inject_thread_lifecycle_latency_ms);
+
 namespace kudu {
 namespace log {
 
@@ -124,6 +128,7 @@ class MultiThreadedLogTest : public LogTestBase {
         AssignIndexes(&batch_replicates);
         ASSERT_OK(log_->AsyncAppendReplicates(batch_replicates, cb->AsStatusCallback()));
       }
+      MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
     }
     latch.Wait();
     for (const Status& status : errors) {
@@ -167,6 +172,23 @@ class MultiThreadedLogTest : public LogTestBase {
       t.join();
     }
   }
+
+  void VerifyLog() {
+    shared_ptr<LogReader> reader;
+    ASSERT_OK(LogReader::Open(fs_manager_.get(), nullptr, kTestTablet, nullptr, &reader));
+    SegmentSequence segments;
+    ASSERT_OK(reader->GetSegmentsSnapshot(&segments));
+
+    for (const SegmentSequence::value_type& entry : segments) {
+      ASSERT_OK(entry->ReadEntries(&entries_));
+    }
+    vector<uint32_t> ids;
+    EntriesToIdList(&ids);
+    DVLOG(1) << "Wrote total of " << current_index_ - kStartIndex << " ops";
+    ASSERT_EQ(current_index_ - kStartIndex, ids.size());
+    ASSERT_TRUE(std::is_sorted(ids.begin(), ids.end()));
+  }
+
  private:
   ThreadSafeRandom random_;
   simple_spinlock lock_;
@@ -181,7 +203,6 @@ TEST_F(MultiThreadedLogTest, TestAppends) {
   }
 
   ASSERT_OK(BuildLog());
-  int start_current_id = current_index_;
   LOG_TIMING(INFO, strings::Substitute("inserting $0 batches($1 threads, $2 per-thread)",
                                        FLAGS_num_writer_threads * FLAGS_num_batches_per_thread,
                                        FLAGS_num_writer_threads,
@@ -190,21 +211,23 @@ TEST_F(MultiThreadedLogTest, TestAppends) {
   }
   ASSERT_OK(log_->Close());
   if (FLAGS_verify_log) {
-    shared_ptr<LogReader> reader;
-    ASSERT_OK(LogReader::Open(fs_manager_.get(), nullptr, kTestTablet, nullptr, &reader));
-    SegmentSequence segments;
-    ASSERT_OK(reader->GetSegmentsSnapshot(&segments));
-
-    for (const SegmentSequence::value_type& entry : segments) {
-      ASSERT_OK(entry->ReadEntries(&entries_));
-    }
-    vector<uint32_t> ids;
-    EntriesToIdList(&ids);
-    DVLOG(1) << "Wrote total of " << current_index_ - start_current_id << " ops";
-    ASSERT_EQ(current_index_ - start_current_id, ids.size());
-    ASSERT_TRUE(std::is_sorted(ids.begin(), ids.end()));
+    ASSERT_NO_FATAL_FAILURE(VerifyLog());
   }
 }
 
+// The lifecycle of the appender task starting and stopping is a bit complicated
+// (see Log::AppendThread::GoIdle for details). This injects some latency in key
+// points of that lifecycle to ensure that the different potential interleavings
+// are triggered. It also injects latency into the writes done by the tests, so
+// that sometimes writes are spaced out enough to allow the thread to go idle.
+TEST_F(MultiThreadedLogTest, TestAppendThreadStartStopRaces) {
+  FLAGS_log_thread_idle_threshold_ms = 1;
+  FLAGS_log_inject_thread_lifecycle_latency_ms = 2;
+  ASSERT_OK(BuildLog());
+  LogWriterThread(1);
+  ASSERT_OK(log_->Close());
+  ASSERT_NO_FATAL_FAILURE(VerifyLog());
+}
+
 } // namespace log
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/util/blocking_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc
index 917ada5..00f558a 100644
--- a/src/kudu/util/blocking_queue-test.cc
+++ b/src/kudu/util/blocking_queue-test.cc
@@ -25,8 +25,9 @@
 
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/blocking_queue.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
 
-using std::shared_ptr;
 using std::string;
 using std::thread;
 using std::vector;
@@ -59,14 +60,23 @@ TEST(BlockingQueueTest, TestBlockingDrainTo) {
   ASSERT_EQ(test_queue.Put(2), QUEUE_SUCCESS);
   ASSERT_EQ(test_queue.Put(3), QUEUE_SUCCESS);
   vector<int32_t> out;
-  ASSERT_TRUE(test_queue.BlockingDrainTo(&out));
+  ASSERT_OK(test_queue.BlockingDrainTo(&out, MonoTime::Now() + MonoDelta::FromSeconds(30)));
   ASSERT_EQ(1, out[0]);
   ASSERT_EQ(2, out[1]);
   ASSERT_EQ(3, out[2]);
+
+  // Set a deadline in the past and ensure we time out.
+  Status s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - MonoDelta::FromSeconds(1));
+  ASSERT_TRUE(s.IsTimedOut());
+
+  // Ensure that if the queue is shut down, we get Aborted status.
+  test_queue.Shutdown();
+  s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - MonoDelta::FromSeconds(1));
+  ASSERT_TRUE(s.IsAborted());
 }
 
 // Test that, when the queue is shut down with elements still pending,
-// Drain still returns true until the elements are all gone.
+// Drain still returns OK until the elements are all gone.
 TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
   // Put some elements into the queue and then shut it down.
   BlockingQueue<int32_t> q(3);
@@ -80,13 +90,14 @@ TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
   ASSERT_TRUE(q.BlockingGet(&i));
   ASSERT_EQ(1, i);
 
-  // Drain should still return true, since it yielded elements.
+  // Drain should still return OK, since it yielded elements.
   vector<int32_t> out;
-  ASSERT_TRUE(q.BlockingDrainTo(&out));
+  ASSERT_OK(q.BlockingDrainTo(&out));
   ASSERT_EQ(2, out[0]);
 
-  // Now that it's empty, it should return false.
-  ASSERT_FALSE(q.BlockingDrainTo(&out));
+  // Now that it's empty, it should return Aborted.
+  Status s = q.BlockingDrainTo(&out);
+  ASSERT_TRUE(s.IsAborted()) << s.ToString();
   ASSERT_FALSE(q.BlockingGet(&i));
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/util/blocking_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h
index 86a4c48..2907030 100644
--- a/src/kudu/util/blocking_queue.h
+++ b/src/kudu/util/blocking_queue.h
@@ -27,6 +27,7 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/util/condition_variable.h"
 #include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
 
 namespace kudu {
 
@@ -101,12 +102,18 @@ class BlockingQueue {
 
   // Get all elements from the queue and append them to a vector.
   //
+  // If 'deadline' passes and no elements have been returned from the
+  // queue, returns Status::TimedOut(). If 'deadline' is uninitialized,
+  // no deadline is used.
+  //
   // If the queue has been shut down, but there are still elements waiting,
   // then it returns those elements as if the queue were not yet shut down.
   //
-  // Returns false if the queue has been shut down and has no more remaining
-  // elements.
-  bool BlockingDrainTo(std::vector<T>* out) {
+  // Returns:
+  // - OK if successful
+  // - TimedOut if the deadline passed
+  // - Aborted if the queue shut down
+  Status BlockingDrainTo(std::vector<T>* out, MonoTime deadline = MonoTime()) {
     MutexLock l(lock_);
     while (true) {
       if (!list_.empty()) {
@@ -117,12 +124,16 @@ class BlockingQueue {
         }
         list_.clear();
         not_full_.Signal();
-        return true;
+        return Status::OK();
       }
-      if (shutdown_) {
-        return false;
+      if (PREDICT_FALSE(shutdown_)) {
+        return Status::Aborted("");
+      }
+      if (!deadline.Initialized()) {
+        not_empty_.Wait();
+      } else if (PREDICT_FALSE(!not_empty_.TimedWait(deadline - MonoTime::Now()))) {
+        return Status::TimedOut("");
       }
-      not_empty_.Wait();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 7dc0187..e985002 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -86,7 +86,7 @@ class Runnable {
 //    Status::ServiceUnavailable message from Submit().
 //    Default: INT_MAX.
 //
-// timeout: How long we'll keep around an idle thread before timing it out.
+// idle_timeout: How long we'll keep around an idle thread before timing it out.
 //    We always keep at least min_threads.
 //    Default: 500 milliseconds.
 //
@@ -137,7 +137,7 @@ class ThreadPoolBuilder {
 //            .set_min_threads(0)
 //            .set_max_threads(5)
 //            .set_max_queue_size(10)
-//            .set_timeout(MonoDelta::FromMilliseconds(2000))
+//            .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
 //            .Build(&thread_pool));
 //    thread_pool->Submit(shared_ptr<Runnable>(new Task()));
 //    thread_pool->Submit(boost::bind(&Func, 10));