You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/07/13 04:43:27 UTC

[2/2] kudu git commit: Create ConsensusMetadataManager

Create ConsensusMetadataManager

This patch creates an API for a class that manages consensus
metadata. This abstracts out the management of ConsensusMetadata files,
providing the following benefits:

1) An instance of this class can be plumbed throughout the various
classes that deal with reading, creating, and modifying consensus
metadata so that we don't have to pass the individual cmeta instances
around.

2) It provides additional abstraction and flexibility that will make it
easier to change the underlying implementation of ConsensusMetadata in
the future as needed, perhaps if we wanted to make them log structured
and group committed across all replicas on a tablet server.

This patch also changes the contract of
ConsensusMetadata::DeleteOnDiskData() to return NotFound if a
nonexistent cmeta is deleted so that ConsensusMetadataManager::Delete()
can provide that same contract, and modifies the single callsite (in
TSTabletManager) to account for that change.

Change-Id: Ia30c05dd0feec2b7530205f4d17dfc079a1c3451
Reviewed-on: http://gerrit.cloudera.org:8080/7191
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fa5d56cb
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fa5d56cb
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fa5d56cb

Branch: refs/heads/master
Commit: fa5d56cb65c490be7ef1fa714c38bda3b8e18a56
Parents: 603c157
Author: Mike Percy <mp...@apache.org>
Authored: Thu Jun 8 16:19:21 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jul 13 04:43:06 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/CMakeLists.txt               |   3 +
 src/kudu/consensus/consensus_meta.cc            |  75 ++++---
 src/kudu/consensus/consensus_meta.h             |  64 +++---
 .../consensus_meta_manager-stress-test.cc       | 194 +++++++++++++++++++
 .../consensus/consensus_meta_manager-test.cc    | 110 +++++++++++
 src/kudu/consensus/consensus_meta_manager.cc    |  88 +++++++++
 src/kudu/consensus/consensus_meta_manager.h     |  83 ++++++++
 src/kudu/consensus/raft_consensus.cc            |  11 +-
 .../consensus/raft_consensus_quorum-test.cc     |  88 +++++----
 src/kudu/integration-tests/tablet_copy-itest.cc |   7 +-
 src/kudu/integration-tests/ts_recovery-itest.cc |  10 +-
 src/kudu/master/sys_catalog.cc                  |  11 +-
 src/kudu/master/sys_catalog.h                   |   2 +
 src/kudu/tablet/tablet_bootstrap-test.cc        |  13 +-
 src/kudu/tablet/tablet_bootstrap.cc             |  64 +++---
 src/kudu/tablet/tablet_bootstrap.h              |   4 +-
 src/kudu/tablet/tablet_replica-test.cc          |  29 ++-
 src/kudu/tablet/tablet_replica.cc               |  16 +-
 src/kudu/tablet/tablet_replica.h                |   6 +
 src/kudu/tools/tool_action_local_replica.cc     |  24 ++-
 src/kudu/tserver/tablet_copy_client-test.cc     |   6 +
 src/kudu/tserver/tablet_copy_client.cc          |  24 ++-
 src/kudu/tserver/tablet_copy_client.h           |   5 +-
 .../tserver/tablet_copy_source_session-test.cc  |  11 +-
 src/kudu/tserver/ts_tablet_manager.cc           |  37 ++--
 src/kudu/tserver/ts_tablet_manager.h            |  11 +-
 26 files changed, 786 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/consensus/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/CMakeLists.txt b/src/kudu/consensus/CMakeLists.txt
index ec53919..00df31b 100644
--- a/src/kudu/consensus/CMakeLists.txt
+++ b/src/kudu/consensus/CMakeLists.txt
@@ -97,6 +97,7 @@ target_link_libraries(log
 
 set(CONSENSUS_SRCS
   consensus_meta.cc
+  consensus_meta_manager.cc
   consensus_peers.cc
   consensus_queue.cc
   leader_election.cc
@@ -126,6 +127,8 @@ set(KUDU_TEST_LINK_LIBS
 )
 
 ADD_KUDU_TEST(consensus_meta-test)
+ADD_KUDU_TEST(consensus_meta_manager-stress-test RUN_SERIAL true)
+ADD_KUDU_TEST(consensus_meta_manager-test)
 ADD_KUDU_TEST(consensus_peers-test)
 ADD_KUDU_TEST(consensus_queue-test)
 ADD_KUDU_TEST(leader_election-test)

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/consensus/consensus_meta.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_meta.cc b/src/kudu/consensus/consensus_meta.cc
index 5782aab..79b198c 100644
--- a/src/kudu/consensus/consensus_meta.cc
+++ b/src/kudu/consensus/consensus_meta.cc
@@ -28,6 +28,7 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
 
 DEFINE_double(fault_crash_before_cmeta_flush, 0.0,
@@ -42,45 +43,6 @@ using std::lock_guard;
 using std::string;
 using strings::Substitute;
 
-Status ConsensusMetadata::Create(FsManager* fs_manager,
-                                 const string& tablet_id,
-                                 const std::string& peer_uuid,
-                                 const RaftConfigPB& config,
-                                 int64_t current_term,
-                                 scoped_refptr<ConsensusMetadata>* cmeta_out) {
-  scoped_refptr<ConsensusMetadata> cmeta(new ConsensusMetadata(fs_manager, tablet_id, peer_uuid));
-  cmeta->set_committed_config(config);
-  cmeta->set_current_term(current_term);
-  RETURN_NOT_OK(cmeta->Flush(NO_OVERWRITE)); // Create() should not clobber.
-  cmeta_out->swap(cmeta);
-  return Status::OK();
-}
-
-Status ConsensusMetadata::Load(FsManager* fs_manager,
-                               const std::string& tablet_id,
-                               const std::string& peer_uuid,
-                               scoped_refptr<ConsensusMetadata>* cmeta_out) {
-  scoped_refptr<ConsensusMetadata> cmeta(new ConsensusMetadata(fs_manager, tablet_id, peer_uuid));
-  RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(fs_manager->env(),
-                                                 fs_manager->GetConsensusMetadataPath(tablet_id),
-                                                 &cmeta->pb_));
-  cmeta->UpdateActiveRole(); // Needs to happen here as we sidestep the accessor APIs.
-  cmeta_out->swap(cmeta);
-  return Status::OK();
-}
-
-Status ConsensusMetadata::DeleteOnDiskData(FsManager* fs_manager, const string& tablet_id) {
-  string cmeta_path = fs_manager->GetConsensusMetadataPath(tablet_id);
-  Env* env = fs_manager->env();
-  if (!env->FileExists(cmeta_path)) {
-    return Status::OK();
-  }
-  LOG(INFO) << "T " << tablet_id << " Deleting consensus metadata";
-  RETURN_NOT_OK_PREPEND(env->DeleteFile(cmeta_path),
-                        "Unable to delete consensus metadata file for tablet " + tablet_id);
-  return Status::OK();
-}
-
 int64_t ConsensusMetadata::current_term() const {
   lock_guard<Mutex> l(lock_);
   return current_term_unlocked();
@@ -333,6 +295,41 @@ ConsensusMetadata::ConsensusMetadata(FsManager* fs_manager,
       flush_count_for_tests_(0) {
 }
 
+Status ConsensusMetadata::Create(FsManager* fs_manager,
+                                 const string& tablet_id,
+                                 const std::string& peer_uuid,
+                                 const RaftConfigPB& config,
+                                 int64_t current_term,
+                                 scoped_refptr<ConsensusMetadata>* cmeta_out) {
+  scoped_refptr<ConsensusMetadata> cmeta(new ConsensusMetadata(fs_manager, tablet_id, peer_uuid));
+  cmeta->set_committed_config(config);
+  cmeta->set_current_term(current_term);
+  RETURN_NOT_OK(cmeta->Flush(NO_OVERWRITE)); // Create() should not clobber.
+  cmeta_out->swap(cmeta);
+  return Status::OK();
+}
+
+Status ConsensusMetadata::Load(FsManager* fs_manager,
+                               const std::string& tablet_id,
+                               const std::string& peer_uuid,
+                               scoped_refptr<ConsensusMetadata>* cmeta_out) {
+  scoped_refptr<ConsensusMetadata> cmeta(new ConsensusMetadata(fs_manager, tablet_id, peer_uuid));
+  RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(fs_manager->env(),
+                                                 fs_manager->GetConsensusMetadataPath(tablet_id),
+                                                 &cmeta->pb_));
+  cmeta->UpdateActiveRole(); // Needs to happen here as we sidestep the accessor APIs.
+  cmeta_out->swap(cmeta);
+  return Status::OK();
+}
+
+Status ConsensusMetadata::DeleteOnDiskData(FsManager* fs_manager, const string& tablet_id) {
+  string cmeta_path = fs_manager->GetConsensusMetadataPath(tablet_id);
+  RETURN_NOT_OK_PREPEND(fs_manager->env()->DeleteFile(cmeta_path),
+                        Substitute("Unable to delete consensus metadata file for tablet $0",
+                                   tablet_id));
+  return Status::OK();
+}
+
 std::string ConsensusMetadata::LogPrefix() const {
   // No need to lock to read const members.
   return Substitute("T $0 P $1: ", tablet_id_, peer_uuid_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/consensus/consensus_meta.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_meta.h b/src/kudu/consensus/consensus_meta.h
index ac1ae8f..9fdfed1 100644
--- a/src/kudu/consensus/consensus_meta.h
+++ b/src/kudu/consensus/consensus_meta.h
@@ -14,25 +14,28 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_CONSENSUS_CONSENSUS_META_H_
-#define KUDU_CONSENSUS_CONSENSUS_META_H_
+#pragma once
 
-#include <cstdint>
 #include <string>
 
+#include <gtest/gtest_prod.h>
+
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/util/mutex.h"
-#include "kudu/util/status.h"
 
 namespace kudu {
 
 class FsManager;
+class Status;
 
 namespace consensus {
 
+class ConsensusMetadataManager;
+class ConsensusMetadataTest;
+
 // Provides methods to read, write, and persist consensus-related metadata.
 // This partly corresponds to Raft Figure 2's "Persistent state on all servers".
 //
@@ -65,27 +68,6 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> {
     NO_OVERWRITE
   };
 
-  // Create a ConsensusMetadata object with provided initial state.
-  // Encoded PB is flushed to disk before returning.
-  static Status Create(FsManager* fs_manager,
-                       const std::string& tablet_id,
-                       const std::string& peer_uuid,
-                       const RaftConfigPB& config,
-                       int64_t current_term,
-                       scoped_refptr<ConsensusMetadata>* cmeta_out);
-
-  // Load a ConsensusMetadata object from disk.
-  // Returns Status::NotFound if the file could not be found. May return other
-  // Status codes if unable to read the file.
-  static Status Load(FsManager* fs_manager,
-                     const std::string& tablet_id,
-                     const std::string& peer_uuid,
-                     scoped_refptr<ConsensusMetadata>* cmeta_out);
-
-  // Delete the ConsensusMetadata file associated with the given tablet from
-  // disk.
-  static Status DeleteOnDiskData(FsManager* fs_manager, const std::string& tablet_id);
-
   // Accessors for current term.
   int64_t current_term() const;
   void set_current_term(int64_t term);
@@ -169,10 +151,40 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> {
 
  private:
   friend class RefCountedThreadSafe<ConsensusMetadata>;
+  friend class ConsensusMetadataManager;
+
+  FRIEND_TEST(ConsensusMetadataTest, TestCreateLoad);
+  FRIEND_TEST(ConsensusMetadataTest, TestCreateNoOverwrite);
+  FRIEND_TEST(ConsensusMetadataTest, TestFailedLoad);
+  FRIEND_TEST(ConsensusMetadataTest, TestFlush);
+  FRIEND_TEST(ConsensusMetadataTest, TestActiveRole);
+  FRIEND_TEST(ConsensusMetadataTest, TestToConsensusStatePB);
+  FRIEND_TEST(ConsensusMetadataTest, TestMergeCommittedConsensusStatePB);
 
   ConsensusMetadata(FsManager* fs_manager, std::string tablet_id,
                     std::string peer_uuid);
 
+  // Create a ConsensusMetadata object with provided initial state.
+  // Encoded PB is flushed to disk before returning.
+  static Status Create(FsManager* fs_manager,
+                       const std::string& tablet_id,
+                       const std::string& peer_uuid,
+                       const RaftConfigPB& config,
+                       int64_t current_term,
+                       scoped_refptr<ConsensusMetadata>* cmeta_out);
+
+  // Load a ConsensusMetadata object from disk.
+  // Returns Status::NotFound if the file could not be found. May return other
+  // Status codes if unable to read the file.
+  static Status Load(FsManager* fs_manager,
+                     const std::string& tablet_id,
+                     const std::string& peer_uuid,
+                     scoped_refptr<ConsensusMetadata>* cmeta_out);
+
+  // Delete the ConsensusMetadata file associated with the given tablet from
+  // disk. Returns Status::NotFound if the on-disk data is not found.
+  static Status DeleteOnDiskData(FsManager* fs_manager, const std::string& tablet_id);
+
   // Return the specified config.
   const RaftConfigPB& config_unlocked(RaftConfigState type) const;
 
@@ -222,5 +234,3 @@ class ConsensusMetadata : public RefCountedThreadSafe<ConsensusMetadata> {
 
 } // namespace consensus
 } // namespace kudu
-
-#endif // KUDU_CONSENSUS_CONSENSUS_META_H_

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/consensus/consensus_meta_manager-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_meta_manager-stress-test.cc b/src/kudu/consensus/consensus_meta_manager-stress-test.cc
new file mode 100644
index 0000000..cc1293a
--- /dev/null
+++ b/src/kudu/consensus/consensus_meta_manager-stress-test.cc
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include <glog/stl_logging.h>
+
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/consensus_meta_manager.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/consensus/quorum_util.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/random.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/test_util.h"
+
+using std::atomic;
+using std::lock_guard;
+using std::string;
+using std::thread;
+using std::unordered_map;
+using std::vector;
+
+namespace kudu {
+namespace consensus {
+
+static constexpr const int64_t kInitialTerm = 1;
+
+using LockTable = unordered_map<string, string>;
+
+// Multithreaded stress tests for the cmeta manager.
+class ConsensusMetadataManagerStressTest : public KuduTest {
+ public:
+  ConsensusMetadataManagerStressTest()
+      : rng_(SeedRandom()),
+        fs_manager_(env_, GetTestPath("fs_root")),
+        cmeta_manager_(new ConsensusMetadataManager(&fs_manager_)) {
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    ASSERT_OK(fs_manager_.CreateInitialFileSystemLayout());
+    ASSERT_OK(fs_manager_.Open());
+
+    // Initialize test configuration.
+    config_.set_opid_index(kInvalidOpIdIndex);
+    RaftPeerPB* peer = config_.add_peers();
+    peer->set_permanent_uuid(fs_manager_.uuid());
+    peer->set_member_type(RaftPeerPB::VOTER);
+  }
+
+ protected:
+  enum OpType {
+    kCreate,
+    kLoad,
+    kDelete,
+    kNumOpTypes, // Must come last.
+  };
+
+  ThreadSafeRandom rng_;
+  FsManager fs_manager_;
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager_;
+  RaftConfigPB config_;
+
+  // Lock used by tests.
+  simple_spinlock lock_;
+};
+
+// Concurrency test to check whether TSAN will flag unsafe concurrent
+// operations for simultaneous access to the cmeta manager by different threads
+// on different tablet ids. For a given tablet id, a lock table is used as
+// external synchronization to ensure exclusive access by a single thread.
+TEST_F(ConsensusMetadataManagerStressTest, CreateLoadDeleteTSANTest) {
+  static const int kNumTablets = 26;
+  static const int kNumThreads = 8;
+  static const int kNumOpsPerThread = 1000;
+
+  // Set of tablets we are operating on.
+  vector<string> tablet_ids;
+
+  // Map of tablet_id -> cmeta existence.
+  unordered_map<string, bool> tablet_cmeta_exists;
+
+  // Each entry in 'lock_table' protects each value of
+  // 'tablet_cmeta_exists[tablet_id]'. We never resize 'tablet_cmeta_exists'.
+  LockTable lock_table;
+
+  for (int i = 0; i < kNumTablets; i++) {
+    string tablet_id = string(1, 'a' + i);
+    // None of the cmetas have been created yet.
+    InsertOrDie(&tablet_cmeta_exists, tablet_id, false);
+    tablet_ids.push_back(std::move(tablet_id));
+  }
+
+  // Eventually exit if the test hangs.
+  alarm(60);
+  auto c = MakeScopedCleanup([&] { alarm(0); });
+
+  atomic<int64_t> ops_performed(0);
+  Barrier barrier(kNumThreads);
+  vector<thread> threads;
+  for (int thread_num = 0; thread_num < kNumThreads; thread_num++) {
+    threads.emplace_back([&] {
+      barrier.Wait();
+      for (int op_num = 0; op_num < kNumOpsPerThread; op_num++) {
+        const string& tablet_id = tablet_ids[rng_.Uniform(kNumTablets)];
+        auto unlocker = MakeScopedCleanup([&] {
+          lock_guard<simple_spinlock> l(lock_);
+          CHECK(lock_table.erase(tablet_id));
+        });
+        // Acquire lock in lock table or bail.
+        {
+          // 'lock_' protects 'lock_table'.
+          lock_guard<simple_spinlock> l(lock_);
+          if (ContainsKey(lock_table, tablet_id)) {
+            // Another thread has access to this tablet id. Bail.
+            unlocker.cancel(); // Don't unlock what we didn't lock.
+            continue;
+          }
+          InsertOrDie(&lock_table, tablet_id, "lock for test");
+        }
+        OpType type = static_cast<OpType>(rng_.Uniform(kNumOpTypes));
+        switch (type) {
+          case kCreate: {
+            Status s = cmeta_manager_->Create(tablet_id, config_, kInitialTerm);
+            if (tablet_cmeta_exists[tablet_id]) {
+              CHECK(s.IsAlreadyPresent()) << s.ToString();
+            } else {
+              CHECK(s.ok()) << s.ToString();
+              ops_performed.fetch_add(1, std::memory_order_relaxed);
+            }
+            tablet_cmeta_exists[tablet_id] = true;
+            break;
+          }
+          case kLoad: {
+            scoped_refptr<ConsensusMetadata> cmeta;
+            Status s = cmeta_manager_->Load(tablet_id, &cmeta);
+            if (tablet_cmeta_exists[tablet_id]) {
+              CHECK(s.ok()) << s.ToString();
+              ops_performed.fetch_add(1, std::memory_order_relaxed);
+            } else {
+              CHECK(s.IsNotFound()) << tablet_id << ": " << s.ToString();
+            }
+            // Load() does not change 'tablet_cmeta_exists' status.
+            break;
+          }
+          case kDelete: {
+            Status s = cmeta_manager_->Delete(tablet_id);
+            if (tablet_cmeta_exists[tablet_id]) {
+              CHECK(s.ok()) << s.ToString();
+              ops_performed.fetch_add(1, std::memory_order_relaxed);
+            } else {
+              CHECK(s.IsNotFound()) << s.ToString();
+            }
+            tablet_cmeta_exists[tablet_id] = false;
+            break;
+          }
+          default: LOG(FATAL) << type; break;
+        }
+      }
+    });
+  }
+
+  for (int thread_num = 0; thread_num < kNumThreads; thread_num++) {
+    threads[thread_num].join();
+  }
+
+  LOG(INFO) << "Ops performed: " << ops_performed.load(std::memory_order_relaxed);
+}
+
+} // namespace consensus
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/consensus/consensus_meta_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_meta_manager-test.cc b/src/kudu/consensus/consensus_meta_manager-test.cc
new file mode 100644
index 0000000..bcd9ccb
--- /dev/null
+++ b/src/kudu/consensus/consensus_meta_manager-test.cc
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include <google/protobuf/util/message_differencer.h>
+
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/consensus_meta_manager.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/consensus/quorum_util.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/util/test_util.h"
+
+using google::protobuf::util::MessageDifferencer;
+
+namespace kudu {
+namespace consensus {
+
+static constexpr const char* kTabletId = "cmeta-mgr-test";
+static const int64_t kInitialTerm = 1;
+
+// Functional tests for the cmeta manager.
+class ConsensusMetadataManagerTest : public KuduTest {
+ public:
+  ConsensusMetadataManagerTest()
+      : fs_manager_(env_, GetTestPath("fs_root")),
+        cmeta_manager_(new ConsensusMetadataManager(&fs_manager_)) {
+  }
+
+  void SetUp() override {
+    KuduTest::SetUp();
+    ASSERT_OK(fs_manager_.CreateInitialFileSystemLayout());
+    ASSERT_OK(fs_manager_.Open());
+
+    // Initialize test configuration.
+    config_.set_opid_index(kInvalidOpIdIndex);
+    RaftPeerPB* peer = config_.add_peers();
+    peer->set_permanent_uuid(fs_manager_.uuid());
+    peer->set_member_type(RaftPeerPB::VOTER);
+  }
+
+ protected:
+  FsManager fs_manager_;
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager_;
+  RaftConfigPB config_;
+};
+
+// Test the basic "happy case" of creating and then loading a file.
+TEST_F(ConsensusMetadataManagerTest, TestCreateLoad) {
+  // Try to load a nonexistent instance.
+  scoped_refptr<ConsensusMetadata> cmeta;
+  Status s = cmeta_manager_->Load(kTabletId, &cmeta);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+
+  // Create a new ConsensusMetadata instance.
+  ASSERT_OK(cmeta_manager_->Create(kTabletId, config_, kInitialTerm, &cmeta));
+
+  // Load it back.
+  ASSERT_OK(cmeta_manager_->Load(kTabletId, &cmeta));
+
+  // Ensure we got what we expected.
+  ASSERT_EQ(kInitialTerm, cmeta->current_term());
+  ASSERT_TRUE(MessageDifferencer::Equals(config_, cmeta->CommittedConfig()))
+      << DiffRaftConfigs(config_, cmeta->CommittedConfig());
+}
+
+// Test Delete.
+TEST_F(ConsensusMetadataManagerTest, TestDelete) {
+  // Create a ConsensusMetadata instance.
+  scoped_refptr<ConsensusMetadata> cmeta;
+  ASSERT_OK(cmeta_manager_->Create(kTabletId, config_, kInitialTerm, &cmeta));
+
+  // Now delete it.
+  ASSERT_OK(cmeta_manager_->Delete(kTabletId));
+
+  // Can't load it because it's gone.
+  Status s = cmeta_manager_->Load(kTabletId, &cmeta);
+  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+}
+
+// Test that we can't clobber (overwrite) an existing cmeta.
+TEST_F(ConsensusMetadataManagerTest, TestNoClobber) {
+  // Create a ConsensusMetadata instance.
+  {
+    scoped_refptr<ConsensusMetadata> cmeta;
+    ASSERT_OK(cmeta_manager_->Create(kTabletId, config_, kInitialTerm, &cmeta));
+  }
+
+  // Creating it again should fail.
+  scoped_refptr<ConsensusMetadata> cmeta;
+  Status s = cmeta_manager_->Create(kTabletId, config_, kInitialTerm, &cmeta);
+  ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "already exists");
+}
+
+} // namespace consensus
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/consensus/consensus_meta_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_meta_manager.cc b/src/kudu/consensus/consensus_meta_manager.cc
new file mode 100644
index 0000000..a43f5ce
--- /dev/null
+++ b/src/kudu/consensus/consensus_meta_manager.cc
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/consensus/consensus_meta_manager.h"
+
+#include <mutex>
+
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/map-util.h"
+
+namespace kudu {
+namespace consensus {
+
+using std::lock_guard;
+using std::string;
+
+ConsensusMetadataManager::ConsensusMetadataManager(FsManager* fs_manager)
+    : fs_manager_(DCHECK_NOTNULL(fs_manager)) {
+}
+
+Status ConsensusMetadataManager::Create(const string& tablet_id,
+                                        const RaftConfigPB& config,
+                                        int64_t current_term,
+                                        scoped_refptr<ConsensusMetadata>* cmeta_out) {
+  scoped_refptr<ConsensusMetadata> cmeta;
+  RETURN_NOT_OK(ConsensusMetadata::Create(fs_manager_, tablet_id, fs_manager_->uuid(),
+                                          config, current_term, &cmeta));
+  lock_guard<Mutex> l(lock_);
+  InsertOrDie(&cmeta_cache_, tablet_id, cmeta);
+  if (cmeta_out) *cmeta_out = std::move(cmeta);
+  return Status::OK();
+}
+
+Status ConsensusMetadataManager::Load(const string& tablet_id,
+                                      scoped_refptr<ConsensusMetadata>* cmeta_out) {
+  DCHECK(cmeta_out);
+
+  {
+    lock_guard<Mutex> l(lock_);
+
+    // Try to get the cmeta instance from cache first.
+    scoped_refptr<ConsensusMetadata>* cached_cmeta = FindOrNull(cmeta_cache_, tablet_id);
+    if (cached_cmeta) {
+      *cmeta_out = *cached_cmeta;
+      return Status::OK();
+    }
+  }
+
+  // If it's not yet cached, drop the lock before we load it.
+  scoped_refptr<ConsensusMetadata> cmeta;
+  RETURN_NOT_OK(ConsensusMetadata::Load(fs_manager_, tablet_id, fs_manager_->uuid(), &cmeta));
+
+  // Cache and return the loaded ConsensusMetadata.
+  {
+    lock_guard<Mutex> l(lock_);
+    // Due to our thread-safety contract, no other caller may have interleaved
+    // with us for this tablet id, so we use InsertOrDie().
+    InsertOrDie(&cmeta_cache_, tablet_id, cmeta);
+  }
+
+  *cmeta_out = std::move(cmeta);
+  return Status::OK();
+}
+
+Status ConsensusMetadataManager::Delete(const string& tablet_id) {
+  {
+    lock_guard<Mutex> l(lock_);
+    cmeta_cache_.erase(tablet_id); // OK to delete an uncached cmeta; ignore the return value.
+  }
+  return ConsensusMetadata::DeleteOnDiskData(fs_manager_, tablet_id);
+}
+
+} // namespace consensus
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/consensus/consensus_meta_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_meta_manager.h b/src/kudu/consensus/consensus_meta_manager.h
new file mode 100644
index 0000000..31e997b
--- /dev/null
+++ b/src/kudu/consensus/consensus_meta_manager.h
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+#include <unordered_map>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+class FsManager;
+class Status;
+
+namespace consensus {
+class ConsensusMetadata;
+class RaftConfigPB;
+
+// API and implementation for a consensus metadata "manager" that controls
+// access to consensus metadata across a server instance. This abstracts
+// the handling of consensus metadata persistence.
+//
+// A single manager instance can be plumbed throughout the various classes that
+// deal with reading, creating, and modifying consensus metadata so that we
+// don't have to pass individual consensus metadata instances around. It also
+// provides flexibility to change the underlying implementation of
+// ConsensusMetadata in the future.
+//
+// This class is ONLY thread-safe across different tablets. Concurrent access
+// to Create(), Load(), or Delete() for the same tablet id is thread-hostile
+// and must be externally synchronized. Failure to do so may result in a crash.
+class ConsensusMetadataManager : public RefCountedThreadSafe<ConsensusMetadataManager> {
+ public:
+  explicit ConsensusMetadataManager(FsManager* fs_manager);
+
+  // Create a ConsensusMetadata instance keyed by 'tablet_id'.
+  // Returns an error if a ConsensusMetadata instance with that key already exists.
+  Status Create(const std::string& tablet_id,
+                const RaftConfigPB& config,
+                int64_t current_term,
+                scoped_refptr<ConsensusMetadata>* cmeta_out = nullptr);
+
+  // Load the ConsensusMetadata instance keyed by 'tablet_id'.
+  // Returns an error if it cannot be found.
+  Status Load(const std::string& tablet_id,
+              scoped_refptr<ConsensusMetadata>* cmeta_out);
+
+  // Permanently delete the ConsensusMetadata instance keyed by 'tablet_id'.
+  // Returns Status::NotFound if the instance cannot be found.
+  // Returns another error if the cmeta instance exists but cannot be deleted
+  // for some reason, perhaps due to a permissions or I/O-related issue.
+  Status Delete(const std::string& tablet_id);
+
+ private:
+  friend class RefCountedThreadSafe<ConsensusMetadataManager>;
+
+  FsManager* const fs_manager_;
+
+  // Lock protecting the map below.
+  Mutex lock_;
+
+  // Cache for ConsensusMetadata objects (tablet_id => cmeta).
+  std::unordered_map<std::string, scoped_refptr<ConsensusMetadata>> cmeta_cache_;
+
+  DISALLOW_COPY_AND_ASSIGN(ConsensusMetadataManager);
+};
+
+} // namespace consensus
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index c636ccf..f5208d2 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -351,8 +351,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
     RETURN_NOT_OK(BecomeReplicaUnlocked());
   }
 
-  bool single_voter = IsSingleVoterConfig();
-  if (single_voter && FLAGS_enable_leader_failure_detection) {
+  if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) {
     LOG_WITH_PREFIX(INFO) << "Only one voter in the Raft config. Triggering election immediately";
     RETURN_NOT_OK(StartElection(NORMAL_ELECTION, INITIAL_SINGLE_NODE_ELECTION));
   }
@@ -840,12 +839,8 @@ Status RaftConsensus::StartReplicaTransactionUnlocked(const ReplicateRefPtr& msg
 bool RaftConsensus::IsSingleVoterConfig() const {
   ThreadRestrictions::AssertWaitAllowed();
   LockGuard l(lock_);
-  const string& uuid = peer_uuid_;
-  if (cmeta_->CountVotersInConfig(COMMITTED_CONFIG) == 1 &&
-      cmeta_->IsVoterInConfig(uuid, COMMITTED_CONFIG)) {
-    return true;
-  }
-  return false;
+  return cmeta_->CountVotersInConfig(COMMITTED_CONFIG) == 1 &&
+         cmeta_->IsVoterInConfig(peer_uuid_, COMMITTED_CONFIG);
 }
 
 std::string RaftConsensus::LeaderRequest::OpsRangeString() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index f87de28..62ecdfa 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -20,11 +20,13 @@
 
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/consensus/consensus-test-util.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
-#include "kudu/consensus/consensus-test-util.h"
+#include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_index.h"
+#include "kudu/consensus/log_reader.h"
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/opid_util.h"
@@ -34,7 +36,6 @@
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/consensus/log_reader.h"
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/server/logical_clock.h"
 #include "kudu/util/auto_release_pool.h"
@@ -104,14 +105,14 @@ class RaftConsensusQuorumTest : public KuduTest {
   // Builds an initial configuration of 'num' elements.
   // All of the peers start as followers.
   void BuildInitialRaftConfigPB(int num) {
-    config_ = BuildRaftConfigPBForTests(num);
+    config_ = BuildRaftConfigPB(num);
     config_.set_opid_index(kInvalidOpIdIndex);
     peers_.reset(new TestPeerMapManager(config_));
   }
 
-  Status BuildFsManagersAndLogs() {
+  Status BuildFsManagersAndLogs(int num) {
     // Build the fsmanagers and logs
-    for (int i = 0; i < config_.peers_size(); i++) {
+    for (int i = 0; i < num; i++) {
       shared_ptr<MemTracker> parent_mem_tracker =
           MemTracker::CreateTracker(-1, Substitute("peer-$0", i));
       parent_mem_trackers_.push_back(parent_mem_tracker);
@@ -124,6 +125,10 @@ class RaftConsensusQuorumTest : public KuduTest {
       RETURN_NOT_OK(fs_manager->CreateInitialFileSystemLayout());
       RETURN_NOT_OK(fs_manager->Open());
 
+      scoped_refptr<ConsensusMetadataManager> cmeta_manager(
+          new ConsensusMetadataManager(fs_manager.get()));
+      cmeta_managers_.push_back(cmeta_manager);
+
       scoped_refptr<Log> log;
       RETURN_NOT_OK(Log::Open(LogOptions(),
                               fs_manager.get(),
@@ -138,20 +143,32 @@ class RaftConsensusQuorumTest : public KuduTest {
     return Status::OK();
   }
 
+  // Builds a configuration of 'num' voters.
+  RaftConfigPB BuildRaftConfigPB(int num) {
+    RaftConfigPB raft_config;
+    for (int i = 0; i < num; i++) {
+      RaftPeerPB* peer_pb = raft_config.add_peers();
+      peer_pb->set_member_type(RaftPeerPB::VOTER);
+      peer_pb->set_permanent_uuid(fs_managers_[i]->uuid());
+      HostPortPB* hp = peer_pb->mutable_last_known_addr();
+      hp->set_host(Substitute("peer-$0.fake-domain-for-tests", i));
+      hp->set_port(0);
+    }
+    return raft_config;
+  }
+
   void BuildPeers() {
     for (int i = 0; i < config_.peers_size(); i++) {
       auto proxy_factory = new LocalTestPeerProxyFactory(peers_.get());
 
       auto txn_factory = new TestTransactionFactory(logs_[i].get());
 
-      string peer_uuid = Substitute("peer-$0", i);
-
       scoped_refptr<ConsensusMetadata> cmeta;
-      CHECK_OK(ConsensusMetadata::Create(fs_managers_[i], kTestTablet, peer_uuid, config_,
-                                         kMinimumTerm, &cmeta));
+      CHECK_OK(cmeta_managers_[i]->Create(kTestTablet, config_,
+                                          kMinimumTerm, &cmeta));
 
       RaftPeerPB local_peer_pb;
-      CHECK_OK(GetRaftConfigMember(config_, peer_uuid, &local_peer_pb));
+      CHECK_OK(GetRaftConfigMember(config_, fs_managers_[i]->uuid(), &local_peer_pb));
 
       scoped_refptr<TimeManager> time_manager(new TimeManager(clock_, Timestamp::kMin));
       gscoped_ptr<PeerMessageQueue> queue(
@@ -205,8 +222,8 @@ class RaftConsensusQuorumTest : public KuduTest {
   }
 
   Status BuildConfig(int num) {
+    RETURN_NOT_OK(BuildFsManagersAndLogs(num));
     BuildInitialRaftConfigPB(num);
-    RETURN_NOT_OK(BuildFsManagersAndLogs());
     BuildPeers();
     return Status::OK();
   }
@@ -370,7 +387,7 @@ class RaftConsensusQuorumTest : public KuduTest {
                                    ReplicateWaitMode wait_mode,
                                    CommitMode commit_mode,
                                    OpId* last_op_id,
-                                   vector<scoped_refptr<ConsensusRound> >* rounds,
+                                   vector<scoped_refptr<ConsensusRound>>* rounds,
                                    shared_ptr<Synchronizer>* commit_sync = nullptr) {
     for (int i = 0; i < seq_size; i++) {
       scoped_refptr<ConsensusRound> round;
@@ -522,9 +539,8 @@ class RaftConsensusQuorumTest : public KuduTest {
 
   // Read the ConsensusMetadata for the given peer from disk.
   scoped_refptr<ConsensusMetadata> ReadConsensusMetadataFromDisk(int peer_index) {
-    string peer_uuid = Substitute("peer-$0", peer_index);
     scoped_refptr<ConsensusMetadata> cmeta;
-    CHECK_OK(ConsensusMetadata::Load(fs_managers_[peer_index], kTestTablet, peer_uuid, &cmeta));
+    CHECK_OK(cmeta_managers_[peer_index]->Load(kTestTablet, &cmeta));
     return cmeta;
   }
 
@@ -556,10 +572,11 @@ class RaftConsensusQuorumTest : public KuduTest {
   ConsensusOptions options_;
   RaftConfigPB config_;
   OpId initial_id_;
-  vector<shared_ptr<MemTracker> > parent_mem_trackers_;
+  vector<shared_ptr<MemTracker>> parent_mem_trackers_;
   vector<FsManager*> fs_managers_;
   vector<scoped_refptr<Log> > logs_;
   gscoped_ptr<ThreadPool> raft_pool_;
+  vector<scoped_refptr<ConsensusMetadataManager>> cmeta_managers_;
   gscoped_ptr<TestPeerMapManager> peers_;
   vector<TestTransactionFactory*> txn_factories_;
   scoped_refptr<server::Clock> clock_;
@@ -580,7 +597,7 @@ TEST_F(RaftConsensusQuorumTest, TestFollowersReplicateAndCommitMessage) {
   ASSERT_OK(BuildAndStartConfig(3));
 
   OpId last_op_id;
-  vector<scoped_refptr<ConsensusRound> > rounds;
+  vector<scoped_refptr<ConsensusRound>> rounds;
   shared_ptr<Synchronizer> commit_sync;
   REPLICATE_SEQUENCE_OF_MESSAGES(1,
                                  kLeaderIdx,
@@ -622,7 +639,7 @@ TEST_F(RaftConsensusQuorumTest, TestFollowersReplicateAndCommitSequence) {
   ASSERT_OK(BuildAndStartConfig(3));
 
   OpId last_op_id;
-  vector<scoped_refptr<ConsensusRound> > rounds;
+  vector<scoped_refptr<ConsensusRound>> rounds;
   shared_ptr<Synchronizer> commit_sync;
 
   REPLICATE_SEQUENCE_OF_MESSAGES(seq_size,
@@ -656,7 +673,7 @@ TEST_F(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind) {
   ASSERT_OK(BuildAndStartConfig(3));
 
   OpId last_replicate;
-  vector<scoped_refptr<ConsensusRound> > rounds;
+  vector<scoped_refptr<ConsensusRound>> rounds;
   {
     // lock one of the replicas down by obtaining the state lock
     // and never letting it go.
@@ -766,7 +783,7 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasHandleCommunicationErrors) {
 
   // Append a sequence of messages, and keep injecting errors into the
   // replica proxies.
-  vector<scoped_refptr<ConsensusRound> > rounds;
+  vector<scoped_refptr<ConsensusRound>> rounds;
   shared_ptr<Synchronizer> commit_sync;
   for (int i = 0; i < 100; i++) {
     scoped_refptr<ConsensusRound> round;
@@ -861,7 +878,7 @@ TEST_F(RaftConsensusQuorumTest, TestLeaderElectionWithQuiescedQuorum) {
 
   OpId last_op_id;
   shared_ptr<Synchronizer> last_commit_sync;
-  vector<scoped_refptr<ConsensusRound> > rounds;
+  vector<scoped_refptr<ConsensusRound>> rounds;
 
   // Loop twice, successively shutting down the previous leader.
   for (int current_config_size = kInitialNumPeers;
@@ -931,7 +948,7 @@ TEST_F(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty) {
 
   OpId last_op_id;
   shared_ptr<Synchronizer> last_commit_sync;
-  vector<scoped_refptr<ConsensusRound> > rounds;
+  vector<scoped_refptr<ConsensusRound>> rounds;
   REPLICATE_SEQUENCE_OF_MESSAGES(10,
                                  2, // The index of the initial leader.
                                  WAIT_FOR_ALL_REPLICAS,
@@ -999,7 +1016,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
 
   OpId last_op_id;
   shared_ptr<Synchronizer> last_commit_sync;
-  vector<scoped_refptr<ConsensusRound> > rounds;
+  vector<scoped_refptr<ConsensusRound>> rounds;
   REPLICATE_SEQUENCE_OF_MESSAGES(10,
                                  2, // The index of the initial leader.
                                  WAIT_FOR_ALL_REPLICAS,
@@ -1031,7 +1048,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   // a valid leader.
   int flush_count_before = flush_count();
   VoteResponsePB response;
-  request.set_candidate_uuid("peer-0");
+  request.set_candidate_uuid(fs_managers_[0]->uuid());
   request.set_candidate_term(last_op_id.term() + 1);
   ASSERT_OK(peer->RequestVote(&request, &response));
   ASSERT_FALSE(response.vote_granted());
@@ -1048,7 +1065,8 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   ASSERT_OK(peer->RequestVote(&request, &response));
   ASSERT_TRUE(response.vote_granted());
   ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
-  ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1, "peer-0"));
+  ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1,
+                                                   fs_managers_[0]->uuid()));
   ASSERT_EQ(1, flush_count() - flush_count_before)
       << "A granted vote should flush only once";
 
@@ -1063,13 +1081,14 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   // Ensure we get a "no" for a different candidate UUID for that term.
   flush_count_before = flush_count();
   response.Clear();
-  request.set_candidate_uuid("peer-2");
+  request.set_candidate_uuid(fs_managers_[2]->uuid());
   ASSERT_OK(peer->RequestVote(&request, &response));
   ASSERT_FALSE(response.vote_granted());
   ASSERT_TRUE(response.has_consensus_error());
   ASSERT_EQ(ConsensusErrorPB::ALREADY_VOTED, response.consensus_error().code());
   ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
-  ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1, "peer-0"));
+  ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1,
+                                                   fs_managers_[0]->uuid()));
   ASSERT_EQ(0, flush_count() - flush_count_before)
       << "Rejected votes for same term should not flush";
 
@@ -1080,18 +1099,19 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   // Increase the term of our candidate, which will cause the voter replica to
   // increase its own term to match.
   flush_count_before = flush_count();
-  request.set_candidate_uuid("peer-0");
+  request.set_candidate_uuid(fs_managers_[0]->uuid());
   request.set_candidate_term(last_op_id.term() + 2);
   response.Clear();
   ASSERT_OK(peer->RequestVote(&request, &response));
   ASSERT_TRUE(response.vote_granted());
   ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
-  ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2, "peer-0"));
+  ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2,
+                                                   fs_managers_[0]->uuid()));
   ASSERT_EQ(1, flush_count() - flush_count_before)
       << "Accepted votes with increased term should flush once";
 
   // Now try the old term.
-  // Note: Use the peer who "won" the election on the previous term (peer-0),
+  // Note: Use the peer who "won" the election on the previous term (peer 0),
   // although in practice the impl does not store historical vote data.
   flush_count_before = flush_count();
   request.set_candidate_term(last_op_id.term() + 1);
@@ -1101,7 +1121,8 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   ASSERT_TRUE(response.has_consensus_error());
   ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, response.consensus_error().code());
   ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
-  ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2, "peer-0"));
+  ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2,
+                                                   fs_managers_[0]->uuid()));
   ASSERT_EQ(0, flush_count() - flush_count_before)
       << "Rejected votes for old terms should not flush";
 
@@ -1115,7 +1136,8 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   ASSERT_TRUE(response.vote_granted());
   ASSERT_FALSE(response.has_consensus_error());
   ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
-  ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2, "peer-0"));
+  ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2,
+                                                   fs_managers_[0]->uuid()));
   ASSERT_EQ(0, flush_count() - flush_count_before)
       << "Pre-elections should not flush";
   request.set_is_pre_election(false);
@@ -1125,7 +1147,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   //
 
   flush_count_before = flush_count();
-  request.set_candidate_uuid("peer-0");
+  request.set_candidate_uuid(fs_managers_[0]->uuid());
   request.set_candidate_term(last_op_id.term() + 3);
   request.mutable_candidate_status()->mutable_last_received()->CopyFrom(MinimumOpId());
   response.Clear();
@@ -1142,7 +1164,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   // Send a "heartbeat" to the peer. It should be rejected.
   ConsensusRequestPB req;
   req.set_caller_term(last_op_id.term());
-  req.set_caller_uuid("peer-0");
+  req.set_caller_uuid(fs_managers_[0]->uuid());
   req.set_committed_index(last_op_id.index());
   req.set_all_replicated_index(0);
   ConsensusResponsePB res;

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/integration-tests/tablet_copy-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_copy-itest.cc b/src/kudu/integration-tests/tablet_copy-itest.cc
index 7b64dd6..ad10e5c 100644
--- a/src/kudu/integration-tests/tablet_copy-itest.cc
+++ b/src/kudu/integration-tests/tablet_copy-itest.cc
@@ -28,6 +28,7 @@
 #include "kudu/client/client-test-util.h"
 #include "kudu/client/client.h"
 #include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -55,6 +56,7 @@ DEFINE_int32(test_delete_leader_num_writer_threads, 1,
 using kudu::client::KuduSchema;
 using kudu::client::KuduSchemaFromSchema;
 using kudu::client::KuduTableCreator;
+using kudu::consensus::ConsensusMetadataManager;
 using kudu::itest::TServerDetails;
 using kudu::itest::WaitForNumTabletServers;
 using kudu::tablet::TABLET_DATA_DELETED;
@@ -336,11 +338,12 @@ TEST_F(TabletCopyITest, TestDeleteTabletDuringTabletCopy) {
   gscoped_ptr<FsManager> fs_manager(new FsManager(env_, opts));
   ASSERT_OK(fs_manager->CreateInitialFileSystemLayout());
   ASSERT_OK(fs_manager->Open());
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager(
+      new ConsensusMetadataManager(fs_manager.get()));
 
   {
     // Start up a TabletCopyClient and open a tablet copy session.
-    TabletCopyClient tc_client(tablet_id, fs_manager.get(),
-                                    cluster_->messenger());
+    TabletCopyClient tc_client(tablet_id, fs_manager.get(), cmeta_manager, cluster_->messenger());
     scoped_refptr<tablet::TabletMetadata> meta;
     ASSERT_OK(tc_client.Start(cluster_->tablet_server(kTsIndex)->bound_rpc_hostport(),
                               &meta));

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/integration-tests/ts_recovery-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc
index ffe4bf6..c00967d 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -22,10 +22,11 @@
 #include <glog/stl_logging.h>
 
 #include "kudu/client/client.h"
-#include "kudu/consensus/log-test-base.h"
+#include "kudu/consensus/consensus-test-util.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus_meta.h"
-#include "kudu/consensus/consensus-test-util.h"
+#include "kudu/consensus/consensus_meta_manager.h"
+#include "kudu/consensus/log-test-base.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
@@ -52,6 +53,7 @@ using client::KuduTable;
 using client::KuduUpdate;
 using client::sp::shared_ptr;
 using consensus::ConsensusMetadata;
+using consensus::ConsensusMetadataManager;
 using consensus::OpId;
 using consensus::RECEIVED_OPID;
 using log::AppendNoOpsToLogSync;
@@ -352,6 +354,8 @@ TEST_P(TsRecoveryITestDeathTest, TestRecoverFromOpIdOverflow) {
     opts.data_paths = ets->data_dirs();
     gscoped_ptr<FsManager> fs_manager(new FsManager(env_, opts));
     ASSERT_OK(fs_manager->Open());
+    scoped_refptr<ConsensusMetadataManager> cmeta_manager(
+        new ConsensusMetadataManager(fs_manager.get()));
     scoped_refptr<Clock> clock(new HybridClock());
     ASSERT_OK(clock->Init());
 
@@ -404,7 +408,7 @@ TEST_P(TsRecoveryITestDeathTest, TestRecoverFromOpIdOverflow) {
     // We also need to update the ConsensusMetadata to match with the term we
     // want to end up with.
     scoped_refptr<ConsensusMetadata> cmeta;
-    ConsensusMetadata::Load(fs_manager.get(), tablet_id, fs_manager->uuid(), &cmeta);
+    ASSERT_OK(cmeta_manager->Load(tablet_id, &cmeta));
     cmeta->set_current_term(kDesiredIndexValue);
     ASSERT_OK(cmeta->Flush());
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index b5cba69..d9c37e2 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -37,6 +37,7 @@
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/consensus_peers.h"
 #include "kudu/consensus/opid_util.h"
 #include "kudu/consensus/quorum_util.h"
@@ -66,6 +67,7 @@ TAG_FLAG(sys_catalog_fail_during_write, hidden);
 
 using kudu::consensus::COMMITTED_CONFIG;
 using kudu::consensus::ConsensusMetadata;
+using kudu::consensus::ConsensusMetadataManager;
 using kudu::consensus::ConsensusStatePB;
 using kudu::consensus::RaftConfigPB;
 using kudu::consensus::RaftPeerPB;
@@ -119,6 +121,7 @@ SysCatalogTable::SysCatalogTable(Master* master,
                                  ElectedLeaderCallback leader_cb)
     : metric_registry_(master->metric_registry()),
       master_(master),
+      cmeta_manager_(new ConsensusMetadataManager(master_->fs_manager())),
       leader_cb_(std::move(leader_cb)) {
 }
 
@@ -146,8 +149,7 @@ Status SysCatalogTable::Load(FsManager *fs_manager) {
     LOG(INFO) << "Verifying existing consensus state";
     string tablet_id = metadata->tablet_id();
     scoped_refptr<ConsensusMetadata> cmeta;
-    RETURN_NOT_OK_PREPEND(ConsensusMetadata::Load(fs_manager, tablet_id,
-                                                  fs_manager->uuid(), &cmeta),
+    RETURN_NOT_OK_PREPEND(cmeta_manager_->Load(tablet_id, &cmeta),
                           "Unable to load consensus metadata for tablet " + tablet_id);
     ConsensusStatePB cstate = cmeta->ToConsensusStatePB();
     RETURN_NOT_OK(consensus::VerifyRaftConfig(cstate.committed_config(), COMMITTED_CONFIG));
@@ -218,8 +220,7 @@ Status SysCatalogTable::CreateNew(FsManager *fs_manager) {
 
   string tablet_id = metadata->tablet_id();
   scoped_refptr<ConsensusMetadata> cmeta;
-  RETURN_NOT_OK_PREPEND(ConsensusMetadata::Create(fs_manager, tablet_id, fs_manager->uuid(),
-                                                  config, consensus::kMinimumTerm, &cmeta),
+  RETURN_NOT_OK_PREPEND(cmeta_manager_->Create(tablet_id, config, consensus::kMinimumTerm, &cmeta),
                         "Unable to persist consensus metadata for tablet " + tablet_id);
 
   return SetupTablet(metadata);
@@ -306,6 +307,7 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
   // partially created tablet here?
   tablet_replica_.reset(new TabletReplica(
       metadata,
+      cmeta_manager_,
       local_peer_pb_,
       master_->tablet_apply_pool(),
       Bind(&SysCatalogTable::SysCatalogStateChanged, Unretained(this), metadata->tablet_id())));
@@ -313,6 +315,7 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
   consensus::ConsensusBootstrapInfo consensus_info;
   tablet_replica_->SetBootstrapping();
   RETURN_NOT_OK(BootstrapTablet(metadata,
+                                cmeta_manager_,
                                 scoped_refptr<server::Clock>(master_->clock()),
                                 master_->mem_tracker(),
                                 scoped_refptr<rpc::ResultTracker>(),

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/master/sys_catalog.h
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index 36456f3..25e320d 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -251,6 +251,8 @@ class SysCatalogTable {
 
   Master* master_;
 
+  const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;
+
   ElectedLeaderCallback leader_cb_;
 
   consensus::RaftPeerPB local_peer_pb_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/tablet/tablet_bootstrap-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc b/src/kudu/tablet/tablet_bootstrap-test.cc
index 3bfbb19..f0e30eb 100644
--- a/src/kudu/tablet/tablet_bootstrap-test.cc
+++ b/src/kudu/tablet/tablet_bootstrap-test.cc
@@ -21,8 +21,9 @@
 #include <vector>
 
 #include "kudu/common/iterator.h"
-#include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus-test-util.h"
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/metadata.pb.h"
@@ -44,6 +45,7 @@ namespace tablet {
 
 using consensus::ConsensusBootstrapInfo;
 using consensus::ConsensusMetadata;
+using consensus::ConsensusMetadataManager;
 using consensus::kMinimumTerm;
 using consensus::MakeOpId;
 using consensus::OpId;
@@ -63,6 +65,7 @@ class BootstrapTest : public LogTestBase {
 
   void SetUp() OVERRIDE {
     LogTestBase::SetUp();
+    cmeta_manager_.reset(new ConsensusMetadataManager(fs_manager_.get()));
   }
 
   Status LoadTestTabletMetadata(int mrs_id, int delta_id, scoped_refptr<TabletMetadata>* meta) {
@@ -93,10 +96,9 @@ class BootstrapTest : public LogTestBase {
     peer->set_member_type(consensus::RaftPeerPB::VOTER);
 
     scoped_refptr<ConsensusMetadata> cmeta;
-    RETURN_NOT_OK_PREPEND(ConsensusMetadata::Create(meta->fs_manager(), meta->tablet_id(),
-                                                    meta->fs_manager()->uuid(),
-                                                    config, kMinimumTerm, &cmeta),
+    RETURN_NOT_OK_PREPEND(cmeta_manager_->Create(meta->tablet_id(), config, kMinimumTerm, &cmeta),
                           "Unable to create consensus metadata");
+
     return Status::OK();
   }
 
@@ -115,6 +117,7 @@ class BootstrapTest : public LogTestBase {
     // Now attempt to recover the log
     RETURN_NOT_OK(BootstrapTablet(
         meta,
+        cmeta_manager_,
         scoped_refptr<Clock>(LogicalClock::CreateStartingAt(Timestamp::kInitialTimestamp)),
         shared_ptr<MemTracker>(),
         scoped_refptr<rpc::ResultTracker>(),
@@ -158,6 +161,8 @@ class BootstrapTest : public LogTestBase {
       VLOG(1) << result;
     }
   }
+
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager_;
 };
 
 // Tests a normal bootstrap scenario

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index dc23373..2bd6364 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -28,6 +28,7 @@
 #include "kudu/common/row_operations.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/log_reader.h"
@@ -82,7 +83,7 @@ using consensus::ChangeConfigRecordPB;
 using consensus::CommitMsg;
 using consensus::ConsensusBootstrapInfo;
 using consensus::ConsensusMetadata;
-using consensus::ConsensusRound;
+using consensus::ConsensusMetadataManager;
 using consensus::MinimumOpId;
 using consensus::NO_OP;
 using consensus::OperationType;
@@ -129,7 +130,7 @@ struct ReplayState;
 class FlushedStoresSnapshot {
  public:
   FlushedStoresSnapshot() {}
-  Status InitFrom(const TabletMetadata& meta);
+  Status InitFrom(const TabletMetadata& tablet_meta);
 
   // Return true if the given memory store is still active (i.e. edits that were
   // originally written to this memory store should be replayed during the bootstrap
@@ -164,7 +165,8 @@ class FlushedStoresSnapshot {
 // we need to set it before replay or we won't be able to re-rebuild.
 class TabletBootstrap {
  public:
-  TabletBootstrap(const scoped_refptr<TabletMetadata>& meta,
+  TabletBootstrap(const scoped_refptr<TabletMetadata>& tablet_meta,
+                  const scoped_refptr<ConsensusMetadataManager>& cmeta_manager,
                   const scoped_refptr<Clock>& clock,
                   shared_ptr<MemTracker> mem_tracker,
                   const scoped_refptr<ResultTracker>& result_tracker,
@@ -335,7 +337,8 @@ class TabletBootstrap {
   // Log a status message and set the TabletReplica's status as well.
   void SetStatusMessage(const string& status);
 
-  scoped_refptr<TabletMetadata> meta_;
+  scoped_refptr<TabletMetadata> tablet_meta_;
+  const scoped_refptr<ConsensusMetadataManager> cmeta_manager_;
   scoped_refptr<Clock> clock_;
   shared_ptr<MemTracker> mem_tracker_;
   scoped_refptr<rpc::ResultTracker> result_tracker_;
@@ -400,12 +403,14 @@ class TabletBootstrap {
 };
 
 void TabletBootstrap::SetStatusMessage(const string& status) {
-  LOG(INFO) << "T " << meta_->tablet_id() << " P " << meta_->fs_manager()->uuid() << ": "
+  LOG(INFO) << "T " << tablet_meta_->tablet_id()
+            << " P " << tablet_meta_->fs_manager()->uuid() << ": "
             << status;
   if (tablet_replica_) tablet_replica_->SetStatusMessage(status);
 }
 
-Status BootstrapTablet(const scoped_refptr<TabletMetadata>& meta,
+Status BootstrapTablet(const scoped_refptr<TabletMetadata>& tablet_meta,
+                       const scoped_refptr<ConsensusMetadataManager>& cmeta_manager,
                        const scoped_refptr<Clock>& clock,
                        const shared_ptr<MemTracker>& mem_tracker,
                        const scoped_refptr<ResultTracker>& result_tracker,
@@ -416,8 +421,8 @@ Status BootstrapTablet(const scoped_refptr<TabletMetadata>& meta,
                        const scoped_refptr<log::LogAnchorRegistry>& log_anchor_registry,
                        ConsensusBootstrapInfo* consensus_info) {
   TRACE_EVENT1("tablet", "BootstrapTablet",
-               "tablet_id", meta->tablet_id());
-  TabletBootstrap bootstrap(meta, clock, mem_tracker, result_tracker,
+               "tablet_id", tablet_meta->tablet_id());
+  TabletBootstrap bootstrap(tablet_meta, cmeta_manager, clock, mem_tracker, result_tracker,
                             metric_registry, tablet_replica, log_anchor_registry);
   RETURN_NOT_OK(bootstrap.Bootstrap(rebuilt_tablet, rebuilt_log, consensus_info));
   // This is necessary since OpenNewLog() initially disables sync.
@@ -444,13 +449,15 @@ static string DebugInfo(const string& tablet_id,
 }
 
 TabletBootstrap::TabletBootstrap(
-    const scoped_refptr<TabletMetadata>& meta,
+    const scoped_refptr<TabletMetadata>& tablet_meta,
+    const scoped_refptr<ConsensusMetadataManager>& cmeta_manager,
     const scoped_refptr<Clock>& clock, shared_ptr<MemTracker> mem_tracker,
     const scoped_refptr<ResultTracker>& result_tracker,
     MetricRegistry* metric_registry,
     const scoped_refptr<TabletReplica>& tablet_replica,
     const scoped_refptr<LogAnchorRegistry>& log_anchor_registry)
-    : meta_(meta),
+    : tablet_meta_(tablet_meta),
+      cmeta_manager_(cmeta_manager),
       clock_(clock),
       mem_tracker_(std::move(mem_tracker)),
       result_tracker_(result_tracker),
@@ -463,7 +470,7 @@ Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
                                   ConsensusBootstrapInfo* consensus_info) {
   // We pin (prevent) metadata flush at the beginning of the bootstrap process
   // and always unpin it at the end.
-  meta_->PinFlush();
+  tablet_meta_->PinFlush();
 
   // Now run the actual bootstrap process.
   Status bootstrap_status = RunBootstrap(rebuilt_tablet, rebuilt_log, consensus_info);
@@ -476,14 +483,14 @@ Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
   CHECK((*rebuilt_tablet && *rebuilt_log) || !bootstrap_status.ok())
       << "Tablet and Log not initialized";
   if (bootstrap_status.ok()) {
-    meta_->SetPreFlushCallback(
+    tablet_meta_->SetPreFlushCallback(
         Bind(&FlushInflightsToLogCallback::WaitForInflightsAndFlushLog,
              make_scoped_refptr(new FlushInflightsToLogCallback(
                  rebuilt_tablet->get(), *rebuilt_log))));
   }
 
   // This will cause any pending TabletMetadata flush to be executed.
-  Status unpin_status = meta_->UnPinFlush();
+  Status unpin_status = tablet_meta_->UnPinFlush();
 
   constexpr char kFailedUnpinMsg[] = "Failed to flush after unpinning";
   if (PREDICT_FALSE(!bootstrap_status.ok() && !unpin_status.ok())) {
@@ -498,19 +505,18 @@ Status TabletBootstrap::Bootstrap(shared_ptr<Tablet>* rebuilt_tablet,
 Status TabletBootstrap::RunBootstrap(shared_ptr<Tablet>* rebuilt_tablet,
                                      scoped_refptr<Log>* rebuilt_log,
                                      ConsensusBootstrapInfo* consensus_info) {
-  string tablet_id = meta_->tablet_id();
+  string tablet_id = tablet_meta_->tablet_id();
 
   // Replay requires a valid Consensus metadata file to exist in order to
   // compare the committed consensus configuration seqno with the log entries and also to persist
   // committed but unpersisted changes.
-  RETURN_NOT_OK_PREPEND(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id,
-                                                meta_->fs_manager()->uuid(), &cmeta_),
+  RETURN_NOT_OK_PREPEND(cmeta_manager_->Load(tablet_id, &cmeta_),
                         "Unable to load Consensus metadata");
 
   // Make sure we don't try to locally bootstrap a tablet that was in the middle
   // of a tablet copy. It's likely that not all files were copied over
   // successfully.
-  TabletDataState tablet_data_state = meta_->tablet_data_state();
+  TabletDataState tablet_data_state = tablet_meta_->tablet_data_state();
   if (tablet_data_state != TABLET_DATA_READY) {
     return Status::Corruption("Unable to locally bootstrap tablet " + tablet_id + ": " +
                               "TabletMetadata bootstrap state is " +
@@ -521,11 +527,11 @@ Status TabletBootstrap::RunBootstrap(shared_ptr<Tablet>* rebuilt_tablet,
 
   if (VLOG_IS_ON(1)) {
     TabletSuperBlockPB super_block;
-    RETURN_NOT_OK(meta_->ToSuperBlock(&super_block));
+    RETURN_NOT_OK(tablet_meta_->ToSuperBlock(&super_block));
     VLOG_WITH_PREFIX(1) << "Tablet Metadata: " << SecureDebugString(super_block);
   }
 
-  RETURN_NOT_OK(flushed_stores_.InitFrom(*meta_.get()));
+  RETURN_NOT_OK(flushed_stores_.InitFrom(*tablet_meta_.get()));
 
   bool has_blocks;
   RETURN_NOT_OK(OpenTablet(&has_blocks));
@@ -576,7 +582,7 @@ void TabletBootstrap::FinishBootstrap(const string& message,
 }
 
 Status TabletBootstrap::OpenTablet(bool* has_blocks) {
-  gscoped_ptr<Tablet> tablet(new Tablet(meta_,
+  gscoped_ptr<Tablet> tablet(new Tablet(tablet_meta_,
                                         clock_,
                                         mem_tracker_,
                                         metric_registry_,
@@ -660,7 +666,7 @@ Status TabletBootstrap::PrepareRecoveryDir(bool* needs_recovery) {
 
 Status TabletBootstrap::OpenLogReaderInRecoveryDir() {
   VLOG_WITH_PREFIX(1) << "Opening log reader in log recovery dir "
-                      << meta_->fs_manager()->GetTabletWalRecoveryDir(tablet_->tablet_id());
+                      << tablet_meta_->fs_manager()->GetTabletWalRecoveryDir(tablet_->tablet_id());
   // Open the reader.
   RETURN_NOT_OK_PREPEND(LogReader::OpenFromRecoveryDir(tablet_->metadata()->fs_manager(),
                                                        tablet_->metadata()->tablet_id(),
@@ -970,9 +976,11 @@ TabletBootstrap::ActiveStores TabletBootstrap::AnalyzeActiveStores(const CommitM
 Status TabletBootstrap::CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit) {
   if (AnalyzeActiveStores(commit) == SOME_STORES_ACTIVE) {
     TabletSuperBlockPB super;
-    WARN_NOT_OK(meta_->ToSuperBlock(&super), LogPrefix() + "Couldn't build TabletSuperBlockPB");
+    WARN_NOT_OK(tablet_meta_->ToSuperBlock(&super),
+                Substitute("$0$1", LogPrefix(), "Couldn't build TabletSuperBlockPB"));
     return Status::Corruption(Substitute("CommitMsg was orphaned but it referred to "
-        "stores which need replay. Commit: $0. TabletMetadata: $1", SecureShortDebugString(commit),
+        "stores which need replay. Commit: $0. TabletMetadata: $1",
+        SecureShortDebugString(commit),
         SecureShortDebugString(super)));
   }
 
@@ -1188,7 +1196,7 @@ Status TabletBootstrap::PlaySegments(ConsensusBootstrapInfo* consensus_info) {
           AnalyzeActiveStores(entry.second->commit()) == NO_STORES_ACTIVE) {
         DumpReplayStateToLog(state);
         TabletSuperBlockPB super;
-        WARN_NOT_OK(meta_->ToSuperBlock(&super), "Couldn't build TabletSuperBlockPB.");
+        WARN_NOT_OK(tablet_meta_->ToSuperBlock(&super), "Couldn't build TabletSuperBlockPB.");
         return Status::Corruption(Substitute("CommitMsg was pending but it did not refer "
             "to any active memory stores. Commit: $0. TabletMetadata: $1",
             SecureShortDebugString(entry.second->commit()), SecureShortDebugString(super)));
@@ -1584,13 +1592,13 @@ Status TabletBootstrap::UpdateClock(uint64_t timestamp) {
 }
 
 string TabletBootstrap::LogPrefix() const {
-  return Substitute("T $0 P $1: ", meta_->tablet_id(), meta_->fs_manager()->uuid());
+  return Substitute("T $0 P $1: ", tablet_meta_->tablet_id(), tablet_meta_->fs_manager()->uuid());
 }
 
-Status FlushedStoresSnapshot::InitFrom(const TabletMetadata& meta) {
+Status FlushedStoresSnapshot::InitFrom(const TabletMetadata& tablet_meta) {
   CHECK(flushed_dms_by_drs_id_.empty()) << "already initted";
-  last_durable_mrs_id_ = meta.last_durable_mrs_id();
-  for (const shared_ptr<RowSetMetadata>& rsmd : meta.rowsets()) {
+  last_durable_mrs_id_ = tablet_meta.last_durable_mrs_id();
+  for (const shared_ptr<RowSetMetadata>& rsmd : tablet_meta.rowsets()) {
     if (!InsertIfNotPresent(&flushed_dms_by_drs_id_, rsmd->id(),
                             rsmd->last_durable_redo_dms_id())) {
       return Status::Corruption(Substitute(

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/tablet/tablet_bootstrap.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.h b/src/kudu/tablet/tablet_bootstrap.h
index 905989f..d8091d8 100644
--- a/src/kudu/tablet/tablet_bootstrap.h
+++ b/src/kudu/tablet/tablet_bootstrap.h
@@ -42,6 +42,7 @@ class LogAnchorRegistry;
 
 namespace consensus {
 struct ConsensusBootstrapInfo;
+class ConsensusMetadataManager;
 } // namespace consensus
 
 namespace rpc {
@@ -64,7 +65,8 @@ extern const char* kLogRecoveryDir;
 //
 // This is a synchronous method, but is typically called within a thread pool by
 // TSTabletManager.
-Status BootstrapTablet(const scoped_refptr<TabletMetadata>& meta,
+Status BootstrapTablet(const scoped_refptr<TabletMetadata>& tablet_meta,
+                       const scoped_refptr<consensus::ConsensusMetadataManager>& cmeta_manager,
                        const scoped_refptr<server::Clock>& clock,
                        const std::shared_ptr<MemTracker>& mem_tracker,
                        const scoped_refptr<rpc::ResultTracker>& result_tracker,

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/tablet/tablet_replica-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index cacac8d..27bb052 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -20,9 +20,10 @@
 
 #include "kudu/common/partial_row.h"
 #include "kudu/common/timestamp.h"
-#include "kudu/common/wire_protocol.h"
 #include "kudu/common/wire_protocol-test-util.h"
+#include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_reader.h"
 #include "kudu/consensus/log_util.h"
@@ -54,25 +55,17 @@ namespace kudu {
 namespace tablet {
 
 using consensus::CommitMsg;
-using consensus::Consensus;
 using consensus::ConsensusBootstrapInfo;
 using consensus::ConsensusMetadata;
-using consensus::MakeOpId;
-using consensus::MinimumOpId;
+using consensus::ConsensusMetadataManager;
 using consensus::OpId;
-using consensus::OpIdEquals;
 using consensus::RaftPeerPB;
-using consensus::WRITE_OP;
 using log::Log;
-using log::LogAnchorRegistry;
 using log::LogOptions;
 using rpc::Messenger;
-using server::Clock;
-using server::LogicalClock;
 using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
-using strings::Substitute;
 using tserver::WriteRequestPB;
 using tserver::WriteResponsePB;
 
@@ -106,9 +99,13 @@ class TabletReplicaTest : public KuduTabletTest {
     config_peer.mutable_last_known_addr()->set_port(0);
     config_peer.set_member_type(RaftPeerPB::VOTER);
 
+    scoped_refptr<ConsensusMetadataManager> cmeta_manager(
+        new ConsensusMetadataManager(tablet()->metadata()->fs_manager()));
+
     // "Bootstrap" and start the TabletReplica.
     tablet_replica_.reset(
       new TabletReplica(make_scoped_refptr(tablet()->metadata()),
+                        cmeta_manager,
                         config_peer,
                         apply_pool_.get(),
                         Bind(&TabletReplicaTest::TabletReplicaStateChangedCallback,
@@ -116,8 +113,9 @@ class TabletReplicaTest : public KuduTabletTest {
                              tablet()->tablet_id())));
 
     // Make TabletReplica use the same LogAnchorRegistry as the Tablet created by the harness.
-    // TODO: Refactor TabletHarness to allow taking a LogAnchorRegistry, while also providing
-    // TabletMetadata for consumption by TabletReplica before Tablet is instantiated.
+    // TODO(mpercy): Refactor TabletHarness to allow taking a
+    // LogAnchorRegistry, while also providing TabletMetadata for consumption
+    // by TabletReplica before Tablet is instantiated.
     tablet_replica_->log_anchor_registry_ = tablet()->log_anchor_registry_;
 
     RaftConfigPB config;
@@ -125,11 +123,8 @@ class TabletReplicaTest : public KuduTabletTest {
     config.set_opid_index(consensus::kInvalidOpIdIndex);
 
     scoped_refptr<ConsensusMetadata> cmeta;
-    ASSERT_OK(ConsensusMetadata::Create(tablet()->metadata()->fs_manager(),
-                                        tablet()->tablet_id(),
-                                        tablet()->metadata()->fs_manager()->uuid(),
-                                        config,
-                                        consensus::kMinimumTerm, &cmeta));
+    ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(), config, consensus::kMinimumTerm,
+                                    &cmeta));
 
     scoped_refptr<Log> log;
     ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 4ecd2e9..b9e1516 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/log_util.h"
@@ -99,13 +100,15 @@ using log::LogAnchorRegistry;
 using rpc::Messenger;
 using rpc::ResultTracker;
 using strings::Substitute;
-using tserver::TabletServerErrorPB;
 
-TabletReplica::TabletReplica(const scoped_refptr<TabletMetadata>& meta,
-                             consensus::RaftPeerPB local_peer_pb,
-                             ThreadPool* apply_pool,
-                             Callback<void(const std::string& reason)> mark_dirty_clbk)
+TabletReplica::TabletReplica(
+    const scoped_refptr<TabletMetadata>& meta,
+    const scoped_refptr<consensus::ConsensusMetadataManager>& cmeta_manager,
+    consensus::RaftPeerPB local_peer_pb,
+    ThreadPool* apply_pool,
+    Callback<void(const std::string& reason)> mark_dirty_clbk)
     : meta_(meta),
+      cmeta_manager_(cmeta_manager),
       tablet_id_(meta->tablet_id()),
       local_peer_pb_(std::move(local_peer_pb)),
       state_(NOT_STARTED),
@@ -157,8 +160,7 @@ Status TabletReplica::Init(const shared_ptr<Tablet>& tablet,
     TRACE("Creating consensus instance");
 
     scoped_refptr<ConsensusMetadata> cmeta;
-    RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_,
-                                          meta_->fs_manager()->uuid(), &cmeta));
+    RETURN_NOT_OK(cmeta_manager_->Load(tablet_id_, &cmeta));
 
     scoped_refptr<TimeManager> time_manager(new TimeManager(
         clock, tablet_->mvcc_manager()->GetCleanTimestamp()));

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index f54b115..a969ba4 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -40,6 +40,10 @@ class MaintenanceManager;
 class MaintenanceOp;
 class ThreadPool;
 
+namespace consensus {
+class ConsensusMetadataManager;
+}
+
 namespace log {
 class LogAnchorRegistry;
 } // namespace log
@@ -69,6 +73,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
                       public consensus::ReplicaTransactionFactory {
  public:
   TabletReplica(const scoped_refptr<TabletMetadata>& meta,
+                const scoped_refptr<consensus::ConsensusMetadataManager>& cmeta_manager,
                 consensus::RaftPeerPB local_peer_pb,
                 ThreadPool* apply_pool,
                 Callback<void(const std::string& reason)> mark_dirty_clbk);
@@ -279,6 +284,7 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
                                   const consensus::ConsensusBootstrapInfo& bootstrap_info);
 
   const scoped_refptr<TabletMetadata> meta_;
+  const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;
 
   const std::string tablet_id_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/tools/tool_action_local_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index cb03282..82b6e5e 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -31,8 +31,9 @@
 #include "kudu/common/rowblock.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/wire_protocol.h"
-#include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/log_index.h"
 #include "kudu/consensus/log_reader.h"
 #include "kudu/consensus/log_util.h"
@@ -91,6 +92,7 @@ using cfile::CFileReader;
 using cfile::DumpIterator;
 using cfile::ReaderOptions;
 using consensus::ConsensusMetadata;
+using consensus::ConsensusMetadataManager;
 using consensus::OpId;
 using consensus::RaftConfigPB;
 using consensus::RaftPeerPB;
@@ -219,12 +221,14 @@ Status ParsePeerString(const string& peer_str,
 Status PrintReplicaUuids(const RunnerContext& context) {
   unique_ptr<FsManager> fs_manager;
   RETURN_NOT_OK(FsInit(&fs_manager));
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager(
+      new ConsensusMetadataManager(fs_manager.get()));
+
   const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
 
   // Load the cmeta file and print all peer uuids.
   scoped_refptr<ConsensusMetadata> cmeta;
-  RETURN_NOT_OK(ConsensusMetadata::Load(fs_manager.get(), tablet_id,
-                                        fs_manager->uuid(), &cmeta));
+  RETURN_NOT_OK(cmeta_manager->Load(tablet_id, &cmeta));
   cout << JoinMapped(cmeta->CommittedConfig().peers(),
                      [](const RaftPeerPB& p){ return p.permanent_uuid(); },
                      " ") << endl;
@@ -269,9 +273,9 @@ Status RewriteRaftConfig(const RunnerContext& context) {
   RETURN_NOT_OK(BackupConsensusMetadata(&fs_manager, tablet_id));
 
   // Load the cmeta file and rewrite the raft config.
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager(new ConsensusMetadataManager(&fs_manager));
   scoped_refptr<ConsensusMetadata> cmeta;
-  RETURN_NOT_OK(ConsensusMetadata::Load(&fs_manager, tablet_id,
-                                        fs_manager.uuid(), &cmeta));
+  RETURN_NOT_OK(cmeta_manager->Load(tablet_id, &cmeta));
   RaftConfigPB current_config = cmeta->CommittedConfig();
   RaftConfigPB new_config = current_config;
   new_config.clear_peers();
@@ -302,9 +306,9 @@ Status SetRaftTerm(const RunnerContext& context) {
   FsManager fs_manager(env, FsManagerOpts());
   RETURN_NOT_OK(fs_manager.Open());
   // Load the cmeta file and rewrite the raft config.
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager(new ConsensusMetadataManager(&fs_manager));
   scoped_refptr<ConsensusMetadata> cmeta;
-  RETURN_NOT_OK(ConsensusMetadata::Load(&fs_manager, tablet_id,
-                                        fs_manager.uuid(), &cmeta));
+  RETURN_NOT_OK(cmeta_manager->Load(tablet_id, &cmeta));
   if (new_term <= cmeta->current_term()) {
     return Status::InvalidArgument(Substitute(
         "specified term $0 must be higher than current term $1",
@@ -334,10 +338,11 @@ Status CopyFromRemote(const RunnerContext& context) {
   // Copy the tablet over.
   FsManager fs_manager(Env::Default(), FsManagerOpts());
   RETURN_NOT_OK(fs_manager.Open());
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager(new ConsensusMetadataManager(&fs_manager));
   MessengerBuilder builder("tablet_copy_client");
   shared_ptr<Messenger> messenger;
   builder.Build(&messenger);
-  TabletCopyClient client(tablet_id, &fs_manager, messenger);
+  TabletCopyClient client(tablet_id, &fs_manager, cmeta_manager, messenger);
   RETURN_NOT_OK(client.Start(hp, nullptr));
   RETURN_NOT_OK(client.FetchAll(nullptr));
   return client.Finish();
@@ -347,6 +352,7 @@ Status DeleteLocalReplica(const RunnerContext& context) {
   const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
   FsManager fs_manager(Env::Default(), FsManagerOpts());
   RETURN_NOT_OK(fs_manager.Open());
+  scoped_refptr<ConsensusMetadataManager> cmeta_manager(new ConsensusMetadataManager(&fs_manager));
   boost::optional<OpId> last_logged_opid = boost::none;
   TabletDataState state = TabletDataState::TABLET_DATA_DELETED;
   if (!FLAGS_clean_unsafe) {
@@ -372,7 +378,7 @@ Status DeleteLocalReplica(const RunnerContext& context) {
   // Force the specified tablet on this node to be in 'state'.
   scoped_refptr<TabletMetadata> meta;
   RETURN_NOT_OK(TabletMetadata::Load(&fs_manager, tablet_id, &meta));
-  RETURN_NOT_OK(TSTabletManager::DeleteTabletData(meta, state, last_logged_opid));
+  RETURN_NOT_OK(TSTabletManager::DeleteTabletData(meta, cmeta_manager, state, last_logged_opid));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/tserver/tablet_copy_client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client-test.cc b/src/kudu/tserver/tablet_copy_client-test.cc
index 2497644..8e70a79 100644
--- a/src/kudu/tserver/tablet_copy_client-test.cc
+++ b/src/kudu/tserver/tablet_copy_client-test.cc
@@ -20,6 +20,7 @@
 
 #include <glog/stl_logging.h>
 
+#include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/gutil/strings/fastmem.h"
@@ -32,6 +33,7 @@ using std::shared_ptr;
 namespace kudu {
 namespace tserver {
 
+using consensus::ConsensusMetadataManager;
 using consensus::GetRaftConfigLeader;
 using consensus::RaftPeerPB;
 using std::tuple;
@@ -47,10 +49,14 @@ class TabletCopyClientTest : public TabletCopyTest {
     ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
     ASSERT_OK(fs_manager_->Open());
 
+    scoped_refptr<ConsensusMetadataManager> cmeta_manager(
+        new ConsensusMetadataManager(fs_manager_.get()));
+
     tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10.0));
     rpc::MessengerBuilder(CURRENT_TEST_NAME()).Build(&messenger_);
     client_.reset(new TabletCopyClient(GetTabletId(),
                                        fs_manager_.get(),
+                                       cmeta_manager,
                                        messenger_));
     ASSERT_OK(GetRaftConfigLeader(tablet_replica_->consensus()->ConsensusState(), &leader_));
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fa5d56cb/src/kudu/tserver/tablet_copy_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index c296e90..029a426 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -23,6 +23,7 @@
 
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus_meta.h"
+#include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
@@ -75,6 +76,7 @@ namespace kudu {
 namespace tserver {
 
 using consensus::ConsensusMetadata;
+using consensus::ConsensusMetadataManager;
 using env_util::CopyFile;
 using fs::CreateBlockOptions;
 using fs::WritableBlock;
@@ -95,15 +97,18 @@ using tablet::TabletSuperBlockPB;
 
 TabletCopyClient::TabletCopyClient(std::string tablet_id,
                                    FsManager* fs_manager,
+                                   scoped_refptr<ConsensusMetadataManager> cmeta_manager,
                                    shared_ptr<Messenger> messenger)
     : tablet_id_(std::move(tablet_id)),
       fs_manager_(fs_manager),
+      cmeta_manager_(std::move(cmeta_manager)),
       messenger_(std::move(messenger)),
       state_(kInitialized),
       replace_tombstoned_tablet_(false),
       tablet_replica_(nullptr),
       session_idle_timeout_millis_(0),
-      start_time_micros_(0) {}
+      start_time_micros_(0) {
+}
 
 TabletCopyClient::~TabletCopyClient() {
   // Note: Ending the tablet copy session releases anchors on the remote.
@@ -137,8 +142,7 @@ Status TabletCopyClient::SetTabletToReplace(const scoped_refptr<TabletMetadata>&
 
   // Load the old consensus metadata, if it exists.
   scoped_refptr<ConsensusMetadata> cmeta;
-  Status s = ConsensusMetadata::Load(fs_manager_, tablet_id_,
-                                     fs_manager_->uuid(), &cmeta);
+  Status s = cmeta_manager_->Load(tablet_id_, &cmeta);
   if (s.IsNotFound()) {
     // The consensus metadata was not written to disk, possibly due to a failed
     // tablet copy.
@@ -242,7 +246,8 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr,
     // Remove any existing orphaned blocks and WALs from the tablet, and
     // set the data state to 'COPYING'.
     RETURN_NOT_OK_PREPEND(
-        TSTabletManager::DeleteTabletData(meta_, tablet::TABLET_DATA_COPYING, boost::none),
+        TSTabletManager::DeleteTabletData(meta_, cmeta_manager_,
+                                          tablet::TABLET_DATA_COPYING, boost::none),
         "Could not replace superblock with COPYING data state");
     CHECK_OK(fs_manager_->dd_manager()->CreateDataDirGroup(tablet_id_));
   } else {
@@ -324,7 +329,8 @@ Status TabletCopyClient::Abort() {
 
   // Delete all of the tablet data, including blocks and WALs.
   RETURN_NOT_OK_PREPEND(
-      TSTabletManager::DeleteTabletData(meta_, tablet::TABLET_DATA_TOMBSTONED, boost::none),
+      TSTabletManager::DeleteTabletData(meta_, cmeta_manager_,
+                                        tablet::TABLET_DATA_TOMBSTONED, boost::none),
       LogPrefix() + "Failed to tombstone tablet after aborting tablet copy");
 
   SetStatusMessage(Substitute("Tombstoned tablet $0: Tablet copy aborted", tablet_id_));
@@ -505,10 +511,10 @@ Status TabletCopyClient::WriteConsensusMetadata() {
   // If we didn't find a previous consensus meta file, create one.
   if (!cmeta_) {
     scoped_refptr<ConsensusMetadata> cmeta;
-    return ConsensusMetadata::Create(fs_manager_, tablet_id_, fs_manager_->uuid(),
-                                     remote_cstate_->committed_config(),
-                                     remote_cstate_->current_term(),
-                                     &cmeta);
+    return cmeta_manager_->Create(tablet_id_,
+                                  remote_cstate_->committed_config(),
+                                  remote_cstate_->current_term(),
+                                  &cmeta);
   }
 
   // Otherwise, update the consensus metadata to reflect the config and term