You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/06/29 20:11:59 UTC

[1/2] kudu git commit: Make ConsensusMetadata thread-safe

Repository: kudu
Updated Branches:
  refs/heads/master 63ee6f5d9 -> bf8e1c7b8


Make ConsensusMetadata thread-safe

This patch simply adds locks around all accessors and makes
ConsensusMetadata refcounted. This enables ConsensusMetadata to be
safely used outside of the RaftConsensus class (with certain caveats).

This is needed to implement tombstoned voting in a follow-up patch.

Also, add some helper methods to ConsensusMetadata to avoid making
excessive copies of the RaftConfigPB in various cases:

  bool IsVoterInConfig(const std::string& uuid, RaftConfigState type);
  bool IsMemberInConfig(const std::string& uuid, RaftConfigState type);
  int CountVotersInConfig(RaftConfigState type);
  int64_t GetConfigOpIdIndex(RaftConfigState type);

Change-Id: Id2c70605c0593e55486184705ea448dcb4bef2d7
Reviewed-on: http://gerrit.cloudera.org:8080/6958
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8d29412c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8d29412c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8d29412c

Branch: refs/heads/master
Commit: 8d29412c83be16dd30fd6d45eddd0c4f24b3824c
Parents: 63ee6f5
Author: Mike Percy <mp...@apache.org>
Authored: Mon May 22 04:45:40 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jun 29 20:10:46 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus_meta-test.cc       |  63 ++++---
 src/kudu/consensus/consensus_meta.cc            | 170 +++++++++++++++----
 src/kudu/consensus/consensus_meta.h             |  70 ++++++--
 src/kudu/consensus/quorum_util.h                |   1 +
 src/kudu/consensus/raft_consensus.cc            |  90 +++++-----
 src/kudu/consensus/raft_consensus.h             |  24 ++-
 .../consensus/raft_consensus_quorum-test.cc     |  16 +-
 src/kudu/integration-tests/ts_recovery-itest.cc |   2 +-
 src/kudu/master/sys_catalog.cc                  |   4 +-
 src/kudu/tablet/tablet_bootstrap-test.cc        |   2 +-
 src/kudu/tablet/tablet_bootstrap.cc             |   4 +-
 src/kudu/tablet/tablet_replica-test.cc          |   2 +-
 src/kudu/tablet/tablet_replica.cc               |   4 +-
 src/kudu/tools/tool_action_local_replica.cc     |  10 +-
 src/kudu/tserver/tablet_copy_client.cc          |   4 +-
 src/kudu/tserver/tablet_copy_client.h           |   2 +-
 .../tserver/tablet_copy_source_session-test.cc  |   2 +-
 src/kudu/tserver/ts_tablet_manager.cc           |   2 +-
 18 files changed, 315 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/consensus/consensus_meta-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_meta-test.cc b/src/kudu/consensus/consensus_meta-test.cc
index 2faee64..ff0ece9 100644
--- a/src/kudu/consensus/consensus_meta-test.cc
+++ b/src/kudu/consensus/consensus_meta-test.cc
@@ -64,44 +64,44 @@ class ConsensusMetadataTest : public KuduTest {
 
  protected:
   // Assert that the given cmeta has a single configuration with the given metadata values.
-  void AssertValuesEqual(const ConsensusMetadata& cmeta,
+  void AssertValuesEqual(const scoped_refptr<ConsensusMetadata>& cmeta,
                          int64_t opid_index, const string& permanant_uuid, int64_t term);
 
   FsManager fs_manager_;
   RaftConfigPB config_;
 };
 
-void ConsensusMetadataTest::AssertValuesEqual(const ConsensusMetadata& cmeta,
+void ConsensusMetadataTest::AssertValuesEqual(const scoped_refptr<ConsensusMetadata>& cmeta,
                                               int64_t opid_index,
                                               const string& permanant_uuid,
                                               int64_t term) {
   // Sanity checks.
-  ASSERT_EQ(1, cmeta.committed_config().peers_size());
+  ASSERT_EQ(1, cmeta->CommittedConfig().peers_size());
 
   // Value checks.
-  ASSERT_EQ(opid_index, cmeta.committed_config().opid_index());
-  ASSERT_EQ(permanant_uuid, cmeta.committed_config().peers().begin()->permanent_uuid());
-  ASSERT_EQ(term, cmeta.current_term());
+  ASSERT_EQ(opid_index, cmeta->CommittedConfig().opid_index());
+  ASSERT_EQ(permanant_uuid, cmeta->CommittedConfig().peers().begin()->permanent_uuid());
+  ASSERT_EQ(term, cmeta->current_term());
 }
 
 // Test the basic "happy case" of creating and then loading a file.
 TEST_F(ConsensusMetadataTest, TestCreateLoad) {
   // Create the file.
   {
-    unique_ptr<ConsensusMetadata> cmeta;
+    scoped_refptr<ConsensusMetadata> cmeta;
     ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, fs_manager_.uuid(),
                                         config_, kInitialTerm, &cmeta));
   }
 
   // Load the file.
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   ASSERT_OK(ConsensusMetadata::Load(&fs_manager_, kTabletId, fs_manager_.uuid(), &cmeta));
-  ASSERT_VALUES_EQUAL(*cmeta, kInvalidOpIdIndex, fs_manager_.uuid(), kInitialTerm);
+  ASSERT_VALUES_EQUAL(cmeta, kInvalidOpIdIndex, fs_manager_.uuid(), kInitialTerm);
 }
 
 // Ensure that Create() will not overwrite an existing file.
 TEST_F(ConsensusMetadataTest, TestCreateNoOverwrite) {
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   // Create the consensus metadata file.
   ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, fs_manager_.uuid(),
                                       config_, kInitialTerm, &cmeta));
@@ -114,7 +114,7 @@ TEST_F(ConsensusMetadataTest, TestCreateNoOverwrite) {
 
 // Ensure that we get an error when loading a file that doesn't exist.
 TEST_F(ConsensusMetadataTest, TestFailedLoad) {
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   Status s = ConsensusMetadata::Load(&fs_manager_, kTabletId, fs_manager_.uuid(), &cmeta);
   ASSERT_TRUE(s.IsNotFound()) << "Unexpected status: " << s.ToString();
   LOG(INFO) << "Expected failure: " << s.ToString();
@@ -123,7 +123,7 @@ TEST_F(ConsensusMetadataTest, TestFailedLoad) {
 // Check that changes are not written to disk until Flush() is called.
 TEST_F(ConsensusMetadataTest, TestFlush) {
   const int64_t kNewTerm = 4;
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, fs_manager_.uuid(),
                                       config_, kInitialTerm, &cmeta));
   cmeta->set_current_term(kNewTerm);
@@ -132,17 +132,17 @@ TEST_F(ConsensusMetadataTest, TestFlush) {
   // objects in flight that point to the same file, but for a test this is fine
   // since it's read-only.
   {
-    unique_ptr<ConsensusMetadata> cmeta_read;
+    scoped_refptr<ConsensusMetadata> cmeta_read;
     ASSERT_OK(ConsensusMetadata::Load(&fs_manager_, kTabletId, fs_manager_.uuid(), &cmeta_read));
-    ASSERT_VALUES_EQUAL(*cmeta_read, kInvalidOpIdIndex, fs_manager_.uuid(), kInitialTerm);
+    ASSERT_VALUES_EQUAL(cmeta_read, kInvalidOpIdIndex, fs_manager_.uuid(), kInitialTerm);
   }
 
   ASSERT_OK(cmeta->Flush());
 
   {
-    unique_ptr<ConsensusMetadata> cmeta_read;
+    scoped_refptr<ConsensusMetadata> cmeta_read;
     ASSERT_OK(ConsensusMetadata::Load(&fs_manager_, kTabletId, fs_manager_.uuid(), &cmeta_read));
-    ASSERT_VALUES_EQUAL(*cmeta_read, kInvalidOpIdIndex, fs_manager_.uuid(), kNewTerm);
+    ASSERT_VALUES_EQUAL(cmeta_read, kInvalidOpIdIndex, fs_manager_.uuid(), kNewTerm);
   }
 }
 
@@ -163,27 +163,46 @@ TEST_F(ConsensusMetadataTest, TestActiveRole) {
   vector<string> uuids = { "a", "b", "c", "d" };
   string peer_uuid = "e";
   RaftConfigPB config1 = BuildConfig(uuids); // We aren't a member of this config...
-  config1.set_opid_index(1);
+  config1.set_opid_index(0);
 
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, peer_uuid,
                                       config1, kInitialTerm, &cmeta));
 
+  ASSERT_EQ(4, cmeta->CountVotersInConfig(COMMITTED_CONFIG));
+  ASSERT_EQ(0, cmeta->GetConfigOpIdIndex(COMMITTED_CONFIG));
+
   // Not a participant.
   ASSERT_EQ(RaftPeerPB::NON_PARTICIPANT, cmeta->active_role());
+  ASSERT_FALSE(cmeta->IsMemberInConfig(peer_uuid, COMMITTED_CONFIG));
+  ASSERT_FALSE(cmeta->IsVoterInConfig(peer_uuid, COMMITTED_CONFIG));
 
   // Follower.
   uuids.push_back(peer_uuid);
   RaftConfigPB config2 = BuildConfig(uuids); // But we are a member of this one.
   config2.set_opid_index(1);
   cmeta->set_committed_config(config2);
+
+  ASSERT_EQ(5, cmeta->CountVotersInConfig(COMMITTED_CONFIG));
+  ASSERT_EQ(1, cmeta->GetConfigOpIdIndex(COMMITTED_CONFIG));
+
   ASSERT_EQ(RaftPeerPB::FOLLOWER, cmeta->active_role());
+  ASSERT_TRUE(cmeta->IsVoterInConfig(peer_uuid, COMMITTED_CONFIG));
 
   // Pending should mask committed.
   cmeta->set_pending_config(config1);
   ASSERT_EQ(RaftPeerPB::NON_PARTICIPANT, cmeta->active_role());
+
+  ASSERT_TRUE(cmeta->IsMemberInConfig(peer_uuid, COMMITTED_CONFIG));
+  ASSERT_TRUE(cmeta->IsVoterInConfig(peer_uuid, COMMITTED_CONFIG));
+  for (auto config_state : {ACTIVE_CONFIG, PENDING_CONFIG}) {
+    ASSERT_FALSE(cmeta->IsMemberInConfig(peer_uuid, config_state));
+    ASSERT_FALSE(cmeta->IsVoterInConfig(peer_uuid, config_state));
+  }
   cmeta->clear_pending_config();
   ASSERT_EQ(RaftPeerPB::FOLLOWER, cmeta->active_role());
+  ASSERT_TRUE(cmeta->IsMemberInConfig(peer_uuid, ACTIVE_CONFIG));
+  ASSERT_TRUE(cmeta->IsVoterInConfig(peer_uuid, ACTIVE_CONFIG));
 
   // Leader.
   cmeta->set_leader_uuid(peer_uuid);
@@ -210,7 +229,7 @@ TEST_F(ConsensusMetadataTest, TestToConsensusStatePB) {
 
   RaftConfigPB committed_config = BuildConfig(uuids); // We aren't a member of this config...
   committed_config.set_opid_index(1);
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, peer_uuid,
                                       committed_config, kInitialTerm, &cmeta));
 
@@ -234,14 +253,14 @@ TEST_F(ConsensusMetadataTest, TestToConsensusStatePB) {
 }
 
 // Helper for TestMergeCommittedConsensusStatePB.
-static void AssertConsensusMergeExpected(const unique_ptr<ConsensusMetadata>& cmeta,
+static void AssertConsensusMergeExpected(const scoped_refptr<ConsensusMetadata>& cmeta,
                                          const ConsensusStatePB& cstate,
                                          int64_t expected_term,
                                          const string& expected_voted_for) {
   // See header docs for ConsensusMetadata::MergeCommittedConsensusStatePB() for
   // a "spec" of these assertions.
   ASSERT_TRUE(!cmeta->has_pending_config());
-  ASSERT_EQ(SecureShortDebugString(cmeta->committed_config()),
+  ASSERT_EQ(SecureShortDebugString(cmeta->CommittedConfig()),
             SecureShortDebugString(cstate.committed_config()));
   ASSERT_EQ("", cmeta->leader_uuid());
   ASSERT_EQ(expected_term, cmeta->current_term());
@@ -258,7 +277,7 @@ TEST_F(ConsensusMetadataTest, TestMergeCommittedConsensusStatePB) {
 
   RaftConfigPB committed_config = BuildConfig(uuids); // We aren't a member of this config...
   committed_config.set_opid_index(1);
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   ASSERT_OK(ConsensusMetadata::Create(&fs_manager_, kTabletId, "e",
                                       committed_config, 1, &cmeta));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/consensus/consensus_meta.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_meta.cc b/src/kudu/consensus/consensus_meta.cc
index c2643d4..5782aab 100644
--- a/src/kudu/consensus/consensus_meta.cc
+++ b/src/kudu/consensus/consensus_meta.cc
@@ -38,8 +38,8 @@ TAG_FLAG(fault_crash_before_cmeta_flush, unsafe);
 namespace kudu {
 namespace consensus {
 
+using std::lock_guard;
 using std::string;
-using std::unique_ptr;
 using strings::Substitute;
 
 Status ConsensusMetadata::Create(FsManager* fs_manager,
@@ -47,8 +47,8 @@ Status ConsensusMetadata::Create(FsManager* fs_manager,
                                  const std::string& peer_uuid,
                                  const RaftConfigPB& config,
                                  int64_t current_term,
-                                 unique_ptr<ConsensusMetadata>* cmeta_out) {
-  unique_ptr<ConsensusMetadata> cmeta(new ConsensusMetadata(fs_manager, tablet_id, peer_uuid));
+                                 scoped_refptr<ConsensusMetadata>* cmeta_out) {
+  scoped_refptr<ConsensusMetadata> cmeta(new ConsensusMetadata(fs_manager, tablet_id, peer_uuid));
   cmeta->set_committed_config(config);
   cmeta->set_current_term(current_term);
   RETURN_NOT_OK(cmeta->Flush(NO_OVERWRITE)); // Create() should not clobber.
@@ -59,8 +59,8 @@ Status ConsensusMetadata::Create(FsManager* fs_manager,
 Status ConsensusMetadata::Load(FsManager* fs_manager,
                                const std::string& tablet_id,
                                const std::string& peer_uuid,
-                               unique_ptr<ConsensusMetadata>* cmeta_out) {
-  unique_ptr<ConsensusMetadata> cmeta(new ConsensusMetadata(fs_manager, tablet_id, peer_uuid));
+                               scoped_refptr<ConsensusMetadata>* cmeta_out) {
+  scoped_refptr<ConsensusMetadata> cmeta(new ConsensusMetadata(fs_manager, tablet_id, peer_uuid));
   RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(fs_manager->env(),
                                                  fs_manager->GetConsensusMetadataPath(tablet_id),
                                                  &cmeta->pb_));
@@ -81,110 +81,214 @@ Status ConsensusMetadata::DeleteOnDiskData(FsManager* fs_manager, const string&
   return Status::OK();
 }
 
-const int64_t ConsensusMetadata::current_term() const {
+int64_t ConsensusMetadata::current_term() const {
+  lock_guard<Mutex> l(lock_);
+  return current_term_unlocked();
+}
+
+int64_t ConsensusMetadata::current_term_unlocked() const {
+  lock_.AssertAcquired();
   DCHECK(pb_.has_current_term());
   return pb_.current_term();
 }
 
 void ConsensusMetadata::set_current_term(int64_t term) {
+  lock_guard<Mutex> l(lock_);
+  set_current_term_unlocked(term);
+}
+
+void ConsensusMetadata::set_current_term_unlocked(int64_t term) {
+  lock_.AssertAcquired();
   DCHECK_GE(term, kMinimumTerm);
   pb_.set_current_term(term);
 }
 
 bool ConsensusMetadata::has_voted_for() const {
+  lock_guard<Mutex> l(lock_);
   return pb_.has_voted_for();
 }
 
-const string& ConsensusMetadata::voted_for() const {
+string ConsensusMetadata::voted_for() const {
+  lock_guard<Mutex> l(lock_);
   DCHECK(pb_.has_voted_for());
   return pb_.voted_for();
 }
 
 void ConsensusMetadata::clear_voted_for() {
+  lock_guard<Mutex> l(lock_);
+  clear_voted_for_unlocked();
+}
+
+void ConsensusMetadata::clear_voted_for_unlocked() {
+  lock_.AssertAcquired();
   pb_.clear_voted_for();
 }
 
 void ConsensusMetadata::set_voted_for(const string& uuid) {
+  lock_guard<Mutex> l(lock_);
   DCHECK(!uuid.empty());
   pb_.set_voted_for(uuid);
 }
 
-const RaftConfigPB& ConsensusMetadata::committed_config() const {
+bool ConsensusMetadata::IsVoterInConfig(const string& uuid,
+                                        RaftConfigState type) {
+  lock_guard<Mutex> l(lock_);
+  return IsRaftConfigVoter(uuid, config_unlocked(type));
+}
+
+bool ConsensusMetadata::IsMemberInConfig(const string& uuid,
+                                         RaftConfigState type) {
+  lock_guard<Mutex> l(lock_);
+  return IsRaftConfigMember(uuid, config_unlocked(type));
+}
+
+int ConsensusMetadata::CountVotersInConfig(RaftConfigState type) {
+  lock_guard<Mutex> l(lock_);
+  return CountVoters(config_unlocked(type));
+}
+
+int64_t ConsensusMetadata::GetConfigOpIdIndex(RaftConfigState type) {
+  lock_guard<Mutex> l(lock_);
+  return config_unlocked(type).opid_index();
+}
+
+RaftConfigPB ConsensusMetadata::CommittedConfig() const {
+  lock_guard<Mutex> l(lock_);
+  return committed_config_unlocked();
+}
+
+const RaftConfigPB& ConsensusMetadata::config_unlocked(RaftConfigState type) const {
+  switch (type) {
+    case ACTIVE_CONFIG: return active_config_unlocked();
+    case COMMITTED_CONFIG: return committed_config_unlocked();
+    case PENDING_CONFIG: return pending_config_unlocked();
+  }
+}
+
+const RaftConfigPB& ConsensusMetadata::committed_config_unlocked() const {
+  lock_.AssertAcquired();
   DCHECK(pb_.has_committed_config());
   return pb_.committed_config();
 }
 
 void ConsensusMetadata::set_committed_config(const RaftConfigPB& config) {
+  lock_guard<Mutex> l(lock_);
+  set_committed_config_unlocked(config);
+}
+
+void ConsensusMetadata::set_committed_config_unlocked(const RaftConfigPB& config) {
+  lock_.AssertAcquired();
   *pb_.mutable_committed_config() = config;
   if (!has_pending_config_) {
-    UpdateActiveRole();
+    UpdateActiveRoleUnlocked();
   }
 }
 
 bool ConsensusMetadata::has_pending_config() const {
+  lock_guard<Mutex> l(lock_);
+  return has_pending_config_unlocked();
+}
+
+bool ConsensusMetadata::has_pending_config_unlocked() const {
+  lock_.AssertAcquired();
   return has_pending_config_;
 }
 
-const RaftConfigPB& ConsensusMetadata::pending_config() const {
-  DCHECK(has_pending_config_);
+RaftConfigPB ConsensusMetadata::PendingConfig() const {
+  lock_guard<Mutex> l(lock_);
+  return pending_config_unlocked();
+}
+
+const RaftConfigPB& ConsensusMetadata::pending_config_unlocked() const {
+  lock_.AssertAcquired();
+  CHECK(has_pending_config_) << LogPrefix() << "There is no pending config";
   return pending_config_;
 }
 
 void ConsensusMetadata::clear_pending_config() {
+  lock_guard<Mutex> l(lock_);
+  clear_pending_config_unlocked();
+}
+
+void ConsensusMetadata::clear_pending_config_unlocked() {
+  lock_.AssertAcquired();
   has_pending_config_ = false;
   pending_config_.Clear();
-  UpdateActiveRole();
+  UpdateActiveRoleUnlocked();
 }
 
 void ConsensusMetadata::set_pending_config(const RaftConfigPB& config) {
+  lock_guard<Mutex> l(lock_);
   has_pending_config_ = true;
   pending_config_ = config;
-  UpdateActiveRole();
+  UpdateActiveRoleUnlocked();
 }
 
-const RaftConfigPB& ConsensusMetadata::active_config() const {
+RaftConfigPB ConsensusMetadata::ActiveConfig() const {
+  lock_guard<Mutex> l(lock_);
+  return active_config_unlocked();
+}
+
+const RaftConfigPB& ConsensusMetadata::active_config_unlocked() const {
+  lock_.AssertAcquired();
   if (has_pending_config_) {
-    return pending_config();
+    return pending_config_unlocked();
   }
-  return committed_config();
+  return committed_config_unlocked();
 }
 
-const string& ConsensusMetadata::leader_uuid() const {
+string ConsensusMetadata::leader_uuid() const {
+  lock_guard<Mutex> l(lock_);
   return leader_uuid_;
 }
 
 void ConsensusMetadata::set_leader_uuid(const string& uuid) {
+  lock_guard<Mutex> l(lock_);
+  set_leader_uuid_unlocked(uuid);
+}
+
+void ConsensusMetadata::set_leader_uuid_unlocked(const string& uuid) {
+  lock_.AssertAcquired();
   leader_uuid_ = uuid;
-  UpdateActiveRole();
+  UpdateActiveRoleUnlocked();
 }
 
 RaftPeerPB::Role ConsensusMetadata::active_role() const {
+  lock_guard<Mutex> l(lock_);
   return active_role_;
 }
 
 ConsensusStatePB ConsensusMetadata::ToConsensusStatePB() const {
+  lock_guard<Mutex> l(lock_);
+  return ToConsensusStatePBUnlocked();
+}
+
+ConsensusStatePB ConsensusMetadata::ToConsensusStatePBUnlocked() const {
+  lock_.AssertAcquired();
   ConsensusStatePB cstate;
   cstate.set_current_term(pb_.current_term());
   cstate.set_leader_uuid(leader_uuid_);
-  *cstate.mutable_committed_config() = committed_config();
-  if (has_pending_config()) {
-    *cstate.mutable_pending_config() = pending_config();
+  *cstate.mutable_committed_config() = committed_config_unlocked();
+  if (has_pending_config_unlocked()) {
+    *cstate.mutable_pending_config() = pending_config_unlocked();
   }
   return cstate;
 }
 
 void ConsensusMetadata::MergeCommittedConsensusStatePB(const ConsensusStatePB& cstate) {
-  if (cstate.current_term() > current_term()) {
-    set_current_term(cstate.current_term());
-    clear_voted_for();
+  lock_guard<Mutex> l(lock_);
+  if (cstate.current_term() > current_term_unlocked()) {
+    set_current_term_unlocked(cstate.current_term());
+    clear_voted_for_unlocked();
   }
 
-  set_leader_uuid("");
-  set_committed_config(cstate.committed_config());
-  clear_pending_config();
+  set_leader_uuid_unlocked("");
+  set_committed_config_unlocked(cstate.committed_config());
+  clear_pending_config_unlocked();
 }
 
 Status ConsensusMetadata::Flush(FlushMode mode) {
+  lock_guard<Mutex> l(lock_);
   MAYBE_FAULT(FLAGS_fault_crash_before_cmeta_flush);
   SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500, LogPrefix(), "flushing consensus metadata");
 
@@ -226,14 +330,22 @@ ConsensusMetadata::ConsensusMetadata(FsManager* fs_manager,
       tablet_id_(std::move(tablet_id)),
       peer_uuid_(std::move(peer_uuid)),
       has_pending_config_(false),
-      flush_count_for_tests_(0) {}
+      flush_count_for_tests_(0) {
+}
 
 std::string ConsensusMetadata::LogPrefix() const {
+  // No need to lock to read const members.
   return Substitute("T $0 P $1: ", tablet_id_, peer_uuid_);
 }
 
 void ConsensusMetadata::UpdateActiveRole() {
-  ConsensusStatePB cstate = ToConsensusStatePB();
+  lock_guard<Mutex> l(lock_);
+  UpdateActiveRoleUnlocked();
+}
+
+void ConsensusMetadata::UpdateActiveRoleUnlocked() {
+  lock_.AssertAcquired();
+  ConsensusStatePB cstate = ToConsensusStatePBUnlocked();
   active_role_ = GetConsensusRole(peer_uuid_, cstate);
   VLOG_WITH_PREFIX(1) << "Updating active role to " << RaftPeerPB::Role_Name(active_role_)
                       << ". Consensus state: " << SecureShortDebugString(cstate);

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/consensus/consensus_meta.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_meta.h b/src/kudu/consensus/consensus_meta.h
index a903675..ac1ae8f 100644
--- a/src/kudu/consensus/consensus_meta.h
+++ b/src/kudu/consensus/consensus_meta.h
@@ -18,11 +18,13 @@
 #define KUDU_CONSENSUS_CONSENSUS_META_H_
 
 #include <cstdint>
-#include <memory>
 #include <string>
 
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/quorum_util.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/mutex.h"
 #include "kudu/util/status.h"
 
 namespace kudu {
@@ -53,8 +55,8 @@ namespace consensus {
 // the pending configuration if a pending configuration is set, otherwise the committed
 // configuration.
 //
-// This class is not thread-safe and requires external synchronization.
-class ConsensusMetadata {
+// This class is thread-safe.
+class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> {
  public:
 
   // Specify whether we are allowed to overwrite an existing file when flushing.
@@ -70,7 +72,7 @@ class ConsensusMetadata {
                        const std::string& peer_uuid,
                        const RaftConfigPB& config,
                        int64_t current_term,
-                       std::unique_ptr<ConsensusMetadata>* cmeta_out);
+                       scoped_refptr<ConsensusMetadata>* cmeta_out);
 
   // Load a ConsensusMetadata object from disk.
   // Returns Status::NotFound if the file could not be found. May return other
@@ -78,31 +80,46 @@ class ConsensusMetadata {
   static Status Load(FsManager* fs_manager,
                      const std::string& tablet_id,
                      const std::string& peer_uuid,
-                     std::unique_ptr<ConsensusMetadata>* cmeta_out);
+                     scoped_refptr<ConsensusMetadata>* cmeta_out);
 
   // Delete the ConsensusMetadata file associated with the given tablet from
   // disk.
   static Status DeleteOnDiskData(FsManager* fs_manager, const std::string& tablet_id);
 
   // Accessors for current term.
-  const int64_t current_term() const;
+  int64_t current_term() const;
   void set_current_term(int64_t term);
 
   // Accessors for voted_for.
   bool has_voted_for() const;
-  const std::string& voted_for() const;
+  std::string voted_for() const;
   void clear_voted_for();
   void set_voted_for(const std::string& uuid);
 
+  // Returns true iff peer with specified uuid is a voter in the specified
+  // local Raft config.
+  bool IsVoterInConfig(const std::string& uuid, RaftConfigState type);
+
+  // Returns true iff peer with specified uuid is a member of the specified
+  // local Raft config.
+  bool IsMemberInConfig(const std::string& uuid, RaftConfigState type);
+
+  // Returns a count of the number of voters in the specified local Raft
+  // config.
+  int CountVotersInConfig(RaftConfigState type);
+
+  // Returns the opid_index of the specified local Raft config.
+  int64_t GetConfigOpIdIndex(RaftConfigState type);
+
   // Accessors for committed configuration.
-  const RaftConfigPB& committed_config() const;
+  RaftConfigPB CommittedConfig() const;
   void set_committed_config(const RaftConfigPB& config);
 
   // Returns whether a pending configuration is set.
   bool has_pending_config() const;
 
   // Returns the pending configuration if one is set. Otherwise, fires a DCHECK.
-  const RaftConfigPB& pending_config() const;
+  RaftConfigPB PendingConfig() const;
 
   // Set & clear the pending configuration.
   void clear_pending_config();
@@ -110,10 +127,11 @@ class ConsensusMetadata {
 
   // If a pending configuration is set, return it.
   // Otherwise, return the committed configuration.
-  const RaftConfigPB& active_config() const;
+  RaftConfigPB ActiveConfig() const;
+  const RaftConfigPB& active_config_unlocked() const;
 
   // Accessors for setting the active leader.
-  const std::string& leader_uuid() const;
+  std::string leader_uuid() const;
   void set_leader_uuid(const std::string& uuid);
 
   // Returns the currently active role of the current node.
@@ -145,25 +163,45 @@ class ConsensusMetadata {
   // Persist current state of the protobuf to disk.
   Status Flush(FlushMode mode = OVERWRITE);
 
-  int flush_count_for_tests() const {
+  int64_t flush_count_for_tests() const {
     return flush_count_for_tests_;
   }
 
  private:
+  friend class RefCountedThreadSafe<ConsensusMetadata>;
+
   ConsensusMetadata(FsManager* fs_manager, std::string tablet_id,
                     std::string peer_uuid);
 
+  // Return the specified config.
+  const RaftConfigPB& config_unlocked(RaftConfigState type) const;
+
+  const RaftConfigPB& committed_config_unlocked() const;
+  void set_committed_config_unlocked(const RaftConfigPB& config);
+
+  int64_t current_term_unlocked() const;
+  void set_current_term_unlocked(int64_t term);
+  void clear_voted_for_unlocked();
+  const RaftConfigPB& pending_config_unlocked() const;
+  bool has_pending_config_unlocked() const;
+  void clear_pending_config_unlocked();
+  void set_leader_uuid_unlocked(const std::string& uuid);
+
+  ConsensusStatePB ToConsensusStatePBUnlocked() const;
+
   std::string LogPrefix() const;
 
   // Updates the cached active role.
   void UpdateActiveRole();
+  void UpdateActiveRoleUnlocked();
 
-  // Transient fields.
-  // Constants:
   FsManager* const fs_manager_;
   const std::string tablet_id_;
   const std::string peer_uuid_;
-  // Mutable:
+
+  // Protects all of the mutable fields below.
+  mutable Mutex lock_;
+
   std::string leader_uuid_; // Leader of the current term (term == pb_.current_term).
   bool has_pending_config_; // Indicates whether there is an as-yet uncommitted
                             // configuration change pending.
@@ -174,7 +212,7 @@ class ConsensusMetadata {
   RaftPeerPB::Role active_role_;
 
   // The number of times the metadata has been flushed to disk.
-  int flush_count_for_tests_;
+  int64_t flush_count_for_tests_;
 
   // Durable fields.
   ConsensusMetadataPB pb_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/consensus/quorum_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util.h b/src/kudu/consensus/quorum_util.h
index 9c4796c..2a8aa61 100644
--- a/src/kudu/consensus/quorum_util.h
+++ b/src/kudu/consensus/quorum_util.h
@@ -30,6 +30,7 @@ namespace consensus {
 enum RaftConfigState {
   PENDING_CONFIG,
   COMMITTED_CONFIG,
+  ACTIVE_CONFIG,
 };
 
 bool IsRaftConfigMember(const std::string& uuid, const RaftConfigPB& config);

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 2594c00..b77a0bd 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -161,7 +161,7 @@ static const char* const kTimerId = "election-timer";
 
 scoped_refptr<RaftConsensus> RaftConsensus::Create(
     ConsensusOptions options,
-    unique_ptr<ConsensusMetadata> cmeta,
+    scoped_refptr<ConsensusMetadata> cmeta,
     const RaftPeerPB& local_peer_pb,
     const scoped_refptr<MetricEntity>& metric_entity,
     scoped_refptr<TimeManager> time_manager,
@@ -228,7 +228,7 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create(
 
 RaftConsensus::RaftConsensus(
     ConsensusOptions options,
-    unique_ptr<ConsensusMetadata> cmeta,
+    scoped_refptr<ConsensusMetadata> cmeta,
     gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
     gscoped_ptr<PeerMessageQueue> queue,
     gscoped_ptr<PeerManager> peer_manager,
@@ -351,8 +351,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
     RETURN_NOT_OK(BecomeReplicaUnlocked());
   }
 
-  bool single_voter = false;
-  RETURN_NOT_OK(IsSingleVoterConfig(&single_voter));
+  bool single_voter = IsSingleVoterConfig();
   if (single_voter && FLAGS_enable_leader_failure_detection) {
     LOG_WITH_PREFIX(INFO) << "Only one voter in the Raft config. Triggering election immediately";
     RETURN_NOT_OK(StartElection(NORMAL_ELECTION, INITIAL_SINGLE_NODE_ELECTION));
@@ -461,7 +460,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
       RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid_));
     }
 
-    const RaftConfigPB& active_config = GetActiveConfigUnlocked();
+    RaftConfigPB active_config = GetActiveConfigUnlocked();
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Starting " << mode_str << " with config: "
                                    << SecureShortDebugString(active_config);
 
@@ -672,8 +671,8 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
     // Check if the pending Raft config has an OpId less than the committed
     // config. If so, this is a replay at startup in which the COMMIT
     // messages were delayed.
-    const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
-    if (round->replicate_msg()->id().index() > committed_config.opid_index()) {
+    int64_t committed_config_opid_index = cmeta_->GetConfigOpIdIndex(COMMITTED_CONFIG);
+    if (round->replicate_msg()->id().index() > committed_config_opid_index) {
       RETURN_NOT_OK(SetPendingConfigUnlocked(new_config));
       if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
         RETURN_NOT_OK(RefreshConsensusQueueAndPeersUnlocked());
@@ -682,7 +681,7 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
       LOG_WITH_PREFIX_UNLOCKED(INFO)
           << "Ignoring setting pending config change with OpId "
           << round->replicate_msg()->id() << " because the committed config has OpId index "
-          << committed_config.opid_index() << ". The config change we are ignoring is: "
+          << committed_config_opid_index << ". The config change we are ignoring is: "
           << "Old config: { " << SecureShortDebugString(change_record->old_config()) << " }. "
           << "New config: { " << SecureShortDebugString(new_config) << " }";
     }
@@ -838,17 +837,15 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg
   return AddPendingOperationUnlocked(round_ptr);
 }
 
-Status RaftConsensus::IsSingleVoterConfig(bool* single_voter) const {
+bool RaftConsensus::IsSingleVoterConfig() const {
   ThreadRestrictions::AssertWaitAllowed();
   LockGuard l(lock_);
-  const RaftConfigPB& config = GetCommittedConfigUnlocked();
   const string& uuid = peer_uuid_;
-  if (CountVoters(config) == 1 && IsRaftConfigVoter(uuid, config)) {
-    *single_voter = true;
-  } else {
-    *single_voter = false;
+  if (cmeta_->CountVotersInConfig(COMMITTED_CONFIG) == 1 &&
+      cmeta_->IsVoterInConfig(uuid, COMMITTED_CONFIG)) {
+    return true;
   }
-  return Status::OK();
+  return false;
 }
 
 std::string RaftConsensus::LeaderRequest::OpsRangeString() const {
@@ -1196,7 +1193,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
     RETURN_NOT_OK(CheckRunningUnlocked());
-    if (!IsRaftConfigVoter(peer_uuid_, cmeta_->active_config())) {
+    if (!cmeta_->IsMemberInConfig(peer_uuid_, ACTIVE_CONFIG)) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
     }
 
@@ -1462,7 +1459,7 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
 
   // If the node is not in the configuration, allow the vote (this is required by Raft)
   // but log an informational message anyway.
-  if (!IsRaftConfigMember(request->candidate_uuid(), GetActiveConfigUnlocked())) {
+  if (!cmeta_->IsMemberInConfig(request->candidate_uuid(), ACTIVE_CONFIG)) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Handling vote request from an unknown peer "
                                    << request->candidate_uuid();
   }
@@ -1571,7 +1568,7 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
       return Status::InvalidArgument("server must have permanent_uuid specified",
                                      SecureShortDebugString(req));
     }
-    const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
+    RaftConfigPB committed_config = GetCommittedConfigUnlocked();
 
     // Support atomic ChangeConfig requests.
     if (req.has_cas_config_opid_index()) {
@@ -1614,7 +1611,7 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
                          "Force another leader to be elected to remove this server. "
                          "Consensus state: $1",
                          server_uuid,
-                         SecureShortDebugString(ConsensusStateUnlocked())));
+                         SecureShortDebugString(cmeta_->ToConsensusStatePB())));
         }
         if (!RemoveFromRaftConfig(&new_config, server_uuid)) {
           return Status::NotFound(
@@ -2040,7 +2037,7 @@ Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_conf
 Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
   DCHECK(lock_.is_locked());
   DCHECK_EQ(RaftPeerPB::LEADER, GetActiveRoleUnlocked());
-  const RaftConfigPB& active_config = GetActiveConfigUnlocked();
+  RaftConfigPB active_config = GetActiveConfigUnlocked();
 
   // Change the peers so that we're able to replicate messages remotely and
   // locally. The peer manager must be closed before updating the active config
@@ -2067,7 +2064,7 @@ const string& RaftConsensus::tablet_id() const {
 ConsensusStatePB RaftConsensus::ConsensusState() const {
   ThreadRestrictions::AssertWaitAllowed();
   LockGuard l(lock_);
-  return ConsensusStateUnlocked();
+  return cmeta_->ToConsensusStatePB();
 }
 
 RaftConfigPB RaftConsensus::CommittedConfig() const {
@@ -2179,13 +2176,13 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
     return;
   }
 
-  const RaftConfigPB& active_config = GetActiveConfigUnlocked();
-  if (!IsRaftConfigVoter(peer_uuid_, active_config)) {
+  if (!cmeta_->IsVoterInConfig(peer_uuid_, ACTIVE_CONFIG)) {
     LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Leader " << election_type
                                       << " decision while not in active config. "
                                       << "Result: Term " << election_term << ": "
                                       << (result.decision == VOTE_GRANTED ? "won" : "lost")
-                                      << ". RaftConfig: " << SecureShortDebugString(active_config);
+                                      << ". RaftConfig: "
+                                      << SecureShortDebugString(GetActiveConfigUnlocked());
     return;
   }
 
@@ -2308,7 +2305,7 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con
   if (!status.ok()) {
     // If the config change being aborted is the current pending one, abort it.
     if (IsConfigChangePendingUnlocked() &&
-        GetPendingConfigUnlocked().opid_index() == op_id.index()) {
+        cmeta_->GetConfigOpIdIndex(PENDING_CONFIG) == op_id.index()) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting config change with OpId "
                                      << op_id << ": " << status.ToString();
       ClearPendingConfigUnlocked();
@@ -2334,15 +2331,15 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con
 
   DCHECK(round->replicate_msg()->change_config_record().has_old_config());
   DCHECK(round->replicate_msg()->change_config_record().has_new_config());
-  RaftConfigPB old_config = round->replicate_msg()->change_config_record().old_config();
-  RaftConfigPB new_config = round->replicate_msg()->change_config_record().new_config();
+  const RaftConfigPB& old_config = round->replicate_msg()->change_config_record().old_config();
+  const RaftConfigPB& new_config = round->replicate_msg()->change_config_record().new_config();
   DCHECK(old_config.has_opid_index());
   DCHECK(new_config.has_opid_index());
   // Check if the pending Raft config has an OpId less than the committed
   // config. If so, this is a replay at startup in which the COMMIT
   // messages were delayed.
-  const RaftConfigPB& committed_config = GetCommittedConfigUnlocked();
-  if (new_config.opid_index() > committed_config.opid_index()) {
+  int64_t committed_config_opid_index = cmeta_->GetConfigOpIdIndex(COMMITTED_CONFIG);
+  if (new_config.opid_index() > committed_config_opid_index) {
     LOG_WITH_PREFIX_UNLOCKED(INFO)
         << "Committing config change with OpId "
         << op_id << ": "
@@ -2353,7 +2350,7 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con
     LOG_WITH_PREFIX_UNLOCKED(INFO)
         << "Ignoring commit of config change with OpId "
         << op_id << " because the committed config has OpId index "
-        << committed_config.opid_index() << ". The config change we are ignoring is: "
+        << committed_config_opid_index << ". The config change we are ignoring is: "
         << "Old config: { " << SecureShortDebugString(old_config) << " }. "
         << "New config: { " << SecureShortDebugString(new_config) << " }";
   }
@@ -2486,20 +2483,14 @@ Status RaftConsensus::CheckActiveLeaderUnlocked() const {
     case RaftPeerPB::LEADER:
       return Status::OK();
     default:
-      ConsensusStatePB cstate = ConsensusStateUnlocked();
       return Status::IllegalState(Substitute("Replica $0 is not leader of this config. Role: $1. "
                                              "Consensus state: $2",
                                              peer_uuid_,
                                              RaftPeerPB::Role_Name(role),
-                                             SecureShortDebugString(cstate)));
+                                             SecureShortDebugString(cmeta_->ToConsensusStatePB())));
   }
 }
 
-ConsensusStatePB RaftConsensus::ConsensusStateUnlocked() const {
-  DCHECK(lock_.is_locked());
-  return cmeta_->ToConsensusStatePB();
-}
-
 RaftPeerPB::Role RaftConsensus::GetActiveRoleUnlocked() const {
   DCHECK(lock_.is_locked());
   return cmeta_->active_role();
@@ -2529,12 +2520,12 @@ Status RaftConsensus::SetPendingConfigUnlocked(const RaftConfigPB& new_config) {
   if (!new_config.unsafe_config_change()) {
     CHECK(!cmeta_->has_pending_config())
         << "Attempt to set pending config while another is already pending! "
-        << "Existing pending config: " << SecureShortDebugString(cmeta_->pending_config()) << "; "
+        << "Existing pending config: " << SecureShortDebugString(cmeta_->PendingConfig()) << "; "
         << "Attempted new pending config: " << SecureShortDebugString(new_config);
   } else if (cmeta_->has_pending_config()) {
     LOG_WITH_PREFIX_UNLOCKED(INFO)
         << "Allowing unsafe config change even though there is a pending config! "
-        << "Existing pending config: " << SecureShortDebugString(cmeta_->pending_config()) << "; "
+        << "Existing pending config: " << SecureShortDebugString(cmeta_->PendingConfig()) << "; "
         << "New pending config: " << SecureShortDebugString(new_config);
   }
   cmeta_->set_pending_config(new_config);
@@ -2546,10 +2537,9 @@ void RaftConsensus::ClearPendingConfigUnlocked() {
   cmeta_->clear_pending_config();
 }
 
-const RaftConfigPB& RaftConsensus::GetPendingConfigUnlocked() const {
+RaftConfigPB RaftConsensus::GetPendingConfigUnlocked() const {
   DCHECK(lock_.is_locked());
-  CHECK(IsConfigChangePendingUnlocked()) << "No pending config";
-  return cmeta_->pending_config();
+  return cmeta_->PendingConfig();
 }
 
 Status RaftConsensus::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit) {
@@ -2566,10 +2556,10 @@ Status RaftConsensus::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_c
   // Therefore we only need to validate that 'config_to_commit' matches the pending config
   // if the pending config does not have its 'unsafe_config_change' flag set.
   if (IsConfigChangePendingUnlocked()) {
-    const RaftConfigPB& pending_config = GetPendingConfigUnlocked();
+    RaftConfigPB pending_config = GetPendingConfigUnlocked();
     if (!pending_config.unsafe_config_change()) {
       // Quorums must be exactly equal, even w.r.t. peer ordering.
-      CHECK_EQ(GetPendingConfigUnlocked().SerializeAsString(),
+      CHECK_EQ(pending_config.SerializeAsString(),
                config_to_commit.SerializeAsString())
           << Substitute("New committed config must equal pending config, but does not. "
                         "Pending config: $0, committed config: $1",
@@ -2583,14 +2573,14 @@ Status RaftConsensus::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_c
   return Status::OK();
 }
 
-const RaftConfigPB& RaftConsensus::GetCommittedConfigUnlocked() const {
+RaftConfigPB RaftConsensus::GetCommittedConfigUnlocked() const {
   DCHECK(lock_.is_locked());
-  return cmeta_->committed_config();
+  return cmeta_->CommittedConfig();
 }
 
-const RaftConfigPB& RaftConsensus::GetActiveConfigUnlocked() const {
+RaftConfigPB RaftConsensus::GetActiveConfigUnlocked() const {
   DCHECK(lock_.is_locked());
-  return cmeta_->active_config();
+  return cmeta_->ActiveConfig();
 }
 
 Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term,
@@ -2617,7 +2607,7 @@ const int64_t RaftConsensus::GetCurrentTermUnlocked() const {
   return cmeta_->current_term();
 }
 
-const string& RaftConsensus::GetLeaderUuidUnlocked() const {
+string RaftConsensus::GetLeaderUuidUnlocked() const {
   DCHECK(lock_.is_locked());
   return cmeta_->leader_uuid();
 }
@@ -2646,7 +2636,7 @@ Status RaftConsensus::SetVotedForCurrentTermUnlocked(const std::string& uuid) {
   return Status::OK();
 }
 
-const std::string& RaftConsensus::GetVotedForCurrentTermUnlocked() const {
+std::string RaftConsensus::GetVotedForCurrentTermUnlocked() const {
   DCHECK(lock_.is_locked());
   DCHECK(cmeta_->has_voted_for());
   return cmeta_->voted_for();

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 477814d..cf882f4 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -120,7 +120,7 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
 
   static scoped_refptr<RaftConsensus> Create(
     ConsensusOptions options,
-    std::unique_ptr<ConsensusMetadata> cmeta,
+    scoped_refptr<ConsensusMetadata> cmeta,
     const RaftPeerPB& local_peer_pb,
     const scoped_refptr<MetricEntity>& metric_entity,
     scoped_refptr<TimeManager> time_manager,
@@ -132,7 +132,7 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
     ThreadPool* raft_pool);
 
   RaftConsensus(ConsensusOptions options,
-                std::unique_ptr<ConsensusMetadata> cmeta,
+                scoped_refptr<ConsensusMetadata> cmeta,
                 gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
                 gscoped_ptr<PeerMessageQueue> queue,
                 gscoped_ptr<PeerManager> peer_manager,
@@ -461,9 +461,8 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   // that uses transactions, delegates to StartConsensusOnlyRoundUnlocked().
   Status StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg);
 
-  // Returns OK and sets 'single_voter' if this node is the only voter in the
-  // Raft configuration.
-  Status IsSingleVoterConfig(bool* single_voter) const;
+  // Returns true if this node is the only voter in the Raft configuration.
+  bool IsSingleVoterConfig() const;
 
   // Return header string for RequestVote log messages. 'lock_' must be held.
   std::string GetRequestVoteLogPrefixUnlocked(const VoteRequestPB& request) const;
@@ -634,9 +633,6 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   // Returns OK if leader, IllegalState otherwise.
   Status CheckActiveLeaderUnlocked() const WARN_UNUSED_RESULT;
 
-  // Return current consensus state summary.
-  ConsensusStatePB ConsensusStateUnlocked() const;
-
   // Returns the currently active Raft role.
   RaftPeerPB::Role GetActiveRoleUnlocked() const;
 
@@ -657,7 +653,7 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   void ClearPendingConfigUnlocked();
 
   // Return the pending configuration, or crash if one is not set.
-  const RaftConfigPB& GetPendingConfigUnlocked() const;
+  RaftConfigPB GetPendingConfigUnlocked() const;
 
   // Changes the committed config for this replica. Checks that there is a
   // pending configuration and that it is equal to this one. Persists changes to disk.
@@ -665,11 +661,11 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   Status SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit);
 
   // Return the persisted configuration.
-  const RaftConfigPB& GetCommittedConfigUnlocked() const;
+  RaftConfigPB GetCommittedConfigUnlocked() const;
 
   // Return the "active" configuration - if there is a pending configuration return it;
   // otherwise return the committed configuration.
-  const RaftConfigPB& GetActiveConfigUnlocked() const;
+  RaftConfigPB GetActiveConfigUnlocked() const;
 
   // Checks if the term change is legal. If so, sets 'current_term'
   // to 'new_term' and sets 'has voted' to no for the current term.
@@ -683,7 +679,7 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   const int64_t GetCurrentTermUnlocked() const;
 
   // Accessors for the leader of the current term.
-  const std::string& GetLeaderUuidUnlocked() const;
+  std::string GetLeaderUuidUnlocked() const;
   bool HasLeaderUnlocked() const;
   void ClearLeaderUnlocked();
 
@@ -696,7 +692,7 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
 
   // Return replica's vote for the current term.
   // The vote must be set; use HasVotedCurrentTermUnlocked() to check.
-  const std::string& GetVotedForCurrentTermUnlocked() const;
+  std::string GetVotedForCurrentTermUnlocked() const;
 
   const ConsensusOptions& GetOptions() const;
 
@@ -731,7 +727,7 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   State state_;
 
   // Consensus metadata persistence object.
-  std::unique_ptr<ConsensusMetadata> cmeta_;
+  scoped_refptr<ConsensusMetadata> cmeta_;
 
   // Threadpool token for constructing requests to peers, handling RPC callbacks, etc.
   std::unique_ptr<ThreadPoolToken> raft_pool_token_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index e868d5a..3cf1d2b 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -147,7 +147,7 @@ class RaftConsensusQuorumTest : public KuduTest {
 
       string peer_uuid = Substitute("peer-$0", i);
 
-      unique_ptr<ConsensusMetadata> cmeta;
+      scoped_refptr<ConsensusMetadata> cmeta;
       CHECK_OK(ConsensusMetadata::Create(fs_managers_[i], kTestTablet, peer_uuid, config_,
                                          kMinimumTerm, &cmeta));
 
@@ -522,23 +522,23 @@ class RaftConsensusQuorumTest : public KuduTest {
   }
 
   // Read the ConsensusMetadata for the given peer from disk.
-  unique_ptr<ConsensusMetadata> ReadConsensusMetadataFromDisk(int peer_index) {
+  scoped_refptr<ConsensusMetadata> ReadConsensusMetadataFromDisk(int peer_index) {
     string peer_uuid = Substitute("peer-$0", peer_index);
-    unique_ptr<ConsensusMetadata> cmeta;
+    scoped_refptr<ConsensusMetadata> cmeta;
     CHECK_OK(ConsensusMetadata::Load(fs_managers_[peer_index], kTestTablet, peer_uuid, &cmeta));
     return cmeta;
   }
 
   // Assert that the durable term == term and that the peer that got the vote == voted_for.
   void AssertDurableTermAndVote(int peer_index, int64_t term, const std::string& voted_for) {
-    unique_ptr<ConsensusMetadata> cmeta = ReadConsensusMetadataFromDisk(peer_index);
+    scoped_refptr<ConsensusMetadata> cmeta = ReadConsensusMetadataFromDisk(peer_index);
     ASSERT_EQ(term, cmeta->current_term());
     ASSERT_EQ(voted_for, cmeta->voted_for());
   }
 
   // Assert that the durable term == term and that the peer has not yet voted.
   void AssertDurableTermWithoutVote(int peer_index, int64_t term) {
-    unique_ptr<ConsensusMetadata> cmeta = ReadConsensusMetadataFromDisk(peer_index);
+    scoped_refptr<ConsensusMetadata> cmeta = ReadConsensusMetadataFromDisk(peer_index);
     ASSERT_EQ(term, cmeta->current_term());
     ASSERT_FALSE(cmeta->has_voted_for());
   }
@@ -892,13 +892,15 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
 
     // This will force an election in which we expect to make the last
     // non-shutdown peer in the list become leader.
-    int flush_count_before = new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
+    int64_t flush_count_before =
+        new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
     LOG(INFO) << "Running election for future leader with index " << (current_config_size - 1);
     ASSERT_OK(new_leader->StartElection(RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE,
                                         RaftConsensus::EXTERNAL_REQUEST));
     WaitUntilLeaderForTests(new_leader.get());
     LOG(INFO) << "Election won";
-    int flush_count_after = new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
+    int64_t flush_count_after =
+        new_leader->consensus_metadata_for_tests()->flush_count_for_tests();
     ASSERT_EQ(flush_count_after, flush_count_before + 1)
         << "Expected only one consensus metadata flush for a leader election";
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/integration-tests/ts_recovery-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc
index e697f2d..23843dd 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -379,7 +379,7 @@ TEST_F(TsRecoveryITestDeathTest, TestRecoverFromOpIdOverflow) {
 
     // We also need to update the ConsensusMetadata to match with the term we
     // want to end up with.
-    unique_ptr<ConsensusMetadata> cmeta;
+    scoped_refptr<ConsensusMetadata> cmeta;
     ConsensusMetadata::Load(fs_manager.get(), tablet_id, fs_manager->uuid(), &cmeta);
     cmeta->set_current_term(kDesiredIndexValue);
     ASSERT_OK(cmeta->Flush());

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 81407b7..b5cba69 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -145,7 +145,7 @@ Status SysCatalogTable::Load(FsManager *fs_manager) {
   if (master_->opts().IsDistributed()) {
     LOG(INFO) << "Verifying existing consensus state";
     string tablet_id = metadata->tablet_id();
-    unique_ptr<ConsensusMetadata> cmeta;
+    scoped_refptr<ConsensusMetadata> cmeta;
     RETURN_NOT_OK_PREPEND(ConsensusMetadata::Load(fs_manager, tablet_id,
                                                   fs_manager->uuid(), &cmeta),
                           "Unable to load consensus metadata for tablet " + tablet_id);
@@ -217,7 +217,7 @@ Status SysCatalogTable::CreateNew(FsManager *fs_manager) {
   }
 
   string tablet_id = metadata->tablet_id();
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   RETURN_NOT_OK_PREPEND(ConsensusMetadata::Create(fs_manager, tablet_id, fs_manager->uuid(),
                                                   config, consensus::kMinimumTerm, &cmeta),
                         "Unable to persist consensus metadata for tablet " + tablet_id);

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/tablet/tablet_bootstrap-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc b/src/kudu/tablet/tablet_bootstrap-test.cc
index ec89131..3bfbb19 100644
--- a/src/kudu/tablet/tablet_bootstrap-test.cc
+++ b/src/kudu/tablet/tablet_bootstrap-test.cc
@@ -92,7 +92,7 @@ class BootstrapTest : public LogTestBase {
     peer->set_permanent_uuid(meta->fs_manager()->uuid());
     peer->set_member_type(consensus::RaftPeerPB::VOTER);
 
-    unique_ptr<ConsensusMetadata> cmeta;
+    scoped_refptr<ConsensusMetadata> cmeta;
     RETURN_NOT_OK_PREPEND(ConsensusMetadata::Create(meta->fs_manager(), meta->tablet_id(),
                                                     meta->fs_manager()->uuid(),
                                                     config, kMinimumTerm, &cmeta),

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index f182ed5..dc23373 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -346,7 +346,7 @@ class TabletBootstrap {
   scoped_refptr<log::Log> log_;
   std::shared_ptr<log::LogReader> log_reader_;
 
-  unique_ptr<ConsensusMetadata> cmeta_;
+  scoped_refptr<ConsensusMetadata> cmeta_;
 
   // Statistics on the replay of entries in the log.
   struct Stats {
@@ -1413,7 +1413,7 @@ Status TabletBootstrap::PlayChangeConfigRequest(ReplicateMsg* replicate_msg,
   ChangeConfigRecordPB* change_config = replicate_msg->mutable_change_config_record();
   RaftConfigPB config = change_config->new_config();
 
-  int64_t cmeta_opid_index =  cmeta_->committed_config().opid_index();
+  int64_t cmeta_opid_index =  cmeta_->GetConfigOpIdIndex(consensus::COMMITTED_CONFIG);
   if (replicate_msg->id().index() > cmeta_opid_index) {
     DCHECK(!config.has_opid_index());
     config.set_opid_index(replicate_msg->id().index());

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/tablet/tablet_replica-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index f91293d..cacac8d 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -124,7 +124,7 @@ class TabletReplicaTest : public KuduTabletTest {
     config.add_peers()->CopyFrom(config_peer);
     config.set_opid_index(consensus::kInvalidOpIdIndex);
 
-    unique_ptr<ConsensusMetadata> cmeta;
+    scoped_refptr<ConsensusMetadata> cmeta;
     ASSERT_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(),
                                         tablet()->tablet_id(),
                                         tablet()->metadata()->fs_manager()->uuid(),

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index baf1f27..4ecd2e9 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -156,7 +156,7 @@ Status TabletReplica::Init(const shared_ptr<Tablet>& tablet,
 
     TRACE("Creating consensus instance");
 
-    unique_ptr<ConsensusMetadata> cmeta;
+    scoped_refptr<ConsensusMetadata> cmeta;
     RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_,
                                           meta_->fs_manager()->uuid(), &cmeta));
 
@@ -164,7 +164,7 @@ Status TabletReplica::Init(const shared_ptr<Tablet>& tablet,
         clock, tablet_->mvcc_manager()->GetCleanTimestamp()));
 
     consensus_ = RaftConsensus::Create(options,
-                                       std::move(cmeta),
+                                       cmeta,
                                        local_peer_pb_,
                                        metric_entity,
                                        time_manager,

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/tools/tool_action_local_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 0c92eeb..cb03282 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -222,10 +222,10 @@ Status PrintReplicaUuids(const RunnerContext& context) {
   const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
 
   // Load the cmeta file and print all peer uuids.
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   RETURN_NOT_OK(ConsensusMetadata::Load(fs_manager.get(), tablet_id,
                                         fs_manager->uuid(), &cmeta));
-  cout << JoinMapped(cmeta->committed_config().peers(),
+  cout << JoinMapped(cmeta->CommittedConfig().peers(),
                      [](const RaftPeerPB& p){ return p.permanent_uuid(); },
                      " ") << endl;
   return Status::OK();
@@ -269,10 +269,10 @@ Status RewriteRaftConfig(const RunnerContext& context) {
   RETURN_NOT_OK(BackupConsensusMetadata(&fs_manager, tablet_id));
 
   // Load the cmeta file and rewrite the raft config.
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   RETURN_NOT_OK(ConsensusMetadata::Load(&fs_manager, tablet_id,
                                         fs_manager.uuid(), &cmeta));
-  RaftConfigPB current_config = cmeta->committed_config();
+  RaftConfigPB current_config = cmeta->CommittedConfig();
   RaftConfigPB new_config = current_config;
   new_config.clear_peers();
   for (const auto& p : peers) {
@@ -302,7 +302,7 @@ Status SetRaftTerm(const RunnerContext& context) {
   FsManager fs_manager(env, FsManagerOpts());
   RETURN_NOT_OK(fs_manager.Open());
   // Load the cmeta file and rewrite the raft config.
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   RETURN_NOT_OK(ConsensusMetadata::Load(&fs_manager, tablet_id,
                                         fs_manager.uuid(), &cmeta));
   if (new_term <= cmeta->current_term()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/tserver/tablet_copy_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index 239c224..c296e90 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -136,7 +136,7 @@ Status TabletCopyClient::SetTabletToReplace(const scoped_refptr<TabletMetadata>&
   }
 
   // Load the old consensus metadata, if it exists.
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   Status s = ConsensusMetadata::Load(fs_manager_, tablet_id_,
                                      fs_manager_->uuid(), &cmeta);
   if (s.IsNotFound()) {
@@ -504,7 +504,7 @@ Status TabletCopyClient::DownloadWAL(uint64_t wal_segment_seqno) {
 Status TabletCopyClient::WriteConsensusMetadata() {
   // If we didn't find a previous consensus meta file, create one.
   if (!cmeta_) {
-    unique_ptr<ConsensusMetadata> cmeta;
+    scoped_refptr<ConsensusMetadata> cmeta;
     return ConsensusMetadata::Create(fs_manager_, tablet_id_, fs_manager_->uuid(),
                                      remote_cstate_->committed_config(),
                                      remote_cstate_->current_term(),

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/tserver/tablet_copy_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h
index 9a61985..5aa9326 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -205,7 +205,7 @@ class TabletCopyClient {
 
   // Local Consensus metadata file. This may initially be NULL if this is
   // bootstrapping a new replica (rather than replacing an old one).
-  std::unique_ptr<consensus::ConsensusMetadata> cmeta_;
+  scoped_refptr<consensus::ConsensusMetadata> cmeta_;
 
   scoped_refptr<tablet::TabletReplica> tablet_replica_;
   std::shared_ptr<TabletCopyServiceProxy> proxy_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/tserver/tablet_copy_source_session-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 7ded27d..f216074 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -121,7 +121,7 @@ class TabletCopyTest : public KuduTabletTest {
     config.add_peers()->CopyFrom(config_peer);
     config.set_opid_index(consensus::kInvalidOpIdIndex);
 
-    unique_ptr<ConsensusMetadata> cmeta;
+    scoped_refptr<ConsensusMetadata> cmeta;
     ASSERT_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(),
                                         tablet()->tablet_id(), fs_manager()->uuid(),
                                         config, consensus::kMinimumTerm, &cmeta));

http://git-wip-us.apache.org/repos/asf/kudu/blob/8d29412c/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 99e580e..576fcd3 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -271,7 +271,7 @@ Status TSTabletManager::CreateNewTablet(const string& table_id,
 
   // We must persist the consensus metadata to disk before starting a new
   // tablet's TabletReplica and RaftConsensus implementation.
-  unique_ptr<ConsensusMetadata> cmeta;
+  scoped_refptr<ConsensusMetadata> cmeta;
   RETURN_NOT_OK_PREPEND(ConsensusMetadata::Create(fs_manager_, tablet_id, fs_manager_->uuid(),
                                                   config, consensus::kMinimumTerm, &cmeta),
                         "Unable to create new ConsensusMeta for tablet " + tablet_id);


[2/2] kudu git commit: consensus: Get rid of a bunch of cmeta wrappers

Posted by mp...@apache.org.
consensus: Get rid of a bunch of cmeta wrappers

We can directly access consensus metadata in RaftConsensus now, so get
rid of some useless wrapper functions now.

Change-Id: I90537d8ba945e45671415a1b5f899166343f28cb
Reviewed-on: http://gerrit.cloudera.org:8080/7323
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bf8e1c7b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bf8e1c7b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bf8e1c7b

Branch: refs/heads/master
Commit: bf8e1c7b8f21a01148588951a53da66bda070113
Parents: 8d29412
Author: Mike Percy <mp...@apache.org>
Authored: Tue Jun 27 19:30:20 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jun 29 20:11:15 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/raft_consensus.cc | 92 +++++++++++--------------------
 src/kudu/consensus/raft_consensus.h  | 25 +--------
 2 files changed, 33 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bf8e1c7b/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index b77a0bd..49afc3c 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -312,7 +312,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Replica starting. Triggering "
                                    << info.orphaned_replicates.size()
                                    << " pending transactions. Active config: "
-                                   << SecureShortDebugString(GetActiveConfigUnlocked());
+                                   << SecureShortDebugString(cmeta_->ActiveConfig());
     for (ReplicateMsg* replicate : info.orphaned_replicates) {
       ReplicateRefPtr replicate_ptr = make_scoped_refptr_replicate(new ReplicateMsg(*replicate));
       RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr));
@@ -426,7 +426,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
     LockGuard l(lock_);
     RETURN_NOT_OK(CheckRunningUnlocked());
 
-    RaftPeerPB::Role active_role = GetActiveRoleUnlocked();
+    RaftPeerPB::Role active_role = cmeta_->active_role();
     if (active_role == RaftPeerPB::LEADER) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << "Not starting " << mode << " -- already leader";
       return Status::OK();
@@ -435,7 +435,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
       RETURN_NOT_OK(SnoozeFailureDetectorUnlocked()); // Reduce election noise while in this state.
       return Status::IllegalState("Not starting election: Node is currently "
                                   "a non-participant in the raft config",
-                                  SecureShortDebugString(GetActiveConfigUnlocked()));
+                                  SecureShortDebugString(cmeta_->ActiveConfig()));
     }
     LOG_WITH_PREFIX_UNLOCKED(INFO)
         << "Starting " << mode_str
@@ -460,7 +460,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
       RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid_));
     }
 
-    RaftConfigPB active_config = GetActiveConfigUnlocked();
+    RaftConfigPB active_config = cmeta_->ActiveConfig();
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Starting " << mode_str << " with config: "
                                    << SecureShortDebugString(active_config);
 
@@ -521,7 +521,7 @@ Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
   ThreadRestrictions::AssertWaitAllowed();
   LockGuard l(lock_);
   RETURN_NOT_OK(CheckRunningUnlocked());
-  if (GetActiveRoleUnlocked() != RaftPeerPB::LEADER) {
+  if (cmeta_->active_role() != RaftPeerPB::LEADER) {
     resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
     StatusToPB(Status::IllegalState("Not currently leader"),
                resp->mutable_error()->mutable_status());
@@ -674,7 +674,7 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
     int64_t committed_config_opid_index = cmeta_->GetConfigOpIdIndex(COMMITTED_CONFIG);
     if (round->replicate_msg()->id().index() > committed_config_opid_index) {
       RETURN_NOT_OK(SetPendingConfigUnlocked(new_config));
-      if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+      if (cmeta_->active_role() == RaftPeerPB::LEADER) {
         RETURN_NOT_OK(RefreshConsensusQueueAndPeersUnlocked());
       }
     } else {
@@ -709,7 +709,7 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
 
   pending_.AdvanceCommittedIndex(commit_index);
 
-  if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+  if (cmeta_->active_role() == RaftPeerPB::LEADER) {
     peer_manager_->SignalRequest(false);
   }
 }
@@ -756,13 +756,13 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
       return;
     }
 
-    if (IsConfigChangePendingUnlocked()) {
+    if (cmeta_->has_pending_config()) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "There is already a config change operation "
                                      << "in progress. Unable to evict follower until it completes. "
                                      << "Doing nothing.";
       return;
     }
-    committed_config = GetCommittedConfigUnlocked();
+    committed_config = cmeta_->CommittedConfig();
   }
 
   // Run config change on thread pool after dropping lock.
@@ -1568,7 +1568,7 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
       return Status::InvalidArgument("server must have permanent_uuid specified",
                                      SecureShortDebugString(req));
     }
-    RaftConfigPB committed_config = GetCommittedConfigUnlocked();
+    RaftConfigPB committed_config = cmeta_->CommittedConfig();
 
     // Support atomic ChangeConfig requests.
     if (req.has_cas_config_opid_index()) {
@@ -1662,13 +1662,13 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
     current_term = GetCurrentTermUnlocked();
-    committed_config = GetCommittedConfigUnlocked();
-    if (IsConfigChangePendingUnlocked()) {
+    committed_config = cmeta_->CommittedConfig();
+    if (cmeta_->has_pending_config()) {
       LOG_WITH_PREFIX_UNLOCKED(WARNING)
             << "Replica has a pending config, but the new config "
             << "will be unsafely changed anyway. "
             << "Currently pending config on the node: "
-            << SecureShortDebugString(GetPendingConfigUnlocked());
+            << SecureShortDebugString(cmeta_->PendingConfig());
     }
     all_replicated_index = queue_->GetAllReplicatedIndex();
     last_committed_index = queue_->GetCommittedIndex();
@@ -1985,7 +1985,7 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
 RaftPeerPB::Role RaftConsensus::role() const {
   ThreadRestrictions::AssertWaitAllowed();
   LockGuard l(lock_);
-  return GetActiveRoleUnlocked();
+  return cmeta_->active_role();
 }
 
 const char* RaftConsensus::State_Name(State state) {
@@ -2036,8 +2036,8 @@ Status RaftConsensus::ReplicateConfigChangeUnlocked(const RaftConfigPB& old_conf
 
 Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
   DCHECK(lock_.is_locked());
-  DCHECK_EQ(RaftPeerPB::LEADER, GetActiveRoleUnlocked());
-  RaftConfigPB active_config = GetActiveConfigUnlocked();
+  DCHECK_EQ(RaftPeerPB::LEADER, cmeta_->active_role());
+  RaftConfigPB active_config = cmeta_->ActiveConfig();
 
   // Change the peers so that we're able to replicate messages remotely and
   // locally. The peer manager must be closed before updating the active config
@@ -2070,7 +2070,7 @@ ConsensusStatePB RaftConsensus::ConsensusState() const {
 RaftConfigPB RaftConsensus::CommittedConfig() const {
   ThreadRestrictions::AssertWaitAllowed();
   LockGuard l(lock_);
-  return GetCommittedConfigUnlocked();
+  return cmeta_->CommittedConfig();
 }
 
 void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
@@ -2086,7 +2086,7 @@ void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
   {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
-    role = GetActiveRoleUnlocked();
+    role = cmeta_->active_role();
   }
   if (role == RaftPeerPB::LEADER) {
     out << "<h2>Queue overview</h2>" << std::endl;
@@ -2182,11 +2182,11 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
                                       << "Result: Term " << election_term << ": "
                                       << (result.decision == VOTE_GRANTED ? "won" : "lost")
                                       << ". RaftConfig: "
-                                      << SecureShortDebugString(GetActiveConfigUnlocked());
+                                      << SecureShortDebugString(cmeta_->ActiveConfig());
     return;
   }
 
-  if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+  if (cmeta_->active_role() == RaftPeerPB::LEADER) {
     // If this was a pre-election, it's possible to see the following interleaving:
     //
     //  1. Term N (follower): send a real election for term N
@@ -2304,11 +2304,11 @@ void RaftConsensus::CompleteConfigChangeRoundUnlocked(ConsensusRound* round, con
 
   if (!status.ok()) {
     // If the config change being aborted is the current pending one, abort it.
-    if (IsConfigChangePendingUnlocked() &&
+    if (cmeta_->has_pending_config() &&
         cmeta_->GetConfigOpIdIndex(PENDING_CONFIG) == op_id.index()) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << "Aborting config change with OpId "
                                      << op_id << ": " << status.ToString();
-      ClearPendingConfigUnlocked();
+      cmeta_->clear_pending_config();
     } else {
       LOG_WITH_PREFIX_UNLOCKED(INFO)
           << "Skipping abort of non-pending config change with OpId "
@@ -2447,7 +2447,7 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
     return Status::IllegalState(Substitute("Can't advance term to: $0 current term: $1 is higher.",
                                            new_term, GetCurrentTermUnlocked()));
   }
-  if (GetActiveRoleUnlocked() == RaftPeerPB::LEADER) {
+  if (cmeta_->active_role() == RaftPeerPB::LEADER) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Stepping down as leader of term "
                                    << GetCurrentTermUnlocked();
     RETURN_NOT_OK(BecomeReplicaUnlocked());
@@ -2478,7 +2478,7 @@ Status RaftConsensus::CheckRunningUnlocked() const {
 
 Status RaftConsensus::CheckActiveLeaderUnlocked() const {
   DCHECK(lock_.is_locked());
-  RaftPeerPB::Role role = GetActiveRoleUnlocked();
+  RaftPeerPB::Role role = cmeta_->active_role();
   switch (role) {
     case RaftPeerPB::LEADER:
       return Status::OK();
@@ -2491,24 +2491,14 @@ Status RaftConsensus::CheckActiveLeaderUnlocked() const {
   }
 }
 
-RaftPeerPB::Role RaftConsensus::GetActiveRoleUnlocked() const {
-  DCHECK(lock_.is_locked());
-  return cmeta_->active_role();
-}
-
-bool RaftConsensus::IsConfigChangePendingUnlocked() const {
-  DCHECK(lock_.is_locked());
-  return cmeta_->has_pending_config();
-}
-
 Status RaftConsensus::CheckNoConfigChangePendingUnlocked() const {
   DCHECK(lock_.is_locked());
-  if (IsConfigChangePendingUnlocked()) {
+  if (cmeta_->has_pending_config()) {
     return Status::IllegalState(
         Substitute("RaftConfig change currently pending. Only one is allowed at a time.\n"
                    "  Committed config: $0.\n  Pending config: $1",
-                   SecureShortDebugString(GetCommittedConfigUnlocked()),
-                   SecureShortDebugString(GetPendingConfigUnlocked())));
+                   SecureShortDebugString(cmeta_->CommittedConfig()),
+                   SecureShortDebugString(cmeta_->PendingConfig())));
   }
   return Status::OK();
 }
@@ -2532,16 +2522,6 @@ Status RaftConsensus::SetPendingConfigUnlocked(const RaftConfigPB& new_config) {
   return Status::OK();
 }
 
-void RaftConsensus::ClearPendingConfigUnlocked() {
-  DCHECK(lock_.is_locked());
-  cmeta_->clear_pending_config();
-}
-
-RaftConfigPB RaftConsensus::GetPendingConfigUnlocked() const {
-  DCHECK(lock_.is_locked());
-  return cmeta_->PendingConfig();
-}
-
 Status RaftConsensus::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit) {
   TRACE_EVENT0("consensus", "RaftConsensus::SetCommittedConfigUnlocked");
   DCHECK(lock_.is_locked());
@@ -2555,8 +2535,8 @@ Status RaftConsensus::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_c
   // because unsafe config change allows multiple pending configs to exist.
   // Therefore we only need to validate that 'config_to_commit' matches the pending config
   // if the pending config does not have its 'unsafe_config_change' flag set.
-  if (IsConfigChangePendingUnlocked()) {
-    RaftConfigPB pending_config = GetPendingConfigUnlocked();
+  if (cmeta_->has_pending_config()) {
+    RaftConfigPB pending_config = cmeta_->PendingConfig();
     if (!pending_config.unsafe_config_change()) {
       // Quorums must be exactly equal, even w.r.t. peer ordering.
       CHECK_EQ(pending_config.SerializeAsString(),
@@ -2573,16 +2553,6 @@ Status RaftConsensus::SetCommittedConfigUnlocked(const RaftConfigPB& config_to_c
   return Status::OK();
 }
 
-RaftConfigPB RaftConsensus::GetCommittedConfigUnlocked() const {
-  DCHECK(lock_.is_locked());
-  return cmeta_->CommittedConfig();
-}
-
-RaftConfigPB RaftConsensus::GetActiveConfigUnlocked() const {
-  DCHECK(lock_.is_locked());
-  return cmeta_->ActiveConfig();
-}
-
 Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term,
                                             FlushToDisk flush) {
   TRACE_EVENT1("consensus", "RaftConsensus::SetCurrentTermUnlocked",
@@ -2658,7 +2628,7 @@ string RaftConsensus::LogPrefixUnlocked() const {
                     options_.tablet_id,
                     peer_uuid_,
                     GetCurrentTermUnlocked(),
-                    RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
+                    RaftPeerPB::Role_Name(cmeta_->active_role()));
 }
 
 string RaftConsensus::LogPrefixThreadSafe() const {
@@ -2676,7 +2646,7 @@ string RaftConsensus::ToString() const {
 string RaftConsensus::ToStringUnlocked() const {
   DCHECK(lock_.is_locked());
   return Substitute("Replica: $0, State: $1, Role: $2",
-                    peer_uuid_, State_Name(state_), RaftPeerPB::Role_Name(GetActiveRoleUnlocked()));
+                    peer_uuid_, State_Name(state_), RaftPeerPB::Role_Name(cmeta_->active_role()));
 }
 
 ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/bf8e1c7b/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index cf882f4..2d3e725 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -633,40 +633,19 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   // Returns OK if leader, IllegalState otherwise.
   Status CheckActiveLeaderUnlocked() const WARN_UNUSED_RESULT;
 
-  // Returns the currently active Raft role.
-  RaftPeerPB::Role GetActiveRoleUnlocked() const;
-
-  // Returns true if there is a configuration change currently in-flight but not yet
-  // committed.
-  bool IsConfigChangePendingUnlocked() const;
-
-  // Inverse of IsConfigChangePendingUnlocked(): returns OK if there is
-  // currently *no* configuration change pending, and IllegalState is there *is* a
-  // configuration change pending.
+  // Returns OK if there is currently *no* configuration change pending, and
+  // IllegalState is there *is* a configuration change pending.
   Status CheckNoConfigChangePendingUnlocked() const WARN_UNUSED_RESULT;
 
   // Sets the given configuration as pending commit. Does not persist into the peers
   // metadata. In order to be persisted, SetCommittedConfigUnlocked() must be called.
   Status SetPendingConfigUnlocked(const RaftConfigPB& new_config) WARN_UNUSED_RESULT;
 
-  // Clear (cancel) the pending configuration.
-  void ClearPendingConfigUnlocked();
-
-  // Return the pending configuration, or crash if one is not set.
-  RaftConfigPB GetPendingConfigUnlocked() const;
-
   // Changes the committed config for this replica. Checks that there is a
   // pending configuration and that it is equal to this one. Persists changes to disk.
   // Resets the pending configuration to null.
   Status SetCommittedConfigUnlocked(const RaftConfigPB& config_to_commit);
 
-  // Return the persisted configuration.
-  RaftConfigPB GetCommittedConfigUnlocked() const;
-
-  // Return the "active" configuration - if there is a pending configuration return it;
-  // otherwise return the committed configuration.
-  RaftConfigPB GetActiveConfigUnlocked() const;
-
   // Checks if the term change is legal. If so, sets 'current_term'
   // to 'new_term' and sets 'has voted' to no for the current term.
   //