You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/05/31 21:28:01 UTC

[2/2] kudu git commit: consensus: Get rid of ReplicaState class

consensus: Get rid of ReplicaState class

Merges the logic in ReplicaState into the RaftConsensus class.
ReplicaState adds complexity but doesn't really serve a purpose anymore.

There are no functional changes in this patch.

Change-Id: Ie1e62eff37d3f8655100b364939375608063aa80
Reviewed-on: http://gerrit.cloudera.org:8080/7007
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9e40867c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9e40867c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9e40867c

Branch: refs/heads/master
Commit: 9e40867cc9a0d9a7cdc912140a6554eaec5a8d1e
Parents: ca2704c
Author: Mike Percy <mp...@apache.org>
Authored: Fri May 26 19:09:33 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Wed May 31 21:27:42 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/CMakeLists.txt               |   1 -
 src/kudu/consensus/consensus.h                  |   4 +-
 src/kudu/consensus/raft_consensus.cc            | 733 +++++++++++++------
 src/kudu/consensus/raft_consensus.h             | 221 +++++-
 .../consensus/raft_consensus_quorum-test.cc     |  25 +-
 src/kudu/consensus/raft_consensus_state.cc      | 360 ---------
 src/kudu/consensus/raft_consensus_state.h       | 232 ------
 7 files changed, 700 insertions(+), 876 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index 929fb16..e93ea94 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -106,7 +106,6 @@ set(CONSENSUS_SRCS
   pending_rounds.cc
   quorum_util.cc
   raft_consensus.cc
-  raft_consensus_state.cc
   time_manager.cc
 )
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.h b/src/kudu/consensus/consensus.h
index 7084aff..59910be 100644
--- a/src/kudu/consensus/consensus.h
+++ b/src/kudu/consensus/consensus.h
@@ -258,10 +258,10 @@ class Consensus : public RefCountedThreadSafe<Consensus> {
   virtual RaftPeerPB::Role role() const = 0;
 
   // Returns the uuid of this peer.
-  virtual std::string peer_uuid() const = 0;
+  virtual const std::string& peer_uuid() const = 0;
 
   // Returns the id of the tablet whose updates this consensus instance helps coordinate.
-  virtual std::string tablet_id() const = 0;
+  virtual const std::string& tablet_id() const = 0;
 
   virtual scoped_refptr<TimeManager> time_manager() const = 0;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 363478b..c5370c7 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -32,7 +32,6 @@
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/peer_manager.h"
 #include "kudu/consensus/quorum_util.h"
-#include "kudu/consensus/raft_consensus_state.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/stringprintf.h"
@@ -161,7 +160,7 @@ using tserver::TabletServerErrorPB;
 static const char* const kTimerId = "election-timer";
 
 scoped_refptr<RaftConsensus> RaftConsensus::Create(
-    const ConsensusOptions& options,
+    ConsensusOptions options,
     unique_ptr<ConsensusMetadata> cmeta,
     const RaftPeerPB& local_peer_pb,
     const scoped_refptr<MetricEntity>& metric_entity,
@@ -201,7 +200,7 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create(
                     log));
 
   return make_scoped_refptr(new RaftConsensus(
-                              options,
+                              std::move(options),
                               std::move(cmeta),
                               std::move(rpc_factory),
                               std::move(queue),
@@ -217,27 +216,31 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create(
 }
 
 RaftConsensus::RaftConsensus(
-    const ConsensusOptions& options,
+    ConsensusOptions options,
     unique_ptr<ConsensusMetadata> cmeta,
     gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
     gscoped_ptr<PeerMessageQueue> queue,
     gscoped_ptr<PeerManager> peer_manager,
     gscoped_ptr<ThreadPool> thread_pool,
     const scoped_refptr<MetricEntity>& metric_entity,
-    const std::string& peer_uuid,
+    std::string peer_uuid,
     scoped_refptr<TimeManager> time_manager,
     ReplicaTransactionFactory* txn_factory,
     const scoped_refptr<log::Log>& log,
     shared_ptr<MemTracker> parent_mem_tracker,
     Callback<void(const std::string& reason)> mark_dirty_clbk)
-    : thread_pool_(std::move(thread_pool)),
-      log_(log),
-      time_manager_(std::move(time_manager)),
-      peer_proxy_factory_(std::move(peer_proxy_factory)),
-      txn_factory_(txn_factory),
-      peer_manager_(std::move(peer_manager)),
-      queue_(std::move(queue)),
-      pending_(Substitute("T $0 P $1: ", options.tablet_id, peer_uuid), time_manager_),
+    : options_(std::move(options)),
+      peer_uuid_(std::move(peer_uuid)),
+      state_(kInitialized),
+      cmeta_(DCHECK_NOTNULL(std::move(cmeta))),
+      thread_pool_(DCHECK_NOTNULL(std::move(thread_pool))),
+      log_(DCHECK_NOTNULL(log)),
+      time_manager_(DCHECK_NOTNULL(std::move(time_manager))),
+      peer_proxy_factory_(DCHECK_NOTNULL(std::move(peer_proxy_factory))),
+      txn_factory_(DCHECK_NOTNULL(txn_factory)),
+      peer_manager_(DCHECK_NOTNULL(std::move(peer_manager))),
+      queue_(DCHECK_NOTNULL(std::move(queue))),
+      pending_(Substitute("T $0 P $1: ", options_.tablet_id, peer_uuid_), time_manager_),
       rng_(GetRandomSeed32()),
       failure_monitor_(GetRandomSeed32(), GetFailureMonitorCheckMeanMs(),
                        GetFailureMonitorCheckStddevMs()),
@@ -253,12 +256,8 @@ RaftConsensus::RaftConsensus(
       follower_memory_pressure_rejections_(metric_entity->FindOrCreateCounter(
           &METRIC_follower_memory_pressure_rejections)),
       term_metric_(metric_entity->FindOrCreateGauge(&METRIC_raft_term,
-                                                    cmeta->current_term())),
+                                                    cmeta_->current_term())),
       parent_mem_tracker_(std::move(parent_mem_tracker)) {
-  DCHECK(log_);
-  state_.reset(new ReplicaState(options,
-                                peer_uuid,
-                                std::move(cmeta)));
 }
 
 RaftConsensus::~RaftConsensus() {
@@ -275,21 +274,31 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
   // We still have not enabled failure detection for the leader election timer.
   // That happens separately via the helper functions
   // EnsureFailureDetector(Enabled/Disabled)Unlocked();
-  RETURN_NOT_OK(failure_monitor_.MonitorFailureDetector(state_->GetOptions().tablet_id,
+  RETURN_NOT_OK(failure_monitor_.MonitorFailureDetector(options_.tablet_id,
                                                         failure_detector_));
 
   {
-    ReplicaState::UniqueLock lock;
-    RETURN_NOT_OK(state_->LockForStart(&lock));
-    state_->ClearLeaderUnlocked();
+    UniqueLock lock;
+    RETURN_NOT_OK(LockForStart(&lock));
+    ClearLeaderUnlocked();
+
+    // Our last persisted term can be higher than the last persisted operation
+    // (i.e. if we called an election) but reverse should never happen.
+    if (info.last_id.term() > GetCurrentTermUnlocked()) {
+      return Status::Corruption(Substitute("Unable to start RaftConsensus: "
+          "The last op in the WAL with id $0 has a term ($1) that is greater "
+          "than the latest recorded term, which is $2",
+          OpIdToString(info.last_id),
+          info.last_id.term(),
+          GetCurrentTermUnlocked()));
+    }
 
-    RETURN_NOT_OK_PREPEND(state_->StartUnlocked(info.last_id),
-                          "Unable to start Raft ReplicaState");
+    state_ = kRunning;
 
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering "
                                    << info.orphaned_replicates.size()
                                    << " pending transactions. Active config: "
-                                   << SecureShortDebugString(state_->GetActiveConfigUnlocked());
+                                   << SecureShortDebugString(GetActiveConfigUnlocked());
     for (ReplicateMsg* replicate : info.orphaned_replicates) {
       ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*replicate));
       RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr));
@@ -301,14 +310,14 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
   }
 
   {
-    ReplicaState::UniqueLock lock;
-    RETURN_NOT_OK(state_->LockForConfigChange(&lock));
+    UniqueLock lock;
+    RETURN_NOT_OK(LockForConfigChange(&lock));
 
     RETURN_NOT_OK(EnsureFailureDetectorEnabledUnlocked());
 
     // If this is the first term expire the FD immediately so that we have a fast first
     // election, otherwise we just let the timer expire normally.
-    if (state_->GetCurrentTermUnlocked() == 0) {
+    if (GetCurrentTermUnlocked() == 0) {
       // Initialize the failure detector timeout to some time in the past so that
       // the next time the failure detector monitor runs it triggers an election
       // (unless someone else requested a vote from us first, which resets the
@@ -341,21 +350,21 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
 }
 
 bool RaftConsensus::IsRunning() const {
-  ReplicaState::UniqueLock lock;
-  Status s = state_->LockForRead(&lock);
+  UniqueLock lock;
+  Status s = LockForRead(&lock);
   if (PREDICT_FALSE(!s.ok())) return false;
-  return state_->state() == ReplicaState::kRunning;
+  return state_ == kRunning;
 }
 
 Status RaftConsensus::EmulateElection() {
-  ReplicaState::UniqueLock lock;
-  RETURN_NOT_OK(state_->LockForConfigChange(&lock));
+  UniqueLock lock;
+  RETURN_NOT_OK(LockForConfigChange(&lock));
 
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election...";
 
   // Assume leadership of new term.
-  RETURN_NOT_OK(HandleTermAdvanceUnlocked(state_->GetCurrentTermUnlocked() + 1));
-  SetLeaderUuidUnlocked(state_->GetPeerUuid());
+  RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1));
+  SetLeaderUuidUnlocked(peer_uuid_);
   return BecomeLeaderUnlocked();
 }
 
@@ -391,14 +400,14 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
   const char* mode_str = ModeString(mode);
 
   TRACE_EVENT2("consensus", "RaftConsensus::StartElection",
-               "peer", state_->LogPrefixThreadSafe(),
+               "peer", LogPrefixThreadSafe(),
                "mode", mode_str);
   scoped_refptr<LeaderElection> election;
   {
-    ReplicaState::UniqueLock lock;
-    RETURN_NOT_OK(state_->LockForConfigChange(&lock));
+    UniqueLock lock;
+    RETURN_NOT_OK(LockForConfigChange(&lock));
 
-    RaftPeerPB::Role active_role = state_->GetActiveRoleUnlocked();
+    RaftPeerPB::Role active_role = GetActiveRoleUnlocked();
     if (active_role == RaftPeerPB::LEADER) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << "Not starting " << mode << " -- already leader";
       return Status::OK();
@@ -407,11 +416,11 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
       SnoozeFailureDetectorUnlocked(); // Avoid excessive election noise while in this state.
       return Status::IllegalState("Not starting election: Node is currently "
                                   "a non-participant in the raft config",
-                                  SecureShortDebugString(state_->GetActiveConfigUnlocked()));
+                                  SecureShortDebugString(GetActiveConfigUnlocked()));
     }
     LOG_WITH_PREFIX_UNLOCKED(INFO)
         << "Starting " << mode_str
-        << " (" << ReasonString(reason, state_->GetLeaderUuidUnlocked()) << ")";
+        << " (" << ReasonString(reason, GetLeaderUuidUnlocked()) << ")";
 
     // Snooze to avoid the election timer firing again as much as possible.
     // We do not disable the election timer while running an election, so that
@@ -427,12 +436,12 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
 
       // We skip flushing the term to disk because setting the vote just below also
       // flushes to disk, and the double fsync doesn't buy us anything.
-      RETURN_NOT_OK(HandleTermAdvanceUnlocked(state_->GetCurrentTermUnlocked() + 1,
-                                              ReplicaState::SKIP_FLUSH_TO_DISK));
-      RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(state_->GetPeerUuid()));
+      RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1,
+                                              SKIP_FLUSH_TO_DISK));
+      RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid_));
     }
 
-    const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
+    const RaftConfigPB& active_config = GetActiveConfigUnlocked();
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Starting " << mode_str << " with config: "
                                    << SecureShortDebugString(active_config);
 
@@ -443,23 +452,23 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
 
     // Vote for ourselves.
     bool duplicate;
-    RETURN_NOT_OK(counter->RegisterVote(state_->GetPeerUuid(), VOTE_GRANTED, &duplicate));
-    CHECK(!duplicate) << state_->LogPrefixUnlocked()
+    RETURN_NOT_OK(counter->RegisterVote(peer_uuid_, VOTE_GRANTED, &duplicate));
+    CHECK(!duplicate) << LogPrefixUnlocked()
                       << "Inexplicable duplicate self-vote for term "
-                      << state_->GetCurrentTermUnlocked();
+                      << GetCurrentTermUnlocked();
 
     VoteRequestPB request;
     request.set_ignore_live_leader(mode == ELECT_EVEN_IF_LEADER_IS_ALIVE);
-    request.set_candidate_uuid(state_->GetPeerUuid());
+    request.set_candidate_uuid(peer_uuid_);
     if (mode == PRE_ELECTION) {
       // In a pre-election, we haven't bumped our own term yet, so we need to be
       // asking for votes for the next term.
       request.set_is_pre_election(true);
-      request.set_candidate_term(state_->GetCurrentTermUnlocked() + 1);
+      request.set_candidate_term(GetCurrentTermUnlocked() + 1);
     } else {
-      request.set_candidate_term(state_->GetCurrentTermUnlocked());
+      request.set_candidate_term(GetCurrentTermUnlocked());
     }
-    request.set_tablet_id(state_->GetOptions().tablet_id);
+    request.set_tablet_id(options_.tablet_id);
     *request.mutable_candidate_status()->mutable_last_received() =
         queue_->GetLastOpIdInLog();
 
@@ -480,7 +489,8 @@ Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) {
   while (role() != consensus::RaftPeerPB::LEADER) {
     if (MonoTime::Now() >= deadline) {
       return Status::TimedOut(Substitute("Peer $0 is not leader of tablet $1 after $2. Role: $3",
-                                         peer_uuid(), tablet_id(), timeout.ToString(), role()));
+                                         peer_uuid_, options_.tablet_id, timeout.ToString(),
+                                         role()));
     }
     SleepFor(MonoDelta::FromMilliseconds(10));
   }
@@ -489,9 +499,9 @@ Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) {
 
 Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
   TRACE_EVENT0("consensus", "RaftConsensus::StepDown");
-  ReplicaState::UniqueLock lock;
-  RETURN_NOT_OK(state_->LockForConfigChange(&lock));
-  if (state_->GetActiveRoleUnlocked() != RaftPeerPB::LEADER) {
+  UniqueLock lock;
+  RETURN_NOT_OK(LockForConfigChange(&lock));
+  if (GetActiveRoleUnlocked() != RaftPeerPB::LEADER) {
     resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
     StatusToPB(Status::IllegalState("Not currently leader"),
                resp->mutable_error()->mutable_status());
@@ -514,9 +524,9 @@ void RaftConsensus::ReportFailureDetected(const std::string& name, const Status&
 
 Status RaftConsensus::BecomeLeaderUnlocked() {
   TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked",
-               "peer", peer_uuid(),
-               "tablet", tablet_id());
-  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Leader. State: " << state_->ToStringUnlocked();
+               "peer", peer_uuid_,
+               "tablet", options_.tablet_id);
+  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Leader. State: " << ToStringUnlocked();
 
   // Disable FD while we are leader.
   RETURN_NOT_OK(EnsureFailureDetectorDisabledUnlocked());
@@ -547,9 +557,9 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
 
 Status RaftConsensus::BecomeReplicaUnlocked() {
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Follower/Learner. State: "
-                                 << state_->ToStringUnlocked();
+                                 << ToStringUnlocked();
 
-  state_->ClearLeaderUnlocked();
+  ClearLeaderUnlocked();
 
   // FD should be running while we are a follower.
   RETURN_NOT_OK(EnsureFailureDetectorEnabledUnlocked());
@@ -570,9 +580,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
 
   std::lock_guard<simple_spinlock> lock(update_lock_);
   {
-    ReplicaState::UniqueLock lock;
-    RETURN_NOT_OK(state_->LockForReplicate(&lock, *round->replicate_msg()));
-    RETURN_NOT_OK(round->CheckBoundTerm(state_->GetCurrentTermUnlocked()));
+    UniqueLock lock;
+    RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg()));
+    RETURN_NOT_OK(round->CheckBoundTerm(GetCurrentTermUnlocked()));
     RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
   }
 
@@ -581,9 +591,9 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
 }
 
 Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRound>& round) {
-  ReplicaState::UniqueLock lock;
-  RETURN_NOT_OK(state_->LockForReplicate(&lock, *round->replicate_msg()));
-  round->BindToTerm(state_->GetCurrentTermUnlocked());
+  UniqueLock lock;
+  RETURN_NOT_OK(LockForReplicate(&lock, *round->replicate_msg()));
+  round->BindToTerm(GetCurrentTermUnlocked());
   return Status::OK();
 }
 
@@ -615,7 +625,7 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
     const RaftConfigPB& new_config = change_record->new_config();
 
     if (!new_config.unsafe_config_change()) {
-      Status s = state_->CheckNoConfigChangePendingUnlocked();
+      Status s = CheckNoConfigChangePendingUnlocked();
       if (PREDICT_FALSE(!s.ok())) {
         s = s.CloneAndAppend(Substitute("\n  New config: $0", SecureShortDebugString(new_config)));
         LOG_WITH_PREFIX_UNLOCKED(INFO) << s.ToString();
@@ -625,10 +635,10 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
     // Check if the pending Raft config has an OpId less than the committed
     // config. If so, this is a replay at startup in which the COMMIT
     // messages were delayed.
-    const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked();
+    const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
     if (round->replicate_msg()->id().index() > committed_config.opid_index()) {
-      RETURN_NOT_OK(state_->SetPendingConfigUnlocked(new_config));
-      if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+      RETURN_NOT_OK(SetPendingConfigUnlocked(new_config));
+      if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
         RETURN_NOT_OK(RefreshConsensusQueueAndPeersUnlocked());
       }
     } else {
@@ -645,10 +655,10 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
 }
 
 void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
-  ReplicaState::UniqueLock lock;
-  Status s = state_->LockForCommit(&lock);
+  UniqueLock lock;
+  Status s = LockForCommit(&lock);
   if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << state_->LogPrefixThreadSafe()
+    LOG(WARNING) << LogPrefixThreadSafe()
         << "Unable to take state lock to update committed index: "
         << s.ToString();
     return;
@@ -656,16 +666,16 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
 
   pending_.AdvanceCommittedIndex(commit_index);
 
-  if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+  if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
     peer_manager_->SignalRequest(false);
   }
 }
 
 void RaftConsensus::NotifyTermChange(int64_t term) {
-  ReplicaState::UniqueLock lock;
-  Status s = state_->LockForConfigChange(&lock);
+  UniqueLock lock;
+  Status s = LockForConfigChange(&lock);
   if (PREDICT_FALSE(!s.ok())) {
-    LOG(WARNING) << state_->LogPrefixThreadSafe() << "Unable to lock ReplicaState for term change"
+    LOG(WARNING) << LogPrefixThreadSafe() << "Unable to lock consensus for term change"
                  << " when notified of new term " << term << ": " << s.ToString();
     return;
   }
@@ -680,22 +690,22 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
                                uuid, term, reason);
 
   if (!FLAGS_evict_failed_followers) {
-    LOG(INFO) << state_->LogPrefixThreadSafe() << fail_msg
+    LOG(INFO) << LogPrefixThreadSafe() << fail_msg
               << "Eviction of failed followers is disabled. Doing nothing.";
     return;
   }
 
   RaftConfigPB committed_config;
   {
-    ReplicaState::UniqueLock lock;
-    Status s = state_->LockForRead(&lock);
+    UniqueLock lock;
+    Status s = LockForRead(&lock);
     if (PREDICT_FALSE(!s.ok())) {
-      LOG(WARNING) << state_->LogPrefixThreadSafe() << fail_msg
-                   << "Unable to lock ReplicaState for read: " << s.ToString();
+      LOG(WARNING) << LogPrefixThreadSafe() << fail_msg
+                   << "Unable to lock consensus for read: " << s.ToString();
       return;
     }
 
-    int64_t current_term = state_->GetCurrentTermUnlocked();
+    int64_t current_term = GetCurrentTermUnlocked();
     if (current_term != term) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in "
                                      << "previous term " << term << ", but a leader election "
@@ -704,34 +714,34 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
       return;
     }
 
-    if (state_->IsConfigChangePendingUnlocked()) {
+    if (IsConfigChangePendingUnlocked()) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "There is already a config change operation "
                                      << "in progress. Unable to evict follower until it completes. "
                                      << "Doing nothing.";
       return;
     }
-    committed_config = state_->GetCommittedConfigUnlocked();
+    committed_config = GetCommittedConfigUnlocked();
   }
 
-  // Run config change on thread pool after dropping ReplicaState lock.
+  // Run config change on thread pool after dropping lock.
   WARN_NOT_OK(thread_pool_->SubmitClosure(Bind(&RaftConsensus::TryRemoveFollowerTask,
                                                this, uuid, committed_config, reason)),
-              state_->LogPrefixThreadSafe() + "Unable to start RemoteFollowerTask");
+              LogPrefixThreadSafe() + "Unable to start RemoteFollowerTask");
 }
 
 void RaftConsensus::TryRemoveFollowerTask(const string& uuid,
                                           const RaftConfigPB& committed_config,
                                           const std::string& reason) {
   ChangeConfigRequestPB req;
-  req.set_tablet_id(tablet_id());
+  req.set_tablet_id(options_.tablet_id);
   req.mutable_server()->set_permanent_uuid(uuid);
   req.set_type(REMOVE_SERVER);
   req.set_cas_config_opid_index(committed_config.opid_index());
-  LOG(INFO) << state_->LogPrefixThreadSafe() << "Attempting to remove follower "
+  LOG(INFO) << LogPrefixThreadSafe() << "Attempting to remove follower "
             << uuid << " from the Raft config. Reason: " << reason;
   boost::optional<TabletServerErrorPB::Code> error_code;
   WARN_NOT_OK(ChangeConfig(req, Bind(&DoNothingStatusCB), &error_code),
-              state_->LogPrefixThreadSafe() + "Unable to remove follower " + uuid);
+              LogPrefixThreadSafe() + "Unable to remove follower " + uuid);
 }
 
 Status RaftConsensus::Update(const ConsensusRequestPB* request,
@@ -743,7 +753,7 @@ Status RaftConsensus::Update(const ConsensusRequestPB* request,
                                 "is set to true.");
   }
 
-  response->set_responder_uuid(state_->GetPeerUuid());
+  response->set_responder_uuid(peer_uuid_);
 
   VLOG_WITH_PREFIX(2) << "Replica received request: " << SecureShortDebugString(*request);
 
@@ -753,7 +763,7 @@ Status RaftConsensus::Update(const ConsensusRequestPB* request,
   if (PREDICT_FALSE(VLOG_IS_ON(1))) {
     if (request->ops_size() == 0) {
       VLOG_WITH_PREFIX(1) << "Replica replied to status only request. Replica: "
-                          << state_->ToString() << ". Response: "
+                          << ToString() << ". Response: "
                           << SecureShortDebugString(*response);
     }
   }
@@ -784,10 +794,10 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg
 }
 
 Status RaftConsensus::IsSingleVoterConfig(bool* single_voter) const {
-  ReplicaState::UniqueLock lock;
-  RETURN_NOT_OK(state_->LockForRead(&lock));
-  const RaftConfigPB& config = state_->GetCommittedConfigUnlocked();
-  const string& uuid = state_->GetPeerUuid();
+  UniqueLock lock;
+  RETURN_NOT_OK(LockForRead(&lock));
+  const RaftConfigPB& config = GetCommittedConfigUnlocked();
+  const string& uuid = peer_uuid_;
   if (CountVoters(config) == 1 && IsRaftConfigVoter(uuid, config)) {
     *single_voter = true;
   } else {
@@ -876,16 +886,16 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
 Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB* request,
                                                       ConsensusResponsePB* response) {
   // Do term checks first:
-  if (PREDICT_FALSE(request->caller_term() != state_->GetCurrentTermUnlocked())) {
+  if (PREDICT_FALSE(request->caller_term() != GetCurrentTermUnlocked())) {
 
     // If less, reject.
-    if (request->caller_term() < state_->GetCurrentTermUnlocked()) {
+    if (request->caller_term() < GetCurrentTermUnlocked()) {
       string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. "
                               "Current term is $2. Ops: $3",
 
                               request->caller_uuid(),
                               request->caller_term(),
-                              state_->GetCurrentTermUnlocked(),
+                              GetCurrentTermUnlocked(),
                               OpsRangeString(*request));
       LOG_WITH_PREFIX_UNLOCKED(INFO) << msg;
       FillConsensusResponseError(response,
@@ -1019,13 +1029,13 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque
   // the effective leader of the configuration. If they are not currently marked as
   // the leader locally, mark them as leader now.
   const string& caller_uuid = request->caller_uuid();
-  if (PREDICT_FALSE(state_->HasLeaderUnlocked() &&
-                    state_->GetLeaderUuidUnlocked() != caller_uuid)) {
+  if (PREDICT_FALSE(HasLeaderUnlocked() &&
+                    GetLeaderUuidUnlocked() != caller_uuid)) {
     LOG_WITH_PREFIX_UNLOCKED(FATAL) << "Unexpected new leader in same term! "
-        << "Existing leader UUID: " << state_->GetLeaderUuidUnlocked() << ", "
+        << "Existing leader UUID: " << GetLeaderUuidUnlocked() << ", "
         << "new leader UUID: " << caller_uuid;
   }
-  if (PREDICT_FALSE(!state_->HasLeaderUnlocked())) {
+  if (PREDICT_FALSE(!HasLeaderUnlocked())) {
     SetLeaderUuidUnlocked(request->caller_uuid());
   }
 
@@ -1035,8 +1045,8 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque
 Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
                                     ConsensusResponsePB* response) {
   TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica",
-               "peer", peer_uuid(),
-               "tablet", tablet_id());
+               "peer", peer_uuid_,
+               "tablet", options_.tablet_id);
   Synchronizer log_synchronizer;
   StatusCallback sync_status_cb = log_synchronizer.AsStatusCallback();
 
@@ -1132,8 +1142,8 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
   LeaderRequest deduped_req;
 
   {
-    ReplicaState::UniqueLock lock;
-    RETURN_NOT_OK(state_->LockForUpdate(&lock));
+    UniqueLock lock;
+    RETURN_NOT_OK(LockForUpdate(&lock));
 
     deduped_req.leader_uuid = request->caller_uuid();
 
@@ -1323,8 +1333,8 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       // If just waiting for our log append to finish lets snooze the timer.
       // We don't want to fire leader election because we're waiting on our own log.
       if (s.IsTimedOut()) {
-        ReplicaState::UniqueLock lock;
-        RETURN_NOT_OK(state_->LockForRead(&lock));
+        UniqueLock lock;
+        RETURN_NOT_OK(LockForRead(&lock));
         SnoozeFailureDetectorUnlocked();
       }
     } while (s.IsTimedOut());
@@ -1333,7 +1343,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     TRACE("finished");
   }
 
-  VLOG_WITH_PREFIX(2) << "Replica updated. " << state_->ToString()
+  VLOG_WITH_PREFIX(2) << "Replica updated. " << ToString()
                       << ". Request: " << SecureShortDebugString(*request);
 
   TRACE("UpdateReplicas() finished");
@@ -1342,7 +1352,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
 
 void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) {
   TRACE("Filling consensus response to leader.");
-  response->set_responder_term(state_->GetCurrentTermUnlocked());
+  response->set_responder_term(GetCurrentTermUnlocked());
   response->mutable_status()->mutable_last_received()->CopyFrom(
       queue_->GetLastOpIdInLog());
   response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(
@@ -1361,13 +1371,13 @@ void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
 
 Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* response) {
   TRACE_EVENT2("consensus", "RaftConsensus::RequestVote",
-               "peer", peer_uuid(),
-               "tablet", tablet_id());
-  response->set_responder_uuid(state_->GetPeerUuid());
+               "peer", peer_uuid_,
+               "tablet", options_.tablet_id);
+  response->set_responder_uuid(peer_uuid_);
 
   // We must acquire the update lock in order to ensure that this vote action
   // takes place between requests.
-  // Lock ordering: The update lock must be acquired before the ReplicaState lock.
+  // Lock ordering: update_lock_ must be acquired before lock_.
   std::unique_lock<simple_spinlock> update_guard(update_lock_, std::defer_lock);
   if (FLAGS_enable_leader_failure_detection) {
     update_guard.try_lock();
@@ -1383,18 +1393,18 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
     // timeouts, just vote a quick NO.
     //
     // We still need to take the state lock in order to respond with term info, etc.
-    ReplicaState::UniqueLock state_guard;
-    RETURN_NOT_OK(state_->LockForConfigChange(&state_guard));
+    UniqueLock state_guard;
+    RETURN_NOT_OK(LockForConfigChange(&state_guard));
     return RequestVoteRespondIsBusy(request, response);
   }
 
   // Acquire the replica state lock so we can read / modify the consensus state.
-  ReplicaState::UniqueLock state_guard;
-  RETURN_NOT_OK(state_->LockForConfigChange(&state_guard));
+  UniqueLock state_guard;
+  RETURN_NOT_OK(LockForConfigChange(&state_guard));
 
   // If the node is not in the configuration, allow the vote (this is required by Raft)
   // but log an informational message anyway.
-  if (!IsRaftConfigMember(request->candidate_uuid(), state_->GetActiveConfigUnlocked())) {
+  if (!IsRaftConfigMember(request->candidate_uuid(), GetActiveConfigUnlocked())) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Handling vote request from an unknown peer "
                                    << request->candidate_uuid();
   }
@@ -1421,16 +1431,16 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
   }
 
   // Candidate is running behind.
-  if (request->candidate_term() < state_->GetCurrentTermUnlocked()) {
+  if (request->candidate_term() < GetCurrentTermUnlocked()) {
     return RequestVoteRespondInvalidTerm(request, response);
   }
 
   // We already voted this term.
-  if (request->candidate_term() == state_->GetCurrentTermUnlocked() &&
-      state_->HasVotedCurrentTermUnlocked()) {
+  if (request->candidate_term() == GetCurrentTermUnlocked() &&
+      HasVotedCurrentTermUnlocked()) {
 
     // Already voted for the same candidate in the current term.
-    if (state_->GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) {
+    if (GetVotedForCurrentTermUnlocked() == request->candidate_uuid()) {
       return RequestVoteRespondVoteAlreadyGranted(request, response);
     }
 
@@ -1449,14 +1459,14 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
   // has actually now successfully become leader of the prior term, in which case
   // bumping our term here would disrupt it.
   if (!request->is_pre_election() &&
-      request->candidate_term() > state_->GetCurrentTermUnlocked()) {
+      request->candidate_term() > GetCurrentTermUnlocked()) {
     // If we are going to vote for this peer, then we will flush the consensus metadata
     // to disk below when we record the vote, and we can skip flushing the term advancement
     // to disk here.
-    auto flush = vote_yes ? ReplicaState::SKIP_FLUSH_TO_DISK : ReplicaState::FLUSH_TO_DISK;
+    auto flush = vote_yes ? SKIP_FLUSH_TO_DISK : FLUSH_TO_DISK;
     RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term(), flush),
         Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1",
-                   state_->GetCurrentTermUnlocked(), request->candidate_term()));
+                   GetCurrentTermUnlocked(), request->candidate_term()));
   }
 
   if (!vote_yes) {
@@ -1482,10 +1492,10 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
   ChangeConfigType type = req.type();
   const RaftPeerPB& server = req.server();
   {
-    ReplicaState::UniqueLock lock;
-    RETURN_NOT_OK(state_->LockForConfigChange(&lock));
-    RETURN_NOT_OK(state_->CheckActiveLeaderUnlocked());
-    RETURN_NOT_OK(state_->CheckNoConfigChangePendingUnlocked());
+    UniqueLock lock;
+    RETURN_NOT_OK(LockForConfigChange(&lock));
+    RETURN_NOT_OK(CheckActiveLeaderUnlocked());
+    RETURN_NOT_OK(CheckNoConfigChangePendingUnlocked());
 
     // We are required by Raft to reject config change operations until we have
     // committed at least one operation in our current term as leader.
@@ -1498,7 +1508,7 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
       return Status::InvalidArgument("server must have permanent_uuid specified",
                                      SecureShortDebugString(req));
     }
-    const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked();
+    const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
 
     // Support atomic ChangeConfig requests.
     if (req.has_cas_config_opid_index()) {
@@ -1535,13 +1545,13 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
         break;
 
       case REMOVE_SERVER:
-        if (server_uuid == peer_uuid()) {
+        if (server_uuid == peer_uuid_) {
           return Status::InvalidArgument(
               Substitute("Cannot remove peer $0 from the config because it is the leader. "
                          "Force another leader to be elected to remove this server. "
                          "Consensus state: $1",
                          server_uuid,
-                         SecureShortDebugString(state_->ConsensusStateUnlocked())));
+                         SecureShortDebugString(ConsensusStateUnlocked())));
         }
         if (!RemoveFromRaftConfig(&new_config, server_uuid)) {
           return Status::NotFound(
@@ -1586,21 +1596,19 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
   int64 last_committed_index;
   OpId preceding_opid;
   uint64 msg_timestamp;
-  string local_peer_uuid;
   {
     // Take the snapshot of the replica state and queue state so that
     // we can stick them in the consensus update request later.
-    ReplicaState::UniqueLock lock;
-    RETURN_NOT_OK(state_->LockForRead(&lock));
-    local_peer_uuid = state_->GetPeerUuid();
-    current_term = state_->GetCurrentTermUnlocked();
-    committed_config = state_->GetCommittedConfigUnlocked();
-    if (state_->IsConfigChangePendingUnlocked()) {
+    UniqueLock lock;
+    RETURN_NOT_OK(LockForRead(&lock));
+    current_term = GetCurrentTermUnlocked();
+    committed_config = GetCommittedConfigUnlocked();
+    if (IsConfigChangePendingUnlocked()) {
       LOG_WITH_PREFIX_UNLOCKED(WARNING)
             << "Replica has a pending config, but the new config "
             << "will be unsafely changed anyway. "
             << "Currently pending config on the node: "
-            << SecureShortDebugString(state_->GetPendingConfigUnlocked());
+            << SecureShortDebugString(GetPendingConfigUnlocked());
     }
     all_replicated_index = queue_->GetAllReplicatedIndex();
     last_committed_index = queue_->GetCommittedIndex();
@@ -1641,13 +1649,13 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
   // Although it is valid for a local replica to not have itself
   // in the committed config, it is rare and a replica without itself
   // in the latest config is definitely not caught up with the latest leader's log.
-  if (!IsRaftConfigVoter(local_peer_uuid, new_config)) {
+  if (!IsRaftConfigVoter(peer_uuid_, new_config)) {
     return Status::InvalidArgument(Substitute("Local replica uuid $0 is not "
                                               "a VOTER in the new config, "
                                               "rejecting the unsafe config "
                                               "change request for tablet $1. "
                                               "Rejected config: $2" ,
-                                              local_peer_uuid, req.tablet_id(),
+                                              peer_uuid_, req.tablet_id(),
                                               SecureShortDebugString(new_config)));
   }
   new_config.set_unsafe_config_change(true);
@@ -1718,9 +1726,9 @@ void RaftConsensus::Shutdown() {
   if (shutdown_.Load(kMemOrderAcquire)) return;
 
   {
-    ReplicaState::UniqueLock lock;
+    UniqueLock lock;
     // Transition to kShuttingDown state.
-    CHECK_OK(state_->LockForShutdown(&lock));
+    CHECK_OK(LockForShutdown(&lock));
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
   }
 
@@ -1730,13 +1738,12 @@ void RaftConsensus::Shutdown() {
   // We must close the queue after we close the peers.
   queue_->Close();
 
-
   {
-    ReplicaState::UniqueLock lock;
-    CHECK_OK(state_->LockForShutdown(&lock));
-    CHECK_EQ(ReplicaState::kShuttingDown, state_->state());
+    UniqueLock lock;
+    CHECK_OK(LockForShutdown(&lock));
+    CHECK_EQ(kShuttingDown, state_);
     CHECK_OK(pending_.CancelPendingTransactions());
-    CHECK_OK(state_->ShutdownUnlocked());
+    state_ = kShutDown;
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!";
   }
 
@@ -1772,25 +1779,25 @@ Status RaftConsensus::StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg
 }
 
 Status RaftConsensus::AdvanceTermForTests(int64_t new_term) {
-  ReplicaState::UniqueLock lock;
-  CHECK_OK(state_->LockForConfigChange(&lock));
+  UniqueLock lock;
+  CHECK_OK(LockForConfigChange(&lock));
   return HandleTermAdvanceUnlocked(new_term);
 }
 
 std::string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const {
   return Substitute("$0Leader $1election vote request",
-                    state_->LogPrefixUnlocked(),
+                    LogPrefixUnlocked(),
                     request.is_pre_election() ? "pre-" : "");
 }
 
 void RaftConsensus::FillVoteResponseVoteGranted(VoteResponsePB* response) {
-  response->set_responder_term(state_->GetCurrentTermUnlocked());
+  response->set_responder_term(GetCurrentTermUnlocked());
   response->set_vote_granted(true);
 }
 
 void RaftConsensus::FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code,
                                                VoteResponsePB* response) {
-  response->set_responder_term(state_->GetCurrentTermUnlocked());
+  response->set_responder_term(GetCurrentTermUnlocked());
   response->set_vote_granted(false);
   response->mutable_consensus_error()->set_code(error_code);
 }
@@ -1803,7 +1810,7 @@ Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request
                           GetRequestVoteLogPrefixUnlocked(*request),
                           request->candidate_uuid(),
                           request->candidate_term(),
-                          state_->GetCurrentTermUnlocked());
+                          GetCurrentTermUnlocked());
   LOG(INFO) << msg;
   StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
   return Status::OK();
@@ -1827,8 +1834,8 @@ Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB
                           "Already voted for candidate $3 in this term.",
                           GetRequestVoteLogPrefixUnlocked(*request),
                           request->candidate_uuid(),
-                          state_->GetCurrentTermUnlocked(),
-                          state_->GetVotedForCurrentTermUnlocked());
+                          GetCurrentTermUnlocked(),
+                          GetVotedForCurrentTermUnlocked());
   LOG(INFO) << msg;
   StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
   return Status::OK();
@@ -1890,7 +1897,7 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
 
   if (!request->is_pre_election()) {
     // Persist our vote to disk.
-    RETURN_NOT_OK(state_->SetVotedForCurrentTermUnlocked(request->candidate_uuid()));
+    RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(request->candidate_uuid()));
   }
 
   FillVoteResponseVoteGranted(response);
@@ -1902,38 +1909,30 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
   LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.",
                           GetRequestVoteLogPrefixUnlocked(*request),
                           request->candidate_uuid(),
-                          state_->GetCurrentTermUnlocked());
+                          GetCurrentTermUnlocked());
   return Status::OK();
 }
 
 RaftPeerPB::Role RaftConsensus::role() const {
-  ReplicaState::UniqueLock lock;
-  CHECK_OK(state_->LockForRead(&lock));
-  return state_->GetActiveRoleUnlocked();
-}
-
-std::string RaftConsensus::LogPrefixUnlocked() {
-  return state_->LogPrefixUnlocked();
-}
-
-std::string RaftConsensus::LogPrefix() {
-  return state_->LogPrefix();
+  UniqueLock lock;
+  CHECK_OK(LockForRead(&lock));
+  return GetActiveRoleUnlocked();
 }
 
 void RaftConsensus::SetLeaderUuidUnlocked(const string& uuid) {
+  DCHECK(lock_.is_locked());
   failed_elections_since_stable_leader_ = 0;
-  state_->SetLeaderUuidUnlocked(uuid);
-  MarkDirty("New leader " + uuid);
+  cmeta_->set_leader_uuid(uuid);
+  MarkDirty(Substitute("New leader $0", uuid));
 }
 
-
 Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_config,
                                                     const RaftConfigPB& new_config,
                                                     const StatusCallback& client_cb) {
   auto cc_replicate = new ReplicateMsg();
   cc_replicate->set_op_type(CHANGE_CONFIG_OP);
   ChangeConfigRecordPB* cc_req = cc_replicate->mutable_change_config_record();
-  cc_req->set_tablet_id(tablet_id());
+  cc_req->set_tablet_id(options_.tablet_id);
   *cc_req->mutable_old_config() = old_config;
   *cc_req->mutable_new_config() = new_config;
   CHECK_OK(time_manager_->AssignTimestamp(cc_replicate));
@@ -1950,8 +1949,8 @@ Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_conf
 }
 
 Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
-  DCHECK_EQ(RaftPeerPB::LEADER, state_->GetActiveRoleUnlocked());
-  const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
+  DCHECK_EQ(RaftPeerPB::LEADER, GetActiveRoleUnlocked());
+  const RaftConfigPB& active_config = GetActiveConfigUnlocked();
 
   // Change the peers so that we're able to replicate messages remotely and
   // locally. The peer manager must be closed before updating the active config
@@ -1961,46 +1960,46 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
   // TODO(todd): should use queue committed index here? in that case do
   // we need to pass it in at all?
   queue_->SetLeaderMode(pending_.GetCommittedIndex(),
-                        state_->GetCurrentTermUnlocked(),
+                        GetCurrentTermUnlocked(),
                         active_config);
   RETURN_NOT_OK(peer_manager_->UpdateRaftConfig(active_config));
   return Status::OK();
 }
 
-string RaftConsensus::peer_uuid() const {
-  return state_->GetPeerUuid();
+const string& RaftConsensus::peer_uuid() const {
+  return peer_uuid_;
 }
 
-string RaftConsensus::tablet_id() const {
-  return state_->GetOptions().tablet_id;
+const string& RaftConsensus::tablet_id() const {
+  return options_.tablet_id;
 }
 
 ConsensusStatePB RaftConsensus::ConsensusState() const {
-  ReplicaState::UniqueLock lock;
-  CHECK_OK(state_->LockForRead(&lock));
-  return state_->ConsensusStateUnlocked();
+  UniqueLock lock;
+  CHECK_OK(LockForRead(&lock));
+  return ConsensusStateUnlocked();
 }
 
 RaftConfigPB RaftConsensus::CommittedConfig() const {
-  ReplicaState::UniqueLock lock;
-  CHECK_OK(state_->LockForRead(&lock));
-  return state_->GetCommittedConfigUnlocked();
+  UniqueLock lock;
+  CHECK_OK(LockForRead(&lock));
+  return GetCommittedConfigUnlocked();
 }
 
 void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
   out << "<h1>Raft Consensus State</h1>" << std::endl;
 
   out << "<h2>State</h2>" << std::endl;
-  out << "<pre>" << EscapeForHtmlToString(state_->ToString()) << "</pre>" << std::endl;
+  out << "<pre>" << EscapeForHtmlToString(ToString()) << "</pre>" << std::endl;
   out << "<h2>Queue</h2>" << std::endl;
   out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
 
   // Dump the queues on a leader.
   RaftPeerPB::Role role;
   {
-    ReplicaState::UniqueLock lock;
-    CHECK_OK(state_->LockForRead(&lock));
-    role = state_->GetActiveRoleUnlocked();
+    UniqueLock lock;
+    CHECK_OK(LockForRead(&lock));
+    role = GetActiveRoleUnlocked();
   }
   if (role == RaftPeerPB::LEADER) {
     out << "<h2>Queue overview</h2>" << std::endl;
@@ -2011,17 +2010,13 @@ void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
   }
 }
 
-ReplicaState* RaftConsensus::GetReplicaStateForTests() {
-  return state_.get();
-}
-
 void RaftConsensus::ElectionCallback(ElectionReason reason, const ElectionResult& result) {
   // The election callback runs on a reactor thread, so we need to defer to our
   // threadpool. If the threadpool is already shut down for some reason, it's OK --
   // we're OK with the callback never running.
   WARN_NOT_OK(thread_pool_->SubmitClosure(Bind(&RaftConsensus::DoElectionCallback,
                                                this, reason, result)),
-              state_->LogPrefixThreadSafe() + "Unable to run election callback");
+              LogPrefixThreadSafe() + "Unable to run election callback");
 }
 
 void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResult& result) {
@@ -2031,8 +2026,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
 
   // Snooze to avoid the election timer firing again as much as possible.
   {
-    ReplicaState::UniqueLock lock;
-    CHECK_OK(state_->LockForRead(&lock));
+    UniqueLock lock;
+    CHECK_OK(LockForRead(&lock));
     // We need to snooze when we win and when we lose:
     // - When we win because we're about to disable the timer and become leader.
     // - When we lose or otherwise we can fall into a cycle, where everyone keeps
@@ -2056,7 +2051,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
       // because it already voted in term 2. The check below ensures that peer B
       // will bump to term 2 when it gets the vote rejection, such that its
       // next pre-election (for term 3) would succeed.
-      if (result.highest_voter_term > state_->GetCurrentTermUnlocked()) {
+      if (result.highest_voter_term > GetCurrentTermUnlocked()) {
         HandleTermAdvanceUnlocked(result.highest_voter_term);
       }
 
@@ -2069,8 +2064,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
   }
 
   // The vote was granted, become leader.
-  ReplicaState::UniqueLock lock;
-  Status s = state_->LockForConfigChange(&lock);
+  UniqueLock lock;
+  Status s = LockForConfigChange(&lock);
   if (PREDICT_FALSE(!s.ok())) {
     LOG_WITH_PREFIX(INFO) << "Received " << election_type << " callback for term "
                           << election_term << " while not running: "
@@ -2085,7 +2080,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
     election_started_in_term--;
   }
 
-  if (election_started_in_term != state_->GetCurrentTermUnlocked()) {
+  if (election_started_in_term != GetCurrentTermUnlocked()) {
     LOG_WITH_PREFIX_UNLOCKED(INFO)
         << "Leader " << election_type << " decision vote started in "
         << "defunct term " << election_started_in_term << ": "
@@ -2093,8 +2088,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
     return;
   }
 
-  const RaftConfigPB& active_config = state_->GetActiveConfigUnlocked();
-  if (!IsRaftConfigVoter(state_->GetPeerUuid(), active_config)) {
+  const RaftConfigPB& active_config = GetActiveConfigUnlocked();
+  if (!IsRaftConfigVoter(peer_uuid_, active_config)) {
     LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Leader " << election_type
                                       << " decision while not in active config. "
                                       << "Result: Term " << election_term << ": "
@@ -2103,7 +2098,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
     return;
   }
 
-  if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+  if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
     // If this was a pre-election, it's possible to see the following interleaving:
     //
     //  1. Term N (follower): send a real election for term N
@@ -2132,7 +2127,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
                 "Couldn't start leader election after successful pre-election");
   } else {
     // We won a real election. Convert role to LEADER.
-    SetLeaderUuidUnlocked(state_->GetPeerUuid());
+    SetLeaderUuidUnlocked(peer_uuid_);
 
     // TODO(todd): BecomeLeaderUnlocked() can fail due to state checks during shutdown.
     // It races with the above state check.
@@ -2142,8 +2137,8 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
 }
 
 Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
-  ReplicaState::UniqueLock lock;
-  RETURN_NOT_OK(state_->LockForRead(&lock));
+  UniqueLock lock;
+  RETURN_NOT_OK(LockForRead(&lock));
   if (type == RECEIVED_OPID) {
     *DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog();
   } else if (type == COMMITTED_OPID) {
@@ -2166,7 +2161,7 @@ log::RetentionIndexes RaftConsensus::GetRetentionIndexes() {
 
 void RaftConsensus::MarkDirty(const std::string& reason) {
   WARN_NOT_OK(thread_pool_->SubmitClosure(Bind(mark_dirty_clbk_, reason)),
-              state_->LogPrefixThreadSafe() + "Unable to run MarkDirty callback");
+              LogPrefixThreadSafe() + "Unable to run MarkDirty callback");
 }
 
 void RaftConsensus::MarkDirtyOnSuccess(const string& reason,
@@ -2181,8 +2176,9 @@ void RaftConsensus::MarkDirtyOnSuccess(const string& reason,
 void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
                                                   const StatusCallback& client_cb,
                                                   const Status& status) {
-  // NOTE: the ReplicaState lock is held here because this is triggered by
-  // ReplicaState's abort or commit paths.
+  // NOTE: lock_ is held here because this is triggered by
+  // PendingRounds::AbortOpsAfter() and AdvanceCommittedIndex().
+  DCHECK(lock_.is_locked());
   OperationType op_type = round->replicate_msg()->op_type();
   const string& op_type_str = OperationType_Name(op_type);
   CHECK(IsConsensusOnlyOperation(op_type)) << "Unexpected op type: " << op_type_str;
@@ -2192,13 +2188,16 @@ void RaftConsensus::NonTxRoundReplicationFinished(ConsensusRound* round,
     // Fall through to the generic handling.
   }
 
+  // TODO(mpercy): May need some refactoring to unlock 'lock_' before invoking
+  // the client callback.
+
   if (!status.ok()) {
-    LOG(INFO) << state_->LogPrefixThreadSafe() << op_type_str << " replication failed: "
+    LOG(INFO) << LogPrefixThreadSafe() << op_type_str << " replication failed: "
               << status.ToString();
     client_cb.Run(status);
     return;
   }
-  VLOG(1) << state_->LogPrefixThreadSafe() << "Committing " << op_type_str << " with op id "
+  VLOG(1) << LogPrefixThreadSafe() << "Committing " << op_type_str << " with op id "
           << round->id();
   gscoped_ptr<CommitMsg> commit_msg(new CommitMsg);
   commit_msg->set_op_type(round->replicate_msg()->op_type());
@@ -2216,11 +2215,11 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con
 
   if (!status.ok()) {
     // If the config change being aborted is the current pending one, abort it.
-    if (state_->IsConfigChangePendingUnlocked() &&
-        state_->GetPendingConfigUnlocked().opid_index() == op_id.index()) {
+    if (IsConfigChangePendingUnlocked() &&
+        GetPendingConfigUnlocked().opid_index() == op_id.index()) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting config change with OpId "
                                      << op_id << ": " << status.ToString();
-      state_->ClearPendingConfigUnlocked();
+      ClearPendingConfigUnlocked();
     } else {
       LOG_WITH_PREFIX_UNLOCKED(INFO)
           << "Skipping abort of non-pending config change with OpId "
@@ -2250,14 +2249,14 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con
   // Check if the pending Raft config has an OpId less than the committed
   // config. If so, this is a replay at startup in which the COMMIT
   // messages were delayed.
-  const RaftConfigPB& committed_config = state_->GetCommittedConfigUnlocked();
+  const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
   if (new_config.opid_index() > committed_config.opid_index()) {
     LOG_WITH_PREFIX_UNLOCKED(INFO)
         << "Committing config change with OpId "
         << op_id << ": "
         << DiffRaftConfigs(old_config, new_config)
         << ". New config: { " << SecureShortDebugString(new_config) << " }";
-    CHECK_OK(state_->SetCommittedConfigUnlocked(new_config));
+    CHECK_OK(SetCommittedConfigUnlocked(new_config));
   } else {
     LOG_WITH_PREFIX_UNLOCKED(INFO)
         << "Ignoring commit of config change with OpId "
@@ -2347,23 +2346,303 @@ MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
 }
 
 Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
-                                                ReplicaState::FlushToDisk flush) {
-  if (new_term <= state_->GetCurrentTermUnlocked()) {
+                                                FlushToDisk flush) {
+  if (new_term <= GetCurrentTermUnlocked()) {
     return Status::IllegalState(Substitute("Can't advance term to: $0 current term: $1 is higher.",
-                                           new_term, state_->GetCurrentTermUnlocked()));
+                                           new_term, GetCurrentTermUnlocked()));
   }
-  if (state_->GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+  if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Stepping down as leader of term "
-                                   << state_->GetCurrentTermUnlocked();
+                                   << GetCurrentTermUnlocked();
     RETURN_NOT_OK(BecomeReplicaUnlocked());
   }
 
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term;
-  RETURN_NOT_OK(state_->SetCurrentTermUnlocked(new_term, flush));
+  RETURN_NOT_OK(SetCurrentTermUnlocked(new_term, flush));
   term_metric_->set_value(new_term);
   last_received_cur_leader_ = MinimumOpId();
   return Status::OK();
 }
 
+Status RaftConsensus::LockForStart(UniqueLock* lock) const {
+  ThreadRestrictions::AssertWaitAllowed();
+  UniqueLock l(lock_);
+  CHECK_EQ(state_, kInitialized) << "Illegal state for Start()."
+      << " Replica is not in kInitialized state";
+  lock->swap(l);
+  return Status::OK();
+}
+
+Status RaftConsensus::LockForRead(UniqueLock* lock) const {
+  ThreadRestrictions::AssertWaitAllowed();
+  UniqueLock l(lock_);
+  lock->swap(l);
+  return Status::OK();
+}
+
+Status RaftConsensus::LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const {
+  ThreadRestrictions::AssertWaitAllowed();
+  DCHECK(!msg.has_id()) << "Should not have an ID yet: " << SecureShortDebugString(msg);
+  UniqueLock l(lock_);
+  if (PREDICT_FALSE(state_ != kRunning)) {
+    return Status::IllegalState("Replica not in running state");
+  }
+
+  RETURN_NOT_OK(CheckActiveLeaderUnlocked());
+  lock->swap(l);
+  return Status::OK();
+}
+
+Status RaftConsensus::LockForCommit(UniqueLock* lock) const {
+  TRACE_EVENT0("consensus", "RaftConsensus::LockForCommit");
+  ThreadRestrictions::AssertWaitAllowed();
+  UniqueLock l(lock_);
+  if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
+    return Status::IllegalState("Replica not in running state");
+  }
+  lock->swap(l);
+  return Status::OK();
+}
+
+Status RaftConsensus::LockForUpdate(UniqueLock* lock) const {
+  TRACE_EVENT0("consensus", "RaftConsensus::LockForUpdate");
+  ThreadRestrictions::AssertWaitAllowed();
+  UniqueLock l(lock_);
+  if (PREDICT_FALSE(state_ != kRunning)) {
+    return Status::IllegalState("Replica not in running state");
+  }
+  if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) {
+    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
+  }
+  lock->swap(l);
+  return Status::OK();
+}
+
+Status RaftConsensus::LockForConfigChange(UniqueLock* lock) const {
+  TRACE_EVENT0("consensus", "RaftConsensus::LockForConfigChange");
+
+  ThreadRestrictions::AssertWaitAllowed();
+  UniqueLock l(lock_);
+  // Can only change the config on running replicas.
+  if (PREDICT_FALSE(state_ != kRunning)) {
+    return Status::IllegalState("Unable to lock ReplicaState for config change",
+                                Substitute("State = $0", state_));
+  }
+  lock->swap(l);
+  return Status::OK();
+}
+
+Status RaftConsensus::LockForShutdown(UniqueLock* lock) {
+  TRACE_EVENT0("consensus", "RaftConsensus::LockForShutdown");
+  ThreadRestrictions::AssertWaitAllowed();
+  UniqueLock l(lock_);
+  if (state_ != kShuttingDown && state_ != kShutDown) {
+    state_ = kShuttingDown;
+  }
+  lock->swap(l);
+  return Status::OK();
+}
+
+Status RaftConsensus::CheckActiveLeaderUnlocked() const {
+  RaftPeerPB::Role role = GetActiveRoleUnlocked();
+  switch (role) {
+    case RaftPeerPB::LEADER:
+      return Status::OK();
+    default:
+      ConsensusStatePB cstate = ConsensusStateUnlocked();
+      return Status::IllegalState(Substitute("Replica $0 is not leader of this config. Role: $1. "
+                                             "Consensus state: $2",
+                                             peer_uuid_,
+                                             RaftPeerPB::Role_Name(role),
+                                             SecureShortDebugString(cstate)));
+  }
+}
+
+ConsensusStatePB RaftConsensus::ConsensusStateUnlocked() const {
+  return cmeta_->ToConsensusStatePB();
+}
+
+RaftPeerPB::Role RaftConsensus::GetActiveRoleUnlocked() const {
+  DCHECK(lock_.is_locked());
+  return cmeta_->active_role();
+}
+
+bool RaftConsensus::IsConfigChangePendingUnlocked() const {
+  DCHECK(lock_.is_locked());
+  return cmeta_->has_pending_config();
+}
+
+Status RaftConsensus::CheckNoConfigChangePendingUnlocked() const {
+  DCHECK(lock_.is_locked());
+  if (IsConfigChangePendingUnlocked()) {
+    return Status::IllegalState(
+        Substitute("RaftConfig change currently pending. Only one is allowed at a time.\n"
+                   "  Committed config: $0.\n  Pending config: $1",
+                   SecureShortDebugString(GetCommittedConfigUnlocked()),
+                   SecureShortDebugString(GetPendingConfigUnlocked())));
+  }
+  return Status::OK();
+}
+
+Status RaftConsensus::SetPendingConfigUnlocked(const RaftConfigPB& new_config) {
+  DCHECK(lock_.is_locked());
+  RETURN_NOT_OK_PREPEND(VerifyRaftConfig(new_config, PENDING_CONFIG),
+                        "Invalid config to set as pending");
+  if (!new_config.unsafe_config_change()) {
+    CHECK(!cmeta_->has_pending_config())
+        << "Attempt to set pending config while another is already pending! "
+        << "Existing pending config: " << SecureShortDebugString(cmeta_->pending_config()) << "; "
+        << "Attempted new pending config: " << SecureShortDebugString(new_config);
+  } else if (cmeta_->has_pending_config()) {
+    LOG_WITH_PREFIX_UNLOCKED(INFO)
+        << "Allowing unsafe config change even though there is a pending config! "
+        << "Existing pending config: " << SecureShortDebugString(cmeta_->pending_config()) << "; "
+        << "New pending config: " << SecureShortDebugString(new_config);
+  }
+  cmeta_->set_pending_config(new_config);
+  return Status::OK();
+}
+
+void RaftConsensus::ClearPendingConfigUnlocked() {
+  cmeta_->clear_pending_config();
+}
+
+const RaftConfigPB& RaftConsensus::GetPendingConfigUnlocked() const {
+  DCHECK(lock_.is_locked());
+  CHECK(IsConfigChangePendingUnlocked()) << "No pending config";
+  return cmeta_->pending_config();
+}
+
+Status RaftConsensus::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit) {
+  TRACE_EVENT0("consensus", "RaftConsensus::SetCommittedConfigUnlocked");
+  DCHECK(lock_.is_locked());
+  DCHECK(config_to_commit.IsInitialized());
+  RETURN_NOT_OK_PREPEND(VerifyRaftConfig(config_to_commit, COMMITTED_CONFIG),
+                        "Invalid config to set as committed");
+
+  // Compare committed with pending configuration, ensure that they are the same.
+  // In the event of an unsafe config change triggered by an administrator,
+  // it is possible that the config being committed may not match the pending config
+  // because unsafe config change allows multiple pending configs to exist.
+  // Therefore we only need to validate that 'config_to_commit' matches the pending config
+  // if the pending config does not have its 'unsafe_config_change' flag set.
+  if (IsConfigChangePendingUnlocked()) {
+    const RaftConfigPB& pending_config = GetPendingConfigUnlocked();
+    if (!pending_config.unsafe_config_change()) {
+      // Quorums must be exactly equal, even w.r.t. peer ordering.
+      CHECK_EQ(GetPendingConfigUnlocked().SerializeAsString(),
+               config_to_commit.SerializeAsString())
+          << Substitute("New committed config must equal pending config, but does not. "
+                        "Pending config: $0, committed config: $1",
+                        SecureShortDebugString(pending_config),
+                        SecureShortDebugString(config_to_commit));
+    }
+  }
+  cmeta_->set_committed_config(config_to_commit);
+  cmeta_->clear_pending_config();
+  CHECK_OK(cmeta_->Flush());
+  return Status::OK();
+}
+
+const RaftConfigPB& RaftConsensus::GetCommittedConfigUnlocked() const {
+  DCHECK(lock_.is_locked());
+  return cmeta_->committed_config();
+}
+
+const RaftConfigPB& RaftConsensus::GetActiveConfigUnlocked() const {
+  DCHECK(lock_.is_locked());
+  return cmeta_->active_config();
+}
+
+Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term,
+                                            FlushToDisk flush) {
+  TRACE_EVENT1("consensus", "RaftConsensus::SetCurrentTermUnlocked",
+               "term", new_term);
+  DCHECK(lock_.is_locked());
+  if (PREDICT_FALSE(new_term <= GetCurrentTermUnlocked())) {
+    return Status::IllegalState(
+        Substitute("Cannot change term to a term that is lower than or equal to the current one. "
+                   "Current: $0, Proposed: $1", GetCurrentTermUnlocked(), new_term));
+  }
+  cmeta_->set_current_term(new_term);
+  cmeta_->clear_voted_for();
+  if (flush == FLUSH_TO_DISK) {
+    CHECK_OK(cmeta_->Flush());
+  }
+  ClearLeaderUnlocked();
+  return Status::OK();
+}
+
+const int64_t RaftConsensus::GetCurrentTermUnlocked() const {
+  DCHECK(lock_.is_locked());
+  return cmeta_->current_term();
+}
+
+const string& RaftConsensus::GetLeaderUuidUnlocked() const {
+  DCHECK(lock_.is_locked());
+  return cmeta_->leader_uuid();
+}
+
+const bool RaftConsensus::HasVotedCurrentTermUnlocked() const {
+  DCHECK(lock_.is_locked());
+  return cmeta_->has_voted_for();
+}
+
+Status RaftConsensus::SetVotedForCurrentTermUnlocked(const std::string& uuid) {
+  TRACE_EVENT1("consensus", "RaftConsensus::SetVotedForCurrentTermUnlocked",
+               "uuid", uuid);
+  DCHECK(lock_.is_locked());
+  cmeta_->set_voted_for(uuid);
+  CHECK_OK(cmeta_->Flush());
+  return Status::OK();
+}
+
+const std::string& RaftConsensus::GetVotedForCurrentTermUnlocked() const {
+  DCHECK(lock_.is_locked());
+  DCHECK(cmeta_->has_voted_for());
+  return cmeta_->voted_for();
+}
+
+const ConsensusOptions& RaftConsensus::GetOptions() const {
+  return options_;
+}
+
+string RaftConsensus::LogPrefix() const {
+  UniqueLock lock;
+  CHECK_OK(LockForRead(&lock));
+  return LogPrefixUnlocked();
+}
+
+string RaftConsensus::LogPrefixUnlocked() const {
+  DCHECK(lock_.is_locked());
+  return Substitute("T $0 P $1 [term $2 $3]: ",
+                    options_.tablet_id,
+                    peer_uuid_,
+                    GetCurrentTermUnlocked(),
+                    RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
+}
+
+string RaftConsensus::LogPrefixThreadSafe() const {
+  return Substitute("T $0 P $1: ",
+                    options_.tablet_id,
+                    peer_uuid_);
+}
+
+string RaftConsensus::ToString() const {
+  ThreadRestrictions::AssertWaitAllowed();
+  UniqueLock lock(lock_);
+  return ToStringUnlocked();
+}
+
+string RaftConsensus::ToStringUnlocked() const {
+  DCHECK(lock_.is_locked());
+  return Substitute("Replica: $0, State: $1, Role: $2",
+                    peer_uuid_, state_, RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
+}
+
+ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const {
+  return cmeta_.get();
+}
+
 }  // namespace consensus
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index b2186db..b1badde 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef KUDU_CONSENSUS_RAFT_CONSENSUS_H_
-#define KUDU_CONSENSUS_RAFT_CONSENSUS_H_
+#pragma once
 
 #include <boost/optional/optional_fwd.hpp>
 #include <memory>
@@ -30,7 +29,6 @@
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_queue.h"
 #include "kudu/consensus/pending_rounds.h"
-#include "kudu/consensus/raft_consensus_state.h"
 #include "kudu/consensus/time_manager.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/failure_detector.h"
@@ -64,8 +62,10 @@ struct ElectionResult;
 class RaftConsensus : public Consensus,
                       public PeerMessageQueueObserver {
  public:
+  typedef std::unique_lock<simple_spinlock> UniqueLock;
+
   static scoped_refptr<RaftConsensus> Create(
-    const ConsensusOptions& options,
+    ConsensusOptions options,
     std::unique_ptr<ConsensusMetadata> cmeta,
     const RaftPeerPB& local_peer_pb,
     const scoped_refptr<MetricEntity>& metric_entity,
@@ -76,14 +76,14 @@ class RaftConsensus : public Consensus,
     const std::shared_ptr<MemTracker>& parent_mem_tracker,
     const Callback<void(const std::string& reason)>& mark_dirty_clbk);
 
-  RaftConsensus(const ConsensusOptions& options,
+  RaftConsensus(ConsensusOptions options,
                 std::unique_ptr<ConsensusMetadata> cmeta,
                 gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
                 gscoped_ptr<PeerMessageQueue> queue,
                 gscoped_ptr<PeerManager> peer_manager,
                 gscoped_ptr<ThreadPool> thread_pool,
                 const scoped_refptr<MetricEntity>& metric_entity,
-                const std::string& peer_uuid,
+                std::string peer_uuid,
                 scoped_refptr<TimeManager> time_manager,
                 ReplicaTransactionFactory* txn_factory,
                 const scoped_refptr<log::Log>& log,
@@ -133,9 +133,11 @@ class RaftConsensus : public Consensus,
 
   RaftPeerPB::Role role() const override;
 
-  std::string peer_uuid() const override;
+  // Thread-safe.
+  const std::string& peer_uuid() const override;
 
-  std::string tablet_id() const override;
+  // Thread-safe.
+  const std::string& tablet_id() const override;
 
   scoped_refptr<TimeManager> time_manager() const override { return time_manager_; }
 
@@ -150,11 +152,6 @@ class RaftConsensus : public Consensus,
   // Makes this peer advance it's term (and step down if leader), for tests.
   Status AdvanceTermForTests(int64_t new_term);
 
-  // Returns the replica state for tests. This should never be used outside of
-  // tests, in particular calling the LockFor* methods on the returned object
-  // can cause consensus to deadlock.
-  ReplicaState* GetReplicaStateForTests();
-
   int update_calls_for_tests() const {
     return update_calls_for_tests_.Load();
   }
@@ -177,9 +174,29 @@ class RaftConsensus : public Consensus,
   log::RetentionIndexes GetRetentionIndexes() override;
 
  private:
-  friend class ReplicaState;
   friend class RaftConsensusQuorumTest;
+  FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind);
+  FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind);
+  FRIEND_TEST(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum);
   FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty);
+  FRIEND_TEST(RaftConsensusQuorumTest, TestRequestVote);
+
+  enum State {
+    // State after the replica is built.
+    kInitialized,
+
+    // State signaling the replica accepts requests (from clients
+    // if leader, from leader if follower)
+    kRunning,
+
+    // State signaling that the replica is shutting down and no longer accepting
+    // new transactions or commits.
+    kShuttingDown,
+
+    // State signaling the replica is shut down and does not accept
+    // any more requests.
+    kShutDown,
+  };
 
   // Control whether printing of log messages should be done for a particular
   // function call.
@@ -188,6 +205,12 @@ class RaftConsensus : public Consensus,
     ALLOW_LOGGING = 1,
   };
 
+  // Enum for the 'flush' argument to SetCurrentTermUnlocked() below.
+  enum FlushToDisk {
+    SKIP_FLUSH_TO_DISK,
+    FLUSH_TO_DISK,
+  };
+
   // Helper struct that contains the messages from the leader that we need to
   // append to our log, after they've been deduplicated.
   struct LeaderRequest {
@@ -201,10 +224,6 @@ class RaftConsensus : public Consensus,
     std::string OpsRangeString() const;
   };
 
-  std::string LogPrefixUnlocked();
-
-  std::string LogPrefix();
-
   // Set the leader UUID of the configuration and mark the tablet config dirty for
   // reporting to the master.
   void SetLeaderUuidUnlocked(const std::string& uuid);
@@ -224,12 +243,12 @@ class RaftConsensus : public Consensus,
   // Returns OK once the change config transaction that has this peer as leader
   // has been enqueued, the transaction will complete asynchronously.
   //
-  // The ReplicaState must be locked for configuration change before calling.
+  // 'lock_' must be held for configuration change before calling.
   Status BecomeLeaderUnlocked();
 
   // Makes the peer become a replica, i.e. a FOLLOWER or a LEARNER.
   //
-  // The ReplicaState must be locked for configuration change before calling.
+  // 'lock_' must be held for configuration change before calling.
   Status BecomeReplicaUnlocked();
 
   // Updates the state in a replica by storing the received operations in the log
@@ -286,7 +305,7 @@ class RaftConsensus : public Consensus,
   // Raft configuration.
   Status IsSingleVoterConfig(bool* single_voter) const;
 
-  // Return header string for RequestVote log messages. The ReplicaState lock must be held.
+  // Return header string for RequestVote log messages. 'lock_' must be held.
   std::string GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const;
 
   // Fills the response with the current status, if an update was successful.
@@ -386,7 +405,7 @@ class RaftConsensus : public Consensus,
   //
   // 'flush' may be used to control whether the term change is flushed to disk.
   Status HandleTermAdvanceUnlocked(ConsensusTerm new_term,
-                                   ReplicaState::FlushToDisk flush = ReplicaState::FLUSH_TO_DISK);
+                                   FlushToDisk flush = FLUSH_TO_DISK);
 
   // Asynchronously (on thread_pool_) notify the TabletReplica that the consensus configuration
   // has changed, thus reporting it back to the master.
@@ -426,25 +445,162 @@ class RaftConsensus : public Consensus,
   // type of message it is.
   // The 'client_cb' will be invoked at the end of this execution.
   //
-  // NOTE: this must be called with the ReplicaState lock held.
+  // NOTE: Must be called while holding 'lock_'.
   void NonTxRoundReplicationFinished(ConsensusRound* round,
                                      const StatusCallback& client_cb,
                                      const Status& status);
 
   // As a leader, append a new ConsensusRound to the queue.
-  // Only virtual and protected for mocking purposes.
   Status AppendNewRoundToQueueUnlocked(const scoped_refptr<ConsensusRound>& round);
 
   // As a follower, start a consensus round not associated with a Transaction.
-  // Only virtual and protected for mocking purposes.
   Status StartConsensusOnlyRoundUnlocked(const ReplicateRefPtr& msg);
 
-  // Add a new pending operation to the ReplicaState, including the special handling
+  // Add a new pending operation to PendingRounds, including the special handling
   // necessary if this round contains a configuration change. These rounds must
   // take effect as soon as they are received, rather than waiting for commitment
   // (see Diego Ongaro's thesis section 4.1).
   Status AddPendingOperationUnlocked(const scoped_refptr<ConsensusRound>& round);
 
+  // Locks a replica in preparation for StartUnlocked(). Makes
+  // sure the replica is in kInitialized state.
+  Status LockForStart(UniqueLock* lock) const WARN_UNUSED_RESULT;
+
+  // Obtains the lock for a state read, does not check state.
+  Status LockForRead(UniqueLock* lock) const WARN_UNUSED_RESULT;
+
+  // Locks a replica down until the critical section of an append completes,
+  // i.e. until the replicate message has been assigned an id and placed in
+  // the log queue.
+  // This also checks that the replica is in the appropriate
+  // state (role) to replicate the provided operation, that the operation
+  // contains a replicate message and is of the appropriate type, and returns
+  // Status::IllegalState if that is not the case.
+  Status LockForReplicate(UniqueLock* lock, const ReplicateMsg& msg) const WARN_UNUSED_RESULT;
+
+  // Locks a replica down until the critical section of a commit completes.
+  // This succeeds for all states since a replica which has initiated
+  // a Prepare()/Replicate() must eventually commit even if it's state
+  // has changed after the initial Append()/Update().
+  Status LockForCommit(UniqueLock* lock) const WARN_UNUSED_RESULT;
+
+  // Locks a replica down until an the critical section of an update completes.
+  // Further updates from the same or some other leader will be blocked until
+  // this completes. This also checks that the replica is in the appropriate
+  // state (role) to be updated and returns Status::IllegalState if that
+  // is not the case.
+  Status LockForUpdate(UniqueLock* lock) const WARN_UNUSED_RESULT;
+
+  Status LockForConfigChange(UniqueLock* lock) const WARN_UNUSED_RESULT;
+
+  // Changes the role to non-participant and returns a lock that can be
+  // used to make sure no state updates come in until Shutdown() is
+  // completed.
+  Status LockForShutdown(UniqueLock* lock) WARN_UNUSED_RESULT;
+
+  // Ensure the local peer is the active leader.
+  // Returns OK if leader, IllegalState otherwise.
+  Status CheckActiveLeaderUnlocked() const;
+
+  // Return current consensus state summary.
+  ConsensusStatePB ConsensusStateUnlocked() const;
+
+  // Returns the currently active Raft role.
+  RaftPeerPB::Role GetActiveRoleUnlocked() const;
+
+  // Returns true if there is a configuration change currently in-flight but not yet
+  // committed.
+  bool IsConfigChangePendingUnlocked() const;
+
+  // Inverse of IsConfigChangePendingUnlocked(): returns OK if there is
+  // currently *no* configuration change pending, and IllegalState is there *is* a
+  // configuration change pending.
+  Status CheckNoConfigChangePendingUnlocked() const;
+
+  // Sets the given configuration as pending commit. Does not persist into the peers
+  // metadata. In order to be persisted, SetCommittedConfigUnlocked() must be called.
+  Status SetPendingConfigUnlocked(const RaftConfigPB& new_config) WARN_UNUSED_RESULT;
+
+  // Clear (cancel) the pending configuration.
+  void ClearPendingConfigUnlocked();
+
+  // Return the pending configuration, or crash if one is not set.
+  const RaftConfigPB& GetPendingConfigUnlocked() const;
+
+  // Changes the committed config for this replica. Checks that there is a
+  // pending configuration and that it is equal to this one. Persists changes to disk.
+  // Resets the pending configuration to null.
+  Status SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit);
+
+  // Return the persisted configuration.
+  const RaftConfigPB& GetCommittedConfigUnlocked() const;
+
+  // Return the "active" configuration - if there is a pending configuration return it;
+  // otherwise return the committed configuration.
+  const RaftConfigPB& GetActiveConfigUnlocked() const;
+
+  // Checks if the term change is legal. If so, sets 'current_term'
+  // to 'new_term' and sets 'has voted' to no for the current term.
+  //
+  // If the caller knows that it will call another method soon after
+  // to flush the change to disk, it may set 'flush' to 'SKIP_FLUSH_TO_DISK'.
+  Status SetCurrentTermUnlocked(int64_t new_term,
+                                FlushToDisk flush) WARN_UNUSED_RESULT;
+
+  // Returns the term set in the last config change round.
+  const int64_t GetCurrentTermUnlocked() const;
+
+  // Accessors for the leader of the current term.
+  const std::string& GetLeaderUuidUnlocked() const;
+  bool HasLeaderUnlocked() const { return !GetLeaderUuidUnlocked().empty(); }
+  void ClearLeaderUnlocked() { SetLeaderUuidUnlocked(""); }
+
+  // Return whether this peer has voted in the current term.
+  const bool HasVotedCurrentTermUnlocked() const;
+
+  // Record replica's vote for the current term, then flush the consensus
+  // metadata to disk.
+  Status SetVotedForCurrentTermUnlocked(const std::string& uuid) WARN_UNUSED_RESULT;
+
+  // Return replica's vote for the current term.
+  // The vote must be set; use HasVotedCurrentTermUnlocked() to check.
+  const std::string& GetVotedForCurrentTermUnlocked() const;
+
+  const ConsensusOptions& GetOptions() const;
+
+  std::string LogPrefix() const;
+  std::string LogPrefixUnlocked() const;
+
+  // A variant of LogPrefix which does not take the lock. This is a slightly
+  // less thorough prefix which only includes immutable (and thus thread-safe)
+  // information, but does not require the lock.
+  std::string LogPrefixThreadSafe() const;
+
+  std::string ToString() const;
+  std::string ToStringUnlocked() const;
+
+  ConsensusMetadata* consensus_metadata_for_tests() const;
+
+  const ConsensusOptions options_;
+
+  // The UUID of the local peer.
+  const std::string peer_uuid_;
+
+  // TODO(dralves) hack to serialize updates due to repeated/out-of-order messages
+  // should probably be refactored out.
+  //
+  // Lock ordering note: If both 'update_lock_' and 'lock_' are to be taken,
+  // 'update_lock_' lock must be taken first.
+  mutable simple_spinlock update_lock_;
+
+  // Coarse-grained lock that protects all mutable data members.
+  mutable simple_spinlock lock_;
+
+  State state_;
+
+  // Consensus metadata persistence object.
+  std::unique_ptr<ConsensusMetadata> cmeta_;
+
   // Threadpool for constructing requests to peers, handling RPC callbacks,
   // etc.
   gscoped_ptr<ThreadPool> thread_pool_;
@@ -462,10 +618,8 @@ class RaftConsensus : public Consensus,
   // The queue of messages that must be sent to peers.
   gscoped_ptr<PeerMessageQueue> queue_;
 
-  gscoped_ptr<ReplicaState> state_;
-
   // The currently pending rounds that have not yet been committed by
-  // consensus. Protected by the locks inside state_.
+  // consensus. Protected by 'lock_'.
   // TODO(todd) these locks will become more fine-grained.
   PendingRounds pending_;
 
@@ -493,13 +647,6 @@ class RaftConsensus : public Consensus,
 
   const Callback<void(const std::string& reason)> mark_dirty_clbk_;
 
-  // TODO(dralves) hack to serialize updates due to repeated/out-of-order messages
-  // should probably be refactored out.
-  //
-  // Lock ordering note: If both this lock and the ReplicaState lock are to be
-  // taken, this lock must be taken first.
-  mutable simple_spinlock update_lock_;
-
   AtomicBool shutdown_;
 
   // The number of times Update() has been called, used for some test assertions.
@@ -515,5 +662,3 @@ class RaftConsensus : public Consensus,
 
 }  // namespace consensus
 }  // namespace kudu
-
-#endif /* KUDU_CONSENSUS_RAFT_CONSENSUS_H_ */

http://git-wip-us.apache.org/repos/asf/kudu/blob/9e40867c/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index a6db53b..bb1575e 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -31,7 +31,6 @@
 #include "kudu/consensus/peer_manager.h"
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/consensus/raft_consensus.h"
-#include "kudu/consensus/raft_consensus_state.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -661,9 +660,8 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) {
     scoped_refptr<RaftConsensus> follower0;
     CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
 
-    ReplicaState* follower0_rs = follower0->GetReplicaStateForTests();
-    ReplicaState::UniqueLock lock;
-    ASSERT_OK(follower0_rs->LockForRead(&lock));
+    RaftConsensus::UniqueLock lock;
+    ASSERT_OK(follower0->LockForRead(&lock));
 
     // If the locked replica would stop consensus we would hang here
     // as we wait for operations to be replicated to a majority.
@@ -705,15 +703,13 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind) {
     // and never letting them go.
     scoped_refptr<RaftConsensus> follower0;
     CHECK_OK(peers_->GetPeerByIdx(kFollower0Idx, &follower0));
-    ReplicaState* follower0_rs = follower0->GetReplicaStateForTests();
-    ReplicaState::UniqueLock lock0;
-    ASSERT_OK(follower0_rs->LockForRead(&lock0));
+    RaftConsensus::UniqueLock lock0;
+    ASSERT_OK(follower0->LockForRead(&lock0));
 
     scoped_refptr<RaftConsensus> follower1;
     CHECK_OK(peers_->GetPeerByIdx(kFollower1Idx, &follower1));
-    ReplicaState* follower1_rs = follower1->GetReplicaStateForTests();
-    ReplicaState::UniqueLock lock1;
-    ASSERT_OK(follower1_rs->LockForRead(&lock1));
+    RaftConsensus::UniqueLock lock1;
+    ASSERT_OK(follower1->LockForRead(&lock1));
 
     // Append a single message to the queue
     ASSERT_OK(AppendDummyMessage(kLeaderIdx, &round));
@@ -895,15 +891,13 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
 
     // This will force an election in which we expect to make the last
     // non-shutdown peer in the list become leader.
-    int flush_count_before = new_leader->GetReplicaStateForTests()
-        ->consensus_metadata_for_tests()->flush_count_for_tests();
+    int flush_count_before = new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
     LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1);
     ASSERT_OK(new_leader->StartElection(Consensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
                                         Consensus::EXTERNAL_REQUEST));
     WaitUntilLeaderForTests(new_leader.get());
     LOG(INFO) << "Election won";
-    int flush_count_after = new_leader->GetReplicaStateForTests()
-        ->consensus_metadata_for_tests()->flush_count_for_tests();
+    int flush_count_after = new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
     ASSERT_EQ(flush_count_after, flush_count_before + 1)
         << "Expected only one consensus metadata flush for a leader election";
 
@@ -1021,8 +1015,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   scoped_refptr<RaftConsensus> peer;
   CHECK_OK(peers_->GetPeerByIdx(kPeerIndex, &peer));
   auto flush_count = [&]() {
-    return peer->GetReplicaStateForTests()
-      ->consensus_metadata_for_tests()->flush_count_for_tests();
+    return peer->consensus_metadata_for_tests()->flush_count_for_tests();
   };
 
   VoteRequestPB request;