You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/08/24 04:43:57 UTC

[1/2] kudu git commit: KUDU-871. Support tombstoned voting

Repository: kudu
Updated Branches:
  refs/heads/master b37bde72c -> 5bca7d8ba


http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tablet/tablet_metadata.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metadata.h b/src/kudu/tablet/tablet_metadata.h
index 650ac15..9c34638 100644
--- a/src/kudu/tablet/tablet_metadata.h
+++ b/src/kudu/tablet/tablet_metadata.h
@@ -250,6 +250,9 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
 
   RowSetMetadata *GetRowSetForTests(int64_t id);
 
+  // Return standard "T xxx P yyy" log prefix.
+  std::string LogPrefix() const;
+
  private:
   friend class RefCountedThreadSafe<TabletMetadata>;
   friend class MetadataTest;
@@ -303,9 +306,6 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
   // Failures are logged, but are not fatal.
   void DeleteOrphanedBlocks(const std::vector<BlockId>& blocks);
 
-  // Return standard "T xxx P yyy" log prefix.
-  std::string LogPrefix() const;
-
   enum State {
     kNotLoadedYet,
     kNotWrittenYet,
@@ -355,6 +355,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
 
   // Record of the last opid logged by the tablet before it was last
   // tombstoned. Has no meaning for non-tombstoned tablets.
+  // Protected by 'data_lock_'.
   boost::optional<consensus::OpId> tombstone_last_logged_opid_;
 
   // If this counter is > 0 then Flush() will not write any data to

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tablet/tablet_replica-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index b358b59..df07384 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -125,38 +125,37 @@ class TabletReplicaTest : public KuduTabletTest {
 
     metric_entity_ = METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "test-tablet");
 
-    RaftPeerPB config_peer;
-    config_peer.set_permanent_uuid(tablet()->metadata()->fs_manager()->uuid());
-    config_peer.mutable_last_known_addr()->set_host("0.0.0.0");
-    config_peer.mutable_last_known_addr()->set_port(0);
-    config_peer.set_member_type(RaftPeerPB::VOTER);
+    RaftConfigPB config;
+    config.set_opid_index(consensus::kInvalidOpIdIndex);
+
+    RaftPeerPB* config_peer = config.add_peers();
+    config_peer->set_permanent_uuid(tablet()->metadata()->fs_manager()->uuid());
+    config_peer->mutable_last_known_addr()->set_host("0.0.0.0");
+    config_peer->mutable_last_known_addr()->set_port(0);
+    config_peer->set_member_type(RaftPeerPB::VOTER);
 
     scoped_refptr<ConsensusMetadataManager> cmeta_manager(
         new ConsensusMetadataManager(tablet()->metadata()->fs_manager()));
 
+    scoped_refptr<ConsensusMetadata> cmeta;
+    ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(), config, consensus::kMinimumTerm,
+                                    &cmeta));
+
     // "Bootstrap" and start the TabletReplica.
     tablet_replica_.reset(
       new TabletReplica(tablet()->shared_metadata(),
                         cmeta_manager,
-                        config_peer,
+                        *config_peer,
                         apply_pool_.get(),
                         Bind(&TabletReplicaTest::TabletReplicaStateChangedCallback,
                              Unretained(this),
                              tablet()->tablet_id())));
-
+    ASSERT_OK(tablet_replica_->Init(raft_pool_.get()));
     // Make TabletReplica use the same LogAnchorRegistry as the Tablet created by the harness.
     // TODO(mpercy): Refactor TabletHarness to allow taking a
     // LogAnchorRegistry, while also providing TabletMetadata for consumption
     // by TabletReplica before Tablet is instantiated.
     tablet_replica_->log_anchor_registry_ = tablet()->log_anchor_registry_;
-
-    RaftConfigPB config;
-    config.add_peers()->CopyFrom(config_peer);
-    config.set_opid_index(consensus::kInvalidOpIdIndex);
-
-    scoped_refptr<ConsensusMetadata> cmeta;
-    ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(), config, consensus::kMinimumTerm,
-                                    &cmeta));
   }
 
   Status StartReplica(const ConsensusBootstrapInfo& info) {
@@ -171,7 +170,6 @@ class TabletReplicaTest : public KuduTabletTest {
                                   messenger_,
                                   scoped_refptr<rpc::ResultTracker>(),
                                   log,
-                                  raft_pool_.get(),
                                   prepare_pool_.get());
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tablet/tablet_replica.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index c1a0477..7f5a72e 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -115,7 +115,7 @@ TabletReplica::TabletReplica(
       log_anchor_registry_(new LogAnchorRegistry()),
       apply_pool_(apply_pool),
       mark_dirty_clbk_(std::move(mark_dirty_clbk)),
-      state_(NOT_STARTED),
+      state_(NOT_INITIALIZED),
       last_status_("Tablet initializing...") {
 }
 
@@ -127,13 +127,28 @@ TabletReplica::~TabletReplica() {
       << TabletStatePB_Name(state_);
 }
 
+Status TabletReplica::Init(ThreadPool* raft_pool) {
+  CHECK_EQ(NOT_INITIALIZED, state_);
+  TRACE("Creating consensus instance");
+  ConsensusOptions options;
+  options.tablet_id = meta_->tablet_id();
+  shared_ptr<RaftConsensus> consensus;
+  RETURN_NOT_OK(RaftConsensus::Create(std::move(options),
+                                      local_peer_pb_,
+                                      cmeta_manager_,
+                                      raft_pool,
+                                      &consensus));
+  consensus_ = std::move(consensus);
+  set_state(INITIALIZED);
+  return Status::OK();
+}
+
 Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
                             shared_ptr<Tablet> tablet,
                             scoped_refptr<clock::Clock> clock,
                             shared_ptr<Messenger> messenger,
                             scoped_refptr<ResultTracker> result_tracker,
                             scoped_refptr<Log> log,
-                            ThreadPool* raft_pool,
                             ThreadPool* prepare_pool) {
   DCHECK(tablet) << "A TabletReplica must be provided with a Tablet";
   DCHECK(log) << "A TabletReplica must be provided with a Log";
@@ -141,9 +156,11 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
   {
     std::lock_guard<simple_spinlock> state_change_guard(state_change_lock_);
 
+    scoped_refptr<MetricEntity> metric_entity;
+    gscoped_ptr<PeerProxyFactory> peer_proxy_factory;
+    scoped_refptr<TimeManager> time_manager;
     {
       std::lock_guard<simple_spinlock> l(lock_);
-
       CHECK_EQ(BOOTSTRAPPING, state_);
 
       tablet_ = DCHECK_NOTNULL(std::move(tablet));
@@ -151,24 +168,7 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
       messenger_ = DCHECK_NOTNULL(std::move(messenger));
       result_tracker_ = std::move(result_tracker); // Passed null in tablet_replica-test
       log_ = DCHECK_NOTNULL(log); // Not moved because it's passed to RaftConsensus::Start() below.
-    }
-
-    // Unlock while we initialize RaftConsensus, which involves I/O.
-    TRACE("Creating consensus instance");
-    ConsensusOptions options;
-    options.tablet_id = meta_->tablet_id();
-    shared_ptr<RaftConsensus> consensus(std::make_shared<RaftConsensus>(std::move(options),
-                                                                        local_peer_pb_,
-                                                                        cmeta_manager_,
-                                                                        raft_pool));
-    RETURN_NOT_OK(consensus->Init());
 
-    scoped_refptr<MetricEntity> metric_entity;
-    gscoped_ptr<PeerProxyFactory> peer_proxy_factory;
-    scoped_refptr<TimeManager> time_manager;
-    {
-      std::lock_guard<simple_spinlock> l(lock_);
-      consensus_ = consensus;
       metric_entity = tablet_->GetMetricEntity();
       prepare_pool_token_ = prepare_pool->NewTokenWithMetrics(
           ThreadPool::ExecutionMode::SERIAL,
@@ -196,7 +196,7 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
     // may invoke TabletReplica::StartReplicaTransaction() during startup, causing
     // a self-deadlock. We take a ref to members protected by 'lock_' before
     // unlocking.
-    RETURN_NOT_OK(consensus->Start(
+    RETURN_NOT_OK(consensus_->Start(
         bootstrap_info,
         std::move(peer_proxy_factory),
         log,
@@ -208,7 +208,7 @@ Status TabletReplica::Start(const ConsensusBootstrapInfo& bootstrap_info,
     // Re-acquire 'lock_' to update our state variable.
     std::lock_guard<simple_spinlock> l(lock_);
     CHECK_EQ(BOOTSTRAPPING, state_); // We are still protected by 'state_change_lock_'.
-    state_ = RUNNING;
+    set_state(RUNNING);
   }
 
   // Because we changed the tablet state, we need to re-report the tablet to the master.
@@ -222,22 +222,17 @@ const consensus::RaftConfigPB TabletReplica::RaftConfig() const {
   return consensus_->CommittedConfig();
 }
 
-void TabletReplica::Shutdown() {
-
-  LOG(INFO) << "Initiating TabletReplica shutdown for tablet: " << tablet_id_;
-
-  TabletStatePB shutdown_type = SHUTDOWN;
+void TabletReplica::Stop() {
   {
     std::unique_lock<simple_spinlock> lock(lock_);
-    if (state_ == QUIESCING || state_ == SHUTDOWN || state_ == FAILED) {
+    if (state_ == STOPPING || state_ == STOPPED ||
+        state_ == SHUTDOWN || state_ == FAILED) {
       lock.unlock();
-      WaitUntilShutdown();
+      WaitUntilStopped();
       return;
     }
-    if (!error_.ok()) {
-      shutdown_type = FAILED;
-    }
-    state_ = QUIESCING;
+    LOG_WITH_PREFIX(INFO) << "stopping tablet replica";
+    set_state(STOPPING);
   }
 
   std::lock_guard<simple_spinlock> l(state_change_lock_);
@@ -248,7 +243,7 @@ void TabletReplica::Shutdown() {
   if (tablet_) tablet_->UnregisterMaintenanceOps();
   UnregisterMaintenanceOps();
 
-  if (consensus_) consensus_->Shutdown();
+  if (consensus_) consensus_->Stop();
 
   // TODO(KUDU-183): Keep track of the pending tasks and send an "abort" message.
   LOG_SLOW_EXECUTION(WARNING, 1000,
@@ -272,21 +267,31 @@ void TabletReplica::Shutdown() {
     tablet_->Shutdown();
   }
 
-  // Only update the replica state when all other components have shut down.
+  // Only mark the peer as STOPPED when all other components have shut down.
   {
     std::lock_guard<simple_spinlock> lock(lock_);
-    // Release mem tracker resources.
-    consensus_.reset();
     tablet_.reset();
-    state_ = shutdown_type;
+    set_state(STOPPED);
+  }
+}
+
+void TabletReplica::Shutdown() {
+  Stop();
+  if (consensus_) consensus_->Shutdown();
+  std::lock_guard<simple_spinlock> lock(lock_);
+  if (state_ == SHUTDOWN || state_ == FAILED) return;
+  if (!error_.ok()) {
+    set_state(FAILED);
+    return;
   }
+  set_state(SHUTDOWN);
 }
 
-void TabletReplica::WaitUntilShutdown() {
+void TabletReplica::WaitUntilStopped() {
   while (true) {
     {
       std::lock_guard<simple_spinlock> lock(lock_);
-      if (state_ == SHUTDOWN || state_ == FAILED) {
+      if (state_ == STOPPED || state_ == SHUTDOWN || state_ == FAILED) {
         return;
       }
     }
@@ -294,6 +299,41 @@ void TabletReplica::WaitUntilShutdown() {
   }
 }
 
+string TabletReplica::LogPrefix() const {
+  return meta_->LogPrefix();
+}
+
+void TabletReplica::set_state(TabletStatePB new_state) {
+  switch (new_state) {
+    case NOT_INITIALIZED:
+      LOG(FATAL) << "Cannot transition to NOT_INITIALIZED state";
+      return;
+    case INITIALIZED:
+      CHECK_EQ(NOT_INITIALIZED, state_);
+      break;
+    case BOOTSTRAPPING:
+      CHECK_EQ(INITIALIZED, state_);
+      break;
+    case RUNNING:
+      CHECK_EQ(BOOTSTRAPPING, state_);
+      break;
+    case STOPPING:
+      CHECK_NE(STOPPED, state_);
+      CHECK_NE(SHUTDOWN, state_);
+      break;
+    case STOPPED:
+      CHECK_EQ(STOPPING, state_);
+      break;
+    case SHUTDOWN: FALLTHROUGH_INTENDED;
+    case FAILED:
+      CHECK_EQ(STOPPED, state_) << TabletStatePB_Name(state_);
+      break;
+    default:
+      break;
+  }
+  state_ = new_state;
+}
+
 Status TabletReplica::CheckRunning() const {
   {
     std::lock_guard<simple_spinlock> lock(lock_);
@@ -320,7 +360,7 @@ Status TabletReplica::WaitUntilConsensusRunning(const MonoDelta& timeout) {
         has_consensus = true; // consensus_ is a set-once object.
       }
     }
-    if (cached_state == QUIESCING || cached_state == SHUTDOWN) {
+    if (cached_state == STOPPING || cached_state == STOPPED) {
       return Status::IllegalState(
           Substitute("The tablet is already shutting down or shutdown. State: $0",
                      TabletStatePB_Name(cached_state)));
@@ -390,6 +430,11 @@ Status TabletReplica::RunLogGC() {
   return Status::OK();
 }
 
+void TabletReplica::SetBootstrapping() {
+  std::lock_guard<simple_spinlock> lock(lock_);
+  set_state(BOOTSTRAPPING);
+}
+
 void TabletReplica::SetStatusMessage(const std::string& status) {
   std::lock_guard<simple_spinlock> lock(lock_);
   last_status_ = status;

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tablet/tablet_replica.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_replica.h b/src/kudu/tablet/tablet_replica.h
index fd272ad..8b85889 100644
--- a/src/kudu/tablet/tablet_replica.h
+++ b/src/kudu/tablet/tablet_replica.h
@@ -93,6 +93,12 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
                 ThreadPool* apply_pool,
                 Callback<void(const std::string& reason)> mark_dirty_clbk);
 
+  // Initializes RaftConsensus.
+  // This must be called before publishing the instance to other threads.
+  // If this fails, the TabletReplica instance remains in a NOT_INITIALIZED
+  // state.
+  Status Init(ThreadPool* raft_pool);
+
   // Starts the TabletReplica, making it available for Write()s. If this
   // TabletReplica is part of a consensus configuration this will connect it to other replicas
   // in the consensus configuration.
@@ -102,14 +108,19 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
                std::shared_ptr<rpc::Messenger> messenger,
                scoped_refptr<rpc::ResultTracker> result_tracker,
                scoped_refptr<log::Log> log,
-               ThreadPool* raft_pool,
                ThreadPool* prepare_pool);
 
-  // Shutdown this tablet replica. If a shutdown is already in progress,
-  // blocks until that shutdown is complete.
+  // Synchronously transition this replica to STOPPED state from any other
+  // state. This also stops RaftConsensus. If a Stop() operation is already in
+  // progress, blocks until that operation is complete.
+  // See tablet/metadata.proto for a description of legal state transitions.
+  void Stop();
+
+  // Synchronously transition this replica to SHUTDOWN state from any other state.
+  // See tablet/metadata.proto for a description of legal state transitions.
   //
   // If 'error_' has been set to a non-OK status, the final state will be
-  // FAILED, and SHUTDOWN otherwise.
+  // FAILED instead of SHUTDOWN.
   void Shutdown();
 
   // Check that the tablet is in a RUNNING state.
@@ -176,17 +187,8 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
   // Returns the current Raft configuration.
   const consensus::RaftConfigPB RaftConfig() const;
 
-  // If any peers in the consensus configuration lack permanent uuids, get them via an
-  // RPC call and update.
-  // TODO: move this to raft_consensus.h.
-  Status UpdatePermanentUuids();
-
   // Sets the tablet to a BOOTSTRAPPING state, indicating it is starting up.
-  void SetBootstrapping() {
-    std::lock_guard<simple_spinlock> lock(lock_);
-    CHECK_EQ(NOT_STARTED, state_);
-    state_ = BOOTSTRAPPING;
-  }
+  void SetBootstrapping();
 
   // Set a user-readable status message about the tablet. This may appear on
   // the Web UI, for example.
@@ -287,14 +289,15 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
 
   ~TabletReplica();
 
-  // Wait until the TabletReplica is fully in a FAILED or SHUTDOWN state.
-  void WaitUntilShutdown();
+  // Wait until the TabletReplica is fully in STOPPED, SHUTDOWN, or FAILED
+  // state.
+  void WaitUntilStopped();
 
-  // After bootstrap is complete and consensus is setup this initiates the transactions
-  // that were not complete on bootstrap.
-  // Not implemented yet. See .cc file.
-  Status StartPendingTransactions(consensus::RaftPeerPB::Role my_role,
-                                  const consensus::ConsensusBootstrapInfo& bootstrap_info);
+  std::string LogPrefix() const;
+  // Transition to another state. Requires that the caller hold 'lock_' if the
+  // object has already published to other threads. See tablet/metadata.proto
+  // for state descriptions and legal state transitions.
+  void set_state(TabletStatePB new_state);
 
   const scoped_refptr<TabletMetadata> meta_;
   const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index f2b6e5b..fbb1be4 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -377,8 +377,8 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
   // we should see the tablet in TOMBSTONED state on these servers.
   ASSERT_OK(cluster_->tablet_server_by_uuid(leader_ts->uuid())->Restart());
   ASSERT_OK(cluster_->tablet_server_by_uuid(followers[1]->uuid())->Restart());
-  ASSERT_OK(WaitUntilTabletInState(leader_ts, tablet_id, tablet::SHUTDOWN, kTimeout));
-  ASSERT_OK(WaitUntilTabletInState(followers[1], tablet_id, tablet::SHUTDOWN, kTimeout));
+  ASSERT_OK(WaitUntilTabletInState(leader_ts, tablet_id, tablet::STOPPED, kTimeout));
+  ASSERT_OK(WaitUntilTabletInState(followers[1], tablet_id, tablet::STOPPED, kTimeout));
 }
 
 // Test unsafe config change when there is one leader survivor in the cluster.

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tools/kudu-ts-cli-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-ts-cli-test.cc b/src/kudu/tools/kudu-ts-cli-test.cc
index 7ae58ee..91163aa 100644
--- a/src/kudu/tools/kudu-ts-cli-test.cc
+++ b/src/kudu/tools/kudu-ts-cli-test.cc
@@ -86,7 +86,7 @@ TEST_F(KuduTsCliTest, TestDeleteTablet) {
 
   ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(0, tablet_id, { tablet::TABLET_DATA_TOMBSTONED }));
   TServerDetails* ts = ts_map_[cluster_->tablet_server(0)->uuid()];
-  ASSERT_OK(itest::WaitUntilTabletInState(ts, tablet_id, tablet::SHUTDOWN, timeout));
+  ASSERT_OK(itest::WaitUntilTabletInState(ts, tablet_id, tablet::STOPPED, timeout));
 }
 
 // Test dumping a tablet using kudu-ts-cli tool.

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/tablet_copy_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.cc b/src/kudu/tserver/tablet_copy_client.cc
index a75750c..07b20ff 100644
--- a/src/kudu/tserver/tablet_copy_client.cc
+++ b/src/kudu/tserver/tablet_copy_client.cc
@@ -295,6 +295,10 @@ Status TabletCopyClient::Start(const HostPort& copy_source_addr,
   CHECK(fs_manager_->dd_manager()->GetDataDirGroupPB(tablet_id_,
                                                      superblock_->mutable_data_dir_group()));
 
+  // Create the ConsensusMetadata before returning from Start() so that it's
+  // possible to vote while we are copying the replica for the first time.
+  RETURN_NOT_OK(WriteConsensusMetadata());
+
   state_ = kStarted;
   if (meta) {
     *meta = meta_;
@@ -319,8 +323,6 @@ Status TabletCopyClient::Finish() {
   CHECK_EQ(kStarted, state_);
   state_ = kFinished;
 
-  RETURN_NOT_OK(WriteConsensusMetadata());
-
   // Replace tablet metadata superblock. This will set the tablet metadata state
   // to TABLET_DATA_READY, since we checked above that the response
   // superblock is in a valid state to bootstrap from.

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/tablet_copy_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_client.h b/src/kudu/tserver/tablet_copy_client.h
index 7262ae5..429c05b 100644
--- a/src/kudu/tserver/tablet_copy_client.h
+++ b/src/kudu/tserver/tablet_copy_client.h
@@ -205,7 +205,8 @@ class TabletCopyClient {
   scoped_refptr<tablet::TabletMetadata> meta_;
 
   // Local Consensus metadata file. This may initially be NULL if this is
-  // bootstrapping a new replica (rather than replacing an old one).
+  // bootstrapping a new replica (rather than replacing an old one) but it is
+  // guaranteed to be set after a successful call to Start().
   scoped_refptr<consensus::ConsensusMetadata> cmeta_;
 
   scoped_refptr<tablet::TabletReplica> tablet_replica_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/tablet_copy_source_session-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 05cb6e4..85d9340 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -119,14 +119,14 @@ class TabletCopyTest : public KuduTabletTest {
     CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
   }
 
-  virtual void SetUp() OVERRIDE {
+  void SetUp() override {
     NO_FATALS(KuduTabletTest::SetUp());
     NO_FATALS(SetUpTabletReplica());
     NO_FATALS(PopulateTablet());
     NO_FATALS(InitSession());
   }
 
-  virtual void TearDown() OVERRIDE {
+  void TearDown() override {
     session_.reset();
     tablet_replica_->Shutdown();
     KuduTabletTest::TearDown();
@@ -137,37 +137,37 @@ class TabletCopyTest : public KuduTabletTest {
     scoped_refptr<Log> log;
     ASSERT_OK(Log::Open(LogOptions(), fs_manager(), tablet()->tablet_id(),
                         *tablet()->schema(),
-                        0, // schema_version
-                        NULL, &log));
+                        /*schema_version=*/ 0,
+                        /*metric_entity=*/ nullptr,
+                        &log));
 
     scoped_refptr<MetricEntity> metric_entity =
       METRIC_ENTITY_tablet.Instantiate(&metric_registry_, CURRENT_TEST_NAME());
 
-    RaftPeerPB config_peer;
-    config_peer.set_permanent_uuid(fs_manager()->uuid());
-    config_peer.mutable_last_known_addr()->set_host("0.0.0.0");
-    config_peer.mutable_last_known_addr()->set_port(0);
-    config_peer.set_member_type(RaftPeerPB::VOTER);
+    // TODO(mpercy): Similar to code in tablet_replica-test, consider refactor.
+    RaftConfigPB config;
+    config.set_opid_index(consensus::kInvalidOpIdIndex);
+    RaftPeerPB* config_peer = config.add_peers();
+    config_peer->set_permanent_uuid(fs_manager()->uuid());
+    config_peer->mutable_last_known_addr()->set_host("0.0.0.0");
+    config_peer->mutable_last_known_addr()->set_port(0);
+    config_peer->set_member_type(RaftPeerPB::VOTER);
 
     scoped_refptr<ConsensusMetadataManager> cmeta_manager(
         new ConsensusMetadataManager(fs_manager()));
+    scoped_refptr<ConsensusMetadata> cmeta;
+    ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(),
+                                    config, consensus::kMinimumTerm, &cmeta));
 
     tablet_replica_.reset(
         new TabletReplica(tablet()->metadata(),
                           cmeta_manager,
-                          config_peer,
+                          *config_peer,
                           apply_pool_.get(),
                           Bind(&TabletCopyTest::TabletReplicaStateChangedCallback,
                                Unretained(this),
                                tablet()->tablet_id())));
-    // TODO(dralves) similar to code in tablet_replica-test, consider refactor.
-    RaftConfigPB config;
-    config.add_peers()->CopyFrom(config_peer);
-    config.set_opid_index(consensus::kInvalidOpIdIndex);
-
-    scoped_refptr<ConsensusMetadata> cmeta;
-    ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(),
-                                    config, consensus::kMinimumTerm, &cmeta));
+    ASSERT_OK(tablet_replica_->Init(raft_pool_.get()));
 
     shared_ptr<Messenger> messenger;
     MessengerBuilder mbuilder(CURRENT_TEST_NAME());
@@ -182,7 +182,6 @@ class TabletCopyTest : public KuduTabletTest {
                                      messenger,
                                      scoped_refptr<rpc::ResultTracker>(),
                                      log,
-                                     raft_pool_.get(),
                                      prepare_pool_.get()));
     ASSERT_OK(tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
     ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeaderForTests(MonoDelta::FromSeconds(10)));

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index aad6045..d0d8442 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -162,6 +162,9 @@ using kudu::rpc::RpcContext;
 using kudu::rpc::RpcSidecar;
 using kudu::server::ServerBase;
 using kudu::tablet::AlterSchemaTransactionState;
+using kudu::tablet::TABLET_DATA_COPYING;
+using kudu::tablet::TABLET_DATA_DELETED;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
 using kudu::tablet::Tablet;
 using kudu::tablet::TabletReplica;
 using kudu::tablet::TransactionCompletionCallback;
@@ -184,8 +187,8 @@ namespace tserver {
 
 namespace {
 
-// Lookup the given tablet, ensuring that it both exists and is RUNNING.
-// If it is not, responds to the RPC associated with 'context' after setting
+// Lookup the given tablet, only ensuring that it exists.
+// If it does not, responds to the RPC associated with 'context' after setting
 // resp->mutable_error() to indicate the failure reason.
 //
 // Returns true if successful.
@@ -195,24 +198,65 @@ bool LookupTabletReplicaOrRespond(TabletReplicaLookupIf* tablet_manager,
                                   RespClass* resp,
                                   rpc::RpcContext* context,
                                   scoped_refptr<TabletReplica>* replica) {
-  if (PREDICT_FALSE(!tablet_manager->GetTabletReplica(tablet_id, replica).ok())) {
-    SetupErrorAndRespond(resp->mutable_error(),
-                         Status::NotFound("Tablet not found"),
+  Status s = tablet_manager->GetTabletReplica(tablet_id, replica);
+  if (PREDICT_FALSE(!s.ok())) {
+    SetupErrorAndRespond(resp->mutable_error(), s,
                          TabletServerErrorPB::TABLET_NOT_FOUND, context);
     return false;
   }
+  return true;
+}
+
+template<class RespClass>
+void RespondTabletNotRunning(const scoped_refptr<TabletReplica>& replica,
+                             tablet::TabletStatePB tablet_state,
+                             RespClass* resp,
+                             rpc::RpcContext* context) {
+  Status s = Status::IllegalState("Tablet not RUNNING",
+                                  tablet::TabletStatePB_Name(tablet_state));
+  auto error_code = TabletServerErrorPB::TABLET_NOT_RUNNING;
+  if (replica->tablet_metadata()->tablet_data_state() == TABLET_DATA_TOMBSTONED ||
+      replica->tablet_metadata()->tablet_data_state() == TABLET_DATA_DELETED) {
+    // Treat tombstoned tablets as if they don't exist for most purposes.
+    // This takes precedence over failed, since we don't reset the failed
+    // status of a TabletReplica when deleting it. Only tablet copy does that.
+    error_code = TabletServerErrorPB::TABLET_NOT_FOUND;
+  } else if (tablet_state == tablet::FAILED) {
+    s = s.CloneAndAppend(replica->error().ToString());
+    error_code = TabletServerErrorPB::TABLET_FAILED;
+  }
+  SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
+}
 
+// Check if the replica is running.
+template<class RespClass>
+bool CheckTabletReplicaRunningOrRespond(const scoped_refptr<TabletReplica>& replica,
+                                        RespClass* resp,
+                                        rpc::RpcContext* context) {
   // Check RUNNING state.
-  tablet::TabletStatePB state = (*replica)->state();
+  tablet::TabletStatePB state = replica->state();
   if (PREDICT_FALSE(state != tablet::RUNNING)) {
-    Status s = Status::IllegalState("Tablet not RUNNING",
-                                    tablet::TabletStatePB_Name(state));
-    TabletServerErrorPB::Code code = TabletServerErrorPB::TABLET_NOT_RUNNING;
-    if (state == tablet::FAILED) {
-      s = s.CloneAndAppend((*replica)->error().ToString());
-      code = TabletServerErrorPB::TABLET_FAILED;
-    }
-    SetupErrorAndRespond(resp->mutable_error(), s, code, context);
+    RespondTabletNotRunning(replica, state, resp, context);
+    return false;
+  }
+  return true;
+}
+
+// Lookup the given tablet, ensuring that it both exists and is RUNNING.
+// If it is not, responds to the RPC associated with 'context' after setting
+// resp->mutable_error() to indicate the failure reason.
+//
+// Returns true if successful.
+template<class RespClass>
+bool LookupRunningTabletReplicaOrRespond(TabletReplicaLookupIf* tablet_manager,
+                                         const string& tablet_id,
+                                         RespClass* resp,
+                                         rpc::RpcContext* context,
+                                         scoped_refptr<TabletReplica>* replica) {
+  if (!LookupTabletReplicaOrRespond(tablet_manager, tablet_id, resp, context, replica)) {
+    return false;
+  }
+  if (!CheckTabletReplicaRunningOrRespond(*replica, resp, context)) {
     return false;
   }
   return true;
@@ -253,14 +297,16 @@ template<class RespClass>
 bool GetConsensusOrRespond(const scoped_refptr<TabletReplica>& replica,
                            RespClass* resp,
                            rpc::RpcContext* context,
-                           shared_ptr<RaftConsensus>* consensus) {
-  *consensus = replica->shared_consensus();
-  if (!*consensus) {
-    Status s = Status::ServiceUnavailable("Raft Consensus unavailable. Tablet not running");
+                           shared_ptr<RaftConsensus>* consensus_out) {
+  shared_ptr<RaftConsensus> tmp_consensus = replica->shared_consensus();
+  if (!tmp_consensus) {
+    Status s = Status::ServiceUnavailable("Raft Consensus unavailable",
+                                          "Tablet replica not initialized");
     SetupErrorAndRespond(resp->mutable_error(), s,
                          TabletServerErrorPB::TABLET_NOT_RUNNING, context);
     return false;
   }
+  *consensus_out = std::move(tmp_consensus);
   return true;
 }
 
@@ -620,8 +666,8 @@ void TabletServiceAdminImpl::AlterSchema(const AlterSchemaRequestPB* req,
   DVLOG(3) << "Received Alter Schema RPC: " << SecureDebugString(*req);
 
   scoped_refptr<TabletReplica> replica;
-  if (!LookupTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp, context,
-                                    &replica)) {
+  if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
+                                           context, &replica)) {
     return;
   }
 
@@ -786,8 +832,8 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
   DVLOG(3) << "Received Write RPC: " << SecureDebugString(*req);
 
   scoped_refptr<TabletReplica> replica;
-  if (!LookupTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp, context,
-                                    &replica)) {
+  if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp,
+                                           context, &replica)) {
     return;
   }
 
@@ -895,7 +941,8 @@ void ConsensusServiceImpl::UpdateConsensus(const ConsensusRequestPB* req,
     return;
   }
   scoped_refptr<TabletReplica> replica;
-  if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) {
+  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+                                           &replica)) {
     return;
   }
 
@@ -926,15 +973,51 @@ void ConsensusServiceImpl::RequestConsensusVote(const VoteRequestPB* req,
   if (!CheckUuidMatchOrRespond(tablet_manager_, "RequestConsensusVote", req, resp, context)) {
     return;
   }
+
+  // Because the last-logged opid is stored in the TabletMetadata we go through
+  // the following dance:
+  // 1. Get a reference to the currently-registered TabletReplica.
+  // 2. Fetch (non-atomically) the current data state and last-logged opid from
+  //    the TabletMetadata.
+  // 3. If the data state is COPYING or TOMBSTONED, pass the last-logged opid
+  // from the TabletMetadata into RaftConsensus::RequestVote().
+  //
+  // The reason this sequence is safe to do without atomic locks is the
+  // RaftConsensus object associated with the TabletReplica will be Shutdown()
+  // and thus unable to vote if another TabletCopy operation comes between
+  // steps 1 and 3.
+  //
+  // TODO(mpercy): Move the last-logged opid into ConsensusMetadata to avoid
+  // this hacky plumbing. An additional benefit would be that we would be able
+  // to easily "tombstoned vote" while the tablet is bootstrapping.
+
   scoped_refptr<TabletReplica> replica;
   if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) {
     return;
   }
 
+  boost::optional<OpId> last_logged_opid;
+  tablet::TabletDataState data_state = replica->tablet_metadata()->tablet_data_state();
+
+  LOG(INFO) << "Received RequestConsensusVote() RPC: " << SecureShortDebugString(*req);
+
+  // We cannot vote while DELETED. This check is not racy because DELETED is a
+  // terminal state; it is not possible to transition out of DELETED.
+  if (data_state == TABLET_DATA_DELETED) {
+    RespondTabletNotRunning(replica, replica->state(), resp, context);
+    return;
+  }
+
+  // Attempt to vote while copying or tombstoned.
+  if (data_state == TABLET_DATA_COPYING || data_state == TABLET_DATA_TOMBSTONED) {
+    last_logged_opid = replica->tablet_metadata()->tombstone_last_logged_opid();
+  }
+
   // Submit the vote request directly to the consensus instance.
   shared_ptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
-  Status s = consensus->RequestVote(req, resp);
+
+  Status s = consensus->RequestVote(req, std::move(last_logged_opid), resp);
   if (PREDICT_FALSE(!s.ok())) {
     SetupErrorAndRespond(resp->mutable_error(), s,
                          TabletServerErrorPB::UNKNOWN_ERROR,
@@ -952,8 +1035,8 @@ void ConsensusServiceImpl::ChangeConfig(const ChangeConfigRequestPB* req,
     return;
   }
   scoped_refptr<TabletReplica> replica;
-  if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
-                                    &replica)) {
+  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+                                           &replica)) {
     return;
   }
 
@@ -976,8 +1059,8 @@ void ConsensusServiceImpl::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB*
     return;
   }
   scoped_refptr<TabletReplica> replica;
-  if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
-                                    &replica)) {
+  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+                                           &replica)) {
     return;
   }
 
@@ -1008,7 +1091,8 @@ void ConsensusServiceImpl::RunLeaderElection(const RunLeaderElectionRequestPB* r
     return;
   }
   scoped_refptr<TabletReplica> replica;
-  if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) {
+  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+                                           &replica)) {
     return;
   }
 
@@ -1034,7 +1118,8 @@ void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req,
     return;
   }
   scoped_refptr<TabletReplica> replica;
-  if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) {
+  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+                                           &replica)) {
     return;
   }
 
@@ -1058,7 +1143,8 @@ void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *re
     return;
   }
   scoped_refptr<TabletReplica> replica;
-  if (!LookupTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context, &replica)) {
+  if (!LookupRunningTabletReplicaOrRespond(tablet_manager_, req->tablet_id(), resp, context,
+                                           &replica)) {
     return;
   }
 
@@ -1186,8 +1272,8 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
   if (req->has_new_scan_request()) {
     const NewScanRequestPB& scan_pb = req->new_scan_request();
     scoped_refptr<TabletReplica> replica;
-    if (!LookupTabletReplicaOrRespond(server_->tablet_manager(), scan_pb.tablet_id(), resp, context,
-                                      &replica)) {
+    if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), scan_pb.tablet_id(), resp,
+                                             context, &replica)) {
       return;
     }
     string scanner_id;
@@ -1297,8 +1383,8 @@ void TabletServiceImpl::Checksum(const ChecksumRequestPB* req,
     scan_req.mutable_new_scan_request()->CopyFrom(req->new_request());
     const NewScanRequestPB& new_req = req->new_request();
     scoped_refptr<TabletReplica> replica;
-    if (!LookupTabletReplicaOrRespond(server_->tablet_manager(), new_req.tablet_id(), resp, context,
-                                      &replica)) {
+    if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), new_req.tablet_id(), resp,
+                                             context, &replica)) {
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/ts_tablet_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index 3b9b20e..0a60fd8 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -218,7 +218,8 @@ Status TSTabletManager::Init() {
       CHECK_OK(StartTabletStateTransitionUnlocked(meta->tablet_id(), "opening tablet", &deleter));
     }
 
-    scoped_refptr<TabletReplica> replica = CreateAndRegisterTabletReplica(meta, NEW_REPLICA);
+    scoped_refptr<TabletReplica> replica;
+    RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &replica));
     RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(boost::bind(&TSTabletManager::OpenTablet,
                                                             this, replica, deleter)));
   }
@@ -297,7 +298,8 @@ Status TSTabletManager::CreateNewTablet(const string& table_id,
   scoped_refptr<ConsensusMetadata> cmeta;
   RETURN_NOT_OK_PREPEND(cmeta_manager_->Create(tablet_id, config, kMinimumTerm, &cmeta),
                         "Unable to create new ConsensusMetadata for tablet " + tablet_id);
-  scoped_refptr<TabletReplica> new_replica = CreateAndRegisterTabletReplica(meta, NEW_REPLICA);
+  scoped_refptr<TabletReplica> new_replica;
+  RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &new_replica));
 
   // We can run this synchronously since there is nothing to bootstrap.
   RETURN_NOT_OK(open_tablet_pool_->SubmitFunc(boost::bind(&TSTabletManager::OpenTablet,
@@ -499,12 +501,15 @@ void TSTabletManager::RunTabletCopy(
             CheckLeaderTermNotLower(tablet_id, leader_term, opt_last_logged_opid->term()),
             TabletServerErrorPB::INVALID_CONFIG);
 
-        // Tombstone the tablet and store the last-logged OpId.
+        // Shut down the old TabletReplica so that it is no longer allowed to
+        // mutate the ConsensusMetadata.
         old_replica->Shutdown();
+
         // Note that this leaves the data dir manager without any references to
         // tablet_id. This is okay because the tablet_copy_client should
         // generate a new disk group during the call to Start().
-        //
+
+        // Tombstone the tablet and store the last-logged OpId.
         // TODO(mpercy): Because we begin shutdown of the tablet after we check our
         // last-logged term against the leader's term, there may be operations
         // in flight and it may be possible for the same check in the tablet
@@ -544,16 +549,21 @@ void TSTabletManager::RunTabletCopy(
   }
   CALLBACK_RETURN_NOT_OK(tc_client.Start(copy_source_addr, &meta));
 
-  // From this point onward, the superblock is persisted in TABLET_DATA_COPYING
-  // state, and we need to tombtone the tablet if additional steps prior to
-  // getting to a TABLET_DATA_READY state fail.
+  // After calling TabletCopyClient::Start(), the superblock is persisted in
+  // TABLET_DATA_COPYING state. TabletCopyClient will automatically tombstone
+  // the tablet by implicitly calling Abort() on itself if it is destroyed
+  // prior to calling TabletCopyClient::Finish(), which if successful
+  // transitions the tablet into the TABLET_DATA_READY state.
 
-  // Registering a non-initialized TabletReplica offers visibility through the Web UI.
+  // Registering an unstarted TabletReplica allows for tombstoned voting and
+  // offers visibility through the Web UI.
   RegisterTabletReplicaMode mode = replacing_tablet ? REPLACEMENT_REPLICA : NEW_REPLICA;
-  scoped_refptr<TabletReplica> replica = CreateAndRegisterTabletReplica(meta, mode);
+  scoped_refptr<TabletReplica> replica;
+  CALLBACK_RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, mode, &replica));
 
   // Now we invoke the StartTabletCopy callback and respond success to the
-  // remote caller. Then we proceed to do most of the actual tablet copying work.
+  // remote caller, since StartTabletCopy() is an asynchronous RPC call. Then
+  // we proceed with the Tablet Copy process.
   cb(Status::OK(), TabletServerErrorPB::UNKNOWN_ERROR);
   cb = [](const Status&, TabletServerErrorPB::Code) {
     LOG(FATAL) << "Callback invoked twice from TSTabletManager::RunTabletCopy()";
@@ -561,7 +571,7 @@ void TSTabletManager::RunTabletCopy(
 
   // From this point onward, we do not notify the caller about progress or success.
 
-  // Download all of the remote files.
+  // Go through and synchronously download the remote blocks and WAL segments.
   Status s = tc_client.FetchAll(replica);
   if (!s.ok()) {
     LOG(WARNING) << LogPrefix(tablet_id) << "Tablet Copy: Unable to fetch data from remote peer "
@@ -580,14 +590,15 @@ void TSTabletManager::RunTabletCopy(
     return;
   }
 
-  // startup it's still in a valid, fully-copied state.
+  // Bootstrap and start the fully-copied tablet.
   OpenTablet(replica, deleter);
 }
 
 // Create and register a new TabletReplica, given tablet metadata.
-scoped_refptr<TabletReplica> TSTabletManager::CreateAndRegisterTabletReplica(
+Status TSTabletManager::CreateAndRegisterTabletReplica(
     scoped_refptr<TabletMetadata> meta,
-    RegisterTabletReplicaMode mode) {
+    RegisterTabletReplicaMode mode,
+    scoped_refptr<TabletReplica>* replica_out) {
   const string& tablet_id = meta->tablet_id();
   scoped_refptr<TabletReplica> replica(
       new TabletReplica(std::move(meta),
@@ -597,8 +608,14 @@ scoped_refptr<TabletReplica> TSTabletManager::CreateAndRegisterTabletReplica(
                         Bind(&TSTabletManager::MarkTabletDirty,
                              Unretained(this),
                              tablet_id)));
+  Status s = replica->Init(server_->raft_pool());
+  if (PREDICT_FALSE(!s.ok())) {
+    replica->SetError(s);
+    replica->Shutdown();
+  }
   RegisterTablet(tablet_id, replica, mode);
-  return replica;
+  *replica_out = std::move(replica);
+  return Status::OK();
 }
 
 Status TSTabletManager::DeleteTablet(
@@ -641,8 +658,7 @@ Status TSTabletManager::DeleteTablet(
   // consensus and therefore the log is not available.
   TabletDataState data_state = replica->tablet_metadata()->tablet_data_state();
   bool tablet_deleted = (data_state == TABLET_DATA_DELETED ||
-                         data_state == TABLET_DATA_TOMBSTONED ||
-                         replica->state() == tablet::FAILED);
+                         data_state == TABLET_DATA_TOMBSTONED);
 
   // They specified an "atomic" delete. Check the committed config's opid_index.
   // TODO(mpercy): There's actually a race here between the check and shutdown,
@@ -665,7 +681,7 @@ Status TSTabletManager::DeleteTablet(
     }
   }
 
-  replica->Shutdown();
+  replica->Stop();
 
   boost::optional<OpId> opt_last_logged_opid;
   if (consensus) {
@@ -825,7 +841,6 @@ void TSTabletManager::OpenTablet(const scoped_refptr<TabletReplica>& replica,
                        server_->messenger(),
                        server_->result_tracker(),
                        log,
-                       server_->raft_pool(),
                        server_->tablet_prepare_pool());
     if (!s.ok()) {
       LOG(ERROR) << LogPrefix(tablet_id) << "Tablet failed to start: "
@@ -940,12 +955,6 @@ Status TSTabletManager::GetTabletReplica(const string& tablet_id,
   if (!LookupTablet(tablet_id, replica)) {
     return Status::NotFound("Tablet not found", tablet_id);
   }
-  TabletDataState data_state = (*replica)->tablet_metadata()->tablet_data_state();
-  if (data_state != TABLET_DATA_READY) {
-    return Status::IllegalState("Tablet data state not TABLET_DATA_READY: " +
-                                TabletDataState_Name(data_state),
-                                tablet_id);
-  }
   return Status::OK();
 }
 
@@ -1062,7 +1071,8 @@ Status TSTabletManager::HandleNonReadyTabletOnStartup(const scoped_refptr<Tablet
   // allows us to permanently delete replica tombstones when a table gets
   // deleted.
   if (data_state == TABLET_DATA_TOMBSTONED) {
-    CreateAndRegisterTabletReplica(meta, NEW_REPLICA);
+    scoped_refptr<TabletReplica> dummy;
+    RETURN_NOT_OK(CreateAndRegisterTabletReplica(meta, NEW_REPLICA, &dummy));
   }
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/ts_tablet_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/ts_tablet_manager.h b/src/kudu/tserver/ts_tablet_manager.h
index b0420da..7257d84 100644
--- a/src/kudu/tserver/ts_tablet_manager.h
+++ b/src/kudu/tserver/ts_tablet_manager.h
@@ -260,9 +260,9 @@ class TSTabletManager : public tserver::TabletReplicaLookupIf {
   // Calls RegisterTablet() with the given 'mode' parameter after constructing
   // the TablerPeer object. See RegisterTablet() for details about the
   // semantics of 'mode' and the locking requirements.
-  scoped_refptr<tablet::TabletReplica> CreateAndRegisterTabletReplica(
-      scoped_refptr<tablet::TabletMetadata> meta,
-      RegisterTabletReplicaMode mode);
+  Status CreateAndRegisterTabletReplica(scoped_refptr<tablet::TabletMetadata> meta,
+                                        RegisterTabletReplicaMode mode,
+                                        scoped_refptr<tablet::TabletReplica>* replica_out);
 
   // Helper to generate the report for a single tablet.
   void CreateReportedTabletPB(const std::string& tablet_id,

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tserver/tserver-path-handlers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tserver-path-handlers.cc b/src/kudu/tserver/tserver-path-handlers.cc
index 8578939..c348341 100644
--- a/src/kudu/tserver/tserver-path-handlers.cc
+++ b/src/kudu/tserver/tserver-path-handlers.cc
@@ -490,7 +490,7 @@ void TabletServerPathHandlers::HandleConsensusStatusPage(const Webserver::WebReq
   if (!LoadTablet(tserver_, req, &id, &replica, output)) return;
   shared_ptr<consensus::RaftConsensus> consensus = replica->shared_consensus();
   if (!consensus) {
-    *output << "Tablet " << EscapeForHtmlToString(id) << " not running";
+    *output << "Tablet " << EscapeForHtmlToString(id) << " not initialized";
     return;
   }
   consensus->DumpStatusHtml(*output);

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/util/make_shared.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/make_shared.h b/src/kudu/util/make_shared.h
index c534ebf..ddba52c 100644
--- a/src/kudu/util/make_shared.h
+++ b/src/kudu/util/make_shared.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef KUDU_UTIL_MAKE_SHARED_H_
-#define KUDU_UTIL_MAKE_SHARED_H_
+#pragma once
 
 #ifdef __GLIBCXX__
 #include <ext/alloc_traits.h>  // IWYU pragma: export
@@ -64,5 +63,3 @@
 #else
   #error "Need to implement ALLOW_MAKE_SHARED for your platform!"
 #endif
-
-#endif // KUDU_UTIL_MAKE_SHARED_H_


[2/2] kudu git commit: KUDU-871. Support tombstoned voting

Posted by to...@apache.org.
KUDU-871. Support tombstoned voting

This patch makes it possible for tombstoned tablet replicas to vote in
Raft elections.

Changes:

* Add Stop() method to TabletReplica + Consensus lifecycle.
  * Includes new STOPPED state.
  * Tombstoning a replica should call Stop().
  * Deleting a replica should call Shutdown().
* Persist ConsensusMetadata before returning from
  TabletCopyClient::Start() because we need cmeta to Init()
  RaftConsensus, which happens when registering the replica in
  TSTabletManager.
* TSTabletManager::DeleteTablet() should not consider FAILED == deleted,
  since we no longer destroy RaftConsensus when tombstoning a replica.
* Add positive and negative tests for tombstoned voting.
* Add a stress test that induces lots of tombstoned voting
  while running TabletCopy, TabletBootstrap, and DeleteTablet.
* Fix DeleteTableITest.TestMergeConsensusMetadata after tombstoned
  voting changed its assumption that tombstoned tablets would not vote.
* Fix several tests that expected tombstoned tablets to be SHUTDOWN when
  now they are STOPPED.

Change-Id: Ia19d75b185299443b27f41e468bbae20065e7570
Reviewed-on: http://gerrit.cloudera.org:8080/6960
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5bca7d8b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5bca7d8b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5bca7d8b

Branch: refs/heads/master
Commit: 5bca7d8ba185d62952fb3e3163cbe88d20453da0
Parents: b37bde7
Author: Mike Percy <mp...@apache.org>
Authored: Wed Aug 23 00:01:04 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Aug 24 04:43:25 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus-test-util.h        |   2 +-
 src/kudu/consensus/raft_consensus.cc            | 244 +++++++---
 src/kudu/consensus/raft_consensus.h             |  88 +++-
 .../consensus/raft_consensus_quorum-test.cc     |  29 +-
 src/kudu/integration-tests/CMakeLists.txt       |   2 +
 .../integration-tests/cluster_itest_util.cc     |  29 ++
 src/kudu/integration-tests/cluster_itest_util.h |  12 +
 .../integration-tests/delete_table-itest.cc     |  19 +-
 .../external_mini_cluster-itest-base.cc         |   6 +-
 .../tombstoned_voting-itest.cc                  | 461 +++++++++++++++++++
 .../tombstoned_voting-stress-test.cc            | 313 +++++++++++++
 src/kudu/master/sys_catalog.cc                  |   7 +-
 src/kudu/tablet/metadata.proto                  |  31 +-
 src/kudu/tablet/tablet_metadata.h               |   7 +-
 src/kudu/tablet/tablet_replica-test.cc          |  30 +-
 src/kudu/tablet/tablet_replica.cc               | 127 +++--
 src/kudu/tablet/tablet_replica.h                |  45 +-
 src/kudu/tools/kudu-admin-test.cc               |   4 +-
 src/kudu/tools/kudu-ts-cli-test.cc              |   2 +-
 src/kudu/tserver/tablet_copy_client.cc          |   6 +-
 src/kudu/tserver/tablet_copy_client.h           |   3 +-
 .../tserver/tablet_copy_source_session-test.cc  |  37 +-
 src/kudu/tserver/tablet_service.cc              | 156 +++++--
 src/kudu/tserver/ts_tablet_manager.cc           |  62 +--
 src/kudu/tserver/ts_tablet_manager.h            |   6 +-
 src/kudu/tserver/tserver-path-handlers.cc       |   2 +-
 src/kudu/util/make_shared.h                     |   5 +-
 27 files changed, 1439 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index ac4408b..c73e50b 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -534,7 +534,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
     Status s = peers_->GetPeerByUuid(peer_uuid_, &peer);
 
     if (s.ok()) {
-      s = peer->RequestVote(&other_peer_req, &other_peer_resp);
+      s = peer->RequestVote(&other_peer_req, boost::none, &other_peer_resp);
     }
     if (!s.ok()) {
       LOG(WARNING) << "Could not RequestVote from replica with request: "

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 042c5d1..11d1e40 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -118,6 +118,11 @@ DEFINE_bool(raft_enable_pre_election, true,
 TAG_FLAG(raft_enable_pre_election, experimental);
 TAG_FLAG(raft_enable_pre_election, runtime);
 
+DEFINE_bool(raft_enable_tombstoned_voting, true,
+            "When enabled, tombstoned tablets may vote in elections.");
+TAG_FLAG(raft_enable_tombstoned_voting, experimental);
+TAG_FLAG(raft_enable_tombstoned_voting, runtime);
+
 DECLARE_int32(memory_limit_warn_threshold_percentage);
 
 // Metrics
@@ -133,9 +138,11 @@ METRIC_DEFINE_gauge_int64(tablet, raft_term,
                           "Current Term of the Raft Consensus algorithm. This number increments "
                           "each time a leader election is started.");
 
+using boost::optional;
 using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::PeriodicTimer;
 using kudu::tserver::TabletServerErrorPB;
+using std::shared_ptr;
 using std::string;
 using std::unique_ptr;
 using std::weak_ptr;
@@ -165,10 +172,8 @@ RaftConsensus::RaftConsensus(
 
 Status RaftConsensus::Init() {
   DCHECK_EQ(kNew, state_) << State_Name(state_);
-
   RETURN_NOT_OK(cmeta_manager_->Load(options_.tablet_id, &cmeta_));
-
-  state_ = kInitialized;
+  SetStateUnlocked(kInitialized);
   return Status::OK();
 }
 
@@ -176,6 +181,20 @@ RaftConsensus::~RaftConsensus() {
   Shutdown();
 }
 
+Status RaftConsensus::Create(ConsensusOptions options,
+                             RaftPeerPB local_peer_pb,
+                             scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+                             ThreadPool* raft_pool,
+                             shared_ptr<RaftConsensus>* consensus_out) {
+  shared_ptr<RaftConsensus> consensus(std::make_shared<RaftConsensus>(std::move(options),
+                                                                      std::move(local_peer_pb),
+                                                                      std::move(cmeta_manager),
+                                                                      raft_pool));
+  RETURN_NOT_OK_PREPEND(consensus->Init(), "Unable to initialize Raft consensus");
+  *consensus_out = std::move(consensus);
+  return Status::OK();
+}
+
 Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
                             gscoped_ptr<PeerProxyFactory> peer_proxy_factory,
                             scoped_refptr<log::Log> log,
@@ -257,13 +276,13 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
 
     // Our last persisted term can be higher than the last persisted operation
     // (i.e. if we called an election) but reverse should never happen.
-    if (info.last_id.term() > GetCurrentTermUnlocked()) {
+    if (info.last_id.term() > CurrentTermUnlocked()) {
       return Status::Corruption(Substitute("Unable to start RaftConsensus: "
           "The last op in the WAL with id $0 has a term ($1) that is greater "
           "than the latest recorded term, which is $2",
           OpIdToString(info.last_id),
           info.last_id.term(),
-          GetCurrentTermUnlocked()));
+          CurrentTermUnlocked()));
     }
 
     // Append any uncommitted replicate messages found during log replay to the queue.
@@ -283,7 +302,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
     // If this is the first term expire the FD immediately so that we have a
     // fast first election, otherwise we just let the timer expire normally.
     boost::optional<MonoDelta> initial_delta;
-    if (GetCurrentTermUnlocked() == 0) {
+    if (CurrentTermUnlocked() == 0) {
       // The failure detector is initialized to a low value to trigger an early
       // election (unless someone else requested a vote from us first, which
       // resets the election timer).
@@ -304,7 +323,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
     // Now assume "follower" duties.
     RETURN_NOT_OK(BecomeReplicaUnlocked());
 
-    state_ = kRunning;
+    SetStateUnlocked(kRunning);
   }
 
   if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) {
@@ -336,7 +355,7 @@ Status RaftConsensus::EmulateElection() {
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Emulating election...";
 
   // Assume leadership of new term.
-  RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1));
+  RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1));
   SetLeaderUuidUnlocked(peer_uuid());
   return BecomeLeaderUnlocked();
 }
@@ -408,7 +427,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
 
       // We skip flushing the term to disk because setting the vote just below also
       // flushes to disk, and the double fsync doesn't buy us anything.
-      RETURN_NOT_OK(HandleTermAdvanceUnlocked(GetCurrentTermUnlocked() + 1,
+      RETURN_NOT_OK(HandleTermAdvanceUnlocked(CurrentTermUnlocked() + 1,
                                               SKIP_FLUSH_TO_DISK));
       RETURN_NOT_OK(SetVotedForCurrentTermUnlocked(peer_uuid()));
     }
@@ -427,7 +446,7 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
     RETURN_NOT_OK(counter->RegisterVote(peer_uuid(), VOTE_GRANTED, &duplicate));
     CHECK(!duplicate) << LogPrefixUnlocked()
                       << "Inexplicable duplicate self-vote for term "
-                      << GetCurrentTermUnlocked();
+                      << CurrentTermUnlocked();
 
     VoteRequestPB request;
     request.set_ignore_live_leader(mode == ELECT_EVEN_IF_LEADER_IS_ALIVE);
@@ -436,9 +455,9 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
       // In a pre-election, we haven't bumped our own term yet, so we need to be
       // asking for votes for the next term.
       request.set_is_pre_election(true);
-      request.set_candidate_term(GetCurrentTermUnlocked() + 1);
+      request.set_candidate_term(CurrentTermUnlocked() + 1);
     } else {
-      request.set_candidate_term(GetCurrentTermUnlocked());
+      request.set_candidate_term(CurrentTermUnlocked());
     }
     request.set_tablet_id(options_.tablet_id);
     *request.mutable_candidate_status()->mutable_last_received() =
@@ -581,7 +600,7 @@ Status RaftConsensus::Replicate(const scoped_refptr<ConsensusRound>& round) {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
     RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
-    RETURN_NOT_OK(round->CheckBoundTerm(GetCurrentTermUnlocked()));
+    RETURN_NOT_OK(round->CheckBoundTerm(CurrentTermUnlocked()));
     RETURN_NOT_OK(AppendNewRoundToQueueUnlocked(round));
   }
 
@@ -593,7 +612,7 @@ Status RaftConsensus::CheckLeadershipAndBindTerm(const scoped_refptr<ConsensusRo
   ThreadRestrictions::AssertWaitAllowed();
   LockGuard l(lock_);
   RETURN_NOT_OK(CheckSafeToReplicateUnlocked(*round->replicate_msg()));
-  round->BindToTerm(GetCurrentTermUnlocked());
+  round->BindToTerm(CurrentTermUnlocked());
   return Status::OK();
 }
 
@@ -669,7 +688,7 @@ void RaftConsensus::NotifyCommitIndex(int64_t commit_index) {
   // We will process commit notifications while shutting down because a replica
   // which has initiated a Prepare() / Replicate() may eventually commit even if
   // its state has changed after the initial Append() / Update().
-  if (PREDICT_FALSE(state_ != kRunning && state_ != kShuttingDown)) {
+  if (PREDICT_FALSE(state_ != kRunning && state_ != kStopping)) {
     LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to update committed index: "
                                       << "Replica not in running state: "
                                       << State_Name(state_);
@@ -716,7 +735,7 @@ void RaftConsensus::NotifyFailedFollower(const string& uuid,
   {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
-    int64_t current_term = GetCurrentTermUnlocked();
+    int64_t current_term = CurrentTermUnlocked();
     if (current_term != term) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << fail_msg << "Notified about a follower failure in "
                                      << "previous term " << term << ", but a leader election "
@@ -899,16 +918,16 @@ Status RaftConsensus::HandleLeaderRequestTermUnlocked(const ConsensusRequestPB*
                                                       ConsensusResponsePB* response) {
   DCHECK(lock_.is_locked());
   // Do term checks first:
-  if (PREDICT_FALSE(request->caller_term() != GetCurrentTermUnlocked())) {
+  if (PREDICT_FALSE(request->caller_term() != CurrentTermUnlocked())) {
 
     // If less, reject.
-    if (request->caller_term() < GetCurrentTermUnlocked()) {
+    if (request->caller_term() < CurrentTermUnlocked()) {
       string msg = Substitute("Rejecting Update request from peer $0 for earlier term $1. "
                               "Current term is $2. Ops: $3",
 
                               request->caller_uuid(),
                               request->caller_term(),
-                              GetCurrentTermUnlocked(),
+                              CurrentTermUnlocked(),
                               OpsRangeString(*request));
       LOG_WITH_PREFIX_UNLOCKED(INFO) << msg;
       FillConsensusResponseError(response,
@@ -1371,7 +1390,7 @@ Status RaftConsensus::UpdateReplica(const ConsensusRequestPB* request,
 void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* response) {
   DCHECK(lock_.is_locked());
   TRACE("Filling consensus response to leader.");
-  response->set_responder_term(GetCurrentTermUnlocked());
+  response->set_responder_term(CurrentTermUnlocked());
   response->mutable_status()->mutable_last_received()->CopyFrom(
       queue_->GetLastOpIdInLog());
   response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(
@@ -1388,7 +1407,9 @@ void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
   StatusToPB(status, error->mutable_status());
 }
 
-Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB* response) {
+Status RaftConsensus::RequestVote(const VoteRequestPB* request,
+                                  optional<OpId> tombstone_last_logged_opid,
+                                  VoteResponsePB* response) {
   TRACE_EVENT2("consensus", "RaftConsensus::RequestVote",
                "peer", peer_uuid(),
                "tablet", options_.tablet_id);
@@ -1414,14 +1435,41 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
     // We still need to take the state lock in order to respond with term info, etc.
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
-    RETURN_NOT_OK(CheckRunningUnlocked());
     return RequestVoteRespondIsBusy(request, response);
   }
 
   // Acquire the replica state lock so we can read / modify the consensus state.
   ThreadRestrictions::AssertWaitAllowed();
   LockGuard l(lock_);
-  RETURN_NOT_OK(CheckRunningUnlocked());
+
+  // Ensure our lifecycle state is compatible with voting.
+  // If RaftConsensus is running, we use the latest OpId from the WAL to vote.
+  // Otherwise, we must be voting while tombstoned.
+  OpId local_last_logged_opid;
+  switch (state_) {
+    case kShutdown:
+      return Status::IllegalState("cannot vote while shut down");
+    case kRunning:
+      // Note: it is (theoretically) possible for 'tombstone_last_logged_opid'
+      // to be passed in and by the time we reach here the state is kRunning.
+      // That may occur when a vote request comes in at the end of a tablet
+      // copy and then tablet bootstrap completes quickly. In that case, we
+      // ignore the passed-in value and use the latest OpId from our queue.
+      local_last_logged_opid = queue_->GetLastOpIdInLog();
+      break;
+    default:
+      if (!tombstone_last_logged_opid) {
+        return Status::IllegalState("must be running to vote when last-logged opid is not known");
+      }
+      if (!FLAGS_raft_enable_tombstoned_voting) {
+        return Status::IllegalState("must be running to vote when tombstoned voting is disabled");
+      }
+      local_last_logged_opid = *tombstone_last_logged_opid;
+      LOG_WITH_PREFIX_UNLOCKED(INFO) << "voting while tombstoned based on last-logged opid "
+                                     << local_last_logged_opid;
+      break;
+  }
+  DCHECK(local_last_logged_opid.IsInitialized());
 
   // If the node is not in the configuration, allow the vote (this is required by Raft)
   // but log an informational message anyway.
@@ -1452,12 +1500,12 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
   }
 
   // Candidate is running behind.
-  if (request->candidate_term() < GetCurrentTermUnlocked()) {
+  if (request->candidate_term() < CurrentTermUnlocked()) {
     return RequestVoteRespondInvalidTerm(request, response);
   }
 
   // We already voted this term.
-  if (request->candidate_term() == GetCurrentTermUnlocked() &&
+  if (request->candidate_term() == CurrentTermUnlocked() &&
       HasVotedCurrentTermUnlocked()) {
 
     // Already voted for the same candidate in the current term.
@@ -1471,7 +1519,6 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
 
   // Candidate must have last-logged OpId at least as large as our own to get
   // our vote.
-  OpId local_last_logged_opid = queue_->GetLastOpIdInLog();
   bool vote_yes = !OpIdLessThan(request->candidate_status().last_received(),
                                 local_last_logged_opid);
 
@@ -1480,14 +1527,14 @@ Status RaftConsensus::RequestVote(const VoteRequestPB* request, VoteResponsePB*
   // has actually now successfully become leader of the prior term, in which case
   // bumping our term here would disrupt it.
   if (!request->is_pre_election() &&
-      request->candidate_term() > GetCurrentTermUnlocked()) {
+      request->candidate_term() > CurrentTermUnlocked()) {
     // If we are going to vote for this peer, then we will flush the consensus metadata
     // to disk below when we record the vote, and we can skip flushing the term advancement
     // to disk here.
     auto flush = vote_yes ? SKIP_FLUSH_TO_DISK : FLUSH_TO_DISK;
     RETURN_NOT_OK_PREPEND(HandleTermAdvanceUnlocked(request->candidate_term(), flush),
         Substitute("Could not step down in RequestVote. Current term: $0, candidate term: $1",
-                   GetCurrentTermUnlocked(), request->candidate_term()));
+                   CurrentTermUnlocked(), request->candidate_term()));
   }
 
   if (!vote_yes) {
@@ -1629,7 +1676,7 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
     // we can stick them in the consensus update request later.
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
-    current_term = GetCurrentTermUnlocked();
+    current_term = CurrentTermUnlocked();
     committed_config = cmeta_->CommittedConfig();
     if (cmeta_->has_pending_config()) {
       LOG_WITH_PREFIX_UNLOCKED(WARNING)
@@ -1746,23 +1793,17 @@ Status RaftConsensus::UnsafeChangeConfig(const UnsafeChangeConfigRequestPB& req,
   return s;
 }
 
-void RaftConsensus::Shutdown() {
+void RaftConsensus::Stop() {
   TRACE_EVENT2("consensus", "RaftConsensus::Shutdown",
                "peer", peer_uuid(),
                "tablet", options_.tablet_id);
 
-  // Avoid taking locks if already shut down so we don't violate
-  // ThreadRestrictions assertions in the case where the RaftConsensus
-  // destructor runs on the reactor thread due to an election callback being
-  // the last outstanding reference.
-  if (shutdown_.Load(kMemOrderAcquire)) return;
-
   {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
-    // Transition to kShuttingDown state.
-    CHECK_NE(kShutDown, state_) << State_Name(state_); // We are protected here by 'shutdown_'.
-    state_ = kShuttingDown;
+    if (state_ == kStopping || state_ == kStopped || state_ == kShutdown) return;
+    // Transition to kStopping state.
+    SetStateUnlocked(kStopping);
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus shutting down.";
   }
 
@@ -1776,15 +1817,40 @@ void RaftConsensus::Shutdown() {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
     if (pending_) CHECK_OK(pending_->CancelPendingTransactions());
-    CHECK_EQ(kShuttingDown, state_) << State_Name(state_);
-    state_ = kShutDown;
+    SetStateUnlocked(kStopped);
+
+    // Clear leader status on Stop(), in case this replica was the leader. If
+    // we don't do this, the log messages still show this node as the leader.
+    // No need to sync it since it's not persistent state.
+    if (cmeta_) {
+      ClearLeaderUnlocked();
+    }
+
+    // If we were the leader, stop witholding votes.
+    if (withhold_votes_until_ == MonoTime::Max()) {
+      withhold_votes_until_ = MonoTime::Min();
+    }
+
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Raft consensus is shut down!";
   }
 
   // Shut down things that might acquire locks during destruction.
   if (raft_pool_token_) raft_pool_token_->Shutdown();
   if (failure_detector_) DisableFailureDetector();
+}
+
+void RaftConsensus::Shutdown() {
+  // Avoid taking locks if already shut down so we don't violate
+  // ThreadRestrictions assertions in the case where the RaftConsensus
+  // destructor runs on the reactor thread due to an election callback being
+  // the last outstanding reference.
+  if (shutdown_.Load(kMemOrderAcquire)) return;
 
+  Stop();
+  {
+    LockGuard l(lock_);
+    SetStateUnlocked(kShutdown);
+  }
   shutdown_.Store(true, kMemOrderRelease);
 }
 
@@ -1826,13 +1892,13 @@ std::string RaftConsensus::GetRequestVoteLogPrefixUnlocked(const VoteRequestPB&
 }
 
 void RaftConsensus::FillVoteResponseVoteGranted(VoteResponsePB* response) {
-  response->set_responder_term(GetCurrentTermUnlocked());
+  response->set_responder_term(CurrentTermUnlocked());
   response->set_vote_granted(true);
 }
 
 void RaftConsensus::FillVoteResponseVoteDenied(ConsensusErrorPB::Code error_code,
                                                VoteResponsePB* response) {
-  response->set_responder_term(GetCurrentTermUnlocked());
+  response->set_responder_term(CurrentTermUnlocked());
   response->set_vote_granted(false);
   response->mutable_consensus_error()->set_code(error_code);
 }
@@ -1845,7 +1911,7 @@ Status RaftConsensus::RequestVoteRespondInvalidTerm(const VoteRequestPB* request
                           GetRequestVoteLogPrefixUnlocked(*request),
                           request->candidate_uuid(),
                           request->candidate_term(),
-                          GetCurrentTermUnlocked());
+                          CurrentTermUnlocked());
   LOG(INFO) << msg;
   StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
   return Status::OK();
@@ -1869,7 +1935,7 @@ Status RaftConsensus::RequestVoteRespondAlreadyVotedForOther(const VoteRequestPB
                           "Already voted for candidate $3 in this term.",
                           GetRequestVoteLogPrefixUnlocked(*request),
                           request->candidate_uuid(),
-                          GetCurrentTermUnlocked(),
+                          CurrentTermUnlocked(),
                           GetVotedForCurrentTermUnlocked());
   LOG(INFO) << msg;
   StatusToPB(Status::InvalidArgument(msg), response->mutable_consensus_error()->mutable_status());
@@ -1944,7 +2010,7 @@ Status RaftConsensus::RequestVoteRespondVoteGranted(const VoteRequestPB* request
   LOG(INFO) << Substitute("$0: Granting yes vote for candidate $1 in term $2.",
                           GetRequestVoteLogPrefixUnlocked(*request),
                           request->candidate_uuid(),
-                          GetCurrentTermUnlocked());
+                          CurrentTermUnlocked());
   return Status::OK();
 }
 
@@ -1954,6 +2020,35 @@ RaftPeerPB::Role RaftConsensus::role() const {
   return cmeta_->active_role();
 }
 
+int64_t RaftConsensus::CurrentTerm() const {
+  LockGuard l(lock_);
+  return CurrentTermUnlocked();
+}
+
+void RaftConsensus::SetStateUnlocked(State new_state) {
+  switch (new_state) {
+    case kInitialized:
+      CHECK_EQ(kNew, state_);
+      break;
+    case kRunning:
+      CHECK_EQ(kInitialized, state_);
+      break;
+    case kStopping:
+      CHECK(state_ != kStopped && state_ != kShutdown) << "State = " << State_Name(state_);
+      break;
+    case kStopped:
+      CHECK_EQ(kStopping, state_);
+      break;
+    case kShutdown:
+      CHECK(state_ == kStopped || state_ == kShutdown) << "State = " << State_Name(state_);
+      break;
+    default:
+      LOG(FATAL) << "Disallowed transition to state = " << State_Name(new_state);
+      break;
+  }
+  state_ = new_state;
+}
+
 const char* RaftConsensus::State_Name(State state) {
   switch (state) {
     case kNew:
@@ -1962,9 +2057,11 @@ const char* RaftConsensus::State_Name(State state) {
       return "Initialized";
     case kRunning:
       return "Running";
-    case kShuttingDown:
-      return "Shutting down";
-    case kShutDown:
+    case kStopping:
+      return "Stopping";
+    case kStopped:
+      return "Stopped";
+    case kShutdown:
       return "Shut down";
     default:
       LOG(DFATAL) << "Unknown State value: " << state;
@@ -2018,7 +2115,7 @@ Status RaftConsensus::RefreshConsensusQueueAndPeersUnlocked() {
   // TODO(todd): should use queue committed index here? in that case do
   // we need to pass it in at all?
   queue_->SetLeaderMode(pending_->GetCommittedIndex(),
-                        GetCurrentTermUnlocked(),
+                        CurrentTermUnlocked(),
                         active_config);
   RETURN_NOT_OK(peer_manager_->UpdateRaftConfig(active_config));
   return Status::OK();
@@ -2045,6 +2142,16 @@ RaftConfigPB RaftConsensus::CommittedConfig() const {
 }
 
 void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
+  RaftPeerPB::Role role;
+  {
+    LockGuard l(lock_);
+    if (state_ != kRunning) {
+      out << "Tablet " << EscapeForHtmlToString(tablet_id()) << " not running" << std::endl;
+      return;
+    }
+    role = cmeta_->active_role();
+  }
+
   out << "<h1>Raft Consensus State</h1>" << std::endl;
 
   out << "<h2>State</h2>" << std::endl;
@@ -2053,12 +2160,6 @@ void RaftConsensus::DumpStatusHtml(std::ostream& out) const {
   out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
 
   // Dump the queues on a leader.
-  RaftPeerPB::Role role;
-  {
-    ThreadRestrictions::AssertWaitAllowed();
-    LockGuard l(lock_);
-    role = cmeta_->active_role();
-  }
   if (role == RaftPeerPB::LEADER) {
     out << "<h2>Queue overview</h2>" << std::endl;
     out << "<pre>" << EscapeForHtmlToString(queue_->ToString()) << "</pre>" << std::endl;
@@ -2108,7 +2209,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
       // because it already voted in term 2. The check below ensures that peer B
       // will bump to term 2 when it gets the vote rejection, such that its
       // next pre-election (for term 3) would succeed.
-      if (result.highest_voter_term > GetCurrentTermUnlocked()) {
+      if (result.highest_voter_term > CurrentTermUnlocked()) {
         HandleTermAdvanceUnlocked(result.highest_voter_term);
       }
 
@@ -2138,7 +2239,7 @@ void RaftConsensus::DoElectionCallback(ElectionReason reason, const ElectionResu
     election_started_in_term--;
   }
 
-  if (election_started_in_term != GetCurrentTermUnlocked()) {
+  if (election_started_in_term != CurrentTermUnlocked()) {
     LOG_WITH_PREFIX_UNLOCKED(INFO)
         << "Leader " << election_type << " decision vote started in "
         << "defunct term " << election_started_in_term << ": "
@@ -2350,7 +2451,7 @@ void RaftConsensus::DisableFailureDetector() {
 
 void RaftConsensus::SnoozeFailureDetector(AllowLogging allow_logging,
                                           boost::optional<MonoDelta> delta) {
-  if (PREDICT_TRUE(FLAGS_enable_leader_failure_detection)) {
+  if (PREDICT_TRUE(failure_detector_ && FLAGS_enable_leader_failure_detection)) {
     if (allow_logging == ALLOW_LOGGING) {
       LOG(INFO) << LogPrefixThreadSafe()
                 << Substitute("Snoozing failure detection for $0",
@@ -2393,13 +2494,13 @@ MonoDelta RaftConsensus::LeaderElectionExpBackoffDeltaUnlocked() {
 Status RaftConsensus::HandleTermAdvanceUnlocked(ConsensusTerm new_term,
                                                 FlushToDisk flush) {
   DCHECK(lock_.is_locked());
-  if (new_term <= GetCurrentTermUnlocked()) {
+  if (new_term <= CurrentTermUnlocked()) {
     return Status::IllegalState(Substitute("Can't advance term to: $0 current term: $1 is higher.",
-                                           new_term, GetCurrentTermUnlocked()));
+                                           new_term, CurrentTermUnlocked()));
   }
   if (cmeta_->active_role() == RaftPeerPB::LEADER) {
     LOG_WITH_PREFIX_UNLOCKED(INFO) << "Stepping down as leader of term "
-                                   << GetCurrentTermUnlocked();
+                                   << CurrentTermUnlocked();
     RETURN_NOT_OK(BecomeReplicaUnlocked());
   }
 
@@ -2508,10 +2609,10 @@ Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term,
   TRACE_EVENT1("consensus", "RaftConsensus::SetCurrentTermUnlocked",
                "term", new_term);
   DCHECK(lock_.is_locked());
-  if (PREDICT_FALSE(new_term <= GetCurrentTermUnlocked())) {
+  if (PREDICT_FALSE(new_term <= CurrentTermUnlocked())) {
     return Status::IllegalState(
         Substitute("Cannot change term to a term that is lower than or equal to the current one. "
-                   "Current: $0, Proposed: $1", GetCurrentTermUnlocked(), new_term));
+                   "Current: $0, Proposed: $1", CurrentTermUnlocked(), new_term));
   }
   cmeta_->set_current_term(new_term);
   cmeta_->clear_voted_for();
@@ -2522,7 +2623,7 @@ Status RaftConsensus::SetCurrentTermUnlocked(int64_t new_term,
   return Status::OK();
 }
 
-const int64_t RaftConsensus::GetCurrentTermUnlocked() const {
+const int64_t RaftConsensus::CurrentTermUnlocked() const {
   DCHECK(lock_.is_locked());
   return cmeta_->current_term();
 }
@@ -2574,11 +2675,14 @@ string RaftConsensus::LogPrefix() const {
 
 string RaftConsensus::LogPrefixUnlocked() const {
   DCHECK(lock_.is_locked());
-  return Substitute("T $0 P $1 [term $2 $3]: ",
-                    options_.tablet_id,
-                    peer_uuid(),
-                    GetCurrentTermUnlocked(),
-                    RaftPeerPB::Role_Name(cmeta_->active_role()));
+  // 'cmeta_' may not be set if initialization failed.
+  string cmeta_info;
+  if (cmeta_) {
+    cmeta_info = Substitute(" [term $0 $1]",
+                            cmeta_->current_term(),
+                            RaftPeerPB::Role_Name(cmeta_->active_role()));
+  }
+  return Substitute("T $0 P $1$2: ", options_.tablet_id, peer_uuid(), cmeta_info);
 }
 
 string RaftConsensus::LogPrefixThreadSafe() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index efcc06e..ee61fff 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -44,6 +44,7 @@
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/make_shared.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/random.h"
@@ -126,16 +127,14 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
     EXTERNAL_REQUEST
   };
 
-  RaftConsensus(ConsensusOptions options,
-                RaftPeerPB local_peer_pb,
-                scoped_refptr<ConsensusMetadataManager> cmeta_manager,
-                ThreadPool* raft_pool);
   ~RaftConsensus();
 
-  // Initializes the RaftConsensus object. This should be called before
-  // publishing this object to any thread other than the thread that invoked
-  // the constructor.
-  Status Init();
+  // Factory method to construct and initialize a RaftConsensus instance.
+  static Status Create(ConsensusOptions options,
+                       RaftPeerPB local_peer_pb,
+                       scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+                       ThreadPool* raft_pool,
+                       std::shared_ptr<RaftConsensus>* consensus_out);
 
   // Starts running the Raft consensus algorithm.
   // Start() is not thread-safe. Calls to Start() should be externally
@@ -245,7 +244,12 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   // Messages sent from CANDIDATEs to voting peers to request their vote
   // in leader election.
+  //
+  // If 'tombstone_last_logged_opid' is set, this replica will attempt to vote
+  // in kInitialized and kStopped states, instead of just in the kRunning
+  // state.
   Status RequestVote(const VoteRequestPB* request,
+                     boost::optional<OpId> tombstone_last_logged_opid,
                      VoteResponsePB* response);
 
   // Implement a ChangeConfig() request.
@@ -265,6 +269,9 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // Returns the current Raft role of this instance.
   RaftPeerPB::Role role() const;
 
+  // Returns the current term.
+  int64_t CurrentTerm() const;
+
   // Returns the uuid of this peer.
   // Thread-safe.
   const std::string& peer_uuid() const;
@@ -283,7 +290,14 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   void DumpStatusHtml(std::ostream& out) const;
 
-  // Stop running the Raft consensus algorithm.
+  // Transition to kStopped state. See State enum definition for details.
+  // This is a no-op if the tablet is already in kStopped or kShutdown state;
+  // otherwise, Raft will pass through the kStopping state on the way to
+  // kStopped.
+  void Stop();
+
+  // Transition to kShutdown state. See State enum definition for details.
+  // It is legal to call this method while in any lifecycle state.
   void Shutdown();
 
   // Makes this peer advance it's term (and step down if leader), for tests.
@@ -317,7 +331,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   log::RetentionIndexes GetRetentionIndexes();
 
  private:
-  friend class RefCountedThreadSafe<RaftConsensus>;
+  ALLOW_MAKE_SHARED(RaftConsensus);
   friend class RaftConsensusQuorumTest;
   FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusContinuesIfAMinorityFallsBehind);
   FRIEND_TEST(RaftConsensusQuorumTest, TestConsensusStopsIfAMajorityFallsBehind);
@@ -325,26 +339,40 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   FRIEND_TEST(RaftConsensusQuorumTest, TestReplicasEnforceTheLogMatchingProperty);
   FRIEND_TEST(RaftConsensusQuorumTest, TestRequestVote);
 
+  // RaftConsensus lifecycle states.
+  //
+  // Legal state transitions:
+  //
+  //   kNew -> kInitialized -+-> kRunning -> kStopping -> kStopped -> kShutdown
+  //                          `----------------^
+  //
   // NOTE: When adding / changing values in this enum, add the corresponding
-  // values to State_Name().
+  // values to State_Name() as well.
+  //
   enum State {
-    // RaftConsensus has been freshly constructed.
+    // The RaftConsensus object has been freshly constructed and is not yet
+    // initialized. A RaftConsensus object will never be made externally
+    // visible in this state.
     kNew,
 
-    // RaftConsensus has been initialized.
+    // Raft has been initialized. It cannot accept writes, but it may be able
+    // to vote. See RequestVote() for details.
     kInitialized,
 
-    // State signaling the replica accepts requests (from clients
-    // if leader, from leader if follower)
+    // Raft is running normally and will accept write requests and vote
+    // requests.
     kRunning,
 
-    // State signaling that the replica is shutting down and no longer accepting
-    // new transactions or commits.
-    kShuttingDown,
+    // Raft is in the process of stopping and will not accept writes. Voting
+    // may still be allowed. See RequestVote() for details.
+    kStopping,
+
+    // Raft is stopped and no longer accepting writes. However, voting may
+    // still be allowed; See RequestVote() for details.
+    kStopped,
 
-    // State signaling the replica is shut down and does not accept
-    // any more requests.
-    kShutDown,
+    // Raft is fully shut down and cannot accept writes or vote requests.
+    kShutdown,
   };
 
   // Control whether printing of log messages should be done for a particular
@@ -376,6 +404,19 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   using LockGuard = std::lock_guard<simple_spinlock>;
   using UniqueLock = std::unique_lock<simple_spinlock>;
 
+  RaftConsensus(ConsensusOptions options,
+                RaftPeerPB local_peer_pb,
+                scoped_refptr<ConsensusMetadataManager> cmeta_manager,
+                ThreadPool* raft_pool);
+
+  // Initializes the RaftConsensus object, including loading the consensus
+  // metadata.
+  Status Init();
+
+  // Change the lifecycle state of RaftConsensus. The definition of the State
+  // enum documents legal state transitions.
+  void SetStateUnlocked(State new_state);
+
   // Returns string description for State enum value.
   static const char* State_Name(State state);
 
@@ -658,7 +699,7 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
                                 FlushToDisk flush) WARN_UNUSED_RESULT;
 
   // Returns the term set in the last config change round.
-  const int64_t GetCurrentTermUnlocked() const;
+  const int64_t CurrentTermUnlocked() const;
 
   // Accessors for the leader of the current term.
   std::string GetLeaderUuidUnlocked() const;
@@ -761,6 +802,9 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   Callback<void(const std::string& reason)> mark_dirty_clbk_;
 
+  // A flag to help us avoid taking a lock on the reactor thread if the object
+  // is already in kShutdown state.
+  // TODO(mpercy): Try to get rid of this extra flag.
   AtomicBool shutdown_;
 
   // The number of times Update() has been called, used for some test assertions.

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/consensus/raft_consensus_quorum-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 45d5cec..a6ecf6f 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -194,13 +194,12 @@ class RaftConsensusQuorumTest : public KuduTest {
       RaftPeerPB local_peer_pb;
       RETURN_NOT_OK(GetRaftConfigMember(config_, fs_managers_[i]->uuid(), &local_peer_pb));
 
-      shared_ptr<RaftConsensus> peer(
-          new RaftConsensus(options_,
-                            config_.peers(i),
-                            cmeta_managers_[i],
-                            raft_pool_.get()));
-      RETURN_NOT_OK(peer->Init());
-
+      shared_ptr<RaftConsensus> peer;
+      RETURN_NOT_OK(RaftConsensus::Create(options_,
+                                          config_.peers(i),
+                                          cmeta_managers_[i],
+                                          raft_pool_.get(),
+                                          &peer));
       peers_->AddPeer(config_.peers(i).permanent_uuid(), peer);
     }
     return Status::OK();
@@ -1062,7 +1061,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   VoteResponsePB response;
   request.set_candidate_uuid(fs_managers_[0]->uuid());
   request.set_candidate_term(last_op_id.term() + 1);
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_FALSE(response.vote_granted());
   ASSERT_EQ(ConsensusErrorPB::LEADER_IS_ALIVE, response.consensus_error().code());
   ASSERT_EQ(0, flush_count() - flush_count_before)
@@ -1074,7 +1073,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   // This will allow the rest of the requests in the test to go through.
   flush_count_before = flush_count();
   request.set_ignore_live_leader(true);
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_TRUE(response.vote_granted());
   ASSERT_EQ(last_op_id.term() + 1, response.responder_term());
   ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 1,
@@ -1085,7 +1084,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   // Ensure we get same response for same term and same UUID.
   response.Clear();
   flush_count_before = flush_count();
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_TRUE(response.vote_granted());
   ASSERT_EQ(0, flush_count() - flush_count_before)
       << "Confirming a previous vote should not flush";
@@ -1094,7 +1093,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   flush_count_before = flush_count();
   response.Clear();
   request.set_candidate_uuid(fs_managers_[2]->uuid());
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_FALSE(response.vote_granted());
   ASSERT_TRUE(response.has_consensus_error());
   ASSERT_EQ(ConsensusErrorPB::ALREADY_VOTED, response.consensus_error().code());
@@ -1114,7 +1113,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   request.set_candidate_uuid(fs_managers_[0]->uuid());
   request.set_candidate_term(last_op_id.term() + 2);
   response.Clear();
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_TRUE(response.vote_granted());
   ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
   ASSERT_NO_FATAL_FAILURE(AssertDurableTermAndVote(kPeerIndex, last_op_id.term() + 2,
@@ -1128,7 +1127,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   flush_count_before = flush_count();
   request.set_candidate_term(last_op_id.term() + 1);
   response.Clear();
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_FALSE(response.vote_granted());
   ASSERT_TRUE(response.has_consensus_error());
   ASSERT_EQ(ConsensusErrorPB::INVALID_TERM, response.consensus_error().code());
@@ -1144,7 +1143,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   request.set_candidate_term(last_op_id.term() + 3);
   request.set_is_pre_election(true);
   response.Clear();
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_TRUE(response.vote_granted());
   ASSERT_FALSE(response.has_consensus_error());
   ASSERT_EQ(last_op_id.term() + 2, response.responder_term());
@@ -1163,7 +1162,7 @@ TEST_F(RaftConsensusQuorumTest, TestRequestVote) {
   request.set_candidate_term(last_op_id.term() + 3);
   request.mutable_candidate_status()->mutable_last_received()->CopyFrom(MinimumOpId());
   response.Clear();
-  ASSERT_OK(peer->RequestVote(&request, &response));
+  ASSERT_OK(peer->RequestVote(&request, boost::none, &response));
   ASSERT_FALSE(response.vote_granted());
   ASSERT_TRUE(response.has_consensus_error());
   ASSERT_EQ(ConsensusErrorPB::LAST_OPID_TOO_OLD, response.consensus_error().code());

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index d13d396..4920e88 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -95,6 +95,8 @@ ADD_KUDU_TEST(tablet_copy-itest)
 ADD_KUDU_TEST(tablet_copy_client_session-itest)
 ADD_KUDU_TEST(tablet_history_gc-itest)
 ADD_KUDU_TEST(tablet_replacement-itest)
+ADD_KUDU_TEST(tombstoned_voting-itest)
+ADD_KUDU_TEST(tombstoned_voting-stress-test RUN_SERIAL true)
 ADD_KUDU_TEST(token_signer-itest RESOURCE_LOCK "master-rpc-ports")
 ADD_KUDU_TEST(ts_recovery-itest)
 ADD_KUDU_TEST(ts_tablet_manager-itest)

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index 051c8b5..7d6f55a 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -73,6 +73,8 @@ using consensus::OpIdType;
 using consensus::RaftPeerPB;
 using consensus::RunLeaderElectionResponsePB;
 using consensus::RunLeaderElectionRequestPB;
+using consensus::VoteRequestPB;
+using consensus::VoteResponsePB;
 using consensus::kInvalidOpIdIndex;
 using master::ListTabletServersResponsePB;
 using master::ListTabletServersResponsePB_Entry;
@@ -601,6 +603,33 @@ Status StartElection(const TServerDetails* replica,
   return Status::OK();
 }
 
+Status RequestVote(const TServerDetails* replica,
+                   const std::string& tablet_id,
+                   const std::string& candidate_uuid,
+                   int64_t candidate_term,
+                   const consensus::OpId& last_logged_opid,
+                   boost::optional<bool> ignore_live_leader,
+                   boost::optional<bool> is_pre_election,
+                   const MonoDelta& timeout) {
+  DCHECK(last_logged_opid.IsInitialized());
+  VoteRequestPB req;
+  req.set_dest_uuid(replica->uuid());
+  req.set_tablet_id(tablet_id);
+  req.set_candidate_uuid(candidate_uuid);
+  req.set_candidate_term(candidate_term);
+  *req.mutable_candidate_status()->mutable_last_received() = last_logged_opid;
+  if (ignore_live_leader) req.set_ignore_live_leader(*ignore_live_leader);
+  if (is_pre_election) req.set_is_pre_election(*is_pre_election);
+  VoteResponsePB resp;
+  RpcController rpc;
+  rpc.set_timeout(timeout);
+  RETURN_NOT_OK(replica->consensus_proxy->RequestConsensusVote(req, &resp, &rpc));
+  if (resp.has_vote_granted() && resp.vote_granted()) return Status::OK();
+  if (resp.has_error()) return StatusFromPB(resp.error().status());
+  if (resp.has_consensus_error()) return StatusFromPB(resp.consensus_error().status());
+  return Status::IllegalState("Unknown error");
+}
+
 Status LeaderStepDown(const TServerDetails* replica,
                       const string& tablet_id,
                       const MonoDelta& timeout,

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index e8e49b5..49bf0ff 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -233,6 +233,18 @@ Status StartElection(const TServerDetails* replica,
                      const std::string& tablet_id,
                      const MonoDelta& timeout);
 
+// Request the given replica to vote. This is thin wrapper around
+// RequestConsensusVote(). See the definition of VoteRequestPB in
+// consensus.proto for parameter details.
+Status RequestVote(const TServerDetails* replica,
+                   const std::string& tablet_id,
+                   const std::string& candidate_uuid,
+                   int64_t candidate_term,
+                   const consensus::OpId& last_logged_opid,
+                   boost::optional<bool> ignore_live_leader,
+                   boost::optional<bool> is_pre_election,
+                   const MonoDelta& timeout);
+
 // Cause a leader to step down on the specified server.
 // 'timeout' refers to the RPC timeout waiting synchronously for stepdown to
 // complete on the leader side. Since that does not require communication with

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/delete_table-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/delete_table-itest.cc b/src/kudu/integration-tests/delete_table-itest.cc
index 5fdcc9d..105fd77 100644
--- a/src/kudu/integration-tests/delete_table-itest.cc
+++ b/src/kudu/integration-tests/delete_table-itest.cc
@@ -817,14 +817,22 @@ TEST_F(DeleteTableITest, TestMergeConsensusMetadata) {
                                            TABLET_DATA_TOMBSTONED, boost::none, timeout));
   NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
 
+  // Shut down the tablet server so it won't vote while tombstoned.
+  cluster_->tablet_server(kTsIndex)->Shutdown();
+
   ASSERT_OK(cluster_->tablet_server(1)->Restart());
   ASSERT_OK(cluster_->tablet_server(2)->Restart());
   NO_FATALS(WaitUntilTabletRunning(1, tablet_id));
   NO_FATALS(WaitUntilTabletRunning(2, tablet_id));
   ASSERT_OK(itest::StartElection(leader, tablet_id, timeout));
+  ASSERT_OK(itest::WaitUntilLeader(leader, tablet_id, timeout));
+
+  // Now restart the replica. It will get tablet copied by the leader.
+  ASSERT_OK(cluster_->tablet_server(kTsIndex)->Restart());
   ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(kTsIndex, tablet_id, { TABLET_DATA_READY }));
 
-  // The election history should have been wiped out.
+  // The election history should have been wiped out for the new term, since
+  // this node did not participate.
   ASSERT_OK(inspect_->ReadConsensusMetadataOnTS(kTsIndex, tablet_id, &cmeta_pb));
   ASSERT_EQ(3, cmeta_pb.current_term());
   ASSERT_TRUE(!cmeta_pb.has_voted_for()) << SecureShortDebugString(cmeta_pb);
@@ -1066,7 +1074,8 @@ TEST_F(DeleteTableITest, TestWebPageForTombstonedTablet) {
         cluster_->tablet_server(0)->bound_http_hostport().ToString(),
         page,
         tablet_id), &buf));
-    ASSERT_STR_CONTAINS(buf.ToString(), tablet_id);
+    ASSERT_STR_CONTAINS(buf.ToString(), tablet_id)
+        << "Page: " << page << "; tablet_id: " << tablet_id;
   }
 }
 
@@ -1292,7 +1301,7 @@ TEST_P(DeleteTableTombstonedParamTest, TestTabletTombstone) {
   ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets));
   for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
     if (t.tablet_status().tablet_id() == tablet_id) {
-      ASSERT_EQ(tablet::SHUTDOWN, t.tablet_status().state());
+      ASSERT_EQ(tablet::STOPPED, t.tablet_status().state());
       ASSERT_EQ(TABLET_DATA_TOMBSTONED, t.tablet_status().tablet_data_state())
           << t.tablet_status().tablet_id() << " not tombstoned";
     }
@@ -1313,10 +1322,10 @@ TEST_P(DeleteTableTombstonedParamTest, TestTabletTombstone) {
   NO_FATALS(WaitForTabletTombstonedOnTS(kTsIndex, tablet_id, CMETA_EXPECTED));
   // The tombstoned tablets will still show up in ListTablets(),
   // just with their data state set as TOMBSTONED. They should also be listed
-  // as NOT_STARTED because we restarted the server.
+  // as INITIALIZED because we restarted the server.
   ASSERT_OK(itest::WaitForNumTabletsOnTS(ts, 2, timeout, &tablets));
   for (const ListTabletsResponsePB::StatusAndSchemaPB& t : tablets) {
-    ASSERT_EQ(tablet::NOT_STARTED, t.tablet_status().state());
+    ASSERT_EQ(tablet::INITIALIZED, t.tablet_status().state());
     ASSERT_EQ(TABLET_DATA_TOMBSTONED, t.tablet_status().tablet_data_state())
         << t.tablet_status().tablet_id() << " not tombstoned";
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster-itest-base.cc b/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
index 71d7573..94c313e 100644
--- a/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
+++ b/src/kudu/integration-tests/external_mini_cluster-itest-base.cc
@@ -22,6 +22,7 @@
 #include <string>
 #include <vector>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -33,6 +34,9 @@
 #include "kudu/util/pstack_watcher.h"
 #include "kudu/util/test_macros.h"
 
+DEFINE_bool(test_dump_stacks_on_failure, true,
+            "Whether to dump ExternalMiniCluster process stacks on test failure");
+
 namespace kudu {
 
 void ExternalMiniClusterITestBase::TearDown() {
@@ -69,7 +73,7 @@ void ExternalMiniClusterITestBase::StopCluster() {
     return;
   }
 
-  if (HasFatalFailure()) {
+  if (HasFatalFailure() && FLAGS_test_dump_stacks_on_failure) {
     LOG(INFO) << "Found fatal failure";
     for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
       if (!cluster_->tablet_server(i)->IsProcessAlive()) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/tombstoned_voting-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tombstoned_voting-itest.cc b/src/kudu/integration-tests/tombstoned_voting-itest.cc
new file mode 100644
index 0000000..b723738
--- /dev/null
+++ b/src/kudu/integration-tests/tombstoned_voting-itest.cc
@@ -0,0 +1,461 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/consensus/opid_util.h"
+#include "kudu/consensus/raft_consensus.h"
+#include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/internal_mini_cluster.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(allow_unsafe_replication_factor);
+DECLARE_bool(enable_tablet_copy);
+DECLARE_bool(raft_enable_tombstoned_voting);
+
+using kudu::consensus::MakeOpId;
+using kudu::consensus::LeaderStepDownResponsePB;
+using kudu::consensus::OpId;
+using kudu::consensus::RECEIVED_OPID;
+using kudu::consensus::RaftConsensus;
+using kudu::consensus::RaftPeerPB;
+using kudu::itest::DeleteTablet;
+using kudu::itest::TServerDetails;
+using kudu::itest::WaitForServersToAgree;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using kudu::tablet::TabletReplica;
+using kudu::tablet::TabletStatePB;
+using kudu::tserver::TabletServerErrorPB;
+using kudu::tserver::TSTabletManager;
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+class TombstonedVotingITest : public MiniClusterITestBase {
+};
+
+// Ensure that a tombstoned replica cannot vote after we call Shutdown() on it.
+TEST_F(TombstonedVotingITest, TestNoVoteAfterShutdown) {
+  // This test waits for several seconds, so only run it in slow mode.
+  if (!AllowSlowTests()) return;
+
+  FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+  FLAGS_enable_tablet_copy = false; // Tablet copy would interfere with this test.
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 50) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  workload.StopAndJoin();
+
+  // Figure out the tablet id to mess with.
+  vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+  ASSERT_EQ(1, tablet_ids.size());
+  const string& tablet_id = tablet_ids[0];
+
+  // Ensure all servers are up to date.
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  // Manually tombstone the replica on TS1, start an election on TS0, and wait
+  // until TS0 gets elected. If TS0 gets elected then TS1 was able to vote
+  // while tombstoned.
+  TSTabletManager* ts_tablet_manager = cluster_->mini_tablet_server(1)->server()->tablet_manager();
+  scoped_refptr<TabletReplica> ts1_replica;
+  ASSERT_OK(ts_tablet_manager->GetTabletReplica(tablet_id, &ts1_replica));
+
+  // Tombstone TS1's replica.
+  LOG(INFO) << "Tombstoning ts1...";
+  boost::optional<TabletServerErrorPB::Code> error_code;
+  ASSERT_OK(ts_tablet_manager->DeleteTablet(tablet_id, TABLET_DATA_TOMBSTONED, boost::none,
+                                            &error_code));
+  ASSERT_EQ(TabletStatePB::STOPPED, ts1_replica->state());
+
+  scoped_refptr<TabletReplica> ts0_replica;
+  ASSERT_OK(cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplica(
+      tablet_id, &ts0_replica));
+  LeaderStepDownResponsePB resp;
+  ts0_replica->consensus()->StepDown(&resp); // Ignore result, in case TS1 was the leader.
+  ASSERT_EQ(RaftPeerPB::FOLLOWER, ts0_replica->consensus()->role());
+  ASSERT_OK(ts0_replica->consensus()->StartElection(
+      RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, RaftConsensus::EXTERNAL_REQUEST));
+
+  // Wait until TS0 is leader.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(RaftPeerPB::LEADER, ts0_replica->consensus()->role());
+  });
+
+  // Now shut down TS1. This will ensure that TS0 cannot get re-elected.
+  LOG(INFO) << "Shutting down ts1...";
+  ts1_replica->Shutdown();
+
+  // Start another election and wait for some time to see if it can get elected.
+  ASSERT_OK(ts0_replica->consensus()->StepDown(&resp));
+  ASSERT_OK(ts0_replica->consensus()->StartElection(
+      RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, RaftConsensus::EXTERNAL_REQUEST));
+
+  // Wait for some time to ensure TS0 cannot get elected.
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+  while (MonoTime::Now() < deadline) {
+    ASSERT_EQ(RaftPeerPB::FOLLOWER, ts0_replica->consensus()->role());
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+}
+
+// Test that a tombstoned replica will vote correctly.
+// This is implemented by directly exercising the RPC API with different vote request parameters.
+TEST_F(TombstonedVotingITest, TestVotingLogic) {
+  // This test waits for several seconds, so only run it in slow mode.
+  if (!AllowSlowTests()) return;
+
+  FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+  FLAGS_enable_tablet_copy = false; // Tablet copy would interfere with this test.
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 50) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  workload.StopAndJoin();
+
+  // Figure out the tablet id to mess with.
+  vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+  ASSERT_EQ(1, tablet_ids.size());
+  const string& tablet_id = tablet_ids[0];
+
+  // Ensure all servers are up to date.
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  // Shut down TS0 so it doesn't interfere with our testing.
+  cluster_->mini_tablet_server(0)->Shutdown();
+
+  // Figure out the last logged opid of TS1.
+  OpId last_logged_opid;
+  ASSERT_OK(itest::GetLastOpIdForReplica(tablet_id,
+                                         ts_map_[cluster_->mini_tablet_server(1)->uuid()],
+                                         RECEIVED_OPID,
+                                         kTimeout,
+                                         &last_logged_opid));
+
+  // Tombstone TS1 (actually, the tablet replica hosted on TS1).
+  ASSERT_OK(itest::DeleteTablet(ts_map_[cluster_->mini_tablet_server(1)->uuid()], tablet_id,
+                                TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+  // Loop this series of tests twice: the first time without restarting the TS,
+  // the 2nd time after a restart.
+  for (int i = 0; i < 2; i++) {
+    if (i == 1) {
+      // Restart tablet server #1 on the 2nd loop.
+      LOG(INFO) << "Restarting TS1...";
+      cluster_->mini_tablet_server(1)->Shutdown();
+      ASSERT_OK(cluster_->mini_tablet_server(1)->Restart());
+      ASSERT_OK(cluster_->mini_tablet_server(1)->WaitStarted());
+    }
+
+    scoped_refptr<TabletReplica> replica;
+    ASSERT_OK(cluster_->mini_tablet_server(1)->server()->tablet_manager()->GetTabletReplica(
+        tablet_id, &replica));
+    ASSERT_EQ(i == 0 ? tablet::STOPPED : tablet::INITIALIZED, replica->state());
+
+    int64_t current_term = replica->consensus()->CurrentTerm();
+    current_term++;
+
+    // Ask TS1 for a vote that should be granted (new term, acceptable opid).
+    // Note: peers are required to vote regardless of whether they recognize the
+    // candidate's UUID or not, so the ID used here ("A") is not important.
+    TServerDetails* ts1_ets = ts_map_[cluster_->mini_tablet_server(1)->uuid()];
+    ASSERT_OK(itest::RequestVote(ts1_ets, tablet_id, "A", current_term, last_logged_opid,
+                                /*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout))
+
+    // Ask TS1 for a vote that should be denied (different candidate, same term).
+    Status s = itest::RequestVote(ts1_ets, tablet_id, "B", current_term, last_logged_opid,
+                                  /*ignore_live_leader=*/ true, /*is_pre_election=*/ false,
+                                  kTimeout);
+    ASSERT_TRUE(s.IsInvalidArgument());
+    ASSERT_STR_CONTAINS(s.ToString(), "Already voted for candidate A in this term");
+
+    // Ask TS1 for a vote that should be denied (old term).
+    s = itest::RequestVote(ts1_ets, tablet_id, "B", current_term - 1, last_logged_opid,
+                          /*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout);
+    ASSERT_TRUE(s.IsInvalidArgument());
+    ASSERT_STR_MATCHES(s.ToString(), "Denying vote to candidate B for earlier term");
+
+    // Increment term.
+    current_term++;
+    OpId old_opid = MakeOpId(last_logged_opid.term(), last_logged_opid.index() - 1);
+
+    // Ask TS1 for a vote that should be denied (old last-logged opid).
+    s = itest::RequestVote(ts1_ets, tablet_id, "B", current_term, old_opid,
+                          /*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout);
+    ASSERT_TRUE(s.IsInvalidArgument());
+    ASSERT_STR_MATCHES(s.ToString(),
+                      "Denying vote to candidate B.*greater than that of the candidate");
+
+    // Ask for a successful vote for candidate B.
+    ASSERT_OK(itest::RequestVote(ts1_ets, tablet_id, "B", current_term, last_logged_opid,
+                                /*ignore_live_leader=*/ true, /*is_pre_election=*/ false, kTimeout))
+  }
+}
+
+// Disable tombstoned voting and ensure that an election that would require it fails.
+TEST_F(TombstonedVotingITest, TestNoVoteIfTombstonedVotingDisabled) {
+  // This test waits for several seconds, so only run it in slow mode.
+  if (!AllowSlowTests()) return;
+
+  FLAGS_raft_enable_tombstoned_voting = false; // Disable tombstoned voting.
+  FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+  FLAGS_enable_tablet_copy = false; // Tablet copy would interfere with this test.
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 50) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  workload.StopAndJoin();
+
+  // Figure out the tablet id to mess with.
+  vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+  ASSERT_EQ(1, tablet_ids.size());
+  const string& tablet_id = tablet_ids[0];
+
+  // Ensure all servers are up to date.
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  // Tombstone TS1 and try to get TS0 to vote for it.
+  TServerDetails* ts1 = ts_map_[cluster_->mini_tablet_server(1)->uuid()];
+  ASSERT_OK(DeleteTablet(ts1, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+  scoped_refptr<TabletReplica> ts0_replica;
+  ASSERT_OK(cluster_->mini_tablet_server(0)->server()->tablet_manager()->GetTabletReplica(
+      tablet_id, &ts0_replica));
+  LeaderStepDownResponsePB resp;
+  ts0_replica->consensus()->StepDown(&resp); // Ignore result, in case TS1 was the leader.
+  ASSERT_OK(ts0_replica->consensus()->StartElection(
+      RaftConsensus::ELECT_EVEN_IF_LEADER_IS_ALIVE, RaftConsensus::EXTERNAL_REQUEST));
+
+  // Wait for some time to ensure TS0 cannot get elected.
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+  while (MonoTime::Now() < deadline) {
+    ASSERT_EQ(RaftPeerPB::FOLLOWER, ts0_replica->consensus()->role());
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+}
+
+// Test that a replica will not vote while tombstoned if it was deleted while
+// the last-logged opid was unknown. This may occur if a tablet is tombstoned
+// while in a FAILED state.
+TEST_F(TombstonedVotingITest, TestNoVoteIfNoLastLoggedOpId) {
+  if (!AllowSlowTests()) return; // This test waits for several seconds.
+
+  FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 50) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+
+  tserver::MiniTabletServer* ts0 = cluster_->mini_tablet_server(0);
+  string ts0_uuid = ts0->uuid();
+  tserver::MiniTabletServer* ts1 = cluster_->mini_tablet_server(1);
+  string ts1_uuid = ts0->uuid();
+
+  // Determine the tablet id.
+  vector<string> tablet_ids = ts0->ListTablets();
+  ASSERT_EQ(1, tablet_ids.size());
+  const string& tablet_id = tablet_ids[0];
+
+  // Ensure all servers are in sync.
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  // Shut down each TS, then corrupt the TS0 cmeta.
+  string ts0_cmeta_path = ts0->server()->fs_manager()->GetConsensusMetadataPath(tablet_id);
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    cluster_->mini_tablet_server(i)->Shutdown();
+  }
+
+  std::unique_ptr<WritableFile> file;
+  ASSERT_OK(env_->NewWritableFile(ts0_cmeta_path, &file));
+  ASSERT_OK(file->Append("\0"));
+  ASSERT_OK(file->Close());
+
+  // Restart each TS so it comes back up on the same ports.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(cluster_->mini_tablet_server(i)->Restart());
+  }
+
+  // Wait until the tablet is in FAILED state.
+  ASSERT_OK(itest::WaitUntilTabletInState(ts_map_[ts0_uuid], tablet_id, TabletStatePB::FAILED,
+                                          kTimeout));
+  scoped_refptr<TabletReplica> replica;
+  ASSERT_TRUE(ts0->server() != nullptr);
+  ASSERT_TRUE(ts0->server()->tablet_manager() != nullptr);
+  ASSERT_TRUE(ts0->server()->tablet_manager()->LookupTablet(tablet_id, &replica));
+  ASSERT_EQ(tablet::FAILED, replica->state());
+
+  // Now tombstone the failed replica on TS0.
+  ASSERT_OK(itest::DeleteTablet(ts_map_[ts0_uuid], tablet_id,
+                                TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+  // Wait until TS1 is running.
+  ASSERT_EVENTUALLY([&] {
+    TSTabletManager* tablet_manager = ts1->server()->tablet_manager();
+    ASSERT_TRUE(tablet_manager->LookupTablet(tablet_id, &replica));
+    ASSERT_EQ(tablet::RUNNING, replica->state());
+  });
+
+  // Ensure that TS1 cannot become leader because TS0 will not vote.
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+  while (MonoTime::Now() < deadline) {
+    scoped_refptr<TabletReplica> replica;
+    TSTabletManager* tablet_manager = ts1->server()->tablet_manager();
+    ASSERT_TRUE(tablet_manager != nullptr);
+    ASSERT_TRUE(tablet_manager->LookupTablet(tablet_id, &replica));
+    std::shared_ptr<RaftConsensus> consensus = replica->shared_consensus();
+    if (consensus) {
+      ASSERT_EQ(RaftPeerPB::FOLLOWER, consensus->role());
+    }
+  }
+}
+
+enum RestartAfterTombstone {
+  kNoRestart,
+  kRestart,
+};
+
+class TsRecoveryTombstonedITest : public MiniClusterITestBase,
+                                  public ::testing::WithParamInterface<RestartAfterTombstone> {
+};
+
+INSTANTIATE_TEST_CASE_P(Restart, TsRecoveryTombstonedITest,
+                        ::testing::Values(kNoRestart, kRestart));
+
+// Basic tombstoned voting test.
+TEST_P(TsRecoveryTombstonedITest, TestTombstonedVoter) {
+  const RestartAfterTombstone to_restart = GetParam();
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  FLAGS_allow_unsafe_replication_factor = true; // Allow an even replication factor.
+  NO_FATALS(StartCluster(/*num_tablet_servers=*/ 2));
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 50) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  workload.StopAndJoin();
+
+  // Figure out the tablet id to Tablet Copy.
+  vector<string> tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+  ASSERT_EQ(1, tablet_ids.size());
+  const string& tablet_id = tablet_ids[0];
+
+  // Ensure all servers are up to date.
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id, workload.batches_completed()));
+
+  auto live_ts_map = ts_map_;
+  ASSERT_EQ(1, live_ts_map.erase(cluster_->mini_tablet_server(1)->uuid()));
+
+  // Shut down TS 0 then tombstone TS 1. Restart TS 0.
+  // TS 0 should get a vote from TS 1 and then make a copy on TS 1, bringing
+  // the cluster back up to full strength.
+  LOG(INFO) << "shutting down TS " << cluster_->mini_tablet_server(0)->uuid();
+  cluster_->mini_tablet_server(0)->Shutdown();
+
+  LOG(INFO) << "tombstoning replica on TS " << cluster_->mini_tablet_server(1)->uuid();
+  TServerDetails* ts1 = ts_map_[cluster_->mini_tablet_server(1)->uuid()];
+  ASSERT_OK(DeleteTablet(ts1, tablet_id, TABLET_DATA_TOMBSTONED, boost::none, kTimeout));
+
+  if (to_restart == kRestart) {
+    LOG(INFO) << "restarting tombstoned TS " << cluster_->mini_tablet_server(1)->uuid();
+    cluster_->mini_tablet_server(1)->Shutdown();
+    ASSERT_OK(cluster_->mini_tablet_server(1)->Restart());
+  }
+
+  LOG(INFO) << "restarting TS " << cluster_->mini_tablet_server(1)->uuid();
+  ASSERT_OK(cluster_->mini_tablet_server(0)->Restart());
+
+  // Wait for the tablet copy to complete.
+  LOG(INFO) << "waiting for leader election and tablet copy to complete...";
+  ASSERT_OK(WaitForServersToAgree(kTimeout, live_ts_map, tablet_id, workload.batches_completed()));
+
+  LOG(INFO) << "attempting to write a few more rows...";
+
+  // Write a little bit more.
+  int target_rows = workload.rows_inserted() + 100;
+  workload.Start();
+  while (workload.rows_inserted() < target_rows) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  workload.StopAndJoin();
+
+  // Do a final verification that the servers match.
+  LOG(INFO) << "waiting for final agreement...";
+  ASSERT_OK(WaitForServersToAgree(kTimeout, live_ts_map, tablet_id, workload.batches_completed()));
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/integration-tests/tombstoned_voting-stress-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tombstoned_voting-stress-test.cc b/src/kudu/integration-tests/tombstoned_voting-stress-test.cc
new file mode 100644
index 0000000..2c1cecb
--- /dev/null
+++ b/src/kudu/integration-tests/tombstoned_voting-stress-test.cc
@@ -0,0 +1,313 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/external_mini_cluster.h"
+#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DEFINE_int32(test_num_iterations, 5,
+             "Number of tombstoned voting stress test iterations");
+
+using kudu::consensus::COMMITTED_OPID;
+using kudu::consensus::OpId;
+using kudu::itest::DeleteTablet;
+using kudu::itest::TServerDetails;
+using kudu::itest::WaitForServersToAgree;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using std::atomic;
+using std::string;
+using std::thread;
+using std::unique_lock;
+using std::vector;
+
+namespace kudu {
+
+static const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+class TombstonedVotingStressTest : public ExternalMiniClusterITestBase {
+ public:
+  TombstonedVotingStressTest()
+      : num_workers_(1),
+        cond_all_workers_blocked_(&lock_),
+        cond_workers_unblocked_(&lock_),
+        current_term_(1) {
+  }
+
+ protected:
+  enum State {
+    kRunning,       // The tablet is running normally.
+    kTombstoning,   // We are tombstoning the tablet.
+    kTombstoned,    // The tombstoning is complete.
+    kCopying,       // We are copying the tablet.
+    kTestComplete,  // The test is complete and about to exit.
+  };
+
+  string State_Name(State state);
+
+  // 1. Check if workers should block, block if required.
+  // 2. Return current state.
+  State GetState();
+
+  // 1. Block worker threads.
+  // 2. Wait for all workers to be blocked.
+  // 3. Change state.
+  // 4. Unblock workers.
+  void SetState(State state);
+
+  // Thread that loops and requests votes from TS1.
+  void RunVoteRequestLoop();
+
+  // Set-once shared state.
+  string tablet_id_;
+  OpId last_logged_opid_;
+
+  Mutex lock_;
+  const int num_workers_;
+  int num_workers_blocked_ = 0;
+  bool block_workers_ = false;
+  ConditionVariable cond_all_workers_blocked_;  // Triggers once all worker threads are blocked.
+  ConditionVariable cond_workers_unblocked_;    // Triggers when the workers become unblocked.
+
+  // Protected by lock_.
+  State state_ = kRunning;
+
+  // State for the voter thread.
+  atomic<int64_t> current_term_;
+};
+
+string TombstonedVotingStressTest::State_Name(State state) {
+  switch (state) {
+    case kRunning:
+      return "kRunning";
+    case kTombstoning:
+      return "kTombstoning";
+    case kTombstoned:
+      return "kTombstoned";
+    case kCopying:
+      return "kCopying";
+    case kTestComplete:
+      return "kTestComplete";
+    default:
+      LOG(FATAL) << "Unknown state: " << state;
+      __builtin_unreachable();
+  }
+}
+
+TombstonedVotingStressTest::State TombstonedVotingStressTest::GetState() {
+  unique_lock<Mutex> l(lock_);
+  bool blocked = false;
+  if (block_workers_) {
+    num_workers_blocked_++;
+    blocked = true;
+    if (num_workers_blocked_ == num_workers_) {
+      cond_all_workers_blocked_.Signal();
+    }
+  }
+  while (block_workers_) {
+    cond_workers_unblocked_.Wait();
+  }
+  if (blocked) num_workers_blocked_--;
+  return state_;
+}
+
+void TombstonedVotingStressTest::SetState(State state) {
+  // 1. Block worker threads.
+  // 2. Wait for all workers to be blocked.
+  // 3. Change state.
+  // 4. Unblock workers.
+  LOG(INFO) << "setting state to " << State_Name(state);
+  unique_lock<Mutex> l(lock_);
+  block_workers_ = true;
+  while (num_workers_blocked_ != num_workers_) {
+    cond_all_workers_blocked_.Wait();
+  }
+  state_ = state;
+  block_workers_ = false;
+  cond_workers_unblocked_.Broadcast();
+}
+
+void TombstonedVotingStressTest::RunVoteRequestLoop() {
+  TServerDetails* ts1_ets = ts_map_[cluster_->tablet_server(1)->uuid()];
+  while (true) {
+    State state = GetState();
+    if (state == kTestComplete) break;
+    ++current_term_;
+    Status s = itest::RequestVote(ts1_ets, tablet_id_, "A", current_term_, last_logged_opid_,
+                                  /*ignore_live_leader=*/ true, /*is_pre_election=*/ false,
+                                  kTimeout);
+    switch (state) {
+      case kRunning: FALLTHROUGH_INTENDED;
+      case kTombstoned:
+        // We should always be able to vote in this case.
+        if (s.ok()) {
+          LOG(INFO) << "Vote OK: state = " << state;
+        } else {
+          LOG(FATAL) << s.ToString() << ": tablet = " << tablet_id_ << ": state = " << state;
+        }
+        break;
+
+      // The vote can fail while in the process of tombstoning a replica
+      // because there is a small window of time where we have stopped
+      // RaftConsensus but we haven't yet recorded the last-logged opid in the
+      // tablet metadata.
+      case kTombstoning: FALLTHROUGH_INTENDED;
+      case kCopying:
+        if (s.ok()) {
+          LOG(INFO) << "Vote OK: state = " << state;
+        } else {
+          LOG(WARNING) << "Got bad vote while copying or tombstoning: " << s.ToString()
+                       << ": state = " << state;
+        }
+        break;
+
+      default:
+        // We're shutting down.
+        continue;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(1)); // Don't run too hot.
+  }
+}
+
+// Stress test for tombstoned voting, including tombstoning, deleting, and
+// copying replicas.
+TEST_F(TombstonedVotingStressTest, TestTombstonedVotingUnderStress) {
+  // This test waits for several seconds, so only run it in slow mode.
+  if (!AllowSlowTests()) return;
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  // We want to control leader election manually and we only want 2 replicas.
+  NO_FATALS(StartCluster({ "--enable_leader_failure_detection=false" },
+                         { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
+                           "--allow_unsafe_replication_factor=true" },
+                         /*num_tablet_servers=*/ 2));
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
+  workload.Setup();
+  ASSERT_OK(inspect_->WaitForReplicaCount(2));
+
+  // Figure out the tablet id.
+  vector<string> tablets = inspect_->ListTabletsOnTS(1);
+  ASSERT_EQ(1, tablets.size());
+  tablet_id_ = tablets[0];
+
+  for (int i = 1; i < cluster_->num_tablet_servers(); i++) {
+    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
+                                            tablet_id_, kTimeout));
+    LOG(INFO) << "TabletReplica is RUNNING: T " << tablet_id_
+              << " P " << cluster_->tablet_server(i)->uuid();
+  }
+
+  // Elect a leader and run some data through the cluster.
+  LOG(INFO) << "electing a leader...";
+  TServerDetails* ts0_ets = ts_map_[cluster_->tablet_server(0)->uuid()];
+  TServerDetails* ts1_ets = ts_map_[cluster_->tablet_server(1)->uuid()];
+  ASSERT_EVENTUALLY([&] {
+    // The tablet can report that it's running but still be bootstrapping, so
+    // retry until the election starts.
+    ASSERT_OK(itest::StartElection(ts0_ets, tablet_id_, kTimeout));
+  });
+
+  LOG(INFO) << "loading data...";
+  workload.Start();
+  while (workload.rows_inserted() < 100) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id_, workload.batches_completed()));
+  ASSERT_OK(itest::GetLastOpIdForReplica(tablet_id_, ts0_ets, COMMITTED_OPID, kTimeout,
+                                         &last_logged_opid_));
+
+  // Have the leader step down so we can test voting on the other replica.
+  // We don't shut this node down because it will serve as the tablet copy
+  // "source" during the test.
+  LOG(INFO) << "forcing leader to step down...";
+  ASSERT_OK(itest::LeaderStepDown(ts0_ets, tablet_id_, kTimeout));
+
+  // Now we are done with setup. Start the "stress" part of the test.
+  // Startup the voting thread.
+  LOG(INFO) << "starting stress thread...";
+  thread voter_thread([this] { RunVoteRequestLoop(); });
+  auto cleanup = MakeScopedCleanup([&] {
+    SetState(kTestComplete);
+    voter_thread.join();
+  });
+
+  int iter = 0;
+  while (iter++ < FLAGS_test_num_iterations) {
+    LOG(INFO) << "iteration " << (iter + 1) << " of " << FLAGS_test_num_iterations;
+    // Loop on voting for a while in running state. We want to give an
+    // opportunity for many votes during this time, and since voting involves
+    // fsyncing to disk, we wait for plenty of time here (and below).
+    SleepFor(MonoDelta::FromMilliseconds(500));
+
+    // 1. Tombstone tablet.
+    LOG(INFO) << "tombstoning tablet...";
+    SetState(kTombstoning);
+    ASSERT_OK(itest::DeleteTablet(ts1_ets, tablet_id_, TABLET_DATA_TOMBSTONED, boost::none,
+                                  kTimeout));
+    SetState(kTombstoned);
+
+    // Loop on voting for a while in tombstoned state.
+    SleepFor(MonoDelta::FromMilliseconds(500));
+
+    // 2. Copy tablet.
+    LOG(INFO) << "copying tablet...";
+    HostPort source_hp;
+    ASSERT_OK(HostPortFromPB(ts0_ets->registration.rpc_addresses(0), &source_hp));
+    SetState(kCopying);
+    ASSERT_OK(itest::StartTabletCopy(ts1_ets, tablet_id_, ts0_ets->uuid(), source_hp, current_term_,
+                                     kTimeout));
+    LOG(INFO) << "waiting for servers to agree...";
+    ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id_, workload.batches_completed()));
+
+    SetState(kRunning);
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/master/sys_catalog.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index f6d7a19..0e3a9f1 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -344,6 +344,12 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
       local_peer_pb_,
       master_->tablet_apply_pool(),
       Bind(&SysCatalogTable::SysCatalogStateChanged, Unretained(this), metadata->tablet_id())));
+  Status s = tablet_replica_->Init(master_->raft_pool());
+  if (!s.ok()) {
+    tablet_replica_->SetError(s);
+    tablet_replica_->Shutdown();
+    return s;
+  }
 
   scoped_refptr<ConsensusMetadata> cmeta;
   RETURN_NOT_OK(cmeta_manager_->Load(metadata->tablet_id(), &cmeta));
@@ -371,7 +377,6 @@ Status SysCatalogTable::SetupTablet(const scoped_refptr<tablet::TabletMetadata>&
                                                master_->messenger(),
                                                scoped_refptr<rpc::ResultTracker>(),
                                                log,
-                                               master_->raft_pool(),
                                                master_->tablet_prepare_pool()),
                         "Failed to Start() TabletReplica");
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/5bca7d8b/src/kudu/tablet/metadata.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/metadata.proto b/src/kudu/tablet/metadata.proto
index 83bffbc..70d8c1f 100644
--- a/src/kudu/tablet/metadata.proto
+++ b/src/kudu/tablet/metadata.proto
@@ -131,13 +131,29 @@ message TabletSuperBlockPB {
   optional DataDirGroupPB data_dir_group = 15;
 }
 
-// The enum of tablet states.
-// Tablet states are sent in TabletReports and kept in TabletReplica.
+// Tablet states represent stages of a TabletReplica's object lifecycle and are
+// reported to the master via tablet reports.
+//
+// Legal state transitions for a single TabletReplica object:
+//
+// NOT_INITIALIZED -> INITIALIZED -> BOOTSTRAPPING -> RUNNING -> STOPPING -> STOPPED -> SHUTDOWN
+//             |              |                |                  ^ ^ ^
+//             |              |                |                  | | |
+//             |              |                +------------------+ | |
+//             |              +-------------------------------------+ |
+//             +------------------------------------------------------+
+//
+// Since a TabletReplica instance is replaced when a Tablet Copy operation
+// occurs, from a remote perspective it is possible for a tablet replica to
+// appear to transition from SHUTDOWN back to NOT_INITIALIZED.
 enum TabletStatePB {
   UNKNOWN = 999;
 
-  // Tablet has not yet started.
-  NOT_STARTED = 5;
+  // Tablet has not yet been initialized.
+  NOT_INITIALIZED = 6;
+
+  // Tablet has been initialized but not yet started.
+  INITIALIZED = 5;
 
   // Indicates the Tablet is bootstrapping, i.e. that the Tablet is not
   // available for RPC.
@@ -152,8 +168,11 @@ enum TabletStatePB {
   FAILED = 2;
 
   // The Tablet is shutting down, and will not accept further requests.
-  QUIESCING = 3;
+  STOPPING = 3;
+
+  // The tablet has been stopped, possibly because it has been tombstoned.
+  STOPPED = 7;
 
-  // The Tablet has been stopped.
+  // The Tablet has been completely shut down.
   SHUTDOWN = 4;
 }