You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/08/24 04:43:58 UTC

[2/2] kudu git commit: KUDU-871. Support tombstoned voting

KUDU-871. Support tombstoned voting

This patch makes it possible for tombstoned tablet replicas to vote in
Raft elections.

Changes:

* Add Stop() method to TabletReplica + Consensus lifecycle.
  * Includes new STOPPED state.
  * Tombstoning a replica should call Stop().
  * Deleting a replica should call Shutdown().
* Persist ConsensusMetadata before returning from
  TabletCopyClient::Start() because we need cmeta to Init()
  RaftConsensus, which happens when registering the replica in
  TSTabletManager.
* TSTabletManager::DeleteTablet() should not consider FAILED == deleted,
  since we no longer destroy RaftConsensus when tombstoning a replica.
* Add positive and negative tests for tombstoned voting.
* Add a stress test that induces lots of tombstoned voting
  while running TabletCopy, TabletBootstrap, and DeleteTablet.
* Fix DeleteTableITest.TestMergeConsensusMetadata after tombstoned
  voting changed its assumption that tombstoned tablets would not vote.
* Fix several tests that expected tombstoned tablets to be SHUTDOWN when
  now they are STOPPED.

Change-Id: Ia19d75b185299443b27f41e468bbae20065e7570
Reviewed-on: http://gerrit.cloudera.org:8080/6960
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5bca7d8b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5bca7d8b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5bca7d8b

Branch: refs/heads/master
Commit: 5bca7d8ba185d62952fb3e3163cbe88d20453da0
Parents: b37bde7
Author: Mike Percy <mp...@apache.org>
Authored: Wed Aug 23 00:01:04 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Aug 24 04:43:25 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus-test-util.h        |   2 +-
 src/kudu/consensus/raft_consensus.cc            | 244 +++++++---
 src/kudu/consensus/raft_consensus.h             |  88 +++-
 .../consensus/raft_consensus_quorum-test.cc     |  29 +-
 src/kudu/integration-tests/CMakeLists.txt       |   2 +
 .../integration-tests/cluster_itest_util.cc     |  29 ++
 src/kudu/integration-tests/cluster_itest_util.h |  12 +
 .../integration-tests/delete_table-itest.cc     |  19 +-
 .../external_mini_cluster-itest-base.cc         |   6 +-
 .../tombstoned_voting-itest.cc                  | 461 +++++++++++++++++++
 .../tombstoned_voting-stress-test.cc            | 313 +++++++++++++
 src/kudu/master/sys_catalog.cc                  |   7 +-
 src/kudu/tablet/metadata.proto                  |  31 +-
 src/kudu/tablet/tablet_metadata.h               |   7 +-
 src/kudu/tablet/tablet_replica-test.cc          |  30 +-
 src/kudu/tablet/tablet_replica.cc               | 127 +++--
 src/kudu/tablet/tablet_replica.h                |  45 +-
 src/kudu/tools/kudu-admin-test.cc               |   4 +-
 src/kudu/tools/kudu-ts-cli-test.cc              |   2 +-
 src/kudu/tserver/tablet_copy_client.cc          |   6 +-
 src/kudu/tserver/tablet_copy_client.h           |   3 +-
 .../tserver/tablet_copy_source_session-test.cc  |  37 +-
 src/kudu/tserver/tablet_service.cc              | 156 +++++--
 src/kudu/tserver/ts_tablet_manager.cc           |  62 +--
 src/kudu/tserver/ts_tablet_manager.h            |   6 +-
 src/kudu/tserver/tserver-path-handlers.cc       |   2 +-
 src/kudu/util/make_shared.h                     |   5 +-
 27 files changed, 1439 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index ac4408b..c73e50b 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -534,7 +534,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
     Status s = peers_->GetPeerByUuid(peer_uuid_, &peer);
 
     if (s.ok()) {
-      s = peer->RequestVote(&other_peer_req, &other_peer_resp);
+      s = peer->RequestVote(&other_peer_req, boost::none, &other_peer_resp);
     }
     if (!s.ok()) {
       LOG(WARNING) << "Could not RequestVote from replica with request: "

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 042c5d1..11d1e40 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -118,6 +118,11 @@ DEFINE_bool(raft_enable_pre_election, true,
 TAG_FLAG(raft_enable_pre_election, experimental);
 TAG_FLAG(raft_enable_pre_election, runtime);
 
+DEFINE_bool(raft_enable_tombstoned_voting, true,
+            "When enabled, tombstoned tablets may vote in elections.");
+TAG_FLAG(raft_enable_tombstoned_voting, experimental);
+TAG_FLAG(raft_enable_tombstoned_voting, runtime);
+
 DECLARE_int32(memory_limit_warn_threshold_percentage);
 
 // Metrics
@@ -133,9 +138,11 @@ METRIC_DEFINE_gauge_int64(tablet, raft_term,
                           "Current Term of the Raft Consensus algorithm. This number increments "
                           "each time a leader election is started.");
 
+using boost::optional;
 using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::PeriodicTimer;
 using kudu::tserver::TabletServerErrorPB;
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::weak_ptr;
@@ -165,10 +172,8 @@ RaftConsensus::RaftConsensus(
 
 Status RaftConsensus::Init() {
   DCHECK_EQ(kNew, state_) << State_Name(state_);
-
   RETURN_NOT_OK(cmeta_manager_->Load(options_.tablet_id, &cmeta_));
-
-  state_ = kInitialized;
+  SetStateUnlocked(kInitialized);
   return Status::OK();
 }
 
@@ -176,6 +181,20 @@ RaftConsensus::~RaftConsensus() {
   Shutdown();
 }
 
+Status RaftConsensus::Create(ConsensusOptions options,
+                             RaftPeerPB local_peer_pb,
+                             scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+                             ThreadPool* raft_pool,
+                             shared_ptr<RaftConsensus>* consensus_out) {
+  shared_ptr<RaftConsensus> consensus(std::make_shared<RaftConsensus>(std::move(options),
+                                                                      std::move(local_peer_pb),
+                                                                      std::move(cmeta_manager),
+                                                                      raft_pool));
+  RETURN_NOT_OK_PREPEND(consensus->Init(), "Unable to initialize Raft consensus");
+  *consensus_out = std::move(consensus);
+  return Status::OK();
+}
+
 Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
                             gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
                             scoped_refptr<log::Log> log,
@@ -257,13 +276,13 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
 
     // Our last persisted term can be higher than the last persisted operation
     // (i.e. if we called an election) but reverse should never happen.
-    if (info.last_id.term() > GetCurrentTermUnlocked()) {
+    if (info.last_id.term() > CurrentTermUnlocked()) {
       return Status::Corruption(Substitute("Unable to start RaftConsensus: "
           "The last op in the WAL with id $0 has a term ($1) that is greater "
           "than the latest recorded term, which is $2",
           OpIdToString(info.last_id),
           info.last_id.term(),
-          GetCurrentTermUnlocked()));
+          CurrentTermUnlocked()));
     }
 
     // Append any uncommitted replicate messages found during log replay to the queue.
@@ -283,7 +302,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
     // If this is the first term expire the FD immediately so that we have a
     // fast first election, otherwise we just let the timer expire normally.
     boost::optional<MonoDelta> initial_delta;
-    if (GetCurrentTermUnlocked() == 0) {
+    if (CurrentTermUnlocked() == 0) {
       // The failure detector is initialized to a low value to trigger an early
       // election (unless someone else requested a vote from us first, which
       // resets the election timer).
@@ -304,7 +323,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
     // Now assume "follower" duties.
     RETURN_NOT_OK(BecomeReplicaUnlocked());
 
-    state_ = kRunning;
+    SetStateUnlocked(kRunning);
   }
 
   if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) {
@@ -336,7 +355,7 @@ Status RaftConsensus::EmulateElection() {
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election...";
 
   // Assume leadership of new term.
-  RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1));
+  RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1));
   SetLeaderUuidUnlocked(peer_uuid());
   return BecomeLeaderUnlocked();
 }
@@ -408,7 +427,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
 
       // We skip flushing the term to disk because setting the vote just below also
       // flushes to disk, and the double fsync doesn't buy us anything.
-      RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1,
+      RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1,
                                               SKIP_FLUSH_TO_DISK));
       RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid()));
     }
@@ -427,7 +446,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
     RETURN_NOT_OK(counter->RegisterVote(peer_uuid(), VOTE_GRANTED, &duplicate));
     CHECK(!duplicate) << LogPrefixUnlocked()
                       << "Inexplicable duplicate self-vote for term "
-                      << GetCurrentTermUnlocked();
+                      << CurrentTermUnlocked();
 
     VoteRequestPB request;
     request.set_ignore_live_leader(mode == ELECT_EVEN_IF_LEADER_IS_ALIVE);
@@ -436,9 +455,9 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
       // In a pre-election, we haven't bumped our own term yet, so we need to be
       // asking for votes for the next term.
       request.set_is_pre_election(true);
-      request.set_candidate_term(GetCurrentTermUnlocked() + 1);
+      request.set_candidate_term(CurrentTermUnlocked() + 1);
     } else {
-      request.set_candidate_term(GetCurrentTermUnlocked());
+      request.set_candidate_term(CurrentTermUnlocked());
     }
     request.set_tablet_id(options_.tablet_id);
     *request.mutable_candidate_status()->mutable_last_received() =
@@ -581,7 +600,7 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
     RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
-    RETURN_NOT_OK(round->CheckBoundTerm(GetCurrentTermUnlocked()));
+    RETURN_NOT_OK(round->CheckBoundTerm(CurrentTermUnlocked()));
     RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
   }
 
@@ -593,7 +612,7 @@ Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRo
   ThreadRestrictions::AssertWaitAllowed();
   LockGuard l(lock_);
   RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
-  round->BindToTerm(GetCurrentTermUnlocked());
+  round->BindToTerm(CurrentTermUnlocked());
   return Status::OK();
 }
 
@@ -669,7 +688,7 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
   // We will process commit notifications while shutting down because a replica
   // which has initiated a Prepare() / Replicate() may eventually commit even if
   // its state has changed after the initial Append() / Update().
-  if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
+  if (PREDICT_FALSE(state_ != kRunning && state_ != kStopping)) {
     LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: "
                                       << "Replica not in running state: "
                                       << State_Name(state_);
@@ -716,7 +735,7 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
   {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
-    int64_t current_term = GetCurrentTermUnlocked();
+    int64_t current_term = CurrentTermUnlocked();
     if (current_term != term) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in "
                                      << "previous term " << term << ", but a leader election "
@@ -899,16 +918,16 @@ Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB*
                                                       ConsensusResponsePB* response) {
   DCHECK(lock_.is_locked());
   // Do term checks first:
-  if (PREDICT_FALSE(request->caller_term() != GetCurrentTermUnlocked())) {
+  if (PREDICT_FALSE(request->caller_term() != CurrentTermUnlocked())) {
 
     // If less, reject.
-    if (request->caller_term() < GetCurrentTermUnlocked()) {
+    if (request->caller_term() < CurrentTermUnlocked()) {
       string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. "
                               "Current term is $2. Ops: $3",
 
                               request->caller_uuid(),
                               request->caller_term(),
-                              GetCurrentTermUnlocked(),
+                              CurrentTermUnlocked(),
                               OpsRangeString(*request));
       LOG_WITH_PREFIX_UNLOCKED(INFO) << msg;
       FillConsensusResponseError(response,
@@ -1371,7 +1390,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
 void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) {
   DCHECK(lock_.is_locked());
   TRACE("Filling consensus response to leader.");
-  response->set_responder_term(GetCurrentTermUnlocked());
+  response->set_responder_term(CurrentTermUnlocked());
   response->mutable_status()->mutable_last_received()->CopyFrom(
       queue_->GetLastOpIdInLog());
   response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(
@@ -1388,7 +1407,9 @@ void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
   StatusToPB(status, error->mutable_status());
 }
 
-Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* response) {
+Status RaftConsensus::RequestVote(const VoteRequestPB* request,
+                                  optional<OpId> tombstone_last_logged_opid,
+                                  VoteResponsePB* response) {
   TRACE_EVENT2("consensus", "RaftConsensus::RequestVote",
                "peer", peer_uuid(),
                "tablet", options_.tablet_id);
@@ -1414,14 +1435,41 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
     // We still need to take the state lock in order to respond with term info, etc.
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
-    RETURN_NOT_OK(CheckRunningUnlocked());
     return RequestVoteRespondIsBusy(request, response);
   }
 
   // Acquire the replica state lock so we can read / modify the consensus state.
   ThreadRestrictions::AssertWaitAllowed();
   LockGuard l(lock_);
-  RETURN_NOT_OK(CheckRunningUnlocked());
+
+  // Ensure our lifecycle state is compatible with voting.
+  // If RaftConsensus is running, we use the latest OpId from the WAL to vote.
+  // Otherwise, we must be voting while tombstoned.
+  OpId local_last_logged_opid;
+  switch (state_) {
+    case kShutdown:
+      return Status::IllegalState("cannot vote while shut down");
+    case kRunning:
+      // Note: it is (theoretically) possible for 'tombstone_last_logged_opid'
+      // to be passed in and by the time we reach here the state is kRunning.
+      // That may occur when a vote request comes in at the end of a tablet
+      // copy and then tablet bootstrap completes quickly. In that case, we
+      // ignore the passed-in value and use the latest OpId from our queue.
+      local_last_logged_opid = queue_->GetLastOpIdInLog();
+      break;
+    default:
+      if (!tombstone_last_logged_opid) {
+        return Status::IllegalState("must be running to vote when last-logged opid is not known");
+      }
+      if (!FLAGS_raft_enable_tombstoned_voting) {
+        return Status::IllegalState("must be running to vote when tombstoned voting is disabled");
+      }
+      local_last_logged_opid = *tombstone_last_logged_opid;
+      LOG_WITH_PREFIX_UNLOCKED(INFO) << "voting while tombstoned based on last-logged opid "
+                                     << local_last_logged_opid;
+      break;
+  }
+  DCHECK(local_last_logged_opid.IsInitialized());
 
   // If the node is not in the configuration, allow the vote (this is required by Raft)
   // but log an informational message anyway.
@@ -1452,12 +1500,12 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
   }
 
   // Candidate is running behind.
-  if (request->candidate_term() < GetCurrentTermUnlocked()) {
+  if (request->candidate_term() < CurrentTermUnlocked()) {
     return RequestVoteRespondInvalidTerm(request, response);
   }
 
   // We already voted this term.
-  if (request->candidate_term() == GetCurrentTermUnlocked() &&
+  if (request->candidate_term() == CurrentTermUnlocked() &&
       HasVotedCurrentTermUnlocked()) {
 
     // Already voted for the same candidate in the current term.
@@ -1471,7 +1519,6 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
 
   // Candidate must have last-logged OpId at least as large as our own to get
   // our vote.
-  OpId local_last_logged_opid = queue_->GetLastOpIdInLog();
   bool vote_yes = !OpIdLessThan(request->candidate_status().last_received(),
                                 local_last_logged_opid);
 
@@ -1480,14 +1527,14 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
   // has actually now successfully become leader of the prior term, in which case
   // bumping our term here would disrupt it.
   if (!request->is_pre_election() &&
-      request->candidate_term() > GetCurrentTermUnlocked()) {
+      request->candidate_term() > CurrentTermUnlocked()) {
     // If we are going to vote for this peer, then we will flush the consensus metadata
     // to disk below when we record the vote, and we can skip flushing the term advancement
     // to disk here.
     auto flush = vote_yes ? SKIP_FLUSH_TO_DISK : FLUSH_TO_DISK;
     RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term(), flush),
         Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1",
-                   GetCurrentTermUnlocked(), request->candidate_term()));
+                   CurrentTermUnlocked(), request->candidate_term()));
   }
 
   if (!vote_yes) {
@@ -1629,7 +1676,7 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
     // we can stick them in the consensus update request later.
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
-    current_term = GetCurrentTermUnlocked();
+    current_term = CurrentTermUnlocked();
     committed_config = cmeta_->CommittedConfig();
     if (cmeta_->has_pending_config()) {
       LOG_WITH_PREFIX_UNLOCKED(WARNING)
@@ -1746,23 +1793,17 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
   return s;
 }
 
-void RaftConsensus::Shutdown() {
+void RaftConsensus::Stop() {
   TRACE_EVENT2("consensus", "RaftConsensus::Shutdown",
                "peer", peer_uuid(),
                "tablet", options_.tablet_id);
 
-  // Avoid taking locks if already shut down so we don't violate
-  // ThreadRestrictions assertions in the case where the RaftConsensus
-  // destructor runs on the reactor thread due to an election callback being
-  // the last outstanding reference.
-  if (shutdown_.Load(kMemOrderAcquire)) return;
-
   {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
-    // Transition to kShuttingDown state.
-    CHECK_NE(kShutDown, state_) << State_Name(state_); // We are protected here by 'shutdown_'.
-    state_ = kShuttingDown;
+    if (state_ == kStopping || state_ == kStopped || state_ == kShutdown) return;
+    // Transition to kStopping state.
+    SetStateUnlocked(kStopping);
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
   }
 
@@ -1776,15 +1817,40 @@ void RaftConsensus::Shutdown() {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
     if (pending_) CHECK_OK(pending_->CancelPendingTransactions());
-    CHECK_EQ(kShuttingDown, state_) << State_Name(state_);
-    state_ = kShutDown;
+    SetStateUnlocked(kStopped);
+
+    // Clear leader status on Stop(), in case this replica was the leader. If
+    // we don't do this, the log messages still show this node as the leader.
+    // No need to sync it since it's not persistent state.
+    if (cmeta_) {
+      ClearLeaderUnlocked();
+    }
+
+    // If we were the leader, stop witholding votes.
+    if (withhold_votes_until_ == MonoTime::Max()) {
+      withhold_votes_until_ = MonoTime::Min();
+    }
+
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!";
   }
 
   // Shut down things that might acquire locks during destruction.
   if (raft_pool_token_) raft_pool_token_->Shutdown();
   if (failure_detector_) DisableFailureDetector();
+}
+
+void RaftConsensus::Shutdown() {
+  // Avoid taking locks if already shut down so we don't violate
+  // ThreadRestrictions assertions in the case where the RaftConsensus
+  // destructor runs on the reactor thread due to an election callback being
+  // the last outstanding reference.
+  if (shutdown_.Load(kMemOrderAcquire)) return;
 
+  Stop();
+  {
+    LockGuard l(lock_);
+    SetStateUnlocked(kShutdown);
+  }
   shutdown_.Store(true, kMemOrderRelease);
 }
 
@@ -1826,13 +1892,13 @@ std::string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB&
 }
 
 void RaftConsensus::FillVoteResponseVoteGranted(VoteResponsePB* response) {
-  response->set_responder_term(GetCurrentTermUnlocked());
+  response->set_responder_term(CurrentTermUnlocked());
   response->set_vote_granted(true);
 }
 
 void RaftConsensus::FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code,
                                                VoteResponsePB* response) {
-  response->set_responder_term(GetCurrentTermUnlocked());
+  response->set_responder_term(CurrentTermUnlocked());
   response->set_vote_granted(false);
   response->mutable_consensus_error()->set_code(error_code);
 }
@@ -1845,7 +1911,7 @@ Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request
                           GetRequestVoteLogPrefixUnlocked(*request),
                           request->candidate_uuid(),
                           request->candidate_term(),
-                          GetCurrentTermUnlocked());
+                          CurrentTermUnlocked());
   LOG(INFO) << msg;
   StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
   return Status::OK();
@@ -1869,7 +1935,7 @@ Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB
                           "Already voted for candidate $3 in this term.",
                           GetRequestVoteLogPrefixUnlocked(*request),
                           request->candidate_uuid(),
-                          GetCurrentTermUnlocked(),
+                          CurrentTermUnlocked(),
                           GetVotedForCurrentTermUnlocked());
   LOG(INFO) << msg;
   StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
@@ -1944,7 +2010,7 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
   LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.",
                           GetRequestVoteLogPrefixUnlocked(*request),
                           request->candidate_uuid(),
-                          GetCurrentTermUnlocked());
+                          CurrentTermUnlocked());
   return Status::OK();
 }
 
@@ -1954,6 +2020,35 @@ RaftPeerPB::Role RaftConsensus::role() const {
   return cmeta_->active_role();
 }
 
+int64_t RaftConsensus::CurrentTerm() const {
+  LockGuard l(lock_);
+  return CurrentTermUnlocked();
+}
+
+void RaftConsensus::SetStateUnlocked(State new_state) {
+  switch (new_state) {
+    case kInitialized:
+      CHECK_EQ(kNew, state_);
+      break;
+    case kRunning:
+      CHECK_EQ(kInitialized, state_);
+      break;
+    case kStopping:
+      CHECK(state_ != kStopped && state_ != kShutdown) << "State = " << State_Name(state_);
+      break;
+    case kStopped:
+      CHECK_EQ(kStopping, state_);
+      break;
+    case kShutdown:
+      CHECK(state_ == kStopped || state_ == kShutdown) << "State = " << State_Name(state_);
+      break;
+    default:
+      LOG(FATAL) << "Disallowed transition to state = " << State_Name(new_state);
+      break;
+  }
+  state_ = new_state;
+}
+
 const char* RaftConsensus::State_Name(State state) {
   switch (state) {
     case kNew:
@@ -1962,9 +2057,11 @@ const char* RaftConsensus::State_Name(State state) {
       return "Initialized";
     case kRunning:
       return "Running";
-    case kShuttingDown:
-      return "Shutting down";
-    case kShutDown:
+    case kStopping:
+      return "Stopping";
+    case kStopped:
+      return "Stopped";
+    case kShutdown:
       return "Shut down";
     default:
       LOG(DFATAL) << "Unknown State value: " << state;
@@ -2018,7 +2115,7 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
   // TODO(todd): should use queue committed index here? in that case do
   // we need to pass it in at all?
   queue_->SetLeaderMode(pending_->GetCommittedIndex(),
-                        GetCurrentTermUnlocked(),
+                        CurrentTermUnlocked(),
                         active_config);
   RETURN_NOT_OK(peer_manager_->UpdateRaftConfig(active_config));
   return Status::OK();
@@ -2045,6 +2142,16 @@ RaftConfigPB RaftConsensus::CommittedConfig() const {
 }
 
 void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
+  RaftPeerPB::Role role;
+  {
+    LockGuard l(lock_);
+    if (state_ != kRunning) {
+      out << "Tablet " << EscapeForHtmlToString(tablet_id()) << " not running" << std::endl;
+      return;
+    }
+    role = cmeta_->active_role();
+  }
+
   out << "<h1>Raft Consensus State</h1>" << std::endl;
 
   out << "<h2>State</h2>" << std::endl;
@@ -2053,12 +2160,6 @@ void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
   out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
 
   // Dump the queues on a leader.
-  RaftPeerPB::Role role;
-  {
-    ThreadRestrictions::AssertWaitAllowed();
-    LockGuard l(lock_);
-    role = cmeta_->active_role();
-  }
   if (role == RaftPeerPB::LEADER) {
     out << "<h2>Queue overview</h2>" << std::endl;
     out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
@@ -2108,7 +2209,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
       // because it already voted in term 2. The check below ensures that peer B
       // will bump to term 2 when it gets the vote rejection, such that its
       // next pre-election (for term 3) would succeed.
-      if (result.highest_voter_term > GetCurrentTermUnlocked()) {
+      if (result.highest_voter_term > CurrentTermUnlocked()) {
         HandleTermAdvanceUnlocked(result.highest_voter_term);
       }
 
@@ -2138,7 +2239,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
     election_started_in_term--;
   }
 
-  if (election_started_in_term != GetCurrentTermUnlocked()) {
+  if (election_started_in_term != CurrentTermUnlocked()) {
     LOG_WITH_PREFIX_UNLOCKED(INFO)
         << "Leader " << election_type << " decision vote started in "
         << "defunct term " << election_started_in_term << ": "
@@ -2350,7 +2451,7 @@ void RaftConsensus::DisableFailureDetector() {
 
 void RaftConsensus::SnoozeFailureDetector(AllowLogging allow_logging,
                                           boost::optional<MonoDelta> delta) {
-  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
+  if (PREDICT_TRUE(failure_detector_ && FLAGS_enable_leader_failure_detection)) {
     if (allow_logging == ALLOW_LOGGING) {
       LOG(INFO) << LogPrefixThreadSafe()
                 << Substitute("Snoozing failure detection for $0",
@@ -2393,13 +2494,13 @@ MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
 Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
                                                 FlushToDisk flush) {
   DCHECK(lock_.is_locked());
-  if (new_term <= GetCurrentTermUnlocked()) {
+  if (new_term <= CurrentTermUnlocked()) {
     return Status::IllegalState(Substitute("Can't advance term to: $0 current term: $1 is higher.",
-                                           new_term, GetCurrentTermUnlocked()));
+                                           new_term, CurrentTermUnlocked()));
   }
   if (cmeta_->active_role() == RaftPeerPB::LEADER) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Stepping down as leader of term "
-                                   << GetCurrentTermUnlocked();
+                                   << CurrentTermUnlocked();
     RETURN_NOT_OK(BecomeReplicaUnlocked());
   }
 
@@ -2508,10 +2609,10 @@ Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term,
   TRACE_EVENT1("consensus", "RaftConsensus::SetCurrentTermUnlocked",
                "term", new_term);
   DCHECK(lock_.is_locked());
-  if (PREDICT_FALSE(new_term <= GetCurrentTermUnlocked())) {
+  if (PREDICT_FALSE(new_term <= CurrentTermUnlocked())) {
     return Status::IllegalState(
         Substitute("Cannot change term to a term that is lower than or equal to the current one. "
-                   "Current: $0, Proposed: $1", GetCurrentTermUnlocked(), new_term));
+                   "Current: $0, Proposed: $1", CurrentTermUnlocked(), new_term));
   }
   cmeta_->set_current_term(new_term);
   cmeta_->clear_voted_for();
@@ -2522,7 +2623,7 @@ Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term,
   return Status::OK();
 }
 
-const int64_t RaftConsensus::GetCurrentTermUnlocked() const {
+const int64_t RaftConsensus::CurrentTermUnlocked() const {
   DCHECK(lock_.is_locked());
   return cmeta_->current_term();
 }
@@ -2574,11 +2675,14 @@ string RaftConsensus::LogPrefix() const {
 
 string RaftConsensus::LogPrefixUnlocked() const {
   DCHECK(lock_.is_locked());
-  return Substitute("T $0 P $1 [term $2 $3]: ",
-                    options_.tablet_id,
-                    peer_uuid(),
-                    GetCurrentTermUnlocked(),
-                    RaftPeerPB::Role_Name(cmeta_->active_role()));
+  // 'cmeta_' may not be set if initialization failed.
+  string cmeta_info;
+  if (cmeta_) {
+    cmeta_info = Substitute(" [term $0 $1]",
+                            cmeta_->current_term(),
+                            RaftPeerPB::Role_Name(cmeta_->active_role()));
+  }
+  return Substitute("T $0 P $1$2: ", options_.tablet_id, peer_uuid(), cmeta_info);
 }
 
 string RaftConsensus::LogPrefixThreadSafe() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index efcc06e..ee61fff 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -44,6 +44,7 @@
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/make_shared.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/random.h"
@@ -126,16 +127,14 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
     EXTERNAL_REQUEST
   };
 
-  RaftConsensus(ConsensusOptions options,
-                RaftPeerPB local_peer_pb,
-                scoped_refptr<ConsensusMetadataManager> cmeta_manager,
-                ThreadPool* raft_pool);
   ~RaftConsensus();
 
-  // Initializes the RaftConsensus object. This should be called before
-  // publishing this object to any thread other than the thread that invoked
-  // the constructor.
-  Status Init();
+  // Factory method to construct and initialize a RaftConsensus instance.
+  static Status Create(ConsensusOptions options,
+                       RaftPeerPB local_peer_pb,
+                       scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+                       ThreadPool* raft_pool,
+                       std::shared_ptr<RaftConsensus>* consensus_out);
 
   // Starts running the Raft consensus algorithm.
   // Start() is not thread-safe. Calls to Start() should be externally
@@ -245,7 +244,12 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   // Messages sent from CANDIDATEs to voting peers to request their vote
   // in leader election.
+  //
+  // If 'tombstone_last_logged_opid' is set, this replica will attempt to vote
+  // in kInitialized and kStopped states, instead of just in the kRunning
+  // state.
   Status RequestVote(const VoteRequestPB* request,
+                     boost::optional<OpId> tombstone_last_logged_opid,
                      VoteResponsePB* response);
 
   // Implement a ChangeConfig() request.
@@ -265,6 +269,9 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // Returns the current Raft role of this instance.
   RaftPeerPB::Role role() const;
 
+  // Returns the current term.
+  int64_t CurrentTerm() const;
+
   // Returns the uuid of this peer.
   // Thread-safe.
   const std::string& peer_uuid() const;
@@ -283,7 +290,14 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   void DumpStatusHtml(std::ostream& out) const;
 
-  // Stop running the Raft consensus algorithm.
+  // Transition to kStopped state. See State enum definition for details.
+  // This is a no-op if the tablet is already in kStopped or kShutdown state;
+  // otherwise, Raft will pass through the kStopping state on the way to
+  // kStopped.
+  void Stop();
+
+  // Transition to kShutdown state. See State enum definition for details.
+  // It is legal to call this method while in any lifecycle state.
   void Shutdown();
 
   // Makes this peer advance it's term (and step down if leader), for tests.
@@ -317,7 +331,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   log::RetentionIndexes GetRetentionIndexes();
 
  private:
-  friend class RefCountedThreadSafe<RaftConsensus>;
+  ALLOW_MAKE_SHARED(RaftConsensus);
   friend class RaftConsensusQuorumTest;
   FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind);
   FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind);
@@ -325,26 +339,40 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty);
   FRIEND_TEST(RaftConsensusQuorumTest, TestRequestVote);
 
+  // RaftConsensus lifecycle states.
+  //
+  // Legal state transitions:
+  //
+  //   kNew -> kInitialized -+-> kRunning -> kStopping -> kStopped -> kShutdown
+  //                          `----------------^
+  //
   // NOTE: When adding / changing values in this enum, add the corresponding
-  // values to State_Name().
+  // values to State_Name() as well.
+  //
   enum State {
-    // RaftConsensus has been freshly constructed.
+    // The RaftConsensus object has been freshly constructed and is not yet
+    // initialized. A RaftConsensus object will never be made externally
+    // visible in this state.
     kNew,
 
-    // RaftConsensus has been initialized.
+    // Raft has been initialized. It cannot accept writes, but it may be able
+    // to vote. See RequestVote() for details.
     kInitialized,
 
-    // State signaling the replica accepts requests (from clients
-    // if leader, from leader if follower)
+    // Raft is running normally and will accept write requests and vote
+    // requests.
     kRunning,
 
-    // State signaling that the replica is shutting down and no longer accepting
-    // new transactions or commits.
-    kShuttingDown,
+    // Raft is in the process of stopping and will not accept writes. Voting
+    // may still be allowed. See RequestVote() for details.
+    kStopping,
+
+    // Raft is stopped and no longer accepting writes. However, voting may
+    // still be allowed; See RequestVote() for details.
+    kStopped,
 
-    // State signaling the replica is shut down and does not accept
-    // any more requests.
-    kShutDown,
+    // Raft is fully shut down and cannot accept writes or vote requests.
+    kShutdown,
   };
 
   // Control whether printing of log messages should be done for a particular
@@ -376,6 +404,19 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   using LockGuard = std::lock_guard<simple_spinlock>;
   using UniqueLock = std::unique_lock<simple_spinlock>;
 
+  RaftConsensus(ConsensusOptions options,
+                RaftPeerPB local_peer_pb,
+                scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+                ThreadPool* raft_pool);
+
+  // Initializes the RaftConsensus object, including loading the consensus
+  // metadata.
+  Status Init();
+
+  // Change the lifecycle state of RaftConsensus. The definition of the State
+  // enum documents legal state transitions.
+  void SetStateUnlocked(State new_state);
+
   // Returns string description for State enum value.
   static const char* State_Name(State state);
 
@@ -658,7 +699,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
                                 FlushToDisk flush) WARN_UNUSED_RESULT;
 
   // Returns the term set in the last config change round.
-  const int64_t GetCurrentTermUnlocked() const;
+  const int64_t CurrentTermUnlocked() const;
 
   // Accessors for the leader of the current term.
   std::string GetLeaderUuidUnlocked() const;
@@ -761,6 +802,9 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   Callback<void(const std::string& reason)> mark_dirty_clbk_;
 
+  // A flag to help us avoid taking a lock on the reactor thread if the object
+  // is already in kShutdown state.
+  // TODO(mpercy): Try to get rid of this extra flag.
   AtomicBool shutdown_;
 
   // The number of times Update() has been called, used for some test assertions.

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 45d5cec..a6ecf6f 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -194,13 +194,12 @@ class RaftConsensusQuorumTest : public KuduTest {
       RaftPeerPB local_peer_pb;
       RETURN_NOT_OK(GetRaftConfigMember(config_, fs_managers_[i]->uuid(), &local_peer_pb));
 
-      shared_ptr<RaftConsensus> peer(
-          new RaftConsensus(options_,
-                            config_.peers(i),
-                            cmeta_managers_[i],
-                            raft_pool_.get()));
-      RETURN_NOT_OK(peer->Init());
-
+      shared_ptr<RaftConsensus> peer;
+      RETURN_NOT_OK(RaftConsensus::Create(options_,
+                                          config_.peers(i),
+                                          cmeta_managers_[i],
+                                          raft_pool_.get(),
+                                          &peer));
       peers_->AddPeer(config_.peers(i).permanent_uuid(), peer);
     }
     return Status::OK();
@@ -1062,7 +1061,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   VoteResponsePB response;
   request.set_candidate_uuid(fs_managers_[0]->uuid());
   request.set_candidate_term(last_op_id.term() + 1);
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_FALSE(response.vote_granted());
   ASSERT_EQ(ConsensusErrorPB::LEADER_IS_ALIVE, response.consensus_error().code());
   ASSERT_EQ(0, flush_count() - flush_count_before)
@@ -1074,7 +1073,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   // This will allow the rest of the requests in the test to go through.
   flush_count_before = flush_count();
   request.set_ignore_live_leader(true);
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_TRUE(response.vote_granted());
   ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
   ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1,
@@ -1085,7 +1084,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   // Ensure we get same response for same term and same UUID.
   response.Clear();
   flush_count_before = flush_count();
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_TRUE(response.vote_granted());
   ASSERT_EQ(0, flush_count() - flush_count_before)
       << "Confirming a previous vote should not flush";
@@ -1094,7 +1093,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   flush_count_before = flush_count();
   response.Clear();
   request.set_candidate_uuid(fs_managers_[2]->uuid());
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_FALSE(response.vote_granted());
   ASSERT_TRUE(response.has_consensus_error());
   ASSERT_EQ(ConsensusErrorPB::ALREADY_VOTED, response.consensus_error().code());
@@ -1114,7 +1113,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   request.set_candidate_uuid(fs_managers_[0]->uuid());
   request.set_candidate_term(last_op_id.term() + 2);
   response.Clear();
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_TRUE(response.vote_granted());
   ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
   ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2,
@@ -1128,7 +1127,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   flush_count_before = flush_count();
   request.set_candidate_term(last_op_id.term() + 1);
   response.Clear();
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_FALSE(response.vote_granted());
   ASSERT_TRUE(response.has_consensus_error());
   ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, response.consensus_error().code());
@@ -1144,7 +1143,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   request.set_candidate_term(last_op_id.term() + 3);
   request.set_is_pre_election(true);
   response.Clear();
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_TRUE(response.vote_granted());
   ASSERT_FALSE(response.has_consensus_error());
   ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
@@ -1163,7 +1162,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   request.set_candidate_term(last_op_id.term() + 3);
   request.mutable_candidate_status()->mutable_last_received()->CopyFrom(MinimumOpId());
   response.Clear();
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_FALSE(response.vote_granted());
   ASSERT_TRUE(response.has_consensus_error());
   ASSERT_EQ(ConsensusErrorPB::LAST_OPID_TOO_OLD, response.consensus_error().code());

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index d13d396..4920e88 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -95,6 +95,8 @@ ADD_KUDU_TEST(tablet_copy-itest)
 ADD_KUDU_TEST(tablet_copy_client_session-itest)
 ADD_KUDU_TEST(tablet_history_gc-itest)
 ADD_KUDU_TEST(tablet_replacement-itest)
+ADD_KUDU_TEST(tombstoned_voting-itest)
+ADD_KUDU_TEST(tombstoned_voting-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(token_signer-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(ts_recovery-itest)
 ADD_KUDU_TEST(ts_tablet_manager-itest)

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index 051c8b5..7d6f55a 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -73,6 +73,8 @@ using consensus::OpIdType;
 using consensus::RaftPeerPB;
 using consensus::RunLeaderElectionResponsePB;
 using consensus::RunLeaderElectionRequestPB;
+using consensus::VoteRequestPB;
+using consensus::VoteResponsePB;
 using consensus::kInvalidOpIdIndex;
 using master::ListTabletServersResponsePB;
 using master::ListTabletServersResponsePB_Entry;
@@ -601,6 +603,33 @@ Status StartElection(const TServerDetails* replica,
   return Status::OK();
 }
 
+Status RequestVote(const TServerDetails* replica,
+                   const std::string& tablet_id,
+                   const std::string& candidate_uuid,
+                   int64_t candidate_term,
+                   const consensus::OpId& last_logged_opid,
+                   boost::optional<bool> ignore_live_leader,
+                   boost::optional<bool> is_pre_election,
+                   const MonoDelta& timeout) {
+  DCHECK(last_logged_opid.IsInitialized());
+  VoteRequestPB req;
+  req.set_dest_uuid(replica->uuid());
+  req.set_tablet_id(tablet_id);
+  req.set_candidate_uuid(candidate_uuid);
+  req.set_candidate_term(candidate_term);
+  *req.mutable_candidate_status()->mutable_last_received() = last_logged_opid;
+  if (ignore_live_leader) req.set_ignore_live_leader(*ignore_live_leader);
+  if (is_pre_election) req.set_is_pre_election(*is_pre_election);
+  VoteResponsePB resp;
+  RpcController rpc;
+  rpc.set_timeout(timeout);
+  RETURN_NOT_OK(replica->consensus_proxy->RequestConsensusVote(req, &resp, &rpc));
+  if (resp.has_vote_granted() && resp.vote_granted()) return Status::OK();
+  if (resp.has_error()) return StatusFromPB(resp.error().status());
+  if (resp.has_consensus_error()) return StatusFromPB(resp.consensus_error().status());
+  return Status::IllegalState("Unknown error");
+}
+
 Status LeaderStepDown(const TServerDetails* replica,
                       const string& tablet_id,
                       const MonoDelta& timeout,

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index e8e49b5..49bf0ff 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -233,6 +233,18 @@ Status StartElection(const TServerDetails* replica,
                      const std::string& tablet_id,
                      const MonoDelta& timeout);
 
+// Request the given replica to vote. This is thin wrapper around
+// RequestConsensusVote(). See the definition of VoteRequestPB in
+// consensus.proto for parameter details.
+Status RequestVote(const TServerDetails* replica,
+                   const std::string& tablet_id,
+                   const std::string& candidate_uuid,
+                   int64_t candidate_term,
+                   const consensus::OpId& last_logged_opid,
+                   boost::optional<bool> ignore_live_leader,
+                   boost::optional<bool> is_pre_election,
+                   const MonoDelta& timeout);
+
 // Cause a leader to step down on the specified server.
 // 'timeout' refers to the RPC timeout waiting synchronously for stepdown to
 // complete on the leader side. Since that does not require communication with

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/delete_table-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_table-itest.cc b/src/kudu/integration-tests/delete_table-itest.cc
index 5fdcc9d..105fd77 100644
--- a/src/kudu/integration-tests/delete_table-itest.cc
+++ b/src/kudu/integration-tests/delete_table-itest.cc
@@ -817,14 +817,22 @@ TEST_F(DeleteTableITest, TestMergeConsensusMetadata) {
                                            TABLET_DATA_TOMBSTONED, boost::none, timeout));
   NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
 
+  // Shut down the tablet server so it won't vote while tombstoned.
+  cluster_->tablet_server(kTsIndex)->Shutdown();
+
   ASSERT_OK(cluster_->tablet_server(1)->Restart());
   ASSERT_OK(cluster_->tablet_server(2)->Restart());
   NO_FATALS(WaitUntilTabletRunning(1, tablet_id));
   NO_FATALS(WaitUntilTabletRunning(2, tablet_id));
   ASSERT_OK(itest::StartElection(leader, tablet_id, timeout));
+  ASSERT_OK(itest::WaitUntilLeader(leader, tablet_id, timeout));
+
+  // Now restart the replica. It will get tablet copied by the leader.
+  ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
   ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY }));
 
-  // The election history should have been wiped out.
+  // The election history should have been wiped out for the new term, since
+  // this node did not participate.
   ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb));
   ASSERT_EQ(3, cmeta_pb.current_term());
   ASSERT_TRUE(!cmeta_pb.has_voted_for()) << SecureShortDebugString(cmeta_pb);
@@ -1066,7 +1074,8 @@ TEST_F(DeleteTableITest, TestWebPageForTombstonedTablet) {
         cluster_->tablet_server(0)->bound_http_hostport().ToString(),
         page,
         tablet_id), &buf));
-    ASSERT_STR_CONTAINS(buf.ToString(), tablet_id);
+    ASSERT_STR_CONTAINS(buf.ToString(), tablet_id)
+        << "Page: " << page << "; tablet_id: " << tablet_id;
   }
 }
 
@@ -1292,7 +1301,7 @@ TEST_P(DeleteTableTombstonedParamTest, TestTabletTombstone) {
   ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets));
   for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
     if (t.tablet_status().tablet_id() == tablet_id) {
-      ASSERT_EQ(tablet::SHUTDOWN, t.tablet_status().state());
+      ASSERT_EQ(tablet::STOPPED, t.tablet_status().state());
       ASSERT_EQ(TABLET_DATA_TOMBSTONED, t.tablet_status().tablet_data_state())
           << t.tablet_status().tablet_id() << " not tombstoned";
     }
@@ -1313,10 +1322,10 @@ TEST_P(DeleteTableTombstonedParamTest, TestTabletTombstone) {
   NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
   // The tombstoned tablets will still show up in ListTablets(),
   // just with their data state set as TOMBSTONED. They should also be listed
-  // as NOT_STARTED because we restarted the server.
+  // as INITIALIZED because we restarted the server.
   ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets));
   for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
-    ASSERT_EQ(tablet::NOT_STARTED, t.tablet_status().state());
+    ASSERT_EQ(tablet::INITIALIZED, t.tablet_status().state());
     ASSERT_EQ(TABLET_DATA_TOMBSTONED, t.tablet_status().tablet_data_state())
         << t.tablet_status().tablet_id() << " not tombstoned";
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster-itest-base.cc b/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
index 71d7573..94c313e 100644
--- a/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
+++ b/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
@@ -22,6 +22,7 @@
 #include <string>
 #include <vector>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -33,6 +34,9 @@
 #include "kudu/util/pstack_watcher.h"
 #include "kudu/util/test_macros.h"
 
+DEFINE_bool(test_dump_stacks_on_failure, true,
+            "Whether to dump ExternalMiniCluster process stacks on test failure");
+
 namespace kudu {
 
 void ExternalMiniClusterITestBase::TearDown() {
@@ -69,7 +73,7 @@ void ExternalMiniClusterITestBase::StopCluster() {
     return;
   }
 
-  if (HasFatalFailure()) {
+  if (HasFatalFailure() && FLAGS_test_dump_stacks_on_failure) {
     LOG(INFO) << "Found fatal failure";
     for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
       if (!cluster_->tablet_server(i)->IsProcessAlive()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/tombstoned_voting-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tombstoned_voting-itest.cc b/src/kudu/integration-tests/tombstoned_voting-itest.cc
new file mode 100644
index 0000000..b723738
--- /dev/null
+++ b/src/kudu/integration-tests/tombstoned_voting-itest.cc
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/internal_mini_cluster.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(allow_unsafe_replication_factor);
+DECLARE_bool(enable_tablet_copy);
+DECLARE_bool(raft_enable_tombstoned_voting);
+
+using kudu::consensus::MakeOpId;
+using kudu::consensus::LeaderStepDownResponsePB;
+using kudu::consensus::OpId;
+using kudu::consensus::RECEIVED_OPID;
+using kudu::consensus::RaftConsensus;
+using kudu::consensus::RaftPeerPB;
+using kudu::itest::DeleteTablet;
+using kudu::itest::TServerDetails;
+using kudu::itest::WaitForServersToAgree;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using kudu::tablet::TabletReplica;
+using kudu::tablet::TabletStatePB;
+using kudu::tserver::TabletServerErrorPB;
+using kudu::tserver::TSTabletManager;
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+class TombstonedVotingITest : public MiniClusterITestBase {
+};
+
+// Ensure that a tombstoned replica cannot vote after we call Shutdown() on it.
+TEST_F(TombstonedVotingITest, TestNoVoteAfterShutdown) {
+  // This test waits for several seconds, so only run it in slow mode.
+  if (!AllowSlowTests()) return;
+
+  FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+  FLAGS_enable_tablet_copy = false; // Tablet copy would interfere with this test.
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 50) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  workload.StopAndJoin();
+
+  // Figure out the tablet id to mess with.
+  vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+  ASSERT_EQ(1, tablet_ids.size());
+  const string& tablet_id = tablet_ids[0];
+
+  // Ensure all servers are up to date.
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  // Manually tombstone the replica on TS1, start an election on TS0, and wait
+  // until TS0 gets elected. If TS0 gets elected then TS1 was able to vote
+  // while tombstoned.
+  TSTabletManager* ts_tablet_manager = cluster_->mini_tablet_server(1)->server()->tablet_manager();
+  scoped_refptr<TabletReplica> ts1_replica;
+  ASSERT_OK(ts_tablet_manager->GetTabletReplica(tablet_id, &ts1_replica));
+
+  // Tombstone TS1's replica.
+  LOG(INFO) << "Tombstoning ts1...";
+  boost::optional<TabletServerErrorPB::Code> error_code;
+  ASSERT_OK(ts_tablet_manager->DeleteTablet(tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
+                                            &error_code));
+  ASSERT_EQ(TabletStatePB::STOPPED, ts1_replica->state());
+
+  scoped_refptr<TabletReplica> ts0_replica;
+  ASSERT_OK(cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplica(
+      tablet_id, &ts0_replica));
+  LeaderStepDownResponsePB resp;
+  ts0_replica->consensus()->StepDown(&resp); // Ignore result, in case TS1 was the leader.
+  ASSERT_EQ(RaftPeerPB::FOLLOWER, ts0_replica->consensus()->role());
+  ASSERT_OK(ts0_replica->consensus()->StartElection(
+      RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, RaftConsensus::EXTERNAL_REQUEST));
+
+  // Wait until TS0 is leader.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(RaftPeerPB::LEADER, ts0_replica->consensus()->role());
+  });
+
+  // Now shut down TS1. This will ensure that TS0 cannot get re-elected.
+  LOG(INFO) << "Shutting down ts1...";
+  ts1_replica->Shutdown();
+
+  // Start another election and wait for some time to see if it can get elected.
+  ASSERT_OK(ts0_replica->consensus()->StepDown(&resp));
+  ASSERT_OK(ts0_replica->consensus()->StartElection(
+      RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, RaftConsensus::EXTERNAL_REQUEST));
+
+  // Wait for some time to ensure TS0 cannot get elected.
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+  while (MonoTime::Now() < deadline) {
+    ASSERT_EQ(RaftPeerPB::FOLLOWER, ts0_replica->consensus()->role());
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+}
+
+// Test that a tombstoned replica will vote correctly.
+// This is implemented by directly exercising the RPC API with different vote request parameters.
+TEST_F(TombstonedVotingITest, TestVotingLogic) {
+  // This test waits for several seconds, so only run it in slow mode.
+  if (!AllowSlowTests()) return;
+
+  FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+  FLAGS_enable_tablet_copy = false; // Tablet copy would interfere with this test.
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 50) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  workload.StopAndJoin();
+
+  // Figure out the tablet id to mess with.
+  vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+  ASSERT_EQ(1, tablet_ids.size());
+  const string& tablet_id = tablet_ids[0];
+
+  // Ensure all servers are up to date.
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  // Shut down TS0 so it doesn't interfere with our testing.
+  cluster_->mini_tablet_server(0)->Shutdown();
+
+  // Figure out the last logged opid of TS1.
+  OpId last_logged_opid;
+  ASSERT_OK(itest::GetLastOpIdForReplica(tablet_id,
+                                         ts_map_[cluster_->mini_tablet_server(1)->uuid()],
+                                         RECEIVED_OPID,
+                                         kTimeout,
+                                         &last_logged_opid));
+
+  // Tombstone TS1 (actually, the tablet replica hosted on TS1).
+  ASSERT_OK(itest::DeleteTablet(ts_map_[cluster_->mini_tablet_server(1)->uuid()], tablet_id,
+                                TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+  // Loop this series of tests twice: the first time without restarting the TS,
+  // the 2nd time after a restart.
+  for (int i = 0; i < 2; i++) {
+    if (i == 1) {
+      // Restart tablet server #1 on the 2nd loop.
+      LOG(INFO) << "Restarting TS1...";
+      cluster_->mini_tablet_server(1)->Shutdown();
+      ASSERT_OK(cluster_->mini_tablet_server(1)->Restart());
+      ASSERT_OK(cluster_->mini_tablet_server(1)->WaitStarted());
+    }
+
+    scoped_refptr<TabletReplica> replica;
+    ASSERT_OK(cluster_->mini_tablet_server(1)->server()->tablet_manager()->GetTabletReplica(
+        tablet_id, &replica));
+    ASSERT_EQ(i == 0 ? tablet::STOPPED : tablet::INITIALIZED, replica->state());
+
+    int64_t current_term = replica->consensus()->CurrentTerm();
+    current_term++;
+
+    // Ask TS1 for a vote that should be granted (new term, acceptable opid).
+    // Note: peers are required to vote regardless of whether they recognize the
+    // candidate's UUID or not, so the ID used here ("A") is not important.
+    TServerDetails* ts1_ets = ts_map_[cluster_->mini_tablet_server(1)->uuid()];
+    ASSERT_OK(itest::RequestVote(ts1_ets, tablet_id, "A", current_term, last_logged_opid,
+                                /*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout))
+
+    // Ask TS1 for a vote that should be denied (different candidate, same term).
+    Status s = itest::RequestVote(ts1_ets, tablet_id, "B", current_term, last_logged_opid,
+                                  /*ignore_live_leader=*/ true, /*is_pre_election=*/ false,
+                                  kTimeout);
+    ASSERT_TRUE(s.IsInvalidArgument());
+    ASSERT_STR_CONTAINS(s.ToString(), "Already voted for candidate A in this term");
+
+    // Ask TS1 for a vote that should be denied (old term).
+    s = itest::RequestVote(ts1_ets, tablet_id, "B", current_term - 1, last_logged_opid,
+                          /*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout);
+    ASSERT_TRUE(s.IsInvalidArgument());
+    ASSERT_STR_MATCHES(s.ToString(), "Denying vote to candidate B for earlier term");
+
+    // Increment term.
+    current_term++;
+    OpId old_opid = MakeOpId(last_logged_opid.term(), last_logged_opid.index() - 1);
+
+    // Ask TS1 for a vote that should be denied (old last-logged opid).
+    s = itest::RequestVote(ts1_ets, tablet_id, "B", current_term, old_opid,
+                          /*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout);
+    ASSERT_TRUE(s.IsInvalidArgument());
+    ASSERT_STR_MATCHES(s.ToString(),
+                      "Denying vote to candidate B.*greater than that of the candidate");
+
+    // Ask for a successful vote for candidate B.
+    ASSERT_OK(itest::RequestVote(ts1_ets, tablet_id, "B", current_term, last_logged_opid,
+                                /*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout))
+  }
+}
+
+// Disable tombstoned voting and ensure that an election that would require it fails.
+TEST_F(TombstonedVotingITest, TestNoVoteIfTombstonedVotingDisabled) {
+  // This test waits for several seconds, so only run it in slow mode.
+  if (!AllowSlowTests()) return;
+
+  FLAGS_raft_enable_tombstoned_voting = false; // Disable tombstoned voting.
+  FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+  FLAGS_enable_tablet_copy = false; // Tablet copy would interfere with this test.
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 50) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  workload.StopAndJoin();
+
+  // Figure out the tablet id to mess with.
+  vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+  ASSERT_EQ(1, tablet_ids.size());
+  const string& tablet_id = tablet_ids[0];
+
+  // Ensure all servers are up to date.
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  // Tombstone TS1 and try to get TS0 to vote for it.
+  TServerDetails* ts1 = ts_map_[cluster_->mini_tablet_server(1)->uuid()];
+  ASSERT_OK(DeleteTablet(ts1, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+  scoped_refptr<TabletReplica> ts0_replica;
+  ASSERT_OK(cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplica(
+      tablet_id, &ts0_replica));
+  LeaderStepDownResponsePB resp;
+  ts0_replica->consensus()->StepDown(&resp); // Ignore result, in case TS1 was the leader.
+  ASSERT_OK(ts0_replica->consensus()->StartElection(
+      RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, RaftConsensus::EXTERNAL_REQUEST));
+
+  // Wait for some time to ensure TS0 cannot get elected.
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+  while (MonoTime::Now() < deadline) {
+    ASSERT_EQ(RaftPeerPB::FOLLOWER, ts0_replica->consensus()->role());
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+}
+
+// Test that a replica will not vote while tombstoned if it was deleted while
+// the last-logged opid was unknown. This may occur if a tablet is tombstoned
+// while in a FAILED state.
+TEST_F(TombstonedVotingITest, TestNoVoteIfNoLastLoggedOpId) {
+  if (!AllowSlowTests()) return; // This test waits for several seconds.
+
+  FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 50) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+
+  tserver::MiniTabletServer* ts0 = cluster_->mini_tablet_server(0);
+  string ts0_uuid = ts0->uuid();
+  tserver::MiniTabletServer* ts1 = cluster_->mini_tablet_server(1);
+  string ts1_uuid = ts0->uuid();
+
+  // Determine the tablet id.
+  vector<string> tablet_ids = ts0->ListTablets();
+  ASSERT_EQ(1, tablet_ids.size());
+  const string& tablet_id = tablet_ids[0];
+
+  // Ensure all servers are in sync.
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  // Shut down each TS, then corrupt the TS0 cmeta.
+  string ts0_cmeta_path = ts0->server()->fs_manager()->GetConsensusMetadataPath(tablet_id);
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    cluster_->mini_tablet_server(i)->Shutdown();
+  }
+
+  std::unique_ptr<WritableFile> file;
+  ASSERT_OK(env_->NewWritableFile(ts0_cmeta_path, &file));
+  ASSERT_OK(file->Append("\0"));
+  ASSERT_OK(file->Close());
+
+  // Restart each TS so it comes back up on the same ports.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(cluster_->mini_tablet_server(i)->Restart());
+  }
+
+  // Wait until the tablet is in FAILED state.
+  ASSERT_OK(itest::WaitUntilTabletInState(ts_map_[ts0_uuid], tablet_id, TabletStatePB::FAILED,
+                                          kTimeout));
+  scoped_refptr<TabletReplica> replica;
+  ASSERT_TRUE(ts0->server() != nullptr);
+  ASSERT_TRUE(ts0->server()->tablet_manager() != nullptr);
+  ASSERT_TRUE(ts0->server()->tablet_manager()->LookupTablet(tablet_id, &replica));
+  ASSERT_EQ(tablet::FAILED, replica->state());
+
+  // Now tombstone the failed replica on TS0.
+  ASSERT_OK(itest::DeleteTablet(ts_map_[ts0_uuid], tablet_id,
+                                TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+  // Wait until TS1 is running.
+  ASSERT_EVENTUALLY([&] {
+    TSTabletManager* tablet_manager = ts1->server()->tablet_manager();
+    ASSERT_TRUE(tablet_manager->LookupTablet(tablet_id, &replica));
+    ASSERT_EQ(tablet::RUNNING, replica->state());
+  });
+
+  // Ensure that TS1 cannot become leader because TS0 will not vote.
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+  while (MonoTime::Now() < deadline) {
+    scoped_refptr<TabletReplica> replica;
+    TSTabletManager* tablet_manager = ts1->server()->tablet_manager();
+    ASSERT_TRUE(tablet_manager != nullptr);
+    ASSERT_TRUE(tablet_manager->LookupTablet(tablet_id, &replica));
+    std::shared_ptr<RaftConsensus> consensus = replica->shared_consensus();
+    if (consensus) {
+      ASSERT_EQ(RaftPeerPB::FOLLOWER, consensus->role());
+    }
+  }
+}
+
+enum RestartAfterTombstone {
+  kNoRestart,
+  kRestart,
+};
+
+class TsRecoveryTombstonedITest : public MiniClusterITestBase,
+                                  public ::testing::WithParamInterface<RestartAfterTombstone> {
+};
+
+INSTANTIATE_TEST_CASE_P(Restart, TsRecoveryTombstonedITest,
+                        ::testing::Values(kNoRestart, kRestart));
+
+// Basic tombstoned voting test.
+TEST_P(TsRecoveryTombstonedITest, TestTombstonedVoter) {
+  const RestartAfterTombstone to_restart = GetParam();
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+  NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 50) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  workload.StopAndJoin();
+
+  // Figure out the tablet id to Tablet Copy.
+  vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+  ASSERT_EQ(1, tablet_ids.size());
+  const string& tablet_id = tablet_ids[0];
+
+  // Ensure all servers are up to date.
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  auto live_ts_map = ts_map_;
+  ASSERT_EQ(1, live_ts_map.erase(cluster_->mini_tablet_server(1)->uuid()));
+
+  // Shut down TS 0 then tombstone TS 1. Restart TS 0.
+  // TS 0 should get a vote from TS 1 and then make a copy on TS 1, bringing
+  // the cluster back up to full strength.
+  LOG(INFO) << "shutting down TS " << cluster_->mini_tablet_server(0)->uuid();
+  cluster_->mini_tablet_server(0)->Shutdown();
+
+  LOG(INFO) << "tombstoning replica on TS " << cluster_->mini_tablet_server(1)->uuid();
+  TServerDetails* ts1 = ts_map_[cluster_->mini_tablet_server(1)->uuid()];
+  ASSERT_OK(DeleteTablet(ts1, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+  if (to_restart == kRestart) {
+    LOG(INFO) << "restarting tombstoned TS " << cluster_->mini_tablet_server(1)->uuid();
+    cluster_->mini_tablet_server(1)->Shutdown();
+    ASSERT_OK(cluster_->mini_tablet_server(1)->Restart());
+  }
+
+  LOG(INFO) << "restarting TS " << cluster_->mini_tablet_server(1)->uuid();
+  ASSERT_OK(cluster_->mini_tablet_server(0)->Restart());
+
+  // Wait for the tablet copy to complete.
+  LOG(INFO) << "waiting for leader election and tablet copy to complete...";
+  ASSERT_OK(WaitForServersToAgree(kTimeout, live_ts_map, tablet_id, workload.batches_completed()));
+
+  LOG(INFO) << "attempting to write a few more rows...";
+
+  // Write a little bit more.
+  int target_rows = workload.rows_inserted() + 100;
+  workload.Start();
+  while (workload.rows_inserted() < target_rows) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  workload.StopAndJoin();
+
+  // Do a final verification that the servers match.
+  LOG(INFO) << "waiting for final agreement...";
+  ASSERT_OK(WaitForServersToAgree(kTimeout, live_ts_map, tablet_id, workload.batches_completed()));
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/tombstoned_voting-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tombstoned_voting-stress-test.cc b/src/kudu/integration-tests/tombstoned_voting-stress-test.cc
new file mode 100644
index 0000000..2c1cecb
--- /dev/null
+++ b/src/kudu/integration-tests/tombstoned_voting-stress-test.cc
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/external_mini_cluster.h"
+#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(test_num_iterations, 5,
+             "Number of tombstoned voting stress test iterations");
+
+using kudu::consensus::COMMITTED_OPID;
+using kudu::consensus::OpId;
+using kudu::itest::DeleteTablet;
+using kudu::itest::TServerDetails;
+using kudu::itest::WaitForServersToAgree;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using std::atomic;
+using std::string;
+using std::thread;
+using std::unique_lock;
+using std::vector;
+
+namespace kudu {
+
+static const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+class TombstonedVotingStressTest : public ExternalMiniClusterITestBase {
+ public:
+  TombstonedVotingStressTest()
+      : num_workers_(1),
+        cond_all_workers_blocked_(&lock_),
+        cond_workers_unblocked_(&lock_),
+        current_term_(1) {
+  }
+
+ protected:
+  enum State {
+    kRunning,       // The tablet is running normally.
+    kTombstoning,   // We are tombstoning the tablet.
+    kTombstoned,    // The tombstoning is complete.
+    kCopying,       // We are copying the tablet.
+    kTestComplete,  // The test is complete and about to exit.
+  };
+
+  string State_Name(State state);
+
+  // 1. Check if workers should block, block if required.
+  // 2. Return current state.
+  State GetState();
+
+  // 1. Block worker threads.
+  // 2. Wait for all workers to be blocked.
+  // 3. Change state.
+  // 4. Unblock workers.
+  void SetState(State state);
+
+  // Thread that loops and requests votes from TS1.
+  void RunVoteRequestLoop();
+
+  // Set-once shared state.
+  string tablet_id_;
+  OpId last_logged_opid_;
+
+  Mutex lock_;
+  const int num_workers_;
+  int num_workers_blocked_ = 0;
+  bool block_workers_ = false;
+  ConditionVariable cond_all_workers_blocked_;  // Triggers once all worker threads are blocked.
+  ConditionVariable cond_workers_unblocked_;    // Triggers when the workers become unblocked.
+
+  // Protected by lock_.
+  State state_ = kRunning;
+
+  // State for the voter thread.
+  atomic<int64_t> current_term_;
+};
+
+string TombstonedVotingStressTest::State_Name(State state) {
+  switch (state) {
+    case kRunning:
+      return "kRunning";
+    case kTombstoning:
+      return "kTombstoning";
+    case kTombstoned:
+      return "kTombstoned";
+    case kCopying:
+      return "kCopying";
+    case kTestComplete:
+      return "kTestComplete";
+    default:
+      LOG(FATAL) << "Unknown state: " << state;
+      __builtin_unreachable();
+  }
+}
+
+TombstonedVotingStressTest::State TombstonedVotingStressTest::GetState() {
+  unique_lock<Mutex> l(lock_);
+  bool blocked = false;
+  if (block_workers_) {
+    num_workers_blocked_++;
+    blocked = true;
+    if (num_workers_blocked_ == num_workers_) {
+      cond_all_workers_blocked_.Signal();
+    }
+  }
+  while (block_workers_) {
+    cond_workers_unblocked_.Wait();
+  }
+  if (blocked) num_workers_blocked_--;
+  return state_;
+}
+
+void TombstonedVotingStressTest::SetState(State state) {
+  // 1. Block worker threads.
+  // 2. Wait for all workers to be blocked.
+  // 3. Change state.
+  // 4. Unblock workers.
+  LOG(INFO) << "setting state to " << State_Name(state);
+  unique_lock<Mutex> l(lock_);
+  block_workers_ = true;
+  while (num_workers_blocked_ != num_workers_) {
+    cond_all_workers_blocked_.Wait();
+  }
+  state_ = state;
+  block_workers_ = false;
+  cond_workers_unblocked_.Broadcast();
+}
+
+void TombstonedVotingStressTest::RunVoteRequestLoop() {
+  TServerDetails* ts1_ets = ts_map_[cluster_->tablet_server(1)->uuid()];
+  while (true) {
+    State state = GetState();
+    if (state == kTestComplete) break;
+    ++current_term_;
+    Status s = itest::RequestVote(ts1_ets, tablet_id_, "A", current_term_, last_logged_opid_,
+                                  /*ignore_live_leader=*/ true, /*is_pre_election=*/ false,
+                                  kTimeout);
+    switch (state) {
+      case kRunning: FALLTHROUGH_INTENDED;
+      case kTombstoned:
+        // We should always be able to vote in this case.
+        if (s.ok()) {
+          LOG(INFO) << "Vote OK: state = " << state;
+        } else {
+          LOG(FATAL) << s.ToString() << ": tablet = " << tablet_id_ << ": state = " << state;
+        }
+        break;
+
+      // The vote can fail while in the process of tombstoning a replica
+      // because there is a small window of time where we have stopped
+      // RaftConsensus but we haven't yet recorded the last-logged opid in the
+      // tablet metadata.
+      case kTombstoning: FALLTHROUGH_INTENDED;
+      case kCopying:
+        if (s.ok()) {
+          LOG(INFO) << "Vote OK: state = " << state;
+        } else {
+          LOG(WARNING) << "Got bad vote while copying or tombstoning: " << s.ToString()
+                       << ": state = " << state;
+        }
+        break;
+
+      default:
+        // We're shutting down.
+        continue;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(1)); // Don't run too hot.
+  }
+}
+
+// Stress test for tombstoned voting, including tombstoning, deleting, and
+// copying replicas.
+TEST_F(TombstonedVotingStressTest, TestTombstonedVotingUnderStress) {
+  // This test waits for several seconds, so only run it in slow mode.
+  if (!AllowSlowTests()) return;
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  // We want to control leader election manually and we only want 2 replicas.
+  NO_FATALS(StartCluster({ "--enable_leader_failure_detection=false" },
+                         { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
+                           "--allow_unsafe_replication_factor=true" },
+                         /*num_tablet_servers=*/ 2));
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+  workload.Setup();
+  ASSERT_OK(inspect_->WaitForReplicaCount(2));
+
+  // Figure out the tablet id.
+  vector<string> tablets = inspect_->ListTabletsOnTS(1);
+  ASSERT_EQ(1, tablets.size());
+  tablet_id_ = tablets[0];
+
+  for (int i = 1; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+                                            tablet_id_, kTimeout));
+    LOG(INFO) << "TabletReplica is RUNNING: T " << tablet_id_
+              << " P " << cluster_->tablet_server(i)->uuid();
+  }
+
+  // Elect a leader and run some data through the cluster.
+  LOG(INFO) << "electing a leader...";
+  TServerDetails* ts0_ets = ts_map_[cluster_->tablet_server(0)->uuid()];
+  TServerDetails* ts1_ets = ts_map_[cluster_->tablet_server(1)->uuid()];
+  ASSERT_EVENTUALLY([&] {
+    // The tablet can report that it's running but still be bootstrapping, so
+    // retry until the election starts.
+    ASSERT_OK(itest::StartElection(ts0_ets, tablet_id_, kTimeout));
+  });
+
+  LOG(INFO) << "loading data...";
+  workload.Start();
+  while (workload.rows_inserted() < 100) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id_, workload.batches_completed()));
+  ASSERT_OK(itest::GetLastOpIdForReplica(tablet_id_, ts0_ets, COMMITTED_OPID, kTimeout,
+                                         &last_logged_opid_));
+
+  // Have the leader step down so we can test voting on the other replica.
+  // We don't shut this node down because it will serve as the tablet copy
+  // "source" during the test.
+  LOG(INFO) << "forcing leader to step down...";
+  ASSERT_OK(itest::LeaderStepDown(ts0_ets, tablet_id_, kTimeout));
+
+  // Now we are done with setup. Start the "stress" part of the test.
+  // Startup the voting thread.
+  LOG(INFO) << "starting stress thread...";
+  thread voter_thread([this] { RunVoteRequestLoop(); });
+  auto cleanup = MakeScopedCleanup([&] {
+    SetState(kTestComplete);
+    voter_thread.join();
+  });
+
+  int iter = 0;
+  while (iter++ < FLAGS_test_num_iterations) {
+    LOG(INFO) << "iteration " << (iter + 1) << " of " << FLAGS_test_num_iterations;
+    // Loop on voting for a while in running state. We want to give an
+    // opportunity for many votes during this time, and since voting involves
+    // fsyncing to disk, we wait for plenty of time here (and below).
+    SleepFor(MonoDelta::FromMilliseconds(500));
+
+    // 1. Tombstone tablet.
+    LOG(INFO) << "tombstoning tablet...";
+    SetState(kTombstoning);
+    ASSERT_OK(itest::DeleteTablet(ts1_ets, tablet_id_, TABLET_DATA_TOMBSTONED, boost::none,
+                                  kTimeout));
+    SetState(kTombstoned);
+
+    // Loop on voting for a while in tombstoned state.
+    SleepFor(MonoDelta::FromMilliseconds(500));
+
+    // 2. Copy tablet.
+    LOG(INFO) << "copying tablet...";
+    HostPort source_hp;
+    ASSERT_OK(HostPortFromPB(ts0_ets->registration.rpc_addresses(0), &source_hp));
+    SetState(kCopying);
+    ASSERT_OK(itest::StartTabletCopy(ts1_ets, tablet_id_, ts0_ets->uuid(), source_hp, current_term_,
+                                     kTimeout));
+    LOG(INFO) << "waiting for servers to agree...";
+    ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id_, workload.batches_completed()));
+
+    SetState(kRunning);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index f6d7a19..0e3a9f1 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -344,6 +344,12 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
       local_peer_pb_,
       master_->tablet_apply_pool(),
       Bind(&SysCatalogTable::SysCatalogStateChanged, Unretained(this), metadata->tablet_id())));
+  Status s = tablet_replica_->Init(master_->raft_pool());
+  if (!s.ok()) {
+    tablet_replica_->SetError(s);
+    tablet_replica_->Shutdown();
+    return s;
+  }
 
   scoped_refptr<ConsensusMetadata> cmeta;
   RETURN_NOT_OK(cmeta_manager_->Load(metadata->tablet_id(), &cmeta));
@@ -371,7 +377,6 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
                                                master_->messenger(),
                                                scoped_refptr<rpc::ResultTracker>(),
                                                log,
-                                               master_->raft_pool(),
                                                master_->tablet_prepare_pool()),
                         "Failed to Start() TabletReplica");
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tablet/metadata.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/metadata.proto b/src/kudu/tablet/metadata.proto
index 83bffbc..70d8c1f 100644
--- a/src/kudu/tablet/metadata.proto
+++ b/src/kudu/tablet/metadata.proto
@@ -131,13 +131,29 @@ message TabletSuperBlockPB {
   optional DataDirGroupPB data_dir_group = 15;
 }
 
-// The enum of tablet states.
-// Tablet states are sent in TabletReports and kept in TabletReplica.
+// Tablet states represent stages of a TabletReplica's object lifecycle and are
+// reported to the master via tablet reports.
+//
+// Legal state transitions for a single TabletReplica object:
+//
+// NOT_INITIALIZED -> INITIALIZED -> BOOTSTRAPPING -> RUNNING -> STOPPING -> STOPPED -> SHUTDOWN
+//             |              |                |                  ^ ^ ^
+//             |              |                |                  | | |
+//             |              |                +------------------+ | |
+//             |              +-------------------------------------+ |
+//             +------------------------------------------------------+
+//
+// Since a TabletReplica instance is replaced when a Tablet Copy operation
+// occurs, from a remote perspective it is possible for a tablet replica to
+// appear to transition from SHUTDOWN back to NOT_INITIALIZED.
 enum TabletStatePB {
   UNKNOWN = 999;
 
-  // Tablet has not yet started.
-  NOT_STARTED = 5;
+  // Tablet has not yet been initialized.
+  NOT_INITIALIZED = 6;
+
+  // Tablet has been initialized but not yet started.
+  INITIALIZED = 5;
 
   // Indicates the Tablet is bootstrapping, i.e. that the Tablet is not
   // available for RPC.
@@ -152,8 +168,11 @@ enum TabletStatePB {
   FAILED = 2;
 
   // The Tablet is shutting down, and will not accept further requests.
-  QUIESCING = 3;
+  STOPPING = 3;
+
+  // The tablet has been stopped, possibly because it has been tombstoned.
+  STOPPED = 7;
 
-  // The Tablet has been stopped.
+  // The Tablet has been completely shut down.
   SHUTDOWN = 4;
 }