You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/07/13 06:33:10 UTC

[1/3] kudu git commit: consensus: Get rid of RaftConsensus::Create()

Repository: kudu
Updated Branches:
  refs/heads/master fa5d56cb6 -> b62608a8e


consensus: Get rid of RaftConsensus::Create()

It is no longer useful to have a factory method for RaftConsensus,
particularly since we need to be able to separate creation from
initialization in a follow-up patch. This is a minor refactor that
removes Create() and simplifies a couple of things.

Also adds an additional lifecycle state to RaftConsensus (kNew) for
validation purposes.

Also remove unused MemTracker instance from RaftConsensus.

Change-Id: Ic28fb8fe64cd62d290cea1de22c4ba9dd1743a4e
Reviewed-on: http://gerrit.cloudera.org:8080/7192
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/4f16320a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/4f16320a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/4f16320a

Branch: refs/heads/master
Commit: 4f16320abf59a8d07ffbe0fdb94a9887c3b510d2
Parents: fa5d56c
Author: Mike Percy <mp...@apache.org>
Authored: Mon Jun 12 18:06:52 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jul 13 06:32:22 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/raft_consensus.cc            | 276 +++++++++----------
 src/kudu/consensus/raft_consensus.h             |  69 +++--
 .../consensus/raft_consensus_quorum-test.cc     |  74 ++---
 src/kudu/tablet/tablet_replica.cc               |  52 ++--
 4 files changed, 212 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/4f16320a/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index f5208d2..2216366 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -18,14 +18,16 @@
 #include "kudu/consensus/raft_consensus.h"
 
 #include <algorithm>
-#include <boost/optional.hpp>
-#include <gflags/gflags.h>
-#include <iostream>
 #include <memory>
 #include <mutex>
+#include <ostream>
+
+#include <boost/optional.hpp>
+#include <gflags/gflags.h>
 
 #include "kudu/common/wire_protocol.h"
 #include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus_meta_manager.h"
 #include "kudu/consensus/consensus_peers.h"
 #include "kudu/consensus/leader_election.h"
 #include "kudu/consensus/log.h"
@@ -39,7 +41,6 @@
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
-#include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/process_memory.h"
@@ -151,27 +152,61 @@ int GetFailureMonitorCheckStddevMs() {
 namespace kudu {
 namespace consensus {
 
-using std::shared_ptr;
-using std::unique_ptr;
 using strings::Substitute;
 using tserver::TabletServerErrorPB;
 
 // Special string that represents any known leader to the failure detector.
 static const char* const kTimerId = "election-timer";
 
-scoped_refptr<RaftConsensus> RaftConsensus::Create(
+RaftConsensus::RaftConsensus(
     ConsensusOptions options,
-    scoped_refptr<ConsensusMetadata> cmeta,
-    const RaftPeerPB& local_peer_pb,
-    const scoped_refptr<MetricEntity>& metric_entity,
-    scoped_refptr<TimeManager> time_manager,
-    ReplicaTransactionFactory* txn_factory,
-    const shared_ptr<rpc::Messenger>& messenger,
-    const scoped_refptr<log::Log>& log,
-    const shared_ptr<MemTracker>& parent_mem_tracker,
-    const Callback<void(const std::string& reason)>& mark_dirty_clbk,
-    ThreadPool* raft_pool) {
-  gscoped_ptr<PeerProxyFactory> rpc_factory(new RpcPeerProxyFactory(messenger));
+    RaftPeerPB local_peer_pb,
+    scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+    ThreadPool* raft_pool)
+    : options_(std::move(options)),
+      local_peer_pb_(std::move(local_peer_pb)),
+      cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))),
+      raft_pool_(raft_pool),
+      state_(kNew),
+      rng_(GetRandomSeed32()),
+      withhold_votes_until_(MonoTime::Min()),
+      last_received_cur_leader_(MinimumOpId()),
+      failed_elections_since_stable_leader_(0),
+      shutdown_(false),
+      update_calls_for_tests_(0) {
+  DCHECK(local_peer_pb_.has_permanent_uuid());
+}
+
+Status RaftConsensus::Init() {
+  DCHECK_EQ(kNew, state_) << State_Name(state_);
+
+  RETURN_NOT_OK(cmeta_manager_->Load(options_.tablet_id, &cmeta_));
+
+  state_ = kInitialized;
+  return Status::OK();
+}
+
+RaftConsensus::~RaftConsensus() {
+  Shutdown();
+}
+
+Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
+                            gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
+                            scoped_refptr<log::Log> log,
+                            scoped_refptr<TimeManager> time_manager,
+                            ReplicaTransactionFactory* txn_factory,
+                            scoped_refptr<MetricEntity> metric_entity,
+                            Callback<void(const std::string& reason)> mark_dirty_clbk) {
+  peer_proxy_factory_ = DCHECK_NOTNULL(std::move(peer_proxy_factory));
+  log_ = DCHECK_NOTNULL(std::move(log));
+  time_manager_ = DCHECK_NOTNULL(std::move(time_manager));
+  txn_factory_ = DCHECK_NOTNULL(txn_factory);
+  mark_dirty_clbk_ = std::move(mark_dirty_clbk);
+  DCHECK(metric_entity);
+
+  term_metric_ = metric_entity->FindOrCreateGauge(&METRIC_raft_term, cmeta_->current_term());
+  follower_memory_pressure_rejections_ =
+      metric_entity->FindOrCreateCounter(&METRIC_follower_memory_pressure_rejections);
 
   // The message queue that keeps track of which operations need to be replicated
   // where.
@@ -182,110 +217,47 @@ scoped_refptr<RaftConsensus> RaftConsensus::Create(
   //
   // TODO(adar): the token is SERIAL to match the previous single-thread
   // observer pool behavior, but CONCURRENT may be safe here.
-  gscoped_ptr<PeerMessageQueue> queue(
-      new PeerMessageQueue(metric_entity,
-                           log,
-                           time_manager,
-                           local_peer_pb,
-                           options.tablet_id,
-                           raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL)));
-
-  DCHECK(local_peer_pb.has_permanent_uuid());
-  const string& peer_uuid = local_peer_pb.permanent_uuid();
+  queue_.reset(new PeerMessageQueue(std::move(metric_entity),
+                                    log_,
+                                    time_manager_,
+                                    local_peer_pb_,
+                                    options_.tablet_id,
+                                    raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)));
 
   // A single Raft thread pool token is shared between RaftConsensus and
   // PeerManager. Because PeerManager is owned by RaftConsensus, it receives a
   // raw pointer to the token, to emphasize that RaftConsensus is responsible
   // for destroying the token.
-  unique_ptr<ThreadPoolToken> raft_pool_token(raft_pool->NewToken(
-      ThreadPool::ExecutionMode::CONCURRENT));
+  raft_pool_token_ = raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT);
 
   // A manager for the set of peers that actually send the operations both remotely
   // and to the local wal.
-  gscoped_ptr<PeerManager> peer_manager(
-    new PeerManager(options.tablet_id,
-                    peer_uuid,
-                    rpc_factory.get(),
-                    queue.get(),
-                    raft_pool_token.get(),
-                    log));
-
-  return make_scoped_refptr(new RaftConsensus(
-                              std::move(options),
-                              std::move(cmeta),
-                              std::move(rpc_factory),
-                              std::move(queue),
-                              std::move(peer_manager),
-                              std::move(raft_pool_token),
-                              metric_entity,
-                              peer_uuid,
-                              std::move(time_manager),
-                              txn_factory,
-                              log,
-                              parent_mem_tracker,
-                              mark_dirty_clbk));
-}
-
-RaftConsensus::RaftConsensus(
-    ConsensusOptions options,
-    scoped_refptr<ConsensusMetadata> cmeta,
-    gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
-    gscoped_ptr<PeerMessageQueue> queue,
-    gscoped_ptr<PeerManager> peer_manager,
-    unique_ptr<ThreadPoolToken> raft_pool_token,
-    const scoped_refptr<MetricEntity>& metric_entity,
-    std::string peer_uuid,
-    scoped_refptr<TimeManager> time_manager,
-    ReplicaTransactionFactory* txn_factory,
-    const scoped_refptr<log::Log>& log,
-    shared_ptr<MemTracker> parent_mem_tracker,
-    Callback<void(const std::string& reason)> mark_dirty_clbk)
-    : options_(std::move(options)),
-      peer_uuid_(std::move(peer_uuid)),
-      state_(kInitialized),
-      cmeta_(DCHECK_NOTNULL(std::move(cmeta))),
-      raft_pool_token_(std::move(raft_pool_token)),
-      log_(DCHECK_NOTNULL(log)),
-      time_manager_(DCHECK_NOTNULL(std::move(time_manager))),
-      peer_proxy_factory_(DCHECK_NOTNULL(std::move(peer_proxy_factory))),
-      txn_factory_(DCHECK_NOTNULL(txn_factory)),
-      peer_manager_(DCHECK_NOTNULL(std::move(peer_manager))),
-      queue_(DCHECK_NOTNULL(std::move(queue))),
-      pending_(Substitute("T $0 P $1: ", options_.tablet_id, peer_uuid_), time_manager_),
-      rng_(GetRandomSeed32()),
-      failure_monitor_(GetRandomSeed32(), GetFailureMonitorCheckMeanMs(),
-                       GetFailureMonitorCheckStddevMs()),
-      failure_detector_(new TimedFailureDetector(MonoDelta::FromMilliseconds(
+  peer_manager_.reset(new PeerManager(options_.tablet_id,
+                                      peer_uuid(),
+                                      peer_proxy_factory_.get(),
+                                      queue_.get(),
+                                      raft_pool_token_.get(),
+                                      log_));
+
+  pending_.reset(new PendingRounds(Substitute("T $0 P $1: ", options_.tablet_id, peer_uuid()),
+                                   time_manager_));
+  failure_monitor_.reset(new RandomizedFailureMonitor(GetRandomSeed32(),
+                                                      GetFailureMonitorCheckMeanMs(),
+                                                      GetFailureMonitorCheckStddevMs()));
+  failure_detector_.reset(new TimedFailureDetector(MonoDelta::FromMilliseconds(
           FLAGS_raft_heartbeat_interval_ms *
-          FLAGS_leader_failure_max_missed_heartbeat_periods))),
-      withhold_votes_until_(MonoTime::Min()),
-      last_received_cur_leader_(MinimumOpId()),
-      failed_elections_since_stable_leader_(0),
-      mark_dirty_clbk_(std::move(mark_dirty_clbk)),
-      shutdown_(false),
-      update_calls_for_tests_(0),
-      follower_memory_pressure_rejections_(metric_entity->FindOrCreateCounter(
-          &METRIC_follower_memory_pressure_rejections)),
-      term_metric_(metric_entity->FindOrCreateGauge(&METRIC_raft_term,
-                                                    cmeta_->current_term())),
-      parent_mem_tracker_(std::move(parent_mem_tracker)) {
-}
+          FLAGS_leader_failure_max_missed_heartbeat_periods)));
 
-RaftConsensus::~RaftConsensus() {
-  Shutdown();
-}
-
-Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
   // This just starts the monitor thread -- no failure detector is registered yet.
   if (FLAGS_enable_leader_failure_detection) {
-    RETURN_NOT_OK(failure_monitor_.Start());
+    RETURN_NOT_OK(failure_monitor_->Start());
   }
 
   // Register the failure detector instance with the monitor.
   // We still have not enabled failure detection for the leader election timer.
   // That happens separately via the helper functions
   // EnsureFailureDetector(Enabled/Disabled)Unlocked();
-  RETURN_NOT_OK(failure_monitor_.MonitorFailureDetector(options_.tablet_id,
+  RETURN_NOT_OK(failure_monitor_->MonitorFailureDetector(options_.tablet_id,
                                                         failure_detector_));
 
   {
@@ -318,7 +290,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info) {
       RETURN_NOT_OK(StartReplicaTransactionUnlocked(replicate_ptr));
     }
 
-    pending_.SetInitialCommittedOpId(info.last_committed_id);
+    pending_->SetInitialCommittedOpId(info.last_committed_id);
 
     queue_->Init(info.last_id, info.last_committed_id);
   }
@@ -370,7 +342,7 @@ bool RaftConsensus::IsRunning() const {
 
 Status RaftConsensus::EmulateElection() {
   TRACE_EVENT2("consensus", "RaftConsensus::EmulateElection",
-               "peer", peer_uuid_,
+               "peer", peer_uuid(),
                "tablet", options_.tablet_id);
 
   ThreadRestrictions::AssertWaitAllowed();
@@ -381,7 +353,7 @@ Status RaftConsensus::EmulateElection() {
 
   // Assume leadership of new term.
   RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1));
-  SetLeaderUuidUnlocked(peer_uuid_);
+  SetLeaderUuidUnlocked(peer_uuid());
   return BecomeLeaderUnlocked();
 }
 
@@ -456,7 +428,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
       // flushes to disk, and the double fsync doesn't buy us anything.
       RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1,
                                               SKIP_FLUSH_TO_DISK));
-      RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid_));
+      RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid()));
     }
 
     RaftConfigPB active_config = cmeta_->ActiveConfig();
@@ -470,14 +442,14 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
 
     // Vote for ourselves.
     bool duplicate;
-    RETURN_NOT_OK(counter->RegisterVote(peer_uuid_, VOTE_GRANTED, &duplicate));
+    RETURN_NOT_OK(counter->RegisterVote(peer_uuid(), VOTE_GRANTED, &duplicate));
     CHECK(!duplicate) << LogPrefixUnlocked()
                       << "Inexplicable duplicate self-vote for term "
                       << GetCurrentTermUnlocked();
 
     VoteRequestPB request;
     request.set_ignore_live_leader(mode == ELECT_EVEN_IF_LEADER_IS_ALIVE);
-    request.set_candidate_uuid(peer_uuid_);
+    request.set_candidate_uuid(peer_uuid());
     if (mode == PRE_ELECTION) {
       // In a pre-election, we haven't bumped our own term yet, so we need to be
       // asking for votes for the next term.
@@ -507,7 +479,7 @@ Status RaftConsensus::WaitUntilLeaderForTests(const MonoDelta& timeout) {
   while (role() != consensus::RaftPeerPB::LEADER) {
     if (MonoTime::Now() >= deadline) {
       return Status::TimedOut(Substitute("Peer $0 is not leader of tablet $1 after $2. Role: $3",
-                                         peer_uuid_, options_.tablet_id, timeout.ToString(),
+                                         peer_uuid(), options_.tablet_id, timeout.ToString(),
                                          role()));
     }
     SleepFor(MonoDelta::FromMilliseconds(10));
@@ -551,7 +523,7 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
   DCHECK(lock_.is_locked());
 
   TRACE_EVENT2("consensus", "RaftConsensus::BecomeLeaderUnlocked",
-               "peer", peer_uuid_,
+               "peer", peer_uuid(),
                "tablet", options_.tablet_id);
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Becoming Leader. State: " << ToStringUnlocked();
 
@@ -686,7 +658,7 @@ Status RaftConsensus::AddPendingOperationUnlocked(const scoped_refptr<ConsensusR
     }
   }
 
-  return pending_.AddPendingOperation(round);
+  return pending_->AddPendingOperation(round);
 }
 
 void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
@@ -706,7 +678,7 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
     return;
   }
 
-  pending_.AdvanceCommittedIndex(commit_index);
+  pending_->AdvanceCommittedIndex(commit_index);
 
   if (cmeta_->active_role() == RaftPeerPB::LEADER) {
     peer_manager_->SignalRequest(false);
@@ -794,7 +766,7 @@ Status RaftConsensus::Update(const ConsensusRequestPB* request,
                                 "is set to true.");
   }
 
-  response->set_responder_uuid(peer_uuid_);
+  response->set_responder_uuid(peer_uuid());
 
   VLOG_WITH_PREFIX(2) << "Replica received request: " << SecureShortDebugString(*request);
 
@@ -840,7 +812,7 @@ bool RaftConsensus::IsSingleVoterConfig() const {
   ThreadRestrictions::AssertWaitAllowed();
   LockGuard l(lock_);
   return cmeta_->CountVotersInConfig(COMMITTED_CONFIG) == 1 &&
-         cmeta_->IsVoterInConfig(peer_uuid_, COMMITTED_CONFIG);
+         cmeta_->IsVoterInConfig(peer_uuid(), COMMITTED_CONFIG);
 }
 
 std::string RaftConsensus::LeaderRequest::OpsRangeString() const {
@@ -863,7 +835,7 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
   DCHECK(lock_.is_locked());
 
   // TODO(todd): use queue committed index?
-  int64_t last_committed_index = pending_.GetCommittedIndex();
+  int64_t last_committed_index = pending_->GetCommittedIndex();
 
   // The leader's preceding id.
   deduplicated_req->preceding_opid = &rpc_req->preceding_id();
@@ -888,7 +860,7 @@ void RaftConsensus::DeduplicateLeaderRequestUnlocked(ConsensusRequestPB* rpc_req
       // If the index is uncommitted and below our match index, then it must be in the
       // pendings set.
       scoped_refptr<ConsensusRound> round =
-          pending_.GetPendingOpByIndexOrNull(leader_msg->id().index());
+          pending_->GetPendingOpByIndexOrNull(leader_msg->id().index());
       DCHECK(round) << "Could not find op with index " << leader_msg->id().index()
                     << " in pending set. committed= " << last_committed_index
                     << " dedup=" << dedup_up_to_index;
@@ -953,7 +925,7 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ
   DCHECK(lock_.is_locked());
 
   bool term_mismatch;
-  if (pending_.IsOpCommittedOrPending(*req.preceding_opid, &term_mismatch)) {
+  if (pending_->IsOpCommittedOrPending(*req.preceding_opid, &term_mismatch)) {
     return Status::OK();
   }
 
@@ -991,7 +963,7 @@ Status RaftConsensus::EnforceLogMatchingPropertyMatchesUnlocked(const LeaderRequ
 
 void RaftConsensus::TruncateAndAbortOpsAfterUnlocked(int64_t truncate_after_index) {
   DCHECK(lock_.is_locked());
-  pending_.AbortOpsAfter(truncate_after_index);
+  pending_->AbortOpsAfter(truncate_after_index);
   queue_->TruncateOpsAfter(truncate_after_index);
 }
 
@@ -1059,7 +1031,7 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque
   if (!deduped_req->messages.empty()) {
 
     bool term_mismatch;
-    CHECK(!pending_.IsOpCommittedOrPending(deduped_req->messages[0]->get()->id(), &term_mismatch));
+    CHECK(!pending_->IsOpCommittedOrPending(deduped_req->messages[0]->get()->id(), &term_mismatch));
 
     // If the index is in our log but the terms are not the same abort down to the leader's
     // preceding id.
@@ -1088,7 +1060,7 @@ Status RaftConsensus::CheckLeaderRequestUnlocked(const ConsensusRequestPB* reque
 Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
                                     ConsensusResponsePB* response) {
   TRACE_EVENT2("consensus", "RaftConsensus::UpdateReplica",
-               "peer", peer_uuid_,
+               "peer", peer_uuid(),
                "tablet", options_.tablet_id);
   Synchronizer log_synchronizer;
   StatusCallback sync_status_cb = log_synchronizer.AsStatusCallback();
@@ -1188,7 +1160,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
     RETURN_NOT_OK(CheckRunningUnlocked());
-    if (!cmeta_->IsMemberInConfig(peer_uuid_, ACTIVE_CONFIG)) {
+    if (!cmeta_->IsMemberInConfig(peer_uuid(), ACTIVE_CONFIG)) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << "Allowing update even though not a member of the config";
     }
 
@@ -1221,18 +1193,18 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
     // 2. ...if we commit beyond the preceding index, we'd regress KUDU-639, and...
     // 3. ...the leader's committed index is always our upper bound.
     int64_t early_apply_up_to = std::min<int64_t>({
-        pending_.GetLastPendingTransactionOpId().index(),
+        pending_->GetLastPendingTransactionOpId().index(),
         deduped_req.preceding_opid->index(),
         request->committed_index()});
 
     VLOG_WITH_PREFIX_UNLOCKED(1) << "Early marking committed up to " << early_apply_up_to
                                  << ", Last pending opid index: "
-                                 << pending_.GetLastPendingTransactionOpId().index()
+                                 << pending_->GetLastPendingTransactionOpId().index()
                                  << ", preceding opid index: "
                                  << deduped_req.preceding_opid->index()
                                  << ", requested index: " << request->committed_index();
     TRACE("Early marking committed up to index $0", early_apply_up_to);
-    CHECK_OK(pending_.AdvanceCommittedIndex(early_apply_up_to));
+    CHECK_OK(pending_->AdvanceCommittedIndex(early_apply_up_to));
 
     // 2 - Enqueue the prepares
 
@@ -1247,7 +1219,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
       // our memory pressure.
       double capacity_pct;
       if (process_memory::SoftLimitExceeded(&capacity_pct)) {
-        follower_memory_pressure_rejections_->Increment();
+        if (follower_memory_pressure_rejections_) follower_memory_pressure_rejections_->Increment();
         string msg = StringPrintf(
             "Soft memory limit exceeded (at %.2f%% of capacity)",
             capacity_pct);
@@ -1349,7 +1321,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
 
     VLOG_WITH_PREFIX_UNLOCKED(1) << "Marking committed up to " << apply_up_to;
     TRACE("Marking committed up to $0", apply_up_to);
-    CHECK_OK(pending_.AdvanceCommittedIndex(apply_up_to));
+    CHECK_OK(pending_->AdvanceCommittedIndex(apply_up_to));
     queue_->UpdateFollowerWatermarks(apply_up_to, request->all_replicated_index());
 
     // If any messages failed to be started locally, then we already have removed them
@@ -1417,9 +1389,9 @@ void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
 
 Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* response) {
   TRACE_EVENT2("consensus", "RaftConsensus::RequestVote",
-               "peer", peer_uuid_,
+               "peer", peer_uuid(),
                "tablet", options_.tablet_id);
-  response->set_responder_uuid(peer_uuid_);
+  response->set_responder_uuid(peer_uuid());
 
   // We must acquire the update lock in order to ensure that this vote action
   // takes place between requests.
@@ -1529,7 +1501,7 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
                                    const StatusCallback& client_cb,
                                    boost::optional<TabletServerErrorPB::Code>* error_code) {
   TRACE_EVENT2("consensus", "RaftConsensus::ChangeConfig",
-               "peer", peer_uuid_,
+               "peer", peer_uuid(),
                "tablet", options_.tablet_id);
 
   if (PREDICT_FALSE(!req.has_type())) {
@@ -1598,7 +1570,7 @@ Status RaftConsensus::ChangeConfig(const ChangeConfigRequestPB& req,
         break;
 
       case REMOVE_SERVER:
-        if (server_uuid == peer_uuid_) {
+        if (server_uuid == peer_uuid()) {
           return Status::InvalidArgument(
               Substitute("Cannot remove peer $0 from the config because it is the leader. "
                          "Force another leader to be elected to remove this server. "
@@ -1702,13 +1674,13 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
   // Although it is valid for a local replica to not have itself
   // in the committed config, it is rare and a replica without itself
   // in the latest config is definitely not caught up with the latest leader's log.
-  if (!IsRaftConfigVoter(peer_uuid_, new_config)) {
+  if (!IsRaftConfigVoter(peer_uuid(), new_config)) {
     return Status::InvalidArgument(Substitute("Local replica uuid $0 is not "
                                               "a VOTER in the new config, "
                                               "rejecting the unsafe config "
                                               "change request for tablet $1. "
                                               "Rejected config: $2" ,
-                                              peer_uuid_, req.tablet_id(),
+                                              peer_uuid(), req.tablet_id(),
                                               SecureShortDebugString(new_config)));
   }
   new_config.set_unsafe_config_change(true);
@@ -1773,7 +1745,7 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
 
 void RaftConsensus::Shutdown() {
   TRACE_EVENT2("consensus", "RaftConsensus::Shutdown",
-               "peer", peer_uuid_,
+               "peer", peer_uuid(),
                "tablet", options_.tablet_id);
 
   // Avoid taking locks if already shut down so we don't violate
@@ -1792,23 +1764,23 @@ void RaftConsensus::Shutdown() {
   }
 
   // Close the peer manager.
-  peer_manager_->Close();
+  if (peer_manager_) peer_manager_->Close();
 
   // We must close the queue after we close the peers.
-  queue_->Close();
+  if (queue_) queue_->Close();
 
   {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
-    CHECK_OK(pending_.CancelPendingTransactions());
+    if (pending_) CHECK_OK(pending_->CancelPendingTransactions());
     CHECK_EQ(kShuttingDown, state_) << State_Name(state_);
     state_ = kShutDown;
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!";
   }
 
   // Shut down things that might acquire locks during destruction.
-  raft_pool_token_->Shutdown();
-  failure_monitor_.Shutdown();
+  if (raft_pool_token_) raft_pool_token_->Shutdown();
+  if (failure_monitor_) failure_monitor_->Shutdown();
 
   shutdown_.Store(true, kMemOrderRelease);
 }
@@ -1983,6 +1955,8 @@ RaftPeerPB::Role RaftConsensus::role() const {
 
 const char* RaftConsensus::State_Name(State state) {
   switch (state) {
+    case kNew:
+      return "New";
     case kInitialized:
       return "Initialized";
     case kRunning:
@@ -2039,7 +2013,7 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
   peer_manager_->Close();
   // TODO(todd): should use queue committed index here? in that case do
   // we need to pass it in at all?
-  queue_->SetLeaderMode(pending_.GetCommittedIndex(),
+  queue_->SetLeaderMode(pending_->GetCommittedIndex(),
                         GetCurrentTermUnlocked(),
                         active_config);
   RETURN_NOT_OK(peer_manager_->UpdateRaftConfig(active_config));
@@ -2047,7 +2021,7 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
 }
 
 const string& RaftConsensus::peer_uuid() const {
-  return peer_uuid_;
+  return local_peer_pb_.permanent_uuid();
 }
 
 const string& RaftConsensus::tablet_id() const {
@@ -2169,7 +2143,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
     return;
   }
 
-  if (!cmeta_->IsVoterInConfig(peer_uuid_, ACTIVE_CONFIG)) {
+  if (!cmeta_->IsVoterInConfig(peer_uuid(), ACTIVE_CONFIG)) {
     LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Leader " << election_type
                                       << " decision while not in active config. "
                                       << "Result: Term " << election_term << ": "
@@ -2208,7 +2182,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
                 "Couldn't start leader election after successful pre-election");
   } else {
     // We won a real election. Convert role to LEADER.
-    SetLeaderUuidUnlocked(peer_uuid_);
+    SetLeaderUuidUnlocked(peer_uuid());
 
     // TODO(todd): BecomeLeaderUnlocked() can fail due to state checks during shutdown.
     // It races with the above state check.
@@ -2223,8 +2197,8 @@ Status RaftConsensus::GetLastOpId(OpIdType type, OpId* id) {
   if (type == RECEIVED_OPID) {
     *DCHECK_NOTNULL(id) = queue_->GetLastOpIdInLog();
   } else if (type == COMMITTED_OPID) {
-    id->set_term(pending_.GetTermWithLastCommittedOp());
-    id->set_index(pending_.GetCommittedIndex());
+    id->set_term(pending_->GetTermWithLastCommittedOp());
+    id->set_index(pending_->GetCommittedIndex());
   } else {
     return Status::InvalidArgument("Unsupported OpIdType", OpIdType_Name(type));
   }
@@ -2444,7 +2418,7 @@ Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
 
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Advancing to term " << new_term;
   RETURN_NOT_OK(SetCurrentTermUnlocked(new_term, flush));
-  term_metric_->set_value(new_term);
+  if (term_metric_) term_metric_->set_value(new_term);
   last_received_cur_leader_ = MinimumOpId();
   return Status::OK();
 }
@@ -2474,7 +2448,7 @@ Status RaftConsensus::CheckActiveLeaderUnlocked() const {
     default:
       return Status::IllegalState(Substitute("Replica $0 is not leader of this config. Role: $1. "
                                              "Consensus state: $2",
-                                             peer_uuid_,
+                                             peer_uuid(),
                                              RaftPeerPB::Role_Name(role),
                                              SecureShortDebugString(cmeta_->ToConsensusStatePB())));
   }
@@ -2615,7 +2589,7 @@ string RaftConsensus::LogPrefixUnlocked() const {
   DCHECK(lock_.is_locked());
   return Substitute("T $0 P $1 [term $2 $3]: ",
                     options_.tablet_id,
-                    peer_uuid_,
+                    peer_uuid(),
                     GetCurrentTermUnlocked(),
                     RaftPeerPB::Role_Name(cmeta_->active_role()));
 }
@@ -2623,7 +2597,7 @@ string RaftConsensus::LogPrefixUnlocked() const {
 string RaftConsensus::LogPrefixThreadSafe() const {
   return Substitute("T $0 P $1: ",
                     options_.tablet_id,
-                    peer_uuid_);
+                    peer_uuid());
 }
 
 string RaftConsensus::ToString() const {
@@ -2635,7 +2609,7 @@ string RaftConsensus::ToString() const {
 string RaftConsensus::ToStringUnlocked() const {
   DCHECK(lock_.is_locked());
   return Substitute("Replica: $0, State: $1, Role: $2",
-                    peer_uuid_, State_Name(state_), RaftPeerPB::Role_Name(cmeta_->active_role()));
+                    peer_uuid(), State_Name(state_), RaftPeerPB::Role_Name(cmeta_->active_role()));
 }
 
 ConsensusMetadata* RaftConsensus::consensus_metadata_for_tests() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f16320a/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 9fe1042..7a66045 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -17,13 +17,15 @@
 
 #pragma once
 
-#include <boost/optional/optional_fwd.hpp>
+#include <iosfwd>
 #include <memory>
 #include <mutex>
 #include <string>
 #include <utility>
 #include <vector>
 
+#include <boost/optional/optional_fwd.hpp>
+
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus_meta.h"
 #include "kudu/consensus/consensus_queue.h"
@@ -118,35 +120,26 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
     EXTERNAL_REQUEST
   };
 
-  static scoped_refptr<RaftConsensus> Create(
-    ConsensusOptions options,
-    scoped_refptr<ConsensusMetadata> cmeta,
-    const RaftPeerPB& local_peer_pb,
-    const scoped_refptr<MetricEntity>& metric_entity,
-    scoped_refptr<TimeManager> time_manager,
-    ReplicaTransactionFactory* txn_factory,
-    const std::shared_ptr<rpc::Messenger>& messenger,
-    const scoped_refptr<log::Log>& log,
-    const std::shared_ptr<MemTracker>& parent_mem_tracker,
-    const Callback<void(const std::string& reason)>& mark_dirty_clbk,
-    ThreadPool* raft_pool);
-
   RaftConsensus(ConsensusOptions options,
-                scoped_refptr<ConsensusMetadata> cmeta,
-                gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
-                gscoped_ptr<PeerMessageQueue> queue,
-                gscoped_ptr<PeerManager> peer_manager,
-                std::unique_ptr<ThreadPoolToken> raft_pool_token,
-                const scoped_refptr<MetricEntity>& metric_entity,
-                std::string peer_uuid,
-                scoped_refptr<TimeManager> time_manager,
-                ReplicaTransactionFactory* txn_factory,
-                const scoped_refptr<log::Log>& log,
-                std::shared_ptr<MemTracker> parent_mem_tracker,
-                Callback<void(const std::string& reason)> mark_dirty_clbk);
+                RaftPeerPB local_peer_pb,
+                scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+                ThreadPool* raft_pool);
+
+  // Initializes the RaftConsensus object. This should be called before
+  // publishing this object to any thread other than the thread that invoked
+  // the constructor.
+  Status Init();
 
   // Starts running the Raft consensus algorithm.
-  Status Start(const ConsensusBootstrapInfo& info);
+  // Start() is not thread-safe. Calls to Start() should be externally
+  // synchronized with calls accessing non-const members of this class.
+  Status Start(const ConsensusBootstrapInfo& info,
+               gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
+               scoped_refptr<log::Log> log,
+               scoped_refptr<TimeManager> time_manager,
+               ReplicaTransactionFactory* txn_factory,
+               scoped_refptr<MetricEntity> metric_entity,
+               Callback<void(const std::string& reason)> mark_dirty_clbk);
 
   // Returns true if RaftConsensus is running.
   bool IsRunning() const;
@@ -332,7 +325,10 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   // NOTE: When adding / changing values in this enum, add the corresponding
   // values to State_Name().
   enum State {
-    // State after the replica is built.
+    // RaftConsensus has been freshly constructed.
+    kNew,
+
+    // RaftConsensus has been initialized.
     kInitialized,
 
     // State signaling the replica accepts requests (from clients
@@ -690,8 +686,13 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
 
   const ConsensusOptions options_;
 
-  // The UUID of the local peer.
-  const std::string peer_uuid_;
+  // Information about the local peer, including the local UUID.
+  const RaftPeerPB local_peer_pb_;
+
+  // Consensus metadata service.
+  const scoped_refptr<ConsensusMetadataManager> cmeta_manager_;
+
+  ThreadPool* const raft_pool_;
 
   // TODO(dralves) hack to serialize updates due to repeated/out-of-order messages
   // should probably be refactored out.
@@ -727,12 +728,12 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   // The currently pending rounds that have not yet been committed by
   // consensus. Protected by 'lock_'.
   // TODO(todd) these locks will become more fine-grained.
-  PendingRounds pending_;
+  std::unique_ptr<PendingRounds> pending_;
 
   Random rng_;
 
   // TODO(mpercy): Plumb this from ServerBase.
-  RandomizedFailureMonitor failure_monitor_;
+  std::unique_ptr<RandomizedFailureMonitor> failure_monitor_;
 
   scoped_refptr<FailureDetector> failure_detector_;
 
@@ -751,7 +752,7 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   // This is used to calculate back-off of the election timeout.
   int failed_elections_since_stable_leader_;
 
-  const Callback<void(const std::string& reason)> mark_dirty_clbk_;
+  Callback<void(const std::string& reason)> mark_dirty_clbk_;
 
   AtomicBool shutdown_;
 
@@ -761,8 +762,6 @@ class RaftConsensus : public RefCountedThreadSafe<RaftConsensus>,
   scoped_refptr<Counter> follower_memory_pressure_rejections_;
   scoped_refptr<AtomicGauge<int64_t> > term_metric_;
 
-  std::shared_ptr<MemTracker> parent_mem_tracker_;
-
   DISALLOW_COPY_AND_ASSIGN(RaftConsensus);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f16320a/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 62ecdfa..f9d2533 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -157,66 +157,50 @@ class RaftConsensusQuorumTest : public KuduTest {
     return raft_config;
   }
 
-  void BuildPeers() {
+  Status BuildPeers() {
+    CHECK_EQ(config_.peers_size(), cmeta_managers_.size());
+    CHECK_EQ(config_.peers_size(), fs_managers_.size());
     for (int i = 0; i < config_.peers_size(); i++) {
-      auto proxy_factory = new LocalTestPeerProxyFactory(peers_.get());
-
-      auto txn_factory = new TestTransactionFactory(logs_[i].get());
-
       scoped_refptr<ConsensusMetadata> cmeta;
-      CHECK_OK(cmeta_managers_[i]->Create(kTestTablet, config_,
-                                          kMinimumTerm, &cmeta));
+      RETURN_NOT_OK(cmeta_managers_[i]->Create(kTestTablet, config_, kMinimumTerm, &cmeta));
 
       RaftPeerPB local_peer_pb;
-      CHECK_OK(GetRaftConfigMember(config_, fs_managers_[i]->uuid(), &local_peer_pb));
-
-      scoped_refptr<TimeManager> time_manager(new TimeManager(clock_, Timestamp::kMin));
-      gscoped_ptr<PeerMessageQueue> queue(
-          new PeerMessageQueue(metric_entity_,
-                               logs_[i],
-                               time_manager,
-                               local_peer_pb,
-                               kTestTablet,
-                               raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL)));
-
-      unique_ptr<ThreadPoolToken> pool_token(
-          raft_pool_->NewToken(ThreadPool::ExecutionMode::CONCURRENT));
-
-      gscoped_ptr<PeerManager> peer_manager(
-          new PeerManager(options_.tablet_id,
-                          config_.peers(i).permanent_uuid(),
-                          proxy_factory,
-                          queue.get(),
-                          pool_token.get(),
-                          logs_[i]));
+      RETURN_NOT_OK(GetRaftConfigMember(config_, fs_managers_[i]->uuid(), &local_peer_pb));
 
       scoped_refptr<RaftConsensus> peer(
           new RaftConsensus(options_,
-                            std::move(cmeta),
-                            gscoped_ptr<PeerProxyFactory>(proxy_factory),
-                            std::move(queue),
-                            std::move(peer_manager),
-                            std::move(pool_token),
-                            metric_entity_,
-                            config_.peers(i).permanent_uuid(),
-                            time_manager,
-                            txn_factory,
-                            logs_[i],
-                            parent_mem_trackers_[i],
-                            Bind(&DoNothing)));
+                            config_.peers(i),
+                            cmeta_managers_[i],
+                            raft_pool_.get()));
+      RETURN_NOT_OK(peer->Init());
 
-      txn_factory->SetConsensus(peer.get());
-      txn_factories_.push_back(txn_factory);
       peers_->AddPeer(config_.peers(i).permanent_uuid(), peer);
     }
+    return Status::OK();
   }
 
   Status StartPeers() {
     ConsensusBootstrapInfo boot_info;
 
     TestPeerMap all_peers = peers_->GetPeerMapCopy();
-    for (const TestPeerMap::value_type& entry : all_peers) {
-      RETURN_NOT_OK(entry.second->Start(boot_info));
+    for (int i = 0; i < config_.peers_size(); i++) {
+      scoped_refptr<RaftConsensus> peer;
+      RETURN_NOT_OK(peers_->GetPeerByIdx(i, &peer));
+
+      gscoped_ptr<PeerProxyFactory> proxy_factory(new LocalTestPeerProxyFactory(peers_.get()));
+      scoped_refptr<TimeManager> time_manager(new TimeManager(clock_, Timestamp::kMin));
+      auto txn_factory = new TestTransactionFactory(logs_[i].get());
+      txn_factory->SetConsensus(peer.get());
+      txn_factories_.push_back(txn_factory);
+
+      RETURN_NOT_OK(peer->Start(
+          boot_info,
+          std::move(proxy_factory),
+          logs_[i],
+          time_manager,
+          txn_factory,
+          metric_entity_,
+          Bind(&DoNothing)));
     }
     return Status::OK();
   }
@@ -224,7 +208,7 @@ class RaftConsensusQuorumTest : public KuduTest {
   Status BuildConfig(int num) {
     RETURN_NOT_OK(BuildFsManagersAndLogs(num));
     BuildInitialRaftConfigPB(num);
-    BuildPeers();
+    RETURN_NOT_OK(BuildPeers());
     return Status::OK();
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/4f16320a/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index b9e1516..8c1be7e 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -18,15 +18,16 @@
 #include "kudu/tablet/tablet_replica.h"
 
 #include <algorithm>
-#include <gflags/gflags.h>
 #include <memory>
 #include <mutex>
 #include <string>
 #include <utility>
 #include <vector>
 
-#include "kudu/consensus/consensus_meta.h"
+#include <gflags/gflags.h>
+
 #include "kudu/consensus/consensus_meta_manager.h"
+#include "kudu/consensus/consensus_peers.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/log_util.h"
@@ -55,10 +56,6 @@
 #include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
 
-using std::map;
-using std::shared_ptr;
-using std::unique_ptr;
-
 namespace kudu {
 namespace tablet {
 
@@ -85,13 +82,14 @@ METRIC_DEFINE_histogram(tablet, op_prepare_run_time, "Operation Prepare Run Time
                         10000000, 2);
 
 using consensus::ConsensusBootstrapInfo;
-using consensus::ConsensusMetadata;
 using consensus::ConsensusOptions;
 using consensus::ConsensusRound;
 using consensus::OpId;
+using consensus::PeerProxyFactory;
 using consensus::RaftConfigPB;
 using consensus::RaftPeerPB;
 using consensus::RaftConsensus;
+using consensus::RpcPeerProxyFactory;
 using consensus::TimeManager;
 using consensus::ALTER_SCHEMA_OP;
 using consensus::WRITE_OP;
@@ -99,6 +97,9 @@ using log::Log;
 using log::LogAnchorRegistry;
 using rpc::Messenger;
 using rpc::ResultTracker;
+using std::map;
+using std::shared_ptr;
+using std::unique_ptr;
 using strings::Substitute;
 
 TabletReplica::TabletReplica(
@@ -154,28 +155,11 @@ Status TabletReplica::Init(const shared_ptr<Tablet>& tablet,
     log_ = log;
     result_tracker_ = result_tracker;
 
+    TRACE("Creating consensus instance");
     ConsensusOptions options;
     options.tablet_id = meta_->tablet_id();
-
-    TRACE("Creating consensus instance");
-
-    scoped_refptr<ConsensusMetadata> cmeta;
-    RETURN_NOT_OK(cmeta_manager_->Load(tablet_id_, &cmeta));
-
-    scoped_refptr<TimeManager> time_manager(new TimeManager(
-        clock, tablet_->mvcc_manager()->GetCleanTimestamp()));
-
-    consensus_ = RaftConsensus::Create(options,
-                                       cmeta,
-                                       local_peer_pb_,
-                                       metric_entity,
-                                       time_manager,
-                                       this,
-                                       messenger_,
-                                       log_.get(),
-                                       tablet_->mem_tracker(),
-                                       mark_dirty_clbk_,
-                                       raft_pool);
+    consensus_.reset(new RaftConsensus(options, local_peer_pb_, cmeta_manager_, raft_pool));
+    RETURN_NOT_OK(consensus_->Init());
   }
 
   if (tablet_->metrics() != nullptr) {
@@ -197,7 +181,19 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info) {
 
   VLOG(2) << "RaftConfig before starting: " << SecureDebugString(consensus_->CommittedConfig());
 
-  RETURN_NOT_OK(consensus_->Start(bootstrap_info));
+  gscoped_ptr<PeerProxyFactory> peer_proxy_factory(new RpcPeerProxyFactory(messenger_));
+  scoped_refptr<TimeManager> time_manager(new TimeManager(
+      clock_, tablet_->mvcc_manager()->GetCleanTimestamp()));
+
+  RETURN_NOT_OK(consensus_->Start(
+      bootstrap_info,
+      std::move(peer_proxy_factory),
+      log_,
+      std::move(time_manager),
+      this,
+      tablet_->GetMetricEntity(),
+      mark_dirty_clbk_));
+
   {
     std::lock_guard<simple_spinlock> lock(lock_);
     CHECK_EQ(state_, BOOTSTRAPPING);


[2/3] kudu git commit: TabletReplica: Move Init() logic to Start()

Posted by mp...@apache.org.
TabletReplica: Move Init() logic to Start()

This is a cleanup / refactor that will make it easier to implement
tombstoned voting. In this patch we merge Init() and Start() since they
aren't really logically different right now.

An unrelated change in this patch is a minor API cleanup in
TSTabletManager, where we now pass TabletReplica directly to
TSTabletManager::OpenTablet() instead of registering it in a map first
and then looking it up again later.

Change-Id: Ib762db8cfaac628325feee445490713b5c555c5a
Reviewed-on: http://gerrit.cloudera.org:8080/7194
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bbea22e1
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bbea22e1
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bbea22e1

Branch: refs/heads/master
Commit: bbea22e1950868de1c3cf3e9dab0eef03979b53a
Parents: 4f16320
Author: Mike Percy <mp...@apache.org>
Authored: Mon Jun 12 20:07:22 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jul 13 06:32:32 2017 +0000

----------------------------------------------------------------------
 src/kudu/master/sys_catalog.cc                  |  29 ++--
 src/kudu/tablet/tablet.h                        |   1 +
 src/kudu/tablet/tablet_replica-test.cc          |  42 +++---
 src/kudu/tablet/tablet_replica.cc               | 145 ++++++++++---------
 src/kudu/tablet/tablet_replica.h                |  50 +++----
 .../tserver/tablet_copy_source_session-test.cc  |  18 +--
 src/kudu/tserver/ts_tablet_manager.cc           |  52 +++----
 src/kudu/tserver/ts_tablet_manager.h            |   4 +-
 8 files changed, 166 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bbea22e1/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index d9c37e2..5816484 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -303,8 +303,8 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
 
   InitLocalRaftPeerPB();
 
-  // TODO: handle crash mid-creation of tablet? do we ever end up with a
-  // partially created tablet here?
+  // TODO(matteo): handle crash mid-creation of tablet? do we ever end up with
+  // a partially created tablet here?
   tablet_replica_.reset(new TabletReplica(
       metadata,
       cmeta_manager_,
@@ -326,20 +326,17 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
                                 tablet_replica_->log_anchor_registry(),
                                 &consensus_info));
 
-  // TODO: Do we have a setSplittable(false) or something from the outside is
-  // handling split in the TS?
-
-  RETURN_NOT_OK_PREPEND(tablet_replica_->Init(tablet,
-                                              scoped_refptr<server::Clock>(master_->clock()),
-                                              master_->messenger(),
-                                              scoped_refptr<rpc::ResultTracker>(),
-                                              log,
-                                              tablet->GetMetricEntity(),
-                                              master_->raft_pool(),
-                                              master_->tablet_prepare_pool()),
-                        "Failed to Init() TabletReplica");
-
-  RETURN_NOT_OK_PREPEND(tablet_replica_->Start(consensus_info),
+  // TODO(matteo): Do we have a setSplittable(false) or something from the
+  // outside is handling split in the TS?
+
+  RETURN_NOT_OK_PREPEND(tablet_replica_->Start(consensus_info,
+                                               tablet,
+                                               scoped_refptr<server::Clock>(master_->clock()),
+                                               master_->messenger(),
+                                               scoped_refptr<rpc::ResultTracker>(),
+                                               log,
+                                               master_->raft_pool(),
+                                               master_->tablet_prepare_pool()),
                         "Failed to Start() TabletReplica");
 
   tablet_replica_->RegisterMaintenanceOps(master_->maintenance_manager());

http://git-wip-us.apache.org/repos/asf/kudu/blob/bbea22e1/src/kudu/tablet/tablet.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 67e854e..0c18bdf 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -331,6 +331,7 @@ class Tablet {
 
   const TabletMetadata *metadata() const { return metadata_.get(); }
   TabletMetadata *metadata() { return metadata_.get(); }
+  scoped_refptr<TabletMetadata> shared_metadata() const { return metadata_; }
 
   void SetCompactionHooksForTests(const std::shared_ptr<CompactionFaultHooks> &hooks);
   void SetFlushHooksForTests(const std::shared_ptr<FlushFaultHooks> &hooks);

http://git-wip-us.apache.org/repos/asf/kudu/blob/bbea22e1/src/kudu/tablet/tablet_replica-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index 27bb052..55f71dc 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -104,7 +104,7 @@ class TabletReplicaTest : public KuduTabletTest {
 
     // "Bootstrap" and start the TabletReplica.
     tablet_replica_.reset(
-      new TabletReplica(make_scoped_refptr(tablet()->metadata()),
+      new TabletReplica(tablet()->shared_metadata(),
                         cmeta_manager,
                         config_peer,
                         apply_pool_.get(),
@@ -125,28 +125,28 @@ class TabletReplicaTest : public KuduTabletTest {
     scoped_refptr<ConsensusMetadata> cmeta;
     ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(), config, consensus::kMinimumTerm,
                                     &cmeta));
+  }
 
+  Status StartReplica(const ConsensusBootstrapInfo& info) {
     scoped_refptr<Log> log;
-    ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
-                               *tablet()->schema(), tablet()->metadata()->schema_version(),
-                               metric_entity_.get(), &log));
-
+    RETURN_NOT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
+                            *tablet()->schema(), tablet()->metadata()->schema_version(),
+                            metric_entity_.get(), &log));
     tablet_replica_->SetBootstrapping();
-    ASSERT_OK(tablet_replica_->Init(tablet(),
-                                    clock(),
-                                    messenger_,
-                                    scoped_refptr<rpc::ResultTracker>(),
-                                    log,
-                                    metric_entity_,
-                                    raft_pool_.get(),
-                                    prepare_pool_.get()));
+    return tablet_replica_->Start(info,
+                                  tablet(),
+                                  clock(),
+                                  messenger_,
+                                  scoped_refptr<rpc::ResultTracker>(),
+                                  log,
+                                  raft_pool_.get(),
+                                  prepare_pool_.get());
   }
 
-  Status StartPeer(const ConsensusBootstrapInfo& info) {
+  Status StartReplicaAndWaitUntilLeader(const ConsensusBootstrapInfo& info) {
+    RETURN_NOT_OK(StartReplica(info));
     const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
-    RETURN_NOT_OK(tablet_replica_->Start(info));
-    RETURN_NOT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(kTimeout));
-    return Status::OK();
+    return tablet_replica_->consensus()->WaitUntilLeaderForTests(kTimeout);
   }
 
   void TabletReplicaStateChangedCallback(const string& tablet_id, const string& reason) {
@@ -299,7 +299,7 @@ class DelayedApplyTransaction : public WriteTransaction {
 // Ensure that Log::GC() doesn't delete logs when the MRS has an anchor.
 TEST_F(TabletReplicaTest, TestMRSAnchorPreventsLogGC) {
   ConsensusBootstrapInfo info;
-  ASSERT_OK(StartPeer(info));
+  ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
 
   Log* log = tablet_replica_->log_.get();
   int32_t num_gced;
@@ -339,7 +339,7 @@ TEST_F(TabletReplicaTest, TestMRSAnchorPreventsLogGC) {
 // Ensure that Log::GC() doesn't delete logs when the DMS has an anchor.
 TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
   ConsensusBootstrapInfo info;
-  ASSERT_OK(StartPeer(info));
+  ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
 
   Log* log = tablet_replica_->log_.get();
   int32_t num_gced;
@@ -419,7 +419,7 @@ TEST_F(TabletReplicaTest, TestDMSAnchorPreventsLogGC) {
 // Ensure that Log::GC() doesn't compact logs with OpIds of active transactions.
 TEST_F(TabletReplicaTest, TestActiveTransactionPreventsLogGC) {
   ConsensusBootstrapInfo info;
-  ASSERT_OK(StartPeer(info));
+  ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
 
   Log* log = tablet_replica_->log_.get();
   int32_t num_gced;
@@ -529,7 +529,7 @@ TEST_F(TabletReplicaTest, TestActiveTransactionPreventsLogGC) {
 
 TEST_F(TabletReplicaTest, TestGCEmptyLog) {
   ConsensusBootstrapInfo info;
-  tablet_replica_->Start(info);
+  ASSERT_OK(StartReplica(info));
   // We don't wait on consensus on purpose.
   ASSERT_OK(tablet_replica_->RunLogGC());
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/bbea22e1/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index 8c1be7e..07679d1 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -103,23 +103,23 @@ using std::unique_ptr;
 using strings::Substitute;
 
 TabletReplica::TabletReplica(
-    const scoped_refptr<TabletMetadata>& meta,
-    const scoped_refptr<consensus::ConsensusMetadataManager>& cmeta_manager,
+    scoped_refptr<TabletMetadata> meta,
+    scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager,
     consensus::RaftPeerPB local_peer_pb,
     ThreadPool* apply_pool,
     Callback<void(const std::string& reason)> mark_dirty_clbk)
-    : meta_(meta),
-      cmeta_manager_(cmeta_manager),
-      tablet_id_(meta->tablet_id()),
+    : meta_(DCHECK_NOTNULL(std::move(meta))),
+      cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))),
+      tablet_id_(meta_->tablet_id()),
       local_peer_pb_(std::move(local_peer_pb)),
-      state_(NOT_STARTED),
-      last_status_("Tablet initializing..."),
-      apply_pool_(apply_pool),
       log_anchor_registry_(new LogAnchorRegistry()),
-      mark_dirty_clbk_(std::move(mark_dirty_clbk)) {}
+      apply_pool_(apply_pool),
+      mark_dirty_clbk_(std::move(mark_dirty_clbk)),
+      state_(NOT_STARTED),
+      last_status_("Tablet initializing...") {
+}
 
 TabletReplica::~TabletReplica() {
-  std::lock_guard<simple_spinlock> lock(lock_);
   // We should either have called Shutdown(), or we should have never called
   // Init().
   CHECK(!tablet_)
@@ -127,76 +127,85 @@ TabletReplica::~TabletReplica() {
       << TabletStatePB_Name(state_);
 }
 
-Status TabletReplica::Init(const shared_ptr<Tablet>& tablet,
-                           const scoped_refptr<server::Clock>& clock,
-                           const shared_ptr<Messenger>& messenger,
-                           const scoped_refptr<ResultTracker>& result_tracker,
-                           const scoped_refptr<Log>& log,
-                           const scoped_refptr<MetricEntity>& metric_entity,
-                           ThreadPool* raft_pool,
-                           ThreadPool* prepare_pool) {
-
+Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
+                            shared_ptr<Tablet> tablet,
+                            scoped_refptr<server::Clock> clock,
+                            shared_ptr<Messenger> messenger,
+                            scoped_refptr<ResultTracker> result_tracker,
+                            scoped_refptr<Log> log,
+                            ThreadPool* raft_pool,
+                            ThreadPool* prepare_pool) {
   DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
   DCHECK(log) << "A TabletReplica must be provided with a Log";
 
-  prepare_pool_token_ = prepare_pool->NewTokenWithMetrics(
-      ThreadPool::ExecutionMode::SERIAL,
-      {
-          METRIC_op_prepare_queue_length.Instantiate(metric_entity),
-          METRIC_op_prepare_queue_time.Instantiate(metric_entity),
-          METRIC_op_prepare_run_time.Instantiate(metric_entity)
-      });
   {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    CHECK_EQ(BOOTSTRAPPING, state_);
-    tablet_ = tablet;
-    clock_ = clock;
-    messenger_ = messenger;
-    log_ = log;
-    result_tracker_ = result_tracker;
+    std::lock_guard<simple_spinlock> state_change_guard(state_change_lock_);
 
-    TRACE("Creating consensus instance");
-    ConsensusOptions options;
-    options.tablet_id = meta_->tablet_id();
-    consensus_.reset(new RaftConsensus(options, local_peer_pb_, cmeta_manager_, raft_pool));
-    RETURN_NOT_OK(consensus_->Init());
-  }
-
-  if (tablet_->metrics() != nullptr) {
-    TRACE("Starting instrumentation");
-    txn_tracker_.StartInstrumentation(tablet_->GetMetricEntity());
-  }
-  txn_tracker_.StartMemoryTracking(tablet_->mem_tracker());
+    {
+      std::lock_guard<simple_spinlock> l(lock_);
 
-  TRACE("TabletReplica::Init() finished");
-  VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer Initted";
-  return Status::OK();
-}
+      CHECK_EQ(BOOTSTRAPPING, state_);
 
-Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info) {
-  std::lock_guard<simple_spinlock> l(state_change_lock_);
-  TRACE("Starting consensus");
+      tablet_ = DCHECK_NOTNULL(std::move(tablet));
+      clock_ = DCHECK_NOTNULL(std::move(clock));
+      messenger_ = DCHECK_NOTNULL(std::move(messenger));
+      result_tracker_ = std::move(result_tracker); // Passed null in tablet_replica-test
+      log_ = DCHECK_NOTNULL(log); // Not moved because it's passed to RaftConsensus::Start() below.
+    }
 
-  VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer starting";
+    // Unlock while we initialize RaftConsensus, which involves I/O.
+    TRACE("Creating consensus instance");
+    ConsensusOptions options;
+    options.tablet_id = meta_->tablet_id();
+    scoped_refptr<RaftConsensus> consensus(new RaftConsensus(std::move(options), local_peer_pb_,
+                                                             cmeta_manager_, raft_pool));
+    RETURN_NOT_OK(consensus->Init());
 
-  VLOG(2) << "RaftConfig before starting: " << SecureDebugString(consensus_->CommittedConfig());
+    scoped_refptr<MetricEntity> metric_entity;
+    gscoped_ptr<PeerProxyFactory> peer_proxy_factory;
+    scoped_refptr<TimeManager> time_manager;
+    {
+      std::lock_guard<simple_spinlock> l(lock_);
+      consensus_ = consensus;
+      metric_entity = tablet_->GetMetricEntity();
+      prepare_pool_token_ = prepare_pool->NewTokenWithMetrics(
+          ThreadPool::ExecutionMode::SERIAL,
+          {
+              METRIC_op_prepare_queue_length.Instantiate(metric_entity),
+              METRIC_op_prepare_queue_time.Instantiate(metric_entity),
+              METRIC_op_prepare_run_time.Instantiate(metric_entity)
+          });
+
+      if (tablet_->metrics()) {
+        TRACE("Starting instrumentation");
+        txn_tracker_.StartInstrumentation(tablet_->GetMetricEntity());
+      }
+      txn_tracker_.StartMemoryTracking(tablet_->mem_tracker());
 
-  gscoped_ptr<PeerProxyFactory> peer_proxy_factory(new RpcPeerProxyFactory(messenger_));
-  scoped_refptr<TimeManager> time_manager(new TimeManager(
-      clock_, tablet_->mvcc_manager()->GetCleanTimestamp()));
+      TRACE("Starting consensus");
+      VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer starting";
+      VLOG(2) << "RaftConfig before starting: " << SecureDebugString(consensus_->CommittedConfig());
 
-  RETURN_NOT_OK(consensus_->Start(
-      bootstrap_info,
-      std::move(peer_proxy_factory),
-      log_,
-      std::move(time_manager),
-      this,
-      tablet_->GetMetricEntity(),
-      mark_dirty_clbk_));
+      peer_proxy_factory.reset(new RpcPeerProxyFactory(messenger_));
+      time_manager.reset(new TimeManager(clock_, tablet_->mvcc_manager()->GetCleanTimestamp()));
+    }
 
-  {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    CHECK_EQ(state_, BOOTSTRAPPING);
+    // We cannot hold 'lock_' while we call RaftConsensus::Start() because it
+    // may invoke TabletReplica::StartReplicaTransaction() during startup, causing
+    // a self-deadlock. We take a ref to members protected by 'lock_' before
+    // unlocking.
+    RETURN_NOT_OK(consensus->Start(
+        bootstrap_info,
+        std::move(peer_proxy_factory),
+        log,
+        std::move(time_manager),
+        this,
+        metric_entity,
+        mark_dirty_clbk_));
+
+    // Re-acquire 'lock_' to update our state variable.
+    std::lock_guard<simple_spinlock> l(lock_);
+    CHECK_EQ(BOOTSTRAPPING, state_); // We are still protected by 'state_change_lock_'.
     state_ = RUNNING;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/bbea22e1/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index a969ba4..e813ba4 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -72,27 +72,23 @@ class TransactionDriver;
 class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
                       public consensus::ReplicaTransactionFactory {
  public:
-  TabletReplica(const scoped_refptr<TabletMetadata>& meta,
-                const scoped_refptr<consensus::ConsensusMetadataManager>& cmeta_manager,
+  TabletReplica(scoped_refptr<TabletMetadata> meta,
+                scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager,
                 consensus::RaftPeerPB local_peer_pb,
                 ThreadPool* apply_pool,
                 Callback<void(const std::string& reason)> mark_dirty_clbk);
 
-  // Initializes the TabletReplica, namely creating the Log and initializing
-  // RaftConsensus.
-  Status Init(const std::shared_ptr<tablet::Tablet>& tablet,
-              const scoped_refptr<server::Clock>& clock,
-              const std::shared_ptr<rpc::Messenger>& messenger,
-              const scoped_refptr<rpc::ResultTracker>& result_tracker,
-              const scoped_refptr<log::Log>& log,
-              const scoped_refptr<MetricEntity>& metric_entity,
-              ThreadPool* raft_pool,
-              ThreadPool* prepare_pool);
-
   // Starts the TabletReplica, making it available for Write()s. If this
   // TabletReplica is part of a consensus configuration this will connect it to other replicas
   // in the consensus configuration.
-  Status Start(const consensus::ConsensusBootstrapInfo& info);
+  Status Start(const consensus::ConsensusBootstrapInfo& bootstrap_info,
+               std::shared_ptr<tablet::Tablet> tablet,
+               scoped_refptr<server::Clock> clock,
+               std::shared_ptr<rpc::Messenger> messenger,
+               scoped_refptr<rpc::ResultTracker> result_tracker,
+               scoped_refptr<log::Log> log,
+               ThreadPool* raft_pool,
+               ThreadPool* prepare_pool);
 
   // Shutdown this tablet replica.
   // If a shutdown is already in progress, blocks until that shutdown is complete.
@@ -287,8 +283,19 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;
 
   const std::string tablet_id_;
-
   const consensus::RaftPeerPB local_peer_pb_;
+  scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_; // Assigned in tablet_replica-test
+
+  // Pool that executes apply tasks for transactions. This is a multi-threaded
+  // pool, constructor-injected by either the Master (for system tables) or
+  // the Tablet server.
+  ThreadPool* const apply_pool_;
+
+  // Function to mark this TabletReplica's tablet as dirty in the TSTabletManager.
+  //
+  // Must be called whenever cluster membership or leadership changes, or when
+  // the tablet's schema changes.
+  const Callback<void(const std::string& reason)> mark_dirty_clbk_;
 
   TabletStatePB state_;
   Status error_;
@@ -319,21 +326,8 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   // Token for serial task submission to the server-wide transaction prepare pool.
   std::unique_ptr<ThreadPoolToken> prepare_pool_token_;
 
-  // Pool that executes apply tasks for transactions. This is a multi-threaded
-  // pool, constructor-injected by either the Master (for system tables) or
-  // the Tablet server.
-  ThreadPool* apply_pool_;
-
   scoped_refptr<server::Clock> clock_;
 
-  scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_;
-
-  // Function to mark this TabletReplica's tablet as dirty in the TSTabletManager.
-  //
-  // Must be called whenever cluster membership or leadership changes, or when
-  // the tablet's schema changes.
-  Callback<void(const std::string& reason)> mark_dirty_clbk_;
-
   // List of maintenance operations for the tablet that need information that only the peer
   // can provide.
   std::vector<MaintenanceOp*> maintenance_ops_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/bbea22e1/src/kudu/tserver/tablet_copy_source_session-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 4ff1200..e8ff1c6 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -121,7 +121,6 @@ class TabletCopyTest : public KuduTabletTest {
                           Bind(&TabletCopyTest::TabletReplicaStateChangedCallback,
                                Unretained(this),
                                tablet()->tablet_id())));
-
     // TODO(dralves) similar to code in tablet_replica-test, consider refactor.
     RaftConfigPB config;
     config.add_peers()->CopyFrom(config_peer);
@@ -137,16 +136,15 @@ class TabletCopyTest : public KuduTabletTest {
 
     log_anchor_registry_.reset(new LogAnchorRegistry());
     tablet_replica_->SetBootstrapping();
-    ASSERT_OK(tablet_replica_->Init(tablet(),
-                                    clock(),
-                                    messenger,
-                                    scoped_refptr<rpc::ResultTracker>(),
-                                    log,
-                                    metric_entity,
-                                    raft_pool_.get(),
-                                    prepare_pool_.get()));
     consensus::ConsensusBootstrapInfo boot_info;
-    ASSERT_OK(tablet_replica_->Start(boot_info));
+    ASSERT_OK(tablet_replica_->Start(boot_info,
+                                     tablet(),
+                                     clock(),
+                                     messenger,
+                                     scoped_refptr<rpc::ResultTracker>(),
+                                     log,
+                                     raft_pool_.get(),
+                                     prepare_pool_.get()));
     ASSERT_OK(tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
     ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/bbea22e1/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 979e23f..b564de3 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -200,7 +200,7 @@ Status TSTabletManager::Init() {
 
     scoped_refptr<TabletReplica> replica = CreateAndRegisterTabletReplica(meta, NEW_REPLICA);
     RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(boost::bind(&TSTabletManager::OpenTablet,
-                                                this, meta, deleter)));
+                                                            this, replica, deleter)));
   }
 
   {
@@ -281,7 +281,7 @@ Status TSTabletManager::CreateNewTablet(const string& table_id,
 
   // We can run this synchronously since there is nothing to bootstrap.
   RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(boost::bind(&TSTabletManager::OpenTablet,
-                                              this, meta, deleter)));
+                                                          this, new_replica, deleter)));
 
   if (replica) {
     *replica = new_replica;
@@ -554,21 +554,23 @@ void TSTabletManager::RunTabletCopy(
   }
 
   // startup it's still in a valid, fully-copied state.
-  OpenTablet(meta, deleter);
+  OpenTablet(replica, deleter);
 }
 
 // Create and register a new TabletReplica, given tablet metadata.
 scoped_refptr<TabletReplica> TSTabletManager::CreateAndRegisterTabletReplica(
-    const scoped_refptr<TabletMetadata>& meta, RegisterTabletReplicaMode mode) {
+    scoped_refptr<TabletMetadata> meta,
+    RegisterTabletReplicaMode mode) {
+  const string& tablet_id = meta->tablet_id();
   scoped_refptr<TabletReplica> replica(
-      new TabletReplica(meta,
+      new TabletReplica(std::move(meta),
                         cmeta_manager_,
                         local_peer_pb_,
                         server_->tablet_apply_pool(),
                         Bind(&TSTabletManager::MarkTabletDirty,
                              Unretained(this),
-                             meta->tablet_id())));
-  RegisterTablet(meta->tablet_id(), replica, mode);
+                             tablet_id)));
+  RegisterTablet(tablet_id, replica, mode);
   return replica;
 }
 
@@ -724,16 +726,15 @@ Status TSTabletManager::OpenTabletMeta(const string& tablet_id,
   return Status::OK();
 }
 
-void TSTabletManager::OpenTablet(const scoped_refptr<TabletMetadata>& meta,
-                                 const scoped_refptr<TransitionInProgressDeleter>& deleter) {
-  string tablet_id = meta->tablet_id();
+// Note: 'deleter' is not used in the body of OpenTablet(), but is required
+// anyway because its destructor performs cleanup that should only happen when
+// OpenTablet() completes.
+void TSTabletManager::OpenTablet(const scoped_refptr<TabletReplica>& replica,
+                                 const scoped_refptr<TransitionInProgressDeleter>& /*deleter*/) {
+  const string& tablet_id = replica->tablet_id();
   TRACE_EVENT1("tserver", "TSTabletManager::OpenTablet",
                "tablet_id", tablet_id);
 
-  scoped_refptr<TabletReplica> replica;
-  CHECK(LookupTablet(tablet_id, &replica))
-      << "Tablet not registered prior to OpenTabletAsync call: " << tablet_id;
-
   shared_ptr<Tablet> tablet;
   scoped_refptr<Log> log;
 
@@ -747,10 +748,11 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletMetadata>& meta,
     // potentially millions of transaction traces being attached to the
     // TabletCopy trace.
     ADOPT_TRACE(nullptr);
-    // TODO: handle crash mid-creation of tablet? do we ever end up with a
-    // partially created tablet here?
+
+    // TODO(mpercy): Handle crash mid-creation of tablet? Do we ever end up
+    // with a partially created tablet here?
     replica->SetBootstrapping();
-    s = BootstrapTablet(meta,
+    s = BootstrapTablet(replica->tablet_metadata(),
                         cmeta_manager_,
                         scoped_refptr<server::Clock>(server_->clock()),
                         server_->mem_tracker(),
@@ -771,25 +773,15 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletMetadata>& meta,
 
   MonoTime start(MonoTime::Now());
   LOG_TIMING_PREFIX(INFO, LogPrefix(tablet_id), "starting tablet") {
-    TRACE("Initializing tablet replica");
-    s =  replica->Init(tablet,
+    TRACE("Starting tablet replica");
+    s = replica->Start(bootstrap_info,
+                       tablet,
                        scoped_refptr<server::Clock>(server_->clock()),
                        server_->messenger(),
                        server_->result_tracker(),
                        log,
-                       tablet->GetMetricEntity(),
                        server_->raft_pool(),
                        server_->tablet_prepare_pool());
-
-    if (!s.ok()) {
-      LOG(ERROR) << LogPrefix(tablet_id) << "Tablet failed to init: "
-                 << s.ToString();
-      replica->SetFailed(s);
-      return;
-    }
-
-    TRACE("Starting tablet replica");
-    s = replica->Start(bootstrap_info);
     if (!s.ok()) {
       LOG(ERROR) << LogPrefix(tablet_id) << "Tablet failed to start: "
                  << s.ToString();

http://git-wip-us.apache.org/repos/asf/kudu/blob/bbea22e1/src/kudu/tserver/ts_tablet_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index 53dbe3c..3b2ce45 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -229,7 +229,7 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
   // method. A TransitionInProgressDeleter must be passed as 'deleter' into
   // this method in order to remove that transition-in-progress entry when
   // opening the tablet is complete (in either a success or a failure case).
-  void OpenTablet(const scoped_refptr<tablet::TabletMetadata>& meta,
+  void OpenTablet(const scoped_refptr<tablet::TabletReplica>& replica,
                   const scoped_refptr<TransitionInProgressDeleter>& deleter);
 
   // Open a tablet whose metadata has already been loaded.
@@ -252,7 +252,7 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
   // the TablerPeer object. See RegisterTablet() for details about the
   // semantics of 'mode' and the locking requirements.
   scoped_refptr<tablet::TabletReplica> CreateAndRegisterTabletReplica(
-      const scoped_refptr<tablet::TabletMetadata>& meta,
+      scoped_refptr<tablet::TabletMetadata> meta,
       RegisterTabletReplicaMode mode);
 
   // Helper to generate the report for a single tablet.


[3/3] kudu git commit: mini cluster: Test infrastructure improvements

Posted by mp...@apache.org.
mini cluster: Test infrastructure improvements

This patch adds a couple small improvements to the MiniCluster infra.

* Add base class helpers to MiniClusterITestBase to avoid code
  duplication, including StopCluster() and ts_map_ for convenience and
  consistency with ExternalMiniClusterITestBase.
* Add ListTablets() helper function to MiniTabletServer.

Change-Id: I6bc6a04b113e59243fb788fec15b9935c3dcf069
Reviewed-on: http://gerrit.cloudera.org:8080/6959
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/b62608a8
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/b62608a8
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/b62608a8

Branch: refs/heads/master
Commit: b62608a8e1fa1406f7ffcd7a35cd70c177c67b2c
Parents: bbea22e
Author: Mike Percy <mp...@apache.org>
Authored: Sat May 20 22:04:46 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jul 13 06:32:50 2017 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/CMakeLists.txt       |  1 +
 .../integration-tests/delete_tablet-itest.cc    |  7 +--
 .../internal_mini_cluster-itest-base.cc         | 52 ++++++++++++++++++++
 .../internal_mini_cluster-itest-base.h          | 27 +++++-----
 src/kudu/tserver/mini_tablet_server.cc          | 13 ++++-
 src/kudu/tserver/mini_tablet_server.h           |  7 ++-
 6 files changed, 82 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/b62608a8/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index bbe56f1..d13d396 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -26,6 +26,7 @@ set(INTEGRATION_TESTS_SRCS
   external_mini_cluster-itest-base.cc
   external_mini_cluster.cc
   external_mini_cluster_fs_inspector.cc
+  internal_mini_cluster-itest-base.cc
   internal_mini_cluster.cc
   log_verifier.cc
   mini_cluster.cc

http://git-wip-us.apache.org/repos/asf/kudu/blob/b62608a8/src/kudu/integration-tests/delete_tablet-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_tablet-itest.cc b/src/kudu/integration-tests/delete_tablet-itest.cc
index b7b2c85..c0ea6c3 100644
--- a/src/kudu/integration-tests/delete_tablet-itest.cc
+++ b/src/kudu/integration-tests/delete_tablet-itest.cc
@@ -45,13 +45,8 @@ TEST_F(DeleteTabletITest, TestDeleteFailedReplica) {
   workload.set_num_replicas(1);
   workload.Setup();
 
-  std::unordered_map<std::string, itest::TServerDetails*> ts_map;
-  ValueDeleter del(&ts_map);
-  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy(),
-                                         cluster_->messenger(),
-                                         &ts_map));
   auto* mts = cluster_->mini_tablet_server(0);
-  auto* ts = ts_map[mts->uuid()];
+  auto* ts = ts_map_[mts->uuid()];
 
   scoped_refptr<TabletReplica> tablet_replica;
   ASSERT_EVENTUALLY([&] {

http://git-wip-us.apache.org/repos/asf/kudu/blob/b62608a8/src/kudu/integration-tests/internal_mini_cluster-itest-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/internal_mini_cluster-itest-base.cc b/src/kudu/integration-tests/internal_mini_cluster-itest-base.cc
new file mode 100644
index 0000000..c8775b4
--- /dev/null
+++ b/src/kudu/integration-tests/internal_mini_cluster-itest-base.cc
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/stl_util.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+
+namespace kudu {
+
+void MiniClusterITestBase::TearDown() {
+  StopCluster();
+  KuduTest::TearDown();
+}
+
+void MiniClusterITestBase::StartCluster(int num_tablet_servers) {
+  InternalMiniClusterOptions opts;
+  opts.num_tablet_servers = num_tablet_servers;
+  cluster_.reset(new InternalMiniCluster(env_, opts));
+  ASSERT_OK(cluster_->Start());
+  ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+  ASSERT_OK(itest::CreateTabletServerMap(cluster_->master_proxy(),
+                                         cluster_->messenger(),
+                                         &ts_map_));
+}
+
+void MiniClusterITestBase::StopCluster() {
+  if (!cluster_) return;
+
+  cluster_->Shutdown();
+  cluster_.reset();
+  client_.reset();
+  STLDeleteValues(&ts_map_);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/b62608a8/src/kudu/integration-tests/internal_mini_cluster-itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/internal_mini_cluster-itest-base.h b/src/kudu/integration-tests/internal_mini_cluster-itest-base.h
index 5931cfd..0a69fb1 100644
--- a/src/kudu/integration-tests/internal_mini_cluster-itest-base.h
+++ b/src/kudu/integration-tests/internal_mini_cluster-itest-base.h
@@ -17,9 +17,9 @@
 
 #pragma once
 
+#include <memory>
 #include <string>
 #include <unordered_map>
-#include <vector>
 
 #include <gtest/gtest.h>
 
@@ -29,28 +29,23 @@
 
 namespace kudu {
 
+namespace itest {
+struct TServerDetails;
+}
+
 // Simple base utility class to provide a mini cluster with common setup
 // routines useful for integration tests.
 class MiniClusterITestBase : public KuduTest {
  public:
-  virtual void TearDown() OVERRIDE {
-    if (cluster_) {
-      cluster_->Shutdown();
-    }
-    KuduTest::TearDown();
-  }
+  void TearDown() override;
 
  protected:
-  void StartCluster(int num_tablet_servers = 3) {
-    InternalMiniClusterOptions opts;
-    opts.num_tablet_servers = num_tablet_servers;
-    cluster_.reset(new InternalMiniCluster(env_, opts));
-    ASSERT_OK(cluster_->Start());
-    ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
-  }
-
-  gscoped_ptr<InternalMiniCluster> cluster_;
+  void StartCluster(int num_tablet_servers = 3);
+  void StopCluster();
+
+  std::unique_ptr<InternalMiniCluster> cluster_;
   client::sp::shared_ptr<client::KuduClient> client_;
+  std::unordered_map<std::string, itest::TServerDetails*> ts_map_;
 };
 
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/b62608a8/src/kudu/tserver/mini_tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/mini_tablet_server.cc b/src/kudu/tserver/mini_tablet_server.cc
index 1c77082..452281a 100644
--- a/src/kudu/tserver/mini_tablet_server.cc
+++ b/src/kudu/tserver/mini_tablet_server.cc
@@ -17,7 +17,6 @@
 
 #include "kudu/tserver/mini_tablet_server.h"
 
-#include <string>
 #include <utility>
 #include <vector>
 
@@ -26,6 +25,7 @@
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/tablet-test-util.h"
+#include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/util/env_util.h"
@@ -38,6 +38,7 @@ DECLARE_bool(rpc_server_allow_ephemeral_ports);
 
 using kudu::consensus::RaftConfigPB;
 using kudu::consensus::RaftPeerPB;
+using kudu::tablet::TabletReplica;
 using std::pair;
 using std::string;
 using std::unique_ptr;
@@ -137,6 +138,16 @@ Status MiniTabletServer::AddTestTablet(const std::string& table_id,
       schema_with_ids, partition.first, config, nullptr);
 }
 
+vector<string> MiniTabletServer::ListTablets() const {
+  vector<string> tablet_ids;
+  vector<scoped_refptr<TabletReplica>> replicas;
+  server_->tablet_manager()->GetTabletReplicas(&replicas);
+  for (const auto& replica : replicas) {
+    tablet_ids.push_back(replica->tablet_id());
+  }
+  return tablet_ids;
+}
+
 void MiniTabletServer::FailHeartbeats() {
   server_->set_fail_heartbeats_for_tests(true);
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/b62608a8/src/kudu/tserver/mini_tablet_server.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/mini_tablet_server.h b/src/kudu/tserver/mini_tablet_server.h
index f2f0231..b60d7e8 100644
--- a/src/kudu/tserver/mini_tablet_server.h
+++ b/src/kudu/tserver/mini_tablet_server.h
@@ -19,16 +19,16 @@
 
 #include <memory>
 #include <string>
+#include <vector>
 
 #include "kudu/common/schema.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/tserver/tablet_server_options.h"
 #include "kudu/util/net/sockaddr.h"
-#include "kudu/util/status.h"
 
 namespace kudu {
-
 class FsManager;
+class Status;
 
 namespace consensus {
 class RaftConfigPB;
@@ -81,6 +81,9 @@ class MiniTabletServer {
                        const Schema& schema,
                        const consensus::RaftConfigPB& config);
 
+  // Return the ids of all non-tombstoned tablets on this server.
+  std::vector<std::string> ListTablets() const;
+
   // Create a RaftConfigPB which should be used to create a local-only
   // tablet on the given tablet server.
   consensus::RaftConfigPB CreateLocalConfig() const;