You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2019/10/18 21:32:16 UTC

[kudu] 01/02: log: rename a couple of things for clarity

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 524ed47e3f56fdb90f384fd8454efe179d5aa920
Author: Andrew Wong <aw...@apache.org>
AuthorDate: Tue Oct 15 22:13:24 2019 -0700

    log: rename a couple of things for clarity
    
    This patch:
    - renames Log::DoAppend() to Log::WriteBatch(),
    - renames AppendThread::HandleGroup() to AppendThread::HandleBatches(),
    - renames AppendThread::WorkerState to AppendThread::ThreadState, to
      homogenize the terminology used in that class,
    - renames AppendThread::{WORKER_STOPPED,WORKER_ACTIVE} to IDLE and
      ACTIVE respectively
    
    I think these names make it slightly more obvious what the functions are
    doing.
    
    There are no functional changes here.
    
    Change-Id: I9fcaf2656bc31e5015c285392029fa98f6137cef
    Reviewed-on: http://gerrit.cloudera.org:8080/14477
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/consensus/log.cc | 72 ++++++++++++++++++++++++-----------------------
 src/kudu/consensus/log.h  |  2 +-
 2 files changed, 38 insertions(+), 36 deletions(-)

diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index 7a95afa..df3ac4b 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -193,16 +193,18 @@ using strings::Substitute;
 //
 // The design of submitting tasks to the threadpool is slightly tricky in order
 // to achieve group commit and not have to submit one task per appended batch.
-// Instead, a generic 'DoWork()' task is used which loops collecting work until
-// it finds that it has been idle for a while, at which point the task finishes.
+// Instead, a generic 'ProcessQueue()' task is used which loops collecting
+// batches to write until it finds that the queue has been empty for a while,
+// at which point the task finishes.
 //
 // The trick, then, lies in two areas:
 //
-// 1) after appending a batch, we need to ensure that a task is already running,
-//    and if not, start one. This is done in Wake().
+// 1) After adding a batch to the queue, we need to ensure that a task is
+//    already running, and if not, start one. This is done in Wake().
 //
-// 2) when the task finds no more work to do and wants to go idle, it needs to
-//    ensure that it doesn't miss a concurrent wake-up. This is done in GoIdle().
+// 2) When the task finds no more batches to write and wants to go idle, it
+//    needs to ensure that it doesn't miss a concurrent additions to the queue.
+//    This is done in GoIdle().
 //
 // See the implementation comments in Wake() and GoIdle() for details.
 class Log::AppendThread {
@@ -224,15 +226,15 @@ class Log::AppendThread {
   void Wake();
 
   bool active() const {
-    return base::subtle::NoBarrier_Load(&worker_state_) == WORKER_ACTIVE;
+    return base::subtle::NoBarrier_Load(&thread_state_) == ACTIVE;
   }
 
  private:
   // The task submitted to the threadpool which collects batches from the queue
   // and appends them, until it determines that the queue is idle.
-  void DoWork();
+  void ProcessQueue();
 
-  // Tries to transition back to WORKER_STOPPED state. If successful, returns true.
+  // Tries to transition back to IDLE state. If successful, returns true.
   //
   // Otherwise, returns false to indicate that the task should keep running because
   // a new task was enqueued just as we were trying to go idle.
@@ -240,21 +242,21 @@ class Log::AppendThread {
 
   // Handle the actual appending of a group of entries. Responsible for deleting the
   // LogEntryBatch* pointers.
-  void HandleGroup(vector<LogEntryBatch*> entry_batches);
+  void HandleBatches(vector<LogEntryBatch*> entry_batches);
 
   string LogPrefix() const;
 
   Log* const log_;
 
-  // Atomic state machine for whether there is any worker task currently
-  // queued or running on append_pool_. See Wake() and GoIdle() for more details.
-  enum WorkerState {
-    // No worker task is queued or running.
-    WORKER_STOPPED,
-    // A worker task is queued or running.
-    WORKER_ACTIVE
+  // Atomic state machine for whether there is any task currently queued or
+  // running on append_pool_. See Wake() and GoIdle() for more details.
+  enum ThreadState {
+    // No task is queued or running.
+    IDLE,
+    // A task is queued or running.
+    ACTIVE
   };
-  Atomic32 worker_state_ = WORKER_STOPPED;
+  Atomic32 thread_state_ = IDLE;
 
   // Pool with a single thread, which handles shutting down the thread
   // when idle.
@@ -284,9 +286,9 @@ Status Log::AppendThread::Init() {
 void Log::AppendThread::Wake() {
   DCHECK(append_pool_);
   auto old_status = base::subtle::NoBarrier_CompareAndSwap(
-      &worker_state_, WORKER_STOPPED, WORKER_ACTIVE);
-  if (old_status == WORKER_STOPPED) {
-    CHECK_OK(append_pool_->SubmitClosure(Bind(&Log::AppendThread::DoWork, Unretained(this))));
+      &thread_state_, IDLE, ACTIVE);
+  if (old_status == IDLE) {
+    CHECK_OK(append_pool_->SubmitClosure(Bind(&Log::AppendThread::ProcessQueue, Unretained(this))));
   }
 }
 
@@ -306,19 +308,19 @@ bool Log::AppendThread::GoIdle() {
 
   // So, we first transition back to STOPPED state, and then re-check to see
   // if there has been something enqueued in the meantime.
-  auto old_state = base::subtle::NoBarrier_AtomicExchange(&worker_state_, WORKER_STOPPED);
-  DCHECK_EQ(old_state, WORKER_ACTIVE);
+  auto old_state = base::subtle::NoBarrier_AtomicExchange(&thread_state_, IDLE);
+  DCHECK_EQ(old_state, ACTIVE);
   if (log_->entry_queue()->empty()) {
     // Nothing got enqueued, which means there must not have been any missed wakeup.
-    // We are now in WORKER_STOPPED state.
+    // We are now in IDLE state.
     return true;
   }
 
   MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
   // Someone enqueued something. We don't know whether their wakeup was successful
   // or not, but we can just try to transition back to ACTIVE mode here.
-  if (base::subtle::NoBarrier_CompareAndSwap(&worker_state_, WORKER_STOPPED, WORKER_ACTIVE)
-      == WORKER_STOPPED) {
+  if (base::subtle::NoBarrier_CompareAndSwap(&thread_state_, IDLE, ACTIVE)
+      == IDLE) {
     // Their wake-up was lost, but we've now marked ourselves as running.
     MAYBE_INJECT_RANDOM_LATENCY(FLAGS_log_inject_thread_lifecycle_latency_ms);
     return false;
@@ -330,8 +332,8 @@ bool Log::AppendThread::GoIdle() {
   return true;
 }
 
-void Log::AppendThread::DoWork() {
-  DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(worker_state_), WORKER_ACTIVE);
+void Log::AppendThread::ProcessQueue() {
+  DCHECK_EQ(ANNOTATE_UNPROTECTED_READ(thread_state_), ACTIVE);
   VLOG_WITH_PREFIX(2) << "WAL Appender going active";
   while (true) {
     MonoTime deadline = MonoTime::Now() +
@@ -344,12 +346,12 @@ void Log::AppendThread::DoWork() {
       if (GoIdle()) break;
       continue;
     }
-    HandleGroup(std::move(entry_batches));
+    HandleBatches(std::move(entry_batches));
   }
   VLOG_WITH_PREFIX(2) << "WAL Appender going idle";
 }
 
-void Log::AppendThread::HandleGroup(vector<LogEntryBatch*> entry_batches) {
+void Log::AppendThread::HandleBatches(vector<LogEntryBatch*> entry_batches) {
   if (log_->metrics_) {
     log_->metrics_->entry_batches_per_group->Increment(entry_batches.size());
   }
@@ -360,7 +362,7 @@ void Log::AppendThread::HandleGroup(vector<LogEntryBatch*> entry_batches) {
   bool is_all_commits = true;
   for (LogEntryBatch* entry_batch : entry_batches) {
     TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch);
-    Status s = log_->DoAppend(entry_batch);
+    Status s = log_->WriteBatch(entry_batch);
     if (PREDICT_FALSE(!s.ok())) {
       LOG_WITH_PREFIX(ERROR) << "Error appending to the log: " << s.ToString();
       // TODO(af): If a single transaction fails to append, should we
@@ -630,12 +632,12 @@ Status Log::AsyncAppendCommit(gscoped_ptr<consensus::CommitMsg> commit_msg,
   return Status::OK();
 }
 
-Status Log::DoAppend(LogEntryBatch* entry_batch) {
+Status Log::WriteBatch(LogEntryBatch* entry_batch) {
   size_t num_entries = entry_batch->count();
-  DCHECK_GT(num_entries, 0) << "Cannot call DoAppend() with zero entries reserved";
+  DCHECK_GT(num_entries, 0) << "Cannot call WriteBatch() with zero entries reserved";
 
   MAYBE_RETURN_FAILURE(FLAGS_log_inject_io_error_on_append_fraction,
-                       Status::IOError("Injected IOError in Log::DoAppend()"));
+                       Status::IOError("Injected IOError in Log::WriteBatch()"));
 
   Slice entry_batch_data = entry_batch->data();
   uint32_t entry_batch_bytes = entry_batch->total_size_bytes();
@@ -804,7 +806,7 @@ Status Log::Append(LogEntryPB* entry) {
   entry_batch_pb->mutable_entry()->AddAllocated(entry);
   LogEntryBatch entry_batch(entry->type(), std::move(entry_batch_pb), 1);
   entry_batch.Serialize();
-  Status s = DoAppend(&entry_batch);
+  Status s = WriteBatch(&entry_batch);
   if (s.ok()) {
     s = Sync();
   }
diff --git a/src/kudu/consensus/log.h b/src/kudu/consensus/log.h
index 34ffe15..1df3a20 100644
--- a/src/kudu/consensus/log.h
+++ b/src/kudu/consensus/log.h
@@ -300,7 +300,7 @@ class Log : public RefCountedThreadSafe<Log> {
 
   // Writes serialized contents of 'entry' to the log. Called inside
   // AppenderThread.
-  Status DoAppend(LogEntryBatch* entry_batch);
+  Status WriteBatch(LogEntryBatch* entry_batch);
 
   // Update footer_builder_ to reflect the log indexes seen in 'batch'.
   void UpdateFooterForBatch(LogEntryBatch* batch);