You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/10/18 21:32:16 UTC
[kudu] 01/02: log: rename a couple of things for clarity
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 524ed47e3f56fdb90f384fd8454efe179d5aa920
Author: Andrew Wong <aw...@apache.org>
AuthorDate: Tue Oct 15 22:13:24 2019 -0700
log: rename a couple of things for clarity
This patch:
- renames Log::DoAppend() to Log::WriteBatch(),
- renames AppendThread::HandleGroup() to AppendThread::HandleBatches(),
- renames AppendThread::WorkerState to AppendThread::ThreadState, to
homogenize the terminology used in that class,
- renames AppendThread::{WORKER_STOPPED,WORKER_ACTIVE} to IDLE and
ACTIVE respectively
I think these names make it slightly more obvious what the functions are
doing.
There are no functional changes here.
Change-Id: I9fcaf2656bc31e5015c285392029fa98f6137cef
Reviewed-on: http://gerrit.cloudera.org:8080/14477
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/consensus/log.cc | 72 ++++++++++++++++++++++++-----------------------
src/kudu/consensus/log.h | 2 +-
2 files changed, 38 insertions(+), 36 deletions(-)
diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 7a95afa..df3ac4b 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -193,16 +193,18 @@ using strings::Substitute;
//
// The design of submitting tasks to the threadpool is slightly tricky in order
// to achieve group commit and not have to submit one task per appended batch.
-// Instead, a generic 'DoWork()' task is used which loops collecting work until
-// it finds that it has been idle for a while, at which point the task finishes.
+// Instead, a generic 'ProcessQueue()' task is used which loops collecting
+// batches to write until it finds that the queue has been empty for a while,
+// at which point the task finishes.
//
// The trick, then, lies in two areas:
//
-// 1) after appending a batch, we need to ensure that a task is already running,
-// and if not, start one. This is done in Wake().
+// 1) After adding a batch to the queue, we need to ensure that a task is
+// already running, and if not, start one. This is done in Wake().
//
-// 2) when the task finds no more work to do and wants to go idle, it needs to
-// ensure that it doesn't miss a concurrent wake-up. This is done in GoIdle().
+// 2) When the task finds no more batches to write and wants to go idle, it
+// needs to ensure that it doesn't miss a concurrent additions to the queue.
+// This is done in GoIdle().
//
// See the implementation comments in Wake() and GoIdle() for details.
class Log::AppendThread {
@@ -224,15 +226,15 @@ class Log::AppendThread {
void Wake();
bool active() const {
- return base::subtle::NoBarrier_Load(&worker_state_) == WORKER_ACTIVE;
+ return base::subtle::NoBarrier_Load(&thread_state_) == ACTIVE;
}
private:
// The task submitted to the threadpool which collects batches from the queue
// and appends them, until it determines that the queue is idle.
- void DoWork();
+ void ProcessQueue();
- // Tries to transition back to WORKER_STOPPED state. If successful, returns true.
+ // Tries to transition back to IDLE state. If successful, returns true.
//
// Otherwise, returns false to indicate that the task should keep running because
// a new task was enqueued just as we were trying to go idle.
@@ -240,21 +242,21 @@ class Log::AppendThread {
// Handle the actual appending of a group of entries. Responsible for deleting the
// LogEntryBatch* pointers.
- void HandleGroup(vector<LogEntryBatch*> entry_batches);
+ void HandleBatches(vector<LogEntryBatch*> entry_batches);
string LogPrefix() const;
Log* const log_;
- // Atomic state machine for whether there is any worker task currently
- // queued or running on append_pool_. See Wake() and GoIdle() for more details.
- enum WorkerState {
- // No worker task is queued or running.
- WORKER_STOPPED,
- // A worker task is queued or running.
- WORKER_ACTIVE
+ // Atomic state machine for whether there is any task currently queued or
+ // running on append_pool_. See Wake() and GoIdle() for more details.
+ enum ThreadState {
+ // No task is queued or running.
+ IDLE,
+ // A task is queued or running.
+ ACTIVE
};
- Atomic32 worker_state_ = WORKER_STOPPED;
+ Atomic32 thread_state_ = IDLE;
// Pool with a single thread, which handles shutting down the thread
// when idle.
@@ -284,9 +286,9 @@ Status Log::AppendThread::Init() {
void Log::AppendThread::Wake() {
DCHECK(append_pool_);
auto old_status = base::subtle::NoBarrier_CompareAndSwap(
- &worker_state_, WORKER_STOPPED, WORKER_ACTIVE);
- if (old_status == WORKER_STOPPED) {
- CHECK_OK(append_pool_->SubmitClosure(Bind(&Log::AppendThread::DoWork, Unretained(this))));
+ &thread_state_, IDLE, ACTIVE);
+ if (old_status == IDLE) {
+ CHECK_OK(append_pool_->SubmitClosure(Bind(&Log::AppendThread::ProcessQueue, Unretained(this))));
}
}
@@ -306,19 +308,19 @@ bool Log::AppendThread::GoIdle() {
// So, we first transition back to STOPPED state, and then re-check to see
// if there has been something enqueued in the meantime.
- auto old_state = base::subtle::NoBarrier_AtomicExchange(&worker_state_, WORKER_STOPPED);
- DCHECK_EQ(old_state, WORKER_ACTIVE);
+ auto old_state = base::subtle::NoBarrier_AtomicExchange(&thread_state_, IDLE);
+ DCHECK_EQ(old_state, ACTIVE);
if (log_->entry_queue()->empty()) {
// Nothing got enqueued, which means there must not have been any missed wakeup.
- // We are now in WORKER_STOPPED state.
+ // We are now in IDLE state.
return true;
}
MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
// Someone enqueued something. We don't know whether their wakeup was successful
// or not, but we can just try to transition back to ACTIVE mode here.
- if (base::subtle::NoBarrier_CompareAndSwap(&worker_state_, WORKER_STOPPED, WORKER_ACTIVE)
- == WORKER_STOPPED) {
+ if (base::subtle::NoBarrier_CompareAndSwap(&thread_state_, IDLE, ACTIVE)
+ == IDLE) {
// Their wake-up was lost, but we've now marked ourselves as running.
MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
return false;
@@ -330,8 +332,8 @@ bool Log::AppendThread::GoIdle() {
return true;
}
-void Log::AppendThread::DoWork() {
- DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(worker_state_), WORKER_ACTIVE);
+void Log::AppendThread::ProcessQueue() {
+ DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(thread_state_), ACTIVE);
VLOG_WITH_PREFIX(2) << "WAL Appender going active";
while (true) {
MonoTime deadline = MonoTime::Now() +
@@ -344,12 +346,12 @@ void Log::AppendThread::DoWork() {
if (GoIdle()) break;
continue;
}
- HandleGroup(std::move(entry_batches));
+ HandleBatches(std::move(entry_batches));
}
VLOG_WITH_PREFIX(2) << "WAL Appender going idle";
}
-void Log::AppendThread::HandleGroup(vector<LogEntryBatch*> entry_batches) {
+void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
if (log_->metrics_) {
log_->metrics_->entry_batches_per_group->Increment(entry_batches.size());
}
@@ -360,7 +362,7 @@ void Log::AppendThread::HandleGroup(vector<LogEntryBatch*> entry_batches) {
bool is_all_commits = true;
for (LogEntryBatch* entry_batch : entry_batches) {
TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
- Status s = log_->DoAppend(entry_batch);
+ Status s = log_->WriteBatch(entry_batch);
if (PREDICT_FALSE(!s.ok())) {
LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
// TODO(af): If a single transaction fails to append, should we
@@ -630,12 +632,12 @@ Status Log::AsyncAppendCommit(gscoped_ptr<consensus::CommitMsg> commit_msg,
return Status::OK();
}
-Status Log::DoAppend(LogEntryBatch* entry_batch) {
+Status Log::WriteBatch(LogEntryBatch* entry_batch) {
size_t num_entries = entry_batch->count();
- DCHECK_GT(num_entries, 0) << "Cannot call DoAppend() with zero entries reserved";
+ DCHECK_GT(num_entries, 0) << "Cannot call WriteBatch() with zero entries reserved";
MAYBE_RETURN_FAILURE(FLAGS_log_inject_io_error_on_append_fraction,
- Status::IOError("Injected IOError in Log::DoAppend()"));
+ Status::IOError("Injected IOError in Log::WriteBatch()"));
Slice entry_batch_data = entry_batch->data();
uint32_t entry_batch_bytes = entry_batch->total_size_bytes();
@@ -804,7 +806,7 @@ Status Log::Append(LogEntryPB* entry) {
entry_batch_pb->mutable_entry()->AddAllocated(entry);
LogEntryBatch entry_batch(entry->type(), std::move(entry_batch_pb), 1);
entry_batch.Serialize();
- Status s = DoAppend(&entry_batch);
+ Status s = WriteBatch(&entry_batch);
if (s.ok()) {
s = Sync();
}
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 34ffe15..1df3a20 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -300,7 +300,7 @@ class Log : public RefCountedThreadSafe<Log> {
// Writes serialized contents of 'entry' to the log. Called inside
// AppenderThread.
- Status DoAppend(LogEntryBatch* entry_batch);
+ Status WriteBatch(LogEntryBatch* entry_batch);
// Update footer_builder_ to reflect the log indexes seen in 'batch'.
void UpdateFooterForBatch(LogEntryBatch* batch);