You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/05/17 00:10:23 UTC
[4/4] kudu git commit: log: shut down appender thread when idle
log: shut down appender thread when idle
This changes the log appender thread to be based on submitting tasks to a pool,
rather than a thread which is always running.
See the comments on Log::AppendThread in log.cc for notes on the design.
This also adds a new test case in mt-log-test which triggers the various
complexities of the appender lifecycle. I manually checked coverage using
LOG(WARNING) statements to make sure it does exercise the various cases.
Change-Id: Id40a4cfcec96198b537c2f50be7ff1204faf96d2
Reviewed-on: http://gerrit.cloudera.org:8080/6856
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6311acb2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6311acb2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6311acb2
Branch: refs/heads/master
Commit: 6311acb2faf4f78964fe95cb07ab9331bc2657f7
Parents: c82c0f4
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Apr 12 00:26:38 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Tue May 16 23:31:13 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/log-test-base.h | 7 +-
src/kudu/consensus/log-test.cc | 19 +++
src/kudu/consensus/log.cc | 263 ++++++++++++++++++++++--------
src/kudu/consensus/log.h | 3 +
src/kudu/consensus/mt-log-test.cc | 51 ++++--
src/kudu/util/blocking_queue-test.cc | 25 ++-
src/kudu/util/blocking_queue.h | 25 ++-
src/kudu/util/threadpool.h | 4 +-
8 files changed, 298 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/consensus/log-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test-base.h b/src/kudu/consensus/log-test-base.h
index e21623e..77a9bc7 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -153,7 +153,6 @@ inline Status CorruptLogFile(Env* env, const string& log_path,
class LogTestBase : public KuduTest {
public:
-
typedef pair<int, int> DeltaId;
LogTestBase()
@@ -163,7 +162,7 @@ class LogTestBase : public KuduTest {
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
- current_index_ = 1;
+ current_index_ = kStartIndex;
fs_manager_.reset(new FsManager(env_, GetTestPath("fs_root")));
metric_registry_.reset(new MetricRegistry());
metric_entity_ = METRIC_ENTITY_tablet.Instantiate(metric_registry_.get(), "log-test-base");
@@ -377,6 +376,10 @@ class LogTestBase : public KuduTest {
}
protected:
+ enum {
+ kStartIndex = 1
+ };
+
const Schema schema_;
gscoped_ptr<FsManager> fs_manager_;
gscoped_ptr<MetricRegistry> metric_registry_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/consensus/log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test.cc b/src/kudu/consensus/log-test.cc
index f37f60e..12394a9 100644
--- a/src/kudu/consensus/log-test.cc
+++ b/src/kudu/consensus/log-test.cc
@@ -1097,5 +1097,24 @@ TEST_F(LogTest, TestDiskSpaceCheck) {
// detect that we are past the preallocation limit.
}
+// Test that the append thread shuts itself down after it's idle.
+TEST_F(LogTest, TestAutoStopIdleAppendThread) {
+ ASSERT_OK(BuildLog());
+ OpId opid = MakeOpId(1, 1);
+
+ // Append something to the queue and ensure that the thread starts itself.
+ // We loop here in case for some reason this thread gets de-scheduled just
+ // after the append long enough for the append thread to shut itself down
+ // again.
+ ASSERT_EVENTUALLY([&]() {
+ AppendNoOpsToLogSync(clock_, log_.get(), &opid, 2);
+ ASSERT_TRUE(log_->append_thread_active_for_tests());
+ });
+ // After some time, the append thread should shut itself down.
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_FALSE(log_->append_thread_active_for_tests());
+ });
+}
+
} // namespace log
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/consensus/log.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 50ce12b..8acc3cf 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -78,6 +78,12 @@ DEFINE_int32(group_commit_queue_size_bytes, 4 * 1024 * 1024,
TAG_FLAG(group_commit_queue_size_bytes, advanced);
+DEFINE_int32(log_thread_idle_threshold_ms, 1000,
+ "Number of milliseconds after which the log append thread decides that a "
+ "log is idle, and considers shutting down. Used by tests.");
+TAG_FLAG(log_thread_idle_threshold_ms, experimental);
+TAG_FLAG(log_thread_idle_threshold_ms, hidden);
+
// Compression configuration.
// -----------------------------
DEFINE_string(log_compression_codec, "LZ4",
@@ -96,12 +102,16 @@ DEFINE_int32(log_inject_latency_ms_mean, 100,
DEFINE_int32(log_inject_latency_ms_stddev, 100,
"The standard deviation of latency to inject in the log. "
"Only takes effect if --log_inject_latency is true");
+DEFINE_int32(log_inject_thread_lifecycle_latency_ms, 0,
+ "Injection point for random latency during key thread lifecycle transition "
+ "points.");
DEFINE_double(fault_crash_before_append_commit, 0.0,
"Fraction of the time when the server will crash just before appending a "
"COMMIT message to the log. (For testing only!)");
TAG_FLAG(log_inject_latency, unsafe);
TAG_FLAG(log_inject_latency_ms_mean, unsafe);
TAG_FLAG(log_inject_latency_ms_stddev, unsafe);
+TAG_FLAG(log_inject_thread_lifecycle_latency_ms, unsafe);
TAG_FLAG(fault_crash_before_append_commit, unsafe);
DEFINE_double(log_inject_io_error_on_append_fraction, 0.0,
@@ -152,13 +162,33 @@ using std::vector;
using std::unique_ptr;
using strings::Substitute;
-// This class is responsible for managing the thread that appends to
-// the log file.
+// Manages the thread which drains groups of batches from the log's queue and
+// appends them to the underlying log instance.
+//
+// Rather than being a long-running thread, this instead uses a threadpool with
+// size 1 to automatically start and stop a thread on demand. When the log
+// is idle for some amount of time, no task will be on the thread pool, and thus
+// the underlying thread may exit.
+//
+// The design of submitting tasks to the threadpool is slightly tricky in order
+// to achieve group commit and not have to submit one task per appended batch.
+// Instead, a generic 'DoWork()' task is used which loops collecting work until
+// it finds that it has been idle for a while, at which point the task finishes.
+//
+// The trick, then, lies in two areas:
+//
+// 1) after appending a batch, we need to ensure that a task is already running,
+// and if not, start one. This is done in Wake().
+//
+// 2) when the task finds no more work to do and wants to go idle, it needs to
+// ensure that it doesn't miss a concurrent wake-up. This is done in GoIdle().
+//
+// See the implementation comments in Wake() and GoIdle() for details.
class Log::AppendThread {
public:
explicit AppendThread(Log* log);
- // Initializes the objects and starts the thread.
+ // Initializes the objects and starts the thread pool.
Status Init();
// Waits until the last enqueued elements are processed, sets the
@@ -167,16 +197,47 @@ class Log::AppendThread {
// method.
void Shutdown();
+ // Wake up the appender task, if it is not already running.
+ // This should be called after each time that a new entry is
+ // appended to the log's queue.
+ void Wake();
+
+ bool active() const {
+ return base::subtle::NoBarrier_Load(&worker_state_) == WORKER_ACTIVE;
+ }
+
private:
- void RunThread();
+ // The task submitted to the threadpool which collects batches from the queue
+ // and appends them, until it determines that the queue is idle.
+ void DoWork();
+
+ // Tries to transition back to WORKER_STOPPED state. If successful, returns true.
+ //
+ // Otherwise, returns false to indicate that the task should keep running because
+ // a new task was enqueued just as we were trying to go idle.
+ bool GoIdle();
+
+ // Handle the actual appending of a group of entries. Responsible for deleting the
+ // LogEntryBatch* pointers.
+ void HandleGroup(vector<LogEntryBatch*> entry_batches);
string LogPrefix() const;
Log* const log_;
- // Lock to protect access to thread_ during shutdown.
- mutable std::mutex lock_;
- scoped_refptr<Thread> thread_;
+ // Atomic state machine for whether there is any worker task currently
+ // queued or running on append_pool_. See Wake() and GoIdle() for more details.
+ enum WorkerState {
+ // No worker task is queued or running.
+ WORKER_STOPPED,
+ // A worker task is queued or running.
+ WORKER_ACTIVE
+ };
+ Atomic32 worker_state_ = WORKER_STOPPED;
+
+ // Pool with a single thread, which handles shutting down the thread
+ // when idle.
+ gscoped_ptr<ThreadPool> append_pool_;
};
@@ -185,88 +246,149 @@ Log::AppendThread::AppendThread(Log *log)
}
Status Log::AppendThread::Init() {
- DCHECK(!thread_) << "Already initialized";
+ DCHECK(!append_pool_) << "Already initialized";
VLOG_WITH_PREFIX(1) << "Starting log append thread";
- RETURN_NOT_OK(kudu::Thread::Create("log", "appender",
- &AppendThread::RunThread, this, &thread_));
+ RETURN_NOT_OK(ThreadPoolBuilder("wal-append")
+ .set_min_threads(0)
+ // Only need one thread since we'll only schedule one
+ // task at a time.
+ .set_max_threads(1)
+ // No need for keeping idle threads, since the task itself
+ // handles waiting for work while idle.
+ .set_idle_timeout(MonoDelta::FromSeconds(0))
+ .Build(&append_pool_));
return Status::OK();
}
-void Log::AppendThread::RunThread() {
+void Log::AppendThread::Wake() {
+ DCHECK(append_pool_);
+ auto old_status = base::subtle::NoBarrier_CompareAndSwap(
+ &worker_state_, WORKER_STOPPED, WORKER_ACTIVE);
+ if (old_status == WORKER_STOPPED) {
+ CHECK_OK(append_pool_->SubmitClosure(Bind(&Log::AppendThread::DoWork, Unretained(this))));
+ }
+}
+
+bool Log::AppendThread::GoIdle() {
+ // Inject latency at key points in this function for the purposes of tests.
+ MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
+
+ // Stopping is a bit tricky. We have to consider the following race:
+ //
+ // T1 AppendThread
+ // ------------ -------------
+ // - state is TRIGGERED
+ // - BlockingDrainTo returns TimedOut()
+ // - queue.Put()
+ // - Wake() no-op because
+ // it's already triggered
+
+ // So, we first transition back to STOPPED state, and then re-check to see
+ // if there has been something enqueued in the meantime.
+ auto old_state = base::subtle::NoBarrier_AtomicExchange(&worker_state_, WORKER_STOPPED);
+ DCHECK_EQ(old_state, WORKER_ACTIVE);
+ if (log_->entry_queue()->empty()) {
+ // Nothing got enqueued, which means there must not have been any missed wakeup.
+ // We are now in WORKER_STOPPED state.
+ return true;
+ }
+
+ MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
+ // Someone enqueued something. We don't know whether their wakeup was successful
+ // or not, but we can just try to transition back to ACTIVE mode here.
+ if (base::subtle::NoBarrier_CompareAndSwap(&worker_state_, WORKER_STOPPED, WORKER_ACTIVE)
+ == WORKER_STOPPED) {
+ // Their wake-up was lost, but we've now marked ourselves as running.
+ MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
+ return false;
+ }
+
+ // Their wake-up was successful, meaning that there is another task on the
+ // queue behind us now, so we can exit this one.
+ MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
+ return true;
+}
+
+void Log::AppendThread::DoWork() {
+ DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(worker_state_), WORKER_ACTIVE);
+ VLOG_WITH_PREFIX(2) << "WAL Appender going active";
while (true) {
+ MonoTime deadline = MonoTime::Now() +
+ MonoDelta::FromMilliseconds(FLAGS_log_thread_idle_threshold_ms);
vector<LogEntryBatch*> entry_batches;
- ElementDeleter d(&entry_batches);
-
- if (PREDICT_FALSE(!log_->entry_queue()->BlockingDrainTo(&entry_batches))) {
- CHECK(entry_batches.empty());
+ Status s = log_->entry_queue()->BlockingDrainTo(&entry_batches, deadline);
+ if (PREDICT_FALSE(s.IsAborted())) {
break;
+ } else if (PREDICT_FALSE(s.IsTimedOut())) {
+ if (GoIdle()) break;
+ continue;
}
+ HandleGroup(std::move(entry_batches));
+ }
+ VLOG_WITH_PREFIX(2) << "WAL Appender going idle";
+}
- if (log_->metrics_) {
- log_->metrics_->entry_batches_per_group->Increment(entry_batches.size());
- }
- TRACE_EVENT1("log", "batch", "batch_size", entry_batches.size());
+void Log::AppendThread::HandleGroup(vector<LogEntryBatch*> entry_batches) {
+ if (log_->metrics_) {
+ log_->metrics_->entry_batches_per_group->Increment(entry_batches.size());
+ }
+ TRACE_EVENT1("log", "batch", "batch_size", entry_batches.size());
- SCOPED_LATENCY_METRIC(log_->metrics_, group_commit_latency);
+ SCOPED_LATENCY_METRIC(log_->metrics_, group_commit_latency);
- bool is_all_commits = true;
- for (LogEntryBatch* entry_batch : entry_batches) {
- TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
- Status s = log_->DoAppend(entry_batch);
- if (PREDICT_FALSE(!s.ok())) {
- LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
- // TODO(af): If a single transaction fails to append, should we
- // abort all subsequent transactions in this batch or allow
- // them to be appended? What about transactions in future
- // batches?
- if (!entry_batch->callback().is_null()) {
- entry_batch->callback().Run(s);
- entry_batch->callback_.Reset();
- }
- }
- if (is_all_commits && entry_batch->type_ != COMMIT) {
- is_all_commits = false;
+ bool is_all_commits = true;
+ for (LogEntryBatch* entry_batch : entry_batches) {
+ TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
+ Status s = log_->DoAppend(entry_batch);
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
+ // TODO(af): If a single transaction fails to append, should we
+ // abort all subsequent transactions in this batch or allow
+ // them to be appended? What about transactions in future
+ // batches?
+ if (!entry_batch->callback().is_null()) {
+ entry_batch->callback().Run(s);
+ entry_batch->callback_.Reset();
}
}
-
- Status s;
- if (!is_all_commits) {
- s = log_->Sync();
+ if (is_all_commits && entry_batch->type_ != COMMIT) {
+ is_all_commits = false;
}
- if (PREDICT_FALSE(!s.ok())) {
- LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
- for (LogEntryBatch* entry_batch : entry_batches) {
- if (!entry_batch->callback().is_null()) {
- entry_batch->callback().Run(s);
- }
+ }
+
+ Status s;
+ if (!is_all_commits) {
+ s = log_->Sync();
+ }
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG_WITH_PREFIX(ERROR) << "Error syncing log: " << s.ToString();
+ for (LogEntryBatch* entry_batch : entry_batches) {
+ if (!entry_batch->callback().is_null()) {
+ entry_batch->callback().Run(s);
}
- } else {
- TRACE_EVENT0("log", "Callbacks");
- VLOG_WITH_PREFIX(2) << "Synchronized " << entry_batches.size() << " entry batches";
- SCOPED_WATCH_STACK(100);
- for (LogEntryBatch* entry_batch : entry_batches) {
- if (PREDICT_TRUE(!entry_batch->callback().is_null())) {
- entry_batch->callback().Run(Status::OK());
- }
- // It's important to delete each batch as we see it, because
- // deleting it may free up memory from memory trackers, and the
- // callback of a later batch may want to use that memory.
- delete entry_batch;
+ delete entry_batch;
+ }
+ } else {
+ TRACE_EVENT0("log", "Callbacks");
+ VLOG_WITH_PREFIX(2) << "Synchronized " << entry_batches.size() << " entry batches";
+ SCOPED_WATCH_STACK(100);
+ for (LogEntryBatch* entry_batch : entry_batches) {
+ if (PREDICT_TRUE(!entry_batch->callback().is_null())) {
+ entry_batch->callback().Run(Status::OK());
}
- entry_batches.clear();
+ // It's important to delete each batch as we see it, because
+ // deleting it may free up memory from memory trackers, and the
+ // callback of a later batch may want to use that memory.
+ delete entry_batch;
}
}
- VLOG_WITH_PREFIX(1) << "Exiting AppendThread";
}
void Log::AppendThread::Shutdown() {
log_->entry_queue()->Shutdown();
- std::lock_guard<std::mutex> lock_guard(lock_);
- if (thread_) {
- VLOG_WITH_PREFIX(1) << "Shutting down log append thread";
- CHECK_OK(ThreadJoiner(thread_.get()).Join());
- VLOG_WITH_PREFIX(1) << "Log append thread is shut down";
- thread_.reset();
+ if (append_pool_) {
+ append_pool_->Wait();
+ append_pool_->Shutdown();
}
}
@@ -274,6 +396,12 @@ string Log::AppendThread::LogPrefix() const {
return log_->LogPrefix();
}
+// Return true if the append thread is currently active.
+bool Log::append_thread_active_for_tests() const {
+ return append_thread_->active();
+}
+
+
// This task is submitted to allocation_pool_ in order to
// asynchronously pre-allocate new log segments.
void Log::SegmentAllocationTask() {
@@ -448,6 +576,7 @@ Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch, const StatusCallb
TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
return kLogShutdownStatus;
}
+ append_thread_->Wake();
entry_batch.release();
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/consensus/log.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 9048b38..4027c8a 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -202,6 +202,9 @@ class Log : public RefCountedThreadSafe<Log> {
return active_segment_->path();
}
+ // Return true if the append thread is currently active.
+ bool append_thread_active_for_tests() const;
+
// Forces the Log to allocate a new segment and roll over.
// This can be used to make sure all entries appended up to this point are
// available in closed, readable segments.
http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/consensus/mt-log-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/mt-log-test.cc b/src/kudu/consensus/mt-log-test.cc
index 09cd819..d925fb4 100644
--- a/src/kudu/consensus/mt-log-test.cc
+++ b/src/kudu/consensus/mt-log-test.cc
@@ -27,6 +27,7 @@
#include "kudu/consensus/log_index.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/fault_injection.h"
#include "kudu/util/locks.h"
#include "kudu/util/random.h"
#include "kudu/util/thread.h"
@@ -37,6 +38,9 @@ DEFINE_int32(num_batches_per_thread, 2000, "Number of batches per thread");
DEFINE_int32(num_ops_per_batch_avg, 5, "Target average number of ops per batch");
DEFINE_bool(verify_log, true, "Whether to verify the log by reading it after the writes complete");
+DECLARE_int32(log_thread_idle_threshold_ms);
+DECLARE_int32(log_inject_thread_lifecycle_latency_ms);
+
namespace kudu {
namespace log {
@@ -124,6 +128,7 @@ class MultiThreadedLogTest : public LogTestBase {
AssignIndexes(&batch_replicates);
ASSERT_OK(log_->AsyncAppendReplicates(batch_replicates, cb->AsStatusCallback()));
}
+ MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
}
latch.Wait();
for (const Status& status : errors) {
@@ -167,6 +172,23 @@ class MultiThreadedLogTest : public LogTestBase {
t.join();
}
}
+
+ void VerifyLog() {
+ shared_ptr<LogReader> reader;
+ ASSERT_OK(LogReader::Open(fs_manager_.get(), nullptr, kTestTablet, nullptr, &reader));
+ SegmentSequence segments;
+ ASSERT_OK(reader->GetSegmentsSnapshot(&segments));
+
+ for (const SegmentSequence::value_type& entry : segments) {
+ ASSERT_OK(entry->ReadEntries(&entries_));
+ }
+ vector<uint32_t> ids;
+ EntriesToIdList(&ids);
+ DVLOG(1) << "Wrote total of " << current_index_ - kStartIndex << " ops";
+ ASSERT_EQ(current_index_ - kStartIndex, ids.size());
+ ASSERT_TRUE(std::is_sorted(ids.begin(), ids.end()));
+ }
+
private:
ThreadSafeRandom random_;
simple_spinlock lock_;
@@ -181,7 +203,6 @@ TEST_F(MultiThreadedLogTest, TestAppends) {
}
ASSERT_OK(BuildLog());
- int start_current_id = current_index_;
LOG_TIMING(INFO, strings::Substitute("inserting $0 batches($1 threads, $2 per-thread)",
FLAGS_num_writer_threads * FLAGS_num_batches_per_thread,
FLAGS_num_writer_threads,
@@ -190,21 +211,23 @@ TEST_F(MultiThreadedLogTest, TestAppends) {
}
ASSERT_OK(log_->Close());
if (FLAGS_verify_log) {
- shared_ptr<LogReader> reader;
- ASSERT_OK(LogReader::Open(fs_manager_.get(), nullptr, kTestTablet, nullptr, &reader));
- SegmentSequence segments;
- ASSERT_OK(reader->GetSegmentsSnapshot(&segments));
-
- for (const SegmentSequence::value_type& entry : segments) {
- ASSERT_OK(entry->ReadEntries(&entries_));
- }
- vector<uint32_t> ids;
- EntriesToIdList(&ids);
- DVLOG(1) << "Wrote total of " << current_index_ - start_current_id << " ops";
- ASSERT_EQ(current_index_ - start_current_id, ids.size());
- ASSERT_TRUE(std::is_sorted(ids.begin(), ids.end()));
+ ASSERT_NO_FATAL_FAILURE(VerifyLog());
}
}
+// The lifecycle of the appender task starting and stopping is a bit complicated
+// (see Log::AppendThread::GoIdle for details). This injects some latency in key
+// points of that lifecycle to ensure that the different potential interleavings
+// are triggered. It also injects latency into the writes done by the tests, so
+// that sometimes writes are spaced out enough to allow the thread to go idle.
+TEST_F(MultiThreadedLogTest, TestAppendThreadStartStopRaces) {
+ FLAGS_log_thread_idle_threshold_ms = 1;
+ FLAGS_log_inject_thread_lifecycle_latency_ms = 2;
+ ASSERT_OK(BuildLog());
+ LogWriterThread(1);
+ ASSERT_OK(log_->Close());
+ ASSERT_NO_FATAL_FAILURE(VerifyLog());
+}
+
} // namespace log
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/util/blocking_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc
index 917ada5..00f558a 100644
--- a/src/kudu/util/blocking_queue-test.cc
+++ b/src/kudu/util/blocking_queue-test.cc
@@ -25,8 +25,9 @@
#include "kudu/util/countdown_latch.h"
#include "kudu/util/blocking_queue.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
-using std::shared_ptr;
using std::string;
using std::thread;
using std::vector;
@@ -59,14 +60,23 @@ TEST(BlockingQueueTest, TestBlockingDrainTo) {
ASSERT_EQ(test_queue.Put(2), QUEUE_SUCCESS);
ASSERT_EQ(test_queue.Put(3), QUEUE_SUCCESS);
vector<int32_t> out;
- ASSERT_TRUE(test_queue.BlockingDrainTo(&out));
+ ASSERT_OK(test_queue.BlockingDrainTo(&out, MonoTime::Now() + MonoDelta::FromSeconds(30)));
ASSERT_EQ(1, out[0]);
ASSERT_EQ(2, out[1]);
ASSERT_EQ(3, out[2]);
+
+ // Set a deadline in the past and ensure we time out.
+ Status s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - MonoDelta::FromSeconds(1));
+ ASSERT_TRUE(s.IsTimedOut());
+
+ // Ensure that if the queue is shut down, we get Aborted status.
+ test_queue.Shutdown();
+ s = test_queue.BlockingDrainTo(&out, MonoTime::Now() - MonoDelta::FromSeconds(1));
+ ASSERT_TRUE(s.IsAborted());
}
// Test that, when the queue is shut down with elements still pending,
-// Drain still returns true until the elements are all gone.
+// Drain still returns OK until the elements are all gone.
TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
// Put some elements into the queue and then shut it down.
BlockingQueue<int32_t> q(3);
@@ -80,13 +90,14 @@ TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
ASSERT_TRUE(q.BlockingGet(&i));
ASSERT_EQ(1, i);
- // Drain should still return true, since it yielded elements.
+ // Drain should still return OK, since it yielded elements.
vector<int32_t> out;
- ASSERT_TRUE(q.BlockingDrainTo(&out));
+ ASSERT_OK(q.BlockingDrainTo(&out));
ASSERT_EQ(2, out[0]);
- // Now that it's empty, it should return false.
- ASSERT_FALSE(q.BlockingDrainTo(&out));
+ // Now that it's empty, it should return Aborted.
+ Status s = q.BlockingDrainTo(&out);
+ ASSERT_TRUE(s.IsAborted()) << s.ToString();
ASSERT_FALSE(q.BlockingGet(&i));
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/util/blocking_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h
index 86a4c48..2907030 100644
--- a/src/kudu/util/blocking_queue.h
+++ b/src/kudu/util/blocking_queue.h
@@ -27,6 +27,7 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/util/condition_variable.h"
#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
namespace kudu {
@@ -101,12 +102,18 @@ class BlockingQueue {
// Get all elements from the queue and append them to a vector.
//
+ // If 'deadline' passes and no elements have been returned from the
+ // queue, returns Status::TimedOut(). If 'deadline' is uninitialized,
+ // no deadline is used.
+ //
// If the queue has been shut down, but there are still elements waiting,
// then it returns those elements as if the queue were not yet shut down.
//
- // Returns false if the queue has been shut down and has no more remaining
- // elements.
- bool BlockingDrainTo(std::vector<T>* out) {
+ // Returns:
+ // - OK if successful
+ // - TimedOut if the deadline passed
+ // - Aborted if the queue shut down
+ Status BlockingDrainTo(std::vector<T>* out, MonoTime deadline = MonoTime()) {
MutexLock l(lock_);
while (true) {
if (!list_.empty()) {
@@ -117,12 +124,16 @@ class BlockingQueue {
}
list_.clear();
not_full_.Signal();
- return true;
+ return Status::OK();
}
- if (shutdown_) {
- return false;
+ if (PREDICT_FALSE(shutdown_)) {
+ return Status::Aborted("");
+ }
+ if (!deadline.Initialized()) {
+ not_empty_.Wait();
+ } else if (PREDICT_FALSE(!not_empty_.TimedWait(deadline - MonoTime::Now()))) {
+ return Status::TimedOut("");
}
- not_empty_.Wait();
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/6311acb2/src/kudu/util/threadpool.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/threadpool.h b/src/kudu/util/threadpool.h
index 7dc0187..e985002 100644
--- a/src/kudu/util/threadpool.h
+++ b/src/kudu/util/threadpool.h
@@ -86,7 +86,7 @@ class Runnable {
// Status::ServiceUnavailable message from Submit().
// Default: INT_MAX.
//
-// timeout: How long we'll keep around an idle thread before timing it out.
+// idle_timeout: How long we'll keep around an idle thread before timing it out.
// We always keep at least min_threads.
// Default: 500 milliseconds.
//
@@ -137,7 +137,7 @@ class ThreadPoolBuilder {
// .set_min_threads(0)
// .set_max_threads(5)
// .set_max_queue_size(10)
-// .set_timeout(MonoDelta::FromMilliseconds(2000))
+// .set_idle_timeout(MonoDelta::FromMilliseconds(2000))
// .Build(&thread_pool));
// thread_pool->Submit(shared_ptr<Runnable>(new Task()));
// thread_pool->Submit(boost::bind(&Func, 10));