You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/06/22 19:03:17 UTC

[kudu] branch master updated: [consensus] KUDU-2727 lock-free CheckLeadershipAndBindTerm()

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new da1c66b  [consensus] KUDU-2727 lock-free CheckLeadershipAndBindTerm()
da1c66b is described below

commit da1c66b61c4924a4726aaeb171e79cc0b03ca65c
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Jun 5 17:01:53 2020 -0700

    [consensus] KUDU-2727 lock-free CheckLeadershipAndBindTerm()
    
    This addresses the following lock contention issue outlined in KUDU-2727:
    
      tids=[93293,93284,93285,93286,93287,93288,93289,93290,93291,93292,93304,93294,93295,93296,93297,93298,93299,93300,93301,93302,93303,93313,93322,93321,93320,93319,93318,93317,93316,93315,93314,93283,93312,93311,93310,93309,93308,93307,93306,93305]
          0x7f61b79fc5e0 <unknown>
               0x1ec35f4 base::internal::SpinLockDelay()
               0x1ec347c base::SpinLock::SlowLock()
                0xb7deb8 kudu::consensus::RaftConsensus::CheckLeadershipAndBindTerm()
                0xaab010 kudu::tablet::TransactionDriver::ExecuteAsync()
                0xaa344c kudu::tablet::TabletReplica::SubmitWrite()
                0x928fb0 kudu::tserver::TabletServiceImpl::Write()
               0x1d2e8d9 kudu::rpc::GeneratedServiceIf::Handle()
               0x1d2efd9 kudu::rpc::ServicePool::RunThread()
               0x1ea4a84 kudu::Thread::SuperviseThread()
          0x7f61b79f4e25 start_thread
          0x7f61b5cd234d __clone
      tids=[93383]
          0x7f61b79fc5e0 <unknown>
          0x7f61b79f8cf2 __pthread_cond_timedwait
               0x1dfcfa9 kudu::ConditionVariable::WaitUntil()
                0xb73bc7 kudu::consensus::RaftConsensus::UpdateReplica()
                0xb75128 kudu::consensus::RaftConsensus::Update()
                0x92c5d1 kudu::tserver::ConsensusServiceImpl::UpdateConsensus()
               0x1d2e8d9 kudu::rpc::GeneratedServiceIf::Handle()
               0x1d2efd9 kudu::rpc::ServicePool::RunThread()
               0x1ea4a84 kudu::Thread::SuperviseThread()
          0x7f61b79f4e25 start_thread
          0x7f61b5cd234d __clone
    
    In addition, with this patch RaftConsensus::DumpStatusHtml() no longer
    blocks Raft consensus activity and isn't blocked by it either.
    
    The inspiration for this update came from Yugabyte-DB:
      https://github.com/yugabyte/yugabyte-db/commit/7b3271e4a0f34ff3c3f6e8961288db9cee335461
    
    Change-Id: I934ae3035d893fd850afe27d96f8dd6612c9ffbd
    Reviewed-on: http://gerrit.cloudera.org:8080/16034
    Tested-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/consensus/consensus_meta.cc               |  91 +++++++++++++++--
 src/kudu/consensus/consensus_meta.h                |  23 ++++-
 src/kudu/consensus/raft_consensus.cc               | 112 +++++++++++++++------
 src/kudu/consensus/raft_consensus.h                |  11 +-
 src/kudu/integration-tests/raft_consensus-itest.cc |   3 +-
 src/kudu/tserver/tablet_service.cc                 |   7 +-
 6 files changed, 202 insertions(+), 45 deletions(-)

diff --git a/src/kudu/consensus/consensus_meta.cc b/src/kudu/consensus/consensus_meta.cc
index 98b4d34..18a8f47 100644
--- a/src/kudu/consensus/consensus_meta.cc
+++ b/src/kudu/consensus/consensus_meta.cc
@@ -16,7 +16,8 @@
 // under the License.
 #include "kudu/consensus/consensus_meta.h"
 
-#include <mutex>
+#include <cstddef>
+#include <limits>
 #include <ostream>
 #include <utility>
 
@@ -49,12 +50,75 @@ DEFINE_bool(cmeta_force_fsync, false,
             "Whether fsync() should be called when consensus metadata files are updated");
 TAG_FLAG(cmeta_force_fsync, advanced);
 
+using std::string;
+using strings::Substitute;
+
 namespace kudu {
 namespace consensus {
 
-using std::lock_guard;
-using std::string;
-using strings::Substitute;
+namespace {
+
+constexpr size_t kPackedRoleBits = 3;
+constexpr size_t kPackedTermBits = 8 * sizeof(uint64_t) - kPackedRoleBits;
+constexpr uint64_t kUnknownRolePacked = (1ULL << kPackedRoleBits) - 1;
+constexpr uint64_t kRoleMask = kUnknownRolePacked << kPackedTermBits;
+constexpr uint64_t kTermMask = ~kRoleMask;
+
+static_assert((kRoleMask | kTermMask) == std::numeric_limits<uint64_t>::max(),
+              "term and role should fit into uint64_t");
+static_assert((kTermMask & kRoleMask) == 0,
+              "term and role masks must not intersect");
+//
+// Packing role and term into uint64_t:
+//
+//  * * * * * ... * * * *
+//  ^     ^             ^
+// 63    60             0
+//
+// Bits 0..60 inclusive contain term.  Bits 61..63 contain role.
+
+uint64_t PackRoleAndTerm(RaftPeerPB::Role role, int64_t term) {
+  // Ensure the term is not wider than kPackedTermBits: maximum possible is
+  // 2305843009213693951. Here it might be something like
+  //
+  //   CHECK_EQ(0, kRoleMask & term) << "term is too big: " << term;
+  //
+  // However, sometimes the data read from disk is corrupted, and we don't want
+  // to crash just because of that. The corruption is detected and handled
+  // gracefully at a higher level (e.g., the server marks corresponding replica
+  // as failed). With current approach, the maximum acceptable term is
+  // 2305843009213693950; 2305843009213693951 (kTermMask) is a special value.
+  if (PREDICT_FALSE((term & kRoleMask) != 0)) {
+    // A special value to signal that a 'non-packable' term was supplied.
+    term = kTermMask;
+  }
+
+  // The allocated bit space for role is just 3 bits, but it's necessary
+  // to handle the constant 999 defined in the proto file for UNKNOWN_ROLE.
+  // Changing the constant behind UNKNOWN_ROLE is not an option
+  // due to compatibility issues.
+  uint64_t r = (role == RaftPeerPB::UNKNOWN_ROLE) ? kUnknownRolePacked
+                                                  : static_cast<uint64_t>(role);
+  return (r << kPackedTermBits) | term;
+}
+
+RaftPeerPB::Role UnpackRole(uint64_t role_and_term_packed) {
+  const auto role = role_and_term_packed >> kPackedTermBits;
+  if (PREDICT_FALSE(role == kUnknownRolePacked)) {
+    return RaftPeerPB::UNKNOWN_ROLE;
+  }
+  return static_cast<RaftPeerPB::Role>(role);
+}
+
+int64_t UnpackTerm(uint64_t role_and_term_packed) {
+  const auto t = role_and_term_packed & kTermMask;
+  if (PREDICT_FALSE(t == kTermMask)) {
+    LOG(FATAL) << "packed term is invalid: " << t;
+  }
+  return static_cast<int64_t>(t);
+}
+
+} // anonymous namespace
 
 int64_t ConsensusMetadata::current_term() const {
   DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
@@ -66,6 +130,7 @@ void ConsensusMetadata::set_current_term(int64_t term) {
   DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
   DCHECK_GE(term, kMinimumTerm);
   pb_.set_current_term(term);
+  UpdateRoleAndTermCache();
 }
 
 bool ConsensusMetadata::has_voted_for() const {
@@ -253,8 +318,13 @@ Status ConsensusMetadata::Flush(FlushMode flush_mode) {
       FLAGS_log_force_fsync_all || FLAGS_cmeta_force_fsync ? pb_util::SYNC : pb_util::NO_SYNC),
           Substitute("Unable to write consensus meta file for tablet $0 to path $1",
                      tablet_id_, meta_file_path));
-  RETURN_NOT_OK(UpdateOnDiskSize());
-  return Status::OK();
+  return UpdateOnDiskSize();
+}
+
+ConsensusMetadata::RoleAndTerm ConsensusMetadata::GetRoleAndTerm() const {
+  // Read the cached role and term atomically to unpack them consistently.
+  const uint64_t val = role_and_term_cache_;
+  return std::make_pair(UnpackRole(val), UnpackTerm(val));
 }
 
 ConsensusMetadata::ConsensusMetadata(FsManager* fs_manager,
@@ -265,7 +335,9 @@ ConsensusMetadata::ConsensusMetadata(FsManager* fs_manager,
       peer_uuid_(std::move(peer_uuid)),
       has_pending_config_(false),
       flush_count_for_tests_(0),
+      active_role_(RaftPeerPB::UNKNOWN_ROLE),
       on_disk_size_(0) {
+  UpdateRoleAndTermCache();
 }
 
 Status ConsensusMetadata::Create(FsManager* fs_manager,
@@ -324,11 +396,18 @@ std::string ConsensusMetadata::LogPrefix() const {
 void ConsensusMetadata::UpdateActiveRole() {
   DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
   active_role_ = GetConsensusRole(peer_uuid_, leader_uuid_, ActiveConfig());
+  UpdateRoleAndTermCache();
   VLOG_WITH_PREFIX(1) << "Updating active role to " << RaftPeerPB::Role_Name(active_role_)
                       << ". Consensus state: "
                       << pb_util::SecureShortDebugString(ToConsensusStatePB());
 }
 
+void ConsensusMetadata::UpdateRoleAndTermCache() {
+  DFAKE_SCOPED_RECURSIVE_LOCK(fake_lock_);
+  role_and_term_cache_ = PackRoleAndTerm(
+      active_role_, pb_.has_current_term() ? current_term() : -1);
+}
+
 Status ConsensusMetadata::UpdateOnDiskSize() {
   string path = fs_manager_->GetConsensusMetadataPath(tablet_id_);
   uint64_t on_disk_size;
diff --git a/src/kudu/consensus/consensus_meta.h b/src/kudu/consensus/consensus_meta.h
index df24fd2..44dd601 100644
--- a/src/kudu/consensus/consensus_meta.h
+++ b/src/kudu/consensus/consensus_meta.h
@@ -19,6 +19,7 @@
 #include <atomic>
 #include <cstdint>
 #include <string>
+#include <utility>
 
 #include <gtest/gtest_prod.h>
 
@@ -75,6 +76,8 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> {
     NO_OVERWRITE
   };
 
+  typedef std::pair<RaftPeerPB::Role, int64_t> RoleAndTerm;
+
   // Accessors for current term.
   int64_t current_term() const;
   void set_current_term(int64_t term);
@@ -161,6 +164,9 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> {
     return on_disk_size_.load(std::memory_order_relaxed);
   }
 
+  // Return cached peer role and term, lock-free.
+  RoleAndTerm GetRoleAndTerm() const;
+
  private:
   friend class RaftConsensusQuorumTest;
   friend class RefCountedThreadSafe<ConsensusMetadata>;
@@ -212,6 +218,9 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> {
   // Updates the cached active role.
   void UpdateActiveRole();
 
+  // Update the cached active role and term.
+  void UpdateRoleAndTermCache();
+
   // Updates the cached on-disk size of the consensus metadata.
   Status UpdateOnDiskSize();
 
@@ -229,15 +238,23 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> {
   // RaftConfig used by the peers when there is a pending config change operation.
   RaftConfigPB pending_config_;
 
-  // Cached role of the peer_uuid_ within the active configuration.
-  RaftPeerPB::Role active_role_;
-
   // The number of times the metadata has been flushed to disk.
   int64_t flush_count_for_tests_;
 
   // Durable fields.
   ConsensusMetadataPB pb_;
 
+  // Role of the peer_uuid_ within the active configuration.
+  RaftPeerPB::Role active_role_;
+
+  // Cached term and role of the peer_uuid_ within the active configuration,
+  // packed into 64-bit integer. These are used to get a consistent snapshot of
+  // the replica role and term in a lock-free manner. The cached information
+  // might become stale right after it retrieved by GetRoleAndTerm() method
+  // (i.e. active_role_ and term in pending_config_ updated right after that),
+  // but it's totally fine for the intended use.
+  std::atomic<uint64_t> role_and_term_cache_;
+
   // The on-disk size of the consensus metadata, as of the last call to
   // Load() or Flush().
   // The type is int64_t for consistency with other on-disk size metrics,
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index cefa1bf..d37e6c9 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -187,6 +187,7 @@ RaftConsensus::RaftConsensus(
       server_ctx_(std::move(server_ctx)),
       state_(kNew),
       rng_(GetRandomSeed32()),
+      leader_is_ready_(false),
       leader_transfer_in_progress_(false),
       withhold_votes_until_(MonoTime::Min()),
       last_received_cur_leader_(MinimumOpId()),
@@ -197,6 +198,7 @@ RaftConsensus::RaftConsensus(
 }
 
 Status RaftConsensus::Init() {
+  LockGuard l(lock_);
   DCHECK_EQ(kNew, state_) << State_Name(state_);
   RETURN_NOT_OK(cmeta_manager_->Load(options_.tablet_id, &cmeta_));
   SetStateUnlocked(kInitialized);
@@ -391,8 +393,6 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
 }
 
 bool RaftConsensus::IsRunning() const {
-  ThreadRestrictions::AssertWaitAllowed();
-  LockGuard l(lock_);
   return state_ == kRunning;
 }
 
@@ -693,7 +693,9 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
   queue_->RegisterObserver(this);
   bool was_leader = queue_->IsInLeaderMode();
   RETURN_NOT_OK(RefreshConsensusQueueAndPeersUnlocked());
-  if (!was_leader && server_ctx_.num_leaders) server_ctx_.num_leaders->Increment();
+  if (!was_leader && server_ctx_.num_leaders) {
+    server_ctx_.num_leaders->Increment();
+  }
 
   // Initiate a NO_OP op that is sent at the beginning of every term
   // change in raft.
@@ -711,8 +713,10 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
       });
 
   last_leader_communication_time_micros_ = 0;
+  RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
+  leader_is_ready_ = true;
 
-  return AppendNewRoundToQueueUnlocked(round);
+  return Status::OK();
 }
 
 Status RaftConsensus::BecomeReplicaUnlocked(boost::optional<MonoDelta> fd_delta) {
@@ -757,10 +761,57 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
 }
 
 Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) {
-  ThreadRestrictions::AssertWaitAllowed();
-  LockGuard l(lock_);
-  RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
-  round->BindToTerm(CurrentTermUnlocked());
+#ifndef NDEBUG
+  const auto& msg = *round->replicate_msg();
+  DCHECK(!msg.has_id()) << "should not have an ID yet: "
+                        << SecureShortDebugString(msg);
+#endif
+  // Get a snapshot of an atomic member for consistency between the condition
+  // and the error message, if any.
+  const State state = state_;
+  if (PREDICT_FALSE(state != kRunning)) {
+    return Status::IllegalState("RaftConsensus is not running",
+                                Substitute("state $0", State_Name(state)));
+  }
+
+  // The order of reading role_and_term and leader_is_ready is essential to
+  // deal with situations when leader_is_ready and role_and_term are read
+  // in different Raft terms.
+  //  * It's safe to bind to a stale term even if we've asserted leadership
+  //    in a newer term: the stale op will be rejected on followers anyway
+  //    because Raft guarantees that a majority of replicas have accepted
+  //    the new term.
+  //  * It's safe for the term to be stale if we haven't asserted leadership
+  //    for a newer term because we'll exit out below.
+  //  * It's unsafe to bind to a newer term if we've asserted leadership
+  //    for a stale term, hence this ordering.
+  const auto role_and_term = cmeta_->GetRoleAndTerm();
+  const bool leader_is_ready = leader_is_ready_;
+
+  const auto role = role_and_term.first;
+  switch (role) {
+    case RaftPeerPB::LEADER:
+      if (leader_transfer_in_progress_) {
+        return Status::ServiceUnavailable("leader transfer in progress");
+      }
+      if (!leader_is_ready) {
+        // Leader replica should not accept write operations before scheduling
+        // the replication of a NO_OP to assert its leadership in the current
+        // term. Otherwise there might be a race, so the very first accepted
+        // write operation might have timestamp lower than the timestamp
+        // of the NO_OP. That would trigger an assertion in MVCC.
+        return Status::ServiceUnavailable("leader is not yet ready");
+      }
+      break;
+
+    default:
+      return Status::IllegalState(
+          Substitute("replica $0 is not leader of this config: current role $1",
+                     peer_uuid(), RaftPeerPB::Role_Name(role)));
+  }
+  const auto term = role_and_term.second;
+  round->BindToTerm(term);
+
   return Status::OK();
 }
 
@@ -836,10 +887,11 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
   // We will process commit notifications while shutting down because a replica
   // which has initiated a Prepare() / Replicate() may eventually commit even if
   // its state has changed after the initial Append() / Update().
-  if (PREDICT_FALSE(state_ != kRunning && state_ != kStopping)) {
+  const State state = state_;
+  if (PREDICT_FALSE(state != kRunning && state != kStopping)) {
     LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: "
                                       << "Replica not in running state: "
-                                      << State_Name(state_);
+                                      << State_Name(state);
     return;
   }
 
@@ -1233,8 +1285,8 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque
                                                  LeaderRequest* deduped_req) {
   DCHECK(lock_.is_locked());
 
-  if (request->has_deprecated_committed_index() ||
-      !request->has_all_replicated_index()) {
+  if (PREDICT_FALSE(request->has_deprecated_committed_index() ||
+      !request->has_all_replicated_index())) {
     return Status::InvalidArgument("Leader appears to be running an earlier version "
                                    "of Kudu. Please shut down and upgrade all servers "
                                    "before restarting.");
@@ -1720,7 +1772,8 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request,
   // If RaftConsensus is running, we use the latest OpId from the WAL to vote.
   // Otherwise, we must be voting while tombstoned.
   OpId local_last_logged_opid;
-  switch (state_) {
+  const State state = state_;
+  switch (state) {
     case kShutdown:
       return Status::IllegalState("cannot vote while shut down");
     case kRunning:
@@ -2174,11 +2227,13 @@ void RaftConsensus::Stop() {
   TRACE_EVENT2("consensus", "RaftConsensus::Shutdown",
                "peer", peer_uuid(),
                "tablet", options_.tablet_id);
-
   {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
-    if (state_ == kStopping || state_ == kStopped || state_ == kShutdown) return;
+    const State state = state_;
+    if (state == kStopping || state == kStopped || state == kShutdown) {
+      return;
+    }
     // Transition to kStopping state.
     SetStateUnlocked(kStopping);
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
@@ -2199,7 +2254,9 @@ void RaftConsensus::Stop() {
   {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
-    if (pending_) CHECK_OK(pending_->CancelPendingOps());
+    if (pending_) {
+      CHECK_OK(pending_->CancelPendingOps());
+    }
     SetStateUnlocked(kStopped);
 
     // Clear leader status on Stop(), in case this replica was the leader. If
@@ -2423,6 +2480,7 @@ int64_t RaftConsensus::CurrentTerm() const {
 }
 
 void RaftConsensus::SetStateUnlocked(State new_state) {
+  DCHECK(lock_.is_locked());
   switch (new_state) {
     case kInitialized:
       CHECK_EQ(kNew, state_);
@@ -2518,8 +2576,7 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
   queue_->SetLeaderMode(pending_->GetCommittedIndex(),
                         CurrentTermUnlocked(),
                         active_config);
-  RETURN_NOT_OK(peer_manager_->UpdateRaftConfig(active_config));
-  return Status::OK();
+  return peer_manager_->UpdateRaftConfig(active_config);
 }
 
 const string& RaftConsensus::peer_uuid() const {
@@ -2569,16 +2626,12 @@ RaftConfigPB RaftConsensus::CommittedConfig() const {
 }
 
 void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
-  RaftPeerPB::Role role;
-  {
-    ThreadRestrictions::AssertWaitAllowed();
-    LockGuard l(lock_);
-    if (state_ != kRunning) {
-      out << "Tablet " << EscapeForHtmlToString(tablet_id()) << " not running" << std::endl;
-      return;
-    }
-    role = cmeta_->active_role();
+  if (state_ != kRunning) {
+    out << "Tablet " << EscapeForHtmlToString(tablet_id())
+        << " not running" << std::endl;
+    return;
   }
+  const RaftPeerPB::Role role = cmeta_->GetRoleAndTerm().first;
 
   out << "<h1>Raft Consensus State</h1>" << std::endl;
 
@@ -2974,7 +3027,9 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
 
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term;
   RETURN_NOT_OK(SetCurrentTermUnlocked(new_term, flush));
-  if (term_metric_) term_metric_->set_value(new_term);
+  if (term_metric_) {
+    term_metric_->set_value(new_term);
+  }
   last_received_cur_leader_ = MinimumOpId();
   return Status::OK();
 }
@@ -3121,6 +3176,7 @@ bool RaftConsensus::HasLeaderUnlocked() const {
 
 void RaftConsensus::ClearLeaderUnlocked() {
   DCHECK(lock_.is_locked());
+  leader_is_ready_ = false;
   cmeta_->set_leader_uuid("");
 }
 
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 33ec160..bc22c18 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -856,7 +856,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // Coarse-grained lock that protects all mutable data members.
   mutable simple_spinlock lock_;
 
-  State state_;
+  std::atomic<State> state_;
 
   // Consensus metadata persistence object.
   scoped_refptr<ConsensusMetadata> cmeta_;
@@ -887,6 +887,12 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   std::shared_ptr<rpc::PeriodicTimer> failure_detector_;
 
+  // Whether a replica, switched into the leader mode, has successfully
+  // scheduled a NO_OP Raft message to replicate, asserting its leadership in
+  // the term where it has just become a leader.
+  std::atomic<bool> leader_is_ready_;
+
+  // A few fields used for the leadership transfer process.
   std::atomic<bool> leader_transfer_in_progress_;
   boost::optional<std::string> designated_successor_uuid_;
   std::shared_ptr<rpc::PeriodicTimer> transfer_period_timer_;
@@ -1036,8 +1042,7 @@ class ConsensusRound : public RefCountedThreadSafe<ConsensusRound> {
   void NotifyReplicationFinished(const Status& status);
 
   // Binds this round such that it may not be eventually executed in any term
-  // other than 'term'.
-  // See CheckBoundTerm().
+  // other than 'term'. See CheckBoundTerm().
   void BindToTerm(int64_t term) {
     DCHECK_EQ(bound_term_, -1);
     bound_term_ = term;
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 22151c7..1f6a74c 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -783,7 +783,8 @@ TEST_F(RaftConsensusITest, TestInsertOnNonLeader) {
   ASSERT_TRUE(resp.has_error());
   Status s = StatusFromPB(resp.error().status());
   EXPECT_TRUE(s.IsIllegalState());
-  ASSERT_STR_CONTAINS(s.ToString(), "is not leader of this config. Role: FOLLOWER");
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "is not leader of this config: current role FOLLOWER");
   // TODO(unknown): need to change the error code to be something like REPLICA_NOT_LEADER
   // so that the client can properly handle this case! plumbing this is a little difficult
   // so not addressing at the moment.
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 4a5c040..6a87a65 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -654,8 +654,8 @@ class RpcOpCompletionCallback : public OpCompletionCallback {
  public:
   RpcOpCompletionCallback(rpc::RpcContext* context,
                           Response* response)
- : context_(context),
-   response_(response) {}
+      : context_(context),
+        response_(response) {}
 
   virtual void OpCompleted() OVERRIDE {
     if (!status_.ok()) {
@@ -665,10 +665,9 @@ class RpcOpCompletionCallback : public OpCompletionCallback {
     } else {
       context_->RespondSuccess();
     }
-  };
+  }
 
  private:
-
   TabletServerErrorPB* get_error() {
     return response_->mutable_error();
   }