You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/10/17 15:23:22 UTC

[5/5] kudu git commit: [consensus] adding/removing NON_VOTER members

[consensus] adding/removing NON_VOTER members

Added ability to add and remove NON_VOTER member replicas.
Updated the kudu CLI accordingly.  Also, added new integration test:
  * RaftConsensusITest.AddNonVoterReplica
  * RaftConsensusITest.AddThenRemoveNonVoterReplica
  * RaftConsensusITest.NonVoterReplicasDoNotVote

Change-Id: I2662d45ad9bb6a4bf325d4202c2ee619ffad02b7
Reviewed-on: http://gerrit.cloudera.org:8080/8138
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fe23710c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fe23710c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fe23710c

Branch: refs/heads/master
Commit: fe23710c63c90e561b2622f6f67563572d10dbf9
Parents: 8c23c97
Author: Alexey Serbin <as...@cloudera.com>
Authored: Mon Oct 16 16:23:57 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Tue Oct 17 15:20:31 2017 +0000

----------------------------------------------------------------------
 src/kudu/consensus/leader_election-test.cc      |   5 +-
 src/kudu/consensus/leader_election.cc           |  22 +-
 src/kudu/consensus/leader_election.h            |   4 +-
 src/kudu/consensus/quorum_util-test.cc          |  47 +-
 src/kudu/consensus/quorum_util.cc               |  10 +-
 src/kudu/consensus/quorum_util.h                |   4 +
 src/kudu/consensus/raft_consensus.cc            |  22 +-
 src/kudu/integration-tests/CMakeLists.txt       |   1 +
 .../integration-tests/cluster_itest_util.cc     |   4 +-
 .../raft_consensus-itest-base.cc                |   6 +
 .../raft_consensus-itest-base.h                 |  12 +-
 .../integration-tests/raft_consensus-itest.cc   |  82 ++-
 .../raft_consensus_nonvoter-itest.cc            | 534 +++++++++++++++++++
 13 files changed, 671 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/leader_election-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election-test.cc b/src/kudu/consensus/leader_election-test.cc
index cec04c3..1dc26d9 100644
--- a/src/kudu/consensus/leader_election-test.cc
+++ b/src/kudu/consensus/leader_election-test.cc
@@ -161,7 +161,8 @@ void LeaderElectionTest::ElectionCallback(const ElectionResult& result) {
 
 void LeaderElectionTest::InitUUIDs(int num_voters) {
   voter_uuids_ = GenVoterUUIDs(num_voters);
-  candidate_uuid_ = voter_uuids_[num_voters - 1];
+  CHECK(!voter_uuids_.empty());
+  candidate_uuid_ = voter_uuids_.back();
   voter_uuids_.pop_back();
 }
 
@@ -170,6 +171,7 @@ void LeaderElectionTest::InitNoOpPeerProxies() {
   for (const string& uuid : voter_uuids_) {
     RaftPeerPB* peer_pb = config_.add_peers();
     peer_pb->set_permanent_uuid(uuid);
+    peer_pb->set_member_type(RaftPeerPB::VOTER);
     PeerProxy* proxy = new NoOpTestPeerProxy(pool_.get(), *peer_pb);
     InsertOrDie(&proxies_, uuid, proxy);
   }
@@ -180,6 +182,7 @@ void LeaderElectionTest::InitDelayableMockedProxies(bool enable_delay) {
   for (const string& uuid : voter_uuids_) {
     RaftPeerPB* peer_pb = config_.add_peers();
     peer_pb->set_permanent_uuid(uuid);
+    peer_pb->set_member_type(RaftPeerPB::VOTER);
     auto proxy = new DelayablePeerProxy<MockedPeerProxy>(pool_.get(),
                                                          new MockedPeerProxy(pool_.get()));
     if (enable_delay) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/leader_election.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election.cc b/src/kudu/consensus/leader_election.cc
index 07b8a01..b8cbddc 100644
--- a/src/kudu/consensus/leader_election.cc
+++ b/src/kudu/consensus/leader_election.cc
@@ -161,8 +161,18 @@ LeaderElection::LeaderElection(const RaftConfigPB& config,
       decision_callback_(std::move(decision_callback)),
       highest_voter_term_(0) {
   for (const RaftPeerPB& peer : config.peers()) {
-    if (request.candidate_uuid() == peer.permanent_uuid()) continue;
-    follower_uuids_.push_back(peer.permanent_uuid());
+    if (request.candidate_uuid() == peer.permanent_uuid()) {
+      DCHECK_EQ(peer.member_type(), RaftPeerPB::VOTER)
+          << Substitute("non-voter member $0 tried to start an election; "
+                        "Raft config {$1}",
+                        peer.permanent_uuid(),
+                        pb_util::SecureShortDebugString(config));
+      continue;
+    }
+    if (peer.member_type() != RaftPeerPB::VOTER) {
+      continue;
+    }
+    other_voter_uuids_.push_back(peer.permanent_uuid());
 
     gscoped_ptr<VoterState> state(new VoterState());
     state->proxy_status = proxy_factory->NewProxy(peer, &state->proxy);
@@ -173,10 +183,10 @@ LeaderElection::LeaderElection(const RaftConfigPB& config,
   CHECK_EQ(1, vote_counter_->GetTotalVotesCounted()) << "Candidate must vote for itself first";
 
   // Ensure that existing votes + future votes add up to the expected total.
-  CHECK_EQ(vote_counter_->GetTotalVotesCounted() + follower_uuids_.size(),
+  CHECK_EQ(vote_counter_->GetTotalVotesCounted() + other_voter_uuids_.size(),
            vote_counter_->GetTotalExpectedVotes())
-      << "Expected different number of followers. Follower UUIDs: ["
-      << JoinStringsIterator(follower_uuids_.begin(), follower_uuids_.end(), ", ")
+      << "Expected different number of voters. Voter UUIDs: ["
+      << JoinStringsIterator(other_voter_uuids_.begin(), other_voter_uuids_.end(), ", ")
       << "]; RaftConfig: {" << pb_util::SecureShortDebugString(config) << "}";
 }
 
@@ -194,7 +204,7 @@ void LeaderElection::Run() {
   CheckForDecision();
 
   // The rest of the code below is for a typical multi-node configuration.
-  for (const std::string& voter_uuid : follower_uuids_) {
+  for (const std::string& voter_uuid : other_voter_uuids_) {
     VoterState* state = nullptr;
     {
       std::lock_guard<Lock> guard(lock_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/leader_election.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election.h b/src/kudu/consensus/leader_election.h
index 3d3620f..33e4241 100644
--- a/src/kudu/consensus/leader_election.h
+++ b/src/kudu/consensus/leader_election.h
@@ -221,9 +221,9 @@ class LeaderElection : public RefCountedThreadSafe<LeaderElection> {
   // Callback invoked to notify the caller of an election decision.
   const ElectionDecisionCallback decision_callback_;
 
-  // List of all potential followers to request votes from.
+  // List of all other voters to request votes from.
   // The candidate's own UUID must not be included.
-  std::vector<std::string> follower_uuids_;
+  std::vector<std::string> other_voter_uuids_;
 
   // Map of UUID -> VoterState.
   VoterStateMap voter_state_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/quorum_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util-test.cc b/src/kudu/consensus/quorum_util-test.cc
index 87b30cd..9982c74 100644
--- a/src/kudu/consensus/quorum_util-test.cc
+++ b/src/kudu/consensus/quorum_util-test.cc
@@ -30,19 +30,21 @@ namespace consensus {
 
 using std::string;
 
-static void SetPeerInfo(const string& uuid,
-                        RaftPeerPB::MemberType type,
-                        RaftPeerPB* peer) {
+// Add a consensus peer into the specified configuration.
+static void AddPeer(RaftConfigPB* config,
+                    const string& uuid,
+                    RaftPeerPB::MemberType type) {
+  RaftPeerPB* peer = config->add_peers();
   peer->set_permanent_uuid(uuid);
-  peer->set_member_type(type);
   peer->mutable_last_known_addr()->set_host(uuid + ".example.com");
+  peer->set_member_type(type);
 }
 
 TEST(QuorumUtilTest, TestMemberExtraction) {
   RaftConfigPB config;
-  SetPeerInfo("A", RaftPeerPB::VOTER, config.add_peers());
-  SetPeerInfo("B", RaftPeerPB::VOTER, config.add_peers());
-  SetPeerInfo("C", RaftPeerPB::VOTER, config.add_peers());
+  AddPeer(&config, "A", RaftPeerPB::VOTER);
+  AddPeer(&config, "B", RaftPeerPB::VOTER);
+  AddPeer(&config, "C", RaftPeerPB::VOTER);
 
   // Basic test for GetRaftConfigMember().
   RaftPeerPB peer_pb;
@@ -63,9 +65,9 @@ TEST(QuorumUtilTest, TestMemberExtraction) {
 
 TEST(QuorumUtilTest, TestDiffConsensusStates) {
   ConsensusStatePB old_cs;
-  SetPeerInfo("A", RaftPeerPB::VOTER, old_cs.mutable_committed_config()->add_peers());
-  SetPeerInfo("B", RaftPeerPB::VOTER, old_cs.mutable_committed_config()->add_peers());
-  SetPeerInfo("C", RaftPeerPB::VOTER, old_cs.mutable_committed_config()->add_peers());
+  AddPeer(old_cs.mutable_committed_config(), "A", RaftPeerPB::VOTER);
+  AddPeer(old_cs.mutable_committed_config(), "B", RaftPeerPB::VOTER);
+  AddPeer(old_cs.mutable_committed_config(), "C", RaftPeerPB::VOTER);
   old_cs.set_current_term(1);
   old_cs.set_leader_uuid("A");
   old_cs.mutable_committed_config()->set_opid_index(1);
@@ -100,7 +102,7 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
   {
     auto new_cs = old_cs;
     new_cs.mutable_committed_config()->set_opid_index(2);
-    SetPeerInfo("D", RaftPeerPB::NON_VOTER, new_cs.mutable_committed_config()->add_peers());
+    AddPeer(new_cs.mutable_committed_config(), "D", RaftPeerPB::NON_VOTER);
 
     EXPECT_EQ("config changed from index 1 to 2, "
               "NON_VOTER D (D.example.com) added",
@@ -148,7 +150,7 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
   // Simulate a change in a pending config
   {
     auto before_cs = old_cs;
-    SetPeerInfo("A", RaftPeerPB::VOTER, before_cs.mutable_pending_config()->add_peers());
+    AddPeer(before_cs.mutable_pending_config(), "A", RaftPeerPB::VOTER);
     auto after_cs = before_cs;
     after_cs.mutable_pending_config()
       ->mutable_peers()->Mutable(0)->set_member_type(RaftPeerPB::NON_VOTER);
@@ -158,5 +160,26 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
   }
 }
 
+TEST(QuorumUtilTest, TestIsRaftConfigVoter) {
+  RaftConfigPB config;
+  AddPeer(&config, "A", RaftPeerPB::VOTER);
+  AddPeer(&config, "B", RaftPeerPB::NON_VOTER);
+  AddPeer(&config, "C", RaftPeerPB::UNKNOWN_MEMBER_TYPE);
+
+  // The case when membership type is not specified. That sort of configuration
+  // would not pass VerifyRaftConfig(), though. Anyway, that should result
+  // in non-voter since the member_type is initialized with UNKNOWN_MEMBER_TYPE.
+  const string no_member_type_peer_uuid = "D";
+  RaftPeerPB* no_member_type_peer = config.add_peers();
+  no_member_type_peer->set_permanent_uuid(no_member_type_peer_uuid);
+  no_member_type_peer->mutable_last_known_addr()->set_host(
+      no_member_type_peer_uuid + ".example.com");
+
+  ASSERT_TRUE(IsRaftConfigVoter("A", config));
+  ASSERT_FALSE(IsRaftConfigVoter("B", config));
+  ASSERT_FALSE(IsRaftConfigVoter("C", config));
+  ASSERT_FALSE(IsRaftConfigVoter(no_member_type_peer_uuid, config));
+}
+
 } // namespace consensus
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/quorum_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util.cc b/src/kudu/consensus/quorum_util.cc
index ca074b4..55c5bb9 100644
--- a/src/kudu/consensus/quorum_util.cc
+++ b/src/kudu/consensus/quorum_util.cc
@@ -60,6 +60,10 @@ bool IsRaftConfigVoter(const std::string& uuid, const RaftConfigPB& config) {
   return false;
 }
 
+bool IsVoterRole(RaftPeerPB::Role role) {
+  return role == RaftPeerPB::LEADER || role == RaftPeerPB::FOLLOWER;
+}
+
 Status GetRaftConfigMember(const RaftConfigPB& config,
                            const std::string& uuid,
                            RaftPeerPB* peer_pb) {
@@ -171,12 +175,6 @@ Status VerifyRaftConfig(const RaftConfigPB& config) {
           Substitute("Peer: $0 has no member type set. RaftConfig: $1", peer.permanent_uuid(),
                      SecureShortDebugString(config)));
     }
-    if (peer.member_type() == RaftPeerPB::NON_VOTER) {
-      return Status::IllegalState(
-          Substitute(
-              "Peer: $0 is a NON_VOTER, but this isn't supported yet. RaftConfig: $1",
-              peer.permanent_uuid(), SecureShortDebugString(config)));
-    }
   }
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/quorum_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util.h b/src/kudu/consensus/quorum_util.h
index 822a789..a8d9032 100644
--- a/src/kudu/consensus/quorum_util.h
+++ b/src/kudu/consensus/quorum_util.h
@@ -35,6 +35,10 @@ enum RaftConfigState {
 bool IsRaftConfigMember(const std::string& uuid, const RaftConfigPB& config);
 bool IsRaftConfigVoter(const std::string& uuid, const RaftConfigPB& config);
 
+// Whether the specified Raft role is attributed to a peer which can participate
+// in leader elections.
+bool IsVoterRole(RaftPeerPB::Role role);
+
 // Get the specified member of the config.
 // Returns Status::NotFound if a member with the specified uuid could not be
 // found in the config.

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 50b60bf..66a3c81 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -401,10 +401,17 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
       LOG_WITH_PREFIX_UNLOCKED(INFO) << "Not starting " << mode << " -- already leader";
       return Status::OK();
     }
+    if (PREDICT_FALSE(!consensus::IsVoterRole(active_role))) {
+      // A non-voter should not start leader elections. The leader failure
+      // detector should be re-enabled once the non-voter replica is promoted
+      // to voter replica.
+      return Status::IllegalState("only voting members can start elections",
+          SecureShortDebugString(cmeta_->ActiveConfig()));
+    }
     if (PREDICT_FALSE(active_role == RaftPeerPB::NON_PARTICIPANT)) {
       SnoozeFailureDetector();
-      return Status::IllegalState("Not starting election: Node is currently "
-                                  "a non-participant in the raft config",
+      return Status::IllegalState("Not starting election: node is currently "
+                                  "a non-participant in the Raft config",
                                   SecureShortDebugString(cmeta_->ActiveConfig()));
     }
     LOG_WITH_PREFIX_UNLOCKED(INFO)
@@ -578,8 +585,15 @@ Status RaftConsensus::BecomeReplicaUnlocked(boost::optional<MonoDelta> fd_delta)
                                  << ToStringUnlocked();
   ClearLeaderUnlocked();
 
-  // FD should be running while we are a follower.
-  EnableFailureDetector(std::move(fd_delta));
+  if (consensus::IsVoterRole(cmeta_->active_role())) {
+    // A voter should run failure detector, if not a leader.
+    EnableFailureDetector(std::move(fd_delta));
+  } else {
+    // A non-voter should not start leader elections. The leader failure
+    // detector should be re-enabled once the non-voter replica is promoted
+    // to voter replica.
+    DisableFailureDetector();
+  }
 
   // Now that we're a replica, we can allow voting for other nodes.
   withhold_votes_until_ = MonoTime::Min();

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index ecdda20..e83b56d 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -88,6 +88,7 @@ ADD_KUDU_TEST(open-readonly-fs-itest)
 ADD_KUDU_TEST(raft_config_change-itest)
 ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true)
 ADD_KUDU_TEST(raft_consensus_election-itest)
+ADD_KUDU_TEST(raft_consensus_nonvoter-itest)
 ADD_KUDU_TEST(registration-test RESOURCE_LOCK "master-web-port")
 ADD_KUDU_TEST(security-faults-itest)
 ADD_KUDU_TEST(security-itest)

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index fa4c1d4..ca4cb75 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -429,7 +429,8 @@ Status WaitForNumTabletServers(
 
 Status WaitForReplicasReportedToMaster(
     const shared_ptr<master::MasterServiceProxy>& master_proxy,
-    int num_replicas, const string& tablet_id,
+    int num_replicas,
+    const string& tablet_id,
     const MonoDelta& timeout,
     WaitForLeader wait_for_leader,
     bool* has_leader,
@@ -701,7 +702,6 @@ Status AddServer(const TServerDetails* leader,
   if (cas_config_opid_index) {
     req.set_cas_config_opid_index(*cas_config_opid_index);
   }
-
   RETURN_NOT_OK(leader->consensus_proxy->ChangeConfig(req, &resp, &rpc));
   if (resp.has_error()) {
     if (error_code) *error_code = resp.error().code();

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/integration-tests/raft_consensus-itest-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest-base.cc b/src/kudu/integration-tests/raft_consensus-itest-base.cc
index afd8c64..08b2871 100644
--- a/src/kudu/integration-tests/raft_consensus-itest-base.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest-base.cc
@@ -268,6 +268,12 @@ void RaftConsensusITestBase::CauseFollowerToFallBehindLogGC(
   *fell_behind_uuid = replica->uuid();
 }
 
+Status RaftConsensusITestBase::GetTermMetricValue(ExternalTabletServer* ts,
+                                                  int64_t *term) {
+  return ts->GetInt64Metric(&METRIC_ENTITY_tablet, nullptr, &METRIC_raft_term,
+                            "value", term);
+}
+
 }  // namespace tserver
 }  // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/integration-tests/raft_consensus-itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest-base.h b/src/kudu/integration-tests/raft_consensus-itest-base.h
index 98a379e..18aa82a 100644
--- a/src/kudu/integration-tests/raft_consensus-itest-base.h
+++ b/src/kudu/integration-tests/raft_consensus-itest-base.h
@@ -23,8 +23,14 @@
 
 #include "kudu/integration-tests/ts_itest-base.h"
 #include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
 
 namespace kudu {
+
+namespace cluster {
+class ExternalTabletServer;
+}
+
 namespace tserver {
 
 class TabletServerServiceProxy;
@@ -45,8 +51,12 @@ class RaftConsensusITestBase : public TabletServerIntegrationTestBase {
                                   uint64_t num_batches,
                                   const std::vector<CountDownLatch*>& latches);
  protected:
+  // Retrieve the current term of the first tablet on this tablet server.
+  static Status GetTermMetricValue(cluster::ExternalTabletServer* ts,
+                                   int64_t* term);
+
   // Flags needed for CauseFollowerToFallBehindLogGC() to work well.
-  void AddFlagsForLogRolls(std::vector<std::string>* extra_tserver_flags);
+  static void AddFlagsForLogRolls(std::vector<std::string>* extra_tserver_flags);
 
   // Pause one of the followers and write enough data to the remaining replicas
   // to cause log GC, then resume the paused follower. On success,

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 4d030c5..d012867 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -21,7 +21,6 @@
 #include <ostream>
 #include <string>
 #include <unordered_map>
-#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -91,7 +90,6 @@ DECLARE_int32(rpc_timeout);
 
 METRIC_DECLARE_entity(tablet);
 METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
-METRIC_DECLARE_gauge_int64(raft_term);
 
 using kudu::client::KuduInsert;
 using kudu::client::KuduSession;
@@ -107,24 +105,24 @@ using kudu::consensus::OpId;
 using kudu::consensus::RaftPeerPB;
 using kudu::consensus::ReplicateMsg;
 using kudu::itest::AddServer;
-using kudu::itest::GetReplicaStatusAndCheckIfLeader;
+using kudu::itest::DONT_WAIT_FOR_LEADER;
 using kudu::itest::LeaderStepDown;
 using kudu::itest::RemoveServer;
 using kudu::itest::StartElection;
-using kudu::itest::TabletServerMap;
 using kudu::itest::TServerDetails;
+using kudu::itest::TabletServerMap;
+using kudu::itest::WAIT_FOR_LEADER;
+using kudu::itest::WaitForReplicasReportedToMaster;
 using kudu::itest::WaitUntilLeader;
 using kudu::itest::WriteSimpleTestRow;
 using kudu::master::TabletLocationsPB;
-using kudu::pb_util::SecureShortDebugString;
 using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
 using kudu::rpc::RpcController;
 using kudu::server::SetFlagRequestPB;
 using kudu::server::SetFlagResponsePB;
-using kudu::tablet::TABLET_DATA_COPYING;
 using std::string;
 using std::unordered_map;
-using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -201,9 +199,6 @@ class RaftConsensusITest : public RaftConsensusITestBase {
   void SetupSingleReplicaTest(TServerDetails** replica_ts);
 
  protected:
-  // Retrieve the current term of the first tablet on this tablet server.
-  Status GetTermMetricValue(ExternalTabletServer* ts, int64_t* term);
-
   shared_ptr<KuduTable> table_;
   vector<scoped_refptr<kudu::Thread> > threads_;
 };
@@ -254,13 +249,13 @@ string RaftConsensusITest::DumpToString(TServerDetails* leader,
                                         const vector<string>& leader_results,
                                         TServerDetails* replica,
                                         const vector<string>& replica_results) {
-  string ret = strings::Substitute("Replica results did not match the leaders."
-                                   "\nLeader: $0\nReplica: $1. Results size "
-                                   "L: $2 R: $3",
-                                   leader->ToString(),
-                                   replica->ToString(),
-                                   leader_results.size(),
-                                   replica_results.size());
+  string ret = Substitute("Replica results did not match the leaders."
+                          "\nLeader: $0\nReplica: $1. Results size "
+                          "L: $2 R: $3",
+                          leader->ToString(),
+                          replica->ToString(),
+                          leader_results.size(),
+                          replica_results.size());
 
   StrAppend(&ret, "Leader Results: \n");
   for (const string& result : leader_results) {
@@ -604,11 +599,6 @@ void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
   LOG(INFO) << "================================== Cluster setup complete.";
 }
 
-Status RaftConsensusITest::GetTermMetricValue(ExternalTabletServer* ts,
-                                              int64_t *term) {
-  return ts->GetInt64Metric(&METRIC_ENTITY_tablet, nullptr, &METRIC_raft_term, "value", term);
-}
-
 // Test that we can retrieve the permanent uuid of a server running
 // consensus service via RPC.
 TEST_F(RaftConsensusITest, TestGetPermanentUuid) {
@@ -703,7 +693,7 @@ TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) {
   int num_threads = FLAGS_num_client_threads;
   for (int i = 0; i < num_threads; i++) {
     scoped_refptr<kudu::Thread> new_thread;
-    CHECK_OK(kudu::Thread::Create("test", strings::Substitute("ts-test$0", i),
+    CHECK_OK(kudu::Thread::Create("test", Substitute("ts-test$0", i),
                                   &RaftConsensusITest::InsertTestRowsRemoteThread,
                                   this, i * FLAGS_client_inserts_per_thread,
                                   FLAGS_client_inserts_per_thread,
@@ -714,7 +704,7 @@ TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) {
   }
   for (int i = 0; i < FLAGS_num_replicas; i++) {
     scoped_refptr<kudu::Thread> new_thread;
-    CHECK_OK(kudu::Thread::Create("test", strings::Substitute("chaos-test$0", i),
+    CHECK_OK(kudu::Thread::Create("test", Substitute("chaos-test$0", i),
                                   &RaftConsensusITest::DelayInjectorThread,
                                   this, cluster_->tablet_server(i),
                                   kConsensusRpcTimeoutForTests,
@@ -939,10 +929,10 @@ TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
 
   OverrideFlagForSlowTests(
       "client_inserts_per_thread",
-      strings::Substitute("$0", (FLAGS_client_inserts_per_thread * 100)));
+      Substitute("$0", (FLAGS_client_inserts_per_thread * 100)));
   OverrideFlagForSlowTests(
       "client_num_batches_per_thread",
-      strings::Substitute("$0", (FLAGS_client_num_batches_per_thread * 100)));
+      Substitute("$0", (FLAGS_client_num_batches_per_thread * 100)));
 
   int num_threads = FLAGS_num_client_threads;
   int64_t total_num_rows = num_threads * FLAGS_client_inserts_per_thread;
@@ -956,7 +946,7 @@ TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
 
   for (int i = 0; i < num_threads; i++) {
     scoped_refptr<kudu::Thread> new_thread;
-    CHECK_OK(kudu::Thread::Create("test", strings::Substitute("ts-test$0", i),
+    CHECK_OK(kudu::Thread::Create("test", Substitute("ts-test$0", i),
                                   &RaftConsensusITest::InsertTestRowsRemoteThread,
                                   this, i * FLAGS_client_inserts_per_thread,
                                   FLAGS_client_inserts_per_thread,
@@ -1003,7 +993,7 @@ TEST_F(RaftConsensusITest, TestKUDU_597) {
   AtomicBool finish(false);
   for (int i = 0; i < FLAGS_num_tablet_servers; i++) {
     scoped_refptr<kudu::Thread> new_thread;
-    CHECK_OK(kudu::Thread::Create("test", strings::Substitute("ts-test$0", i),
+    CHECK_OK(kudu::Thread::Create("test", Substitute("ts-test$0", i),
                                   &RaftConsensusITest::StubbornlyWriteSameRowThread,
                                   this, i, &finish, &new_thread));
     threads_.push_back(new_thread);
@@ -1459,10 +1449,8 @@ TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) {
 
   // Now try to replicate a ChangeConfig operation. This should get stuck and time out
   // because the server can't replicate any operations.
-  TabletServerErrorPB::Code error_code;
   Status s = RemoveServer(leader_tserver, tablet_id_, tservers[1],
-                          -1, MonoDelta::FromSeconds(1),
-                          &error_code);
+                          -1, MonoDelta::FromSeconds(1));
   ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
 
   // Pause the leader, and restart the other servers.
@@ -1490,8 +1478,7 @@ TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) {
   // This acts as a regression test for KUDU-1338, in which aborting the original
   // config change didn't properly unset the 'pending' configuration.
   ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tservers[2],
-                         -1, MonoDelta::FromSeconds(5),
-                         &error_code));
+                         -1, MonoDelta::FromSeconds(5)));
   NO_FATALS(InsertTestRowsRemoteThread(10, 10, 1, vector<CountDownLatch*>()));
 }
 
@@ -1713,9 +1700,9 @@ TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) {
   LOG(INFO) << "Waiting for Master to see the current replicas...";
   master::TabletLocationsPB tablet_locations;
   bool has_leader;
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   2, tablet_id, timeout, itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            2, tablet_id, timeout, WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
 
   // Wait for initial NO_OP to be committed by the leader.
@@ -1733,10 +1720,10 @@ TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) {
   // Wait for the master to be notified of the config change.
   // It should continue to have the same leader, even without waiting.
   LOG(INFO) << "Waiting for Master to see config change...";
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id, timeout,
-                                                   itest::DONT_WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id, timeout,
+                                            DONT_WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
 
@@ -1749,10 +1736,10 @@ TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) {
 
   // Wait for the master to be notified of the removal.
   LOG(INFO) << "Waiting for Master to see config change...";
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   2, tablet_id, timeout,
-                                                   itest::DONT_WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            2, tablet_id, timeout,
+                                            DONT_WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
 }
@@ -2326,9 +2313,9 @@ TEST_F(RaftConsensusITest, TestChangeConfigRejectedUnlessNoopReplicated) {
   // Now attempt to do a config change. It should be rejected because there
   // have not been any ops (notably the initial NO_OP) from the leader's term
   // that have been committed yet.
-  Status s = itest::RemoveServer(leader_ts, tablet_id_,
-                                 tablet_servers_[cluster_->tablet_server(1)->uuid()],
-                                 boost::none, timeout);
+  Status s = RemoveServer(leader_ts, tablet_id_,
+                          tablet_servers_[cluster_->tablet_server(1)->uuid()],
+                          boost::none, timeout);
   ASSERT_TRUE(!s.ok()) << s.ToString();
   ASSERT_STR_CONTAINS(s.ToString(), "Leader has not yet committed an operation in its own term");
 }
@@ -2587,4 +2574,3 @@ TEST_F(RaftConsensusITest, TestLogIOErrorIsFatal) {
 
 }  // namespace tserver
 }  // namespace kudu
-

http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
new file mode 100644
index 0000000..fa32bb9
--- /dev/null
+++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
@@ -0,0 +1,534 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
+#include "kudu/integration-tests/raft_consensus-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_int32(num_replicas);
+DECLARE_int32(num_tablet_servers);
+
+METRIC_DECLARE_gauge_int32(tablet_copy_open_client_sessions);
+METRIC_DECLARE_gauge_int32(tablet_copy_open_source_sessions);
+
+using kudu::cluster::ExternalDaemon;
+using kudu::cluster::ExternalTabletServer;
+using kudu::consensus::RaftPeerPB;
+using kudu::itest::AddServer;
+using kudu::itest::LeaderStepDown;
+using kudu::itest::RemoveServer;
+using kudu::itest::StartElection;
+using kudu::itest::TServerDetails;
+using kudu::itest::TabletServerMap;
+using kudu::itest::WAIT_FOR_LEADER;
+using kudu::itest::WaitForReplicasReportedToMaster;
+using kudu::master::TabletLocationsPB;
+using kudu::tablet::TABLET_DATA_COPYING;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tserver {
+
+// Integration test for the raft consensus implementation.
+// Uses the whole tablet server stack with ExternalMiniCluster.
+class RaftConsensusNonVoterITest : public RaftConsensusITestBase {
+ public:
+  RaftConsensusNonVoterITest() = default;
+
+ protected:
+  // Get number of source tablet copy sessions at the specified server.
+  Status GetTabletCopySourceSessionsCount(const ExternalDaemon& server,
+                                          int64_t* count);
+
+  // Get number of target/client tablet copy sessions at the specified server.
+  Status GetTabletCopyTargetSessionsCount(const ExternalDaemon& server,
+                                          int64_t* count);
+
+  // Add replica of the specified type for the specified tablet.
+  Status AddReplica(const string& tablet_id,
+                    const TServerDetails* replica,
+                    RaftPeerPB::MemberType replica_type,
+                    const MonoDelta& timeout);
+
+  // Remove replica of the specified tablet.
+  Status RemoveReplica(const string& tablet_id,
+                       const TServerDetails* replica,
+                       const MonoDelta& timeout);
+};
+
+Status RaftConsensusNonVoterITest::GetTabletCopySourceSessionsCount(
+    const ExternalDaemon& server, int64_t* count) {
+  return server.GetInt64Metric(
+      &METRIC_ENTITY_server, "kudu.tabletserver",
+      &METRIC_tablet_copy_open_source_sessions, "value", count);
+}
+
+Status RaftConsensusNonVoterITest::GetTabletCopyTargetSessionsCount(
+    const ExternalDaemon& server, int64_t* count) {
+  return server.GetInt64Metric(
+      &METRIC_ENTITY_server, "kudu.tabletserver",
+      &METRIC_tablet_copy_open_client_sessions, "value", count);
+}
+
+Status RaftConsensusNonVoterITest::AddReplica(const string& tablet_id,
+                                              const TServerDetails* replica,
+                                              RaftPeerPB::MemberType replica_type,
+                                              const MonoDelta& timeout) {
+  TServerDetails* leader = nullptr;
+  RETURN_NOT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
+
+  // Wait for at least one operation committed by the leader in current term.
+  // Otherwise, any Raft configuration change attempt might end up with error
+  // 'Illegal state: Leader has not yet committed an operation in its own term'.
+  RETURN_NOT_OK(WaitForOpFromCurrentTerm(leader, tablet_id,
+                                         consensus::COMMITTED_OPID, timeout));
+  return AddServer(leader, tablet_id, replica, replica_type,
+                   boost::none, timeout);
+}
+
+Status RaftConsensusNonVoterITest::RemoveReplica(const string& tablet_id,
+                                                 const TServerDetails* replica,
+                                                 const MonoDelta& timeout) {
+  TServerDetails* leader = nullptr;
+  RETURN_NOT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
+
+  // Wait for at least one operation committed by the leader in current term.
+  // Otherwise, any Raft configuration change attempt might end up with error
+  // 'Illegal state: Leader has not yet committed an operation in its own term'.
+  RETURN_NOT_OK(WaitForOpFromCurrentTerm(leader, tablet_id,
+                                         consensus::COMMITTED_OPID, timeout));
+  return RemoveServer(leader, tablet_id, replica, boost::none, timeout);
+}
+
+// Ensure that adding a NON_VOTER replica is properly handled by the system:
+//
+//   * Updating Raft configuration for tablet by adding a NON_VOTER replica
+//     succeeds, no errors reported.
+//
+//   * After adding a replica, the system should start tablet copying
+//     to the newly added replica: both the source and the target copy sessions
+//     should be active for some time.
+//
+//   * By the time the newly added replica changes its state to RUNNING,
+//     the tablet copy session should end at both sides.
+//
+//   * Tablet leader reports about the newly added replica to the master.
+//
+//   * If the leader steps down, a new one can be elected and it's possible
+//     to insert data into the table which contains the tablet.
+//
+//   * The tablet stays consistent: ksck verification reports no error,
+//     replicated operation indices match across all replicas,
+//     tablet row count matches the expected number.
+//
+TEST_F(RaftConsensusNonVoterITest, AddNonVoterReplica) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(60);
+  const vector<string> kMasterFlags = {
+    // Allow replication factor of 2.
+    "--allow_unsafe_replication_factor=true",
+  };
+  const vector<string> kTserverFlags = {
+    // Slow down tablet copy to observe active source and target sessions.
+    "--tablet_copy_download_file_inject_latency_ms=1000",
+  };
+  const int kOriginalReplicasNum = 2;
+
+  FLAGS_num_tablet_servers = 3;
+  FLAGS_num_replicas = kOriginalReplicasNum;
+  NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+  ASSERT_EQ(3, tablet_servers_.size());
+  ASSERT_EQ(kOriginalReplicasNum, tablet_replicas_.size());
+
+  const string& tablet_id = tablet_id_;
+  TabletServerMap replica_servers;
+  for (const auto& e : tablet_replicas_) {
+    if (e.first == tablet_id) {
+      replica_servers.emplace(e.second->uuid(), e.second);
+    }
+  }
+  ASSERT_EQ(FLAGS_num_replicas, replica_servers.size());
+
+  // Create a test table and insert some data into the table,
+  // so the special flag --tablet_copy_download_file_inject_latency_ms
+  // could take affect while tablet copy happens down the road.
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableId);
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 100) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+
+  ASSERT_OK(WaitForServersToAgree(kTimeout, replica_servers, tablet_id, 1));
+
+  TServerDetails* new_replica = nullptr;
+  for (const auto& ts : tablet_servers_) {
+    if (replica_servers.find(ts.first) == replica_servers.end()) {
+      new_replica = ts.second;
+      break;
+    }
+  }
+  ASSERT_NE(nullptr, new_replica);
+
+  ASSERT_OK(AddReplica(tablet_id, new_replica, RaftPeerPB::NON_VOTER, kTimeout));
+
+  const int new_replica_idx =
+      cluster_->tablet_server_index_by_uuid(new_replica->uuid());
+  // Wait for the tablet copying to start.
+  ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(
+      new_replica_idx, tablet_id, { TABLET_DATA_COPYING }, kTimeout));
+
+  TServerDetails* leader = nullptr;
+  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
+  const ExternalDaemon& ed_leader =
+      *cluster_->tablet_server_by_uuid(leader->uuid());
+  const ExternalDaemon& ed_new_replica =
+      *cluster_->tablet_server_by_uuid(new_replica->uuid());
+  {
+    int64_t num_sessions;
+    ASSERT_OK(GetTabletCopySourceSessionsCount(ed_leader, &num_sessions));
+    EXPECT_EQ(1, num_sessions);
+  }
+  {
+    int64_t num_sessions;
+    ASSERT_OK(GetTabletCopyTargetSessionsCount(ed_new_replica, &num_sessions));
+    EXPECT_EQ(1, num_sessions);
+  }
+
+  // The newly copied replica should be able to start.
+  ASSERT_OK(WaitForNumTabletsOnTS(
+      new_replica, 1, kTimeout, nullptr, tablet::RUNNING));
+
+  // The tablet copying should complete shortly after tablet state becomes
+  // RUNNING. Sampling the counters right after seeing RUNNING tablet status
+  // is a little racy: it takes some time to end tablet copy sessions at both
+  // sides. So, using ASSERT_EVENTUALLY here to avoid flakiness.
+  ASSERT_EVENTUALLY([&]() {
+      int64_t num_sessions;
+      ASSERT_OK(GetTabletCopySourceSessionsCount(ed_leader, &num_sessions));
+      EXPECT_EQ(0, num_sessions);
+      ASSERT_OK(GetTabletCopyTargetSessionsCount(ed_new_replica, &num_sessions));
+      EXPECT_EQ(0, num_sessions);
+    });
+
+  // The master should report about the newly added NON_VOTER tablet replica
+  // to the established leader.
+  bool has_leader;
+  master::TabletLocationsPB tablet_locations;
+  ASSERT_OK(WaitForReplicasReportedToMaster(
+      cluster_->master_proxy(), kOriginalReplicasNum + 1, tablet_id, kTimeout,
+      WAIT_FOR_LEADER, &has_leader, &tablet_locations));
+  ASSERT_TRUE(has_leader);
+
+  // Check the update cluster is able to elect a leader.
+  {
+    TServerDetails* leader = nullptr;
+    ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
+    ASSERT_OK(LeaderStepDown(leader, tablet_id, kTimeout));
+  }
+
+  // Make sure it's possible to insert more data into the table once it's backed
+  // by one more (NON_VOTER) replica.
+  const int64_t prev_inserted = workload.rows_inserted();
+  workload.Start();
+  while (workload.rows_inserted() < 2 * prev_inserted) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+  // Ensure that the replicas converge. Along with other verification steps,
+  // ClusterVerifier employs VerifyCommittedOpIdsMatch() to verify that
+  // all OpIds match in local files under all tablet servers of the cluster,
+  // so NON_VOTER replicas are covered by this check as well.
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+  NO_FATALS(v.CheckRowCount(workload.table_name(),
+                            ClusterVerifier::EXACTLY,
+                            workload.rows_inserted()));
+}
+
+// Test how the system reacts on removing a NON_VOTER replica from
+// tablet cluster:
+//
+//   * First, add a NON_VOTER member into the cluster (covered by other test).
+//
+//   * Make sure that changing Raft configuration by removing a NON_VOTER
+//     replica does not return errors.
+//
+//   * After removing such a non-voter replica, the system should not try
+//     to add a new replica instead of the removed one.
+//
+//   * Tablet leader is established and it reports about the removed replica
+//     to the master.
+//
+//   * The updated tablet is still available: it's possible to insert data
+//     into the table which is hosted by the tablet.
+//
+//   * The tablet stays consistent: ksck verification reports no error,
+//     replicated operation indices match across all remaining replicas,
+//     tablet row count matches the expected number.
+//
+TEST_F(RaftConsensusNonVoterITest, AddThenRemoveNonVoterReplica) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(60);
+  const vector<string> kMasterFlags = {
+    // Allow replication factor of 2.
+    "--allow_unsafe_replication_factor=true",
+  };
+  const int kOriginalReplicasNum = 2;
+
+  FLAGS_num_tablet_servers = 3;
+  FLAGS_num_replicas = kOriginalReplicasNum;
+  NO_FATALS(BuildAndStart({}, kMasterFlags));
+  ASSERT_EQ(3, tablet_servers_.size());
+  ASSERT_EQ(kOriginalReplicasNum, tablet_replicas_.size());
+
+  const string& tablet_id = tablet_id_;
+  TabletServerMap replica_servers;
+  for (const auto& e : tablet_replicas_) {
+    if (e.first == tablet_id) {
+      replica_servers.emplace(e.second->uuid(), e.second);
+    }
+  }
+  ASSERT_EQ(FLAGS_num_replicas, replica_servers.size());
+
+  TServerDetails* new_replica = nullptr;
+  for (const auto& ts : tablet_servers_) {
+    if (replica_servers.find(ts.first) == replica_servers.end()) {
+      new_replica = ts.second;
+      break;
+    }
+  }
+  ASSERT_NE(nullptr, new_replica);
+  ASSERT_OK(AddReplica(tablet_id, new_replica, RaftPeerPB::NON_VOTER, kTimeout));
+
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableId);
+  workload.Setup();
+  workload.Start();
+  while (workload.rows_inserted() < 100) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+  ASSERT_OK(WaitForServersToAgree(kTimeout, replica_servers, tablet_id, 1));
+
+  // The newly copied replica should be able to start.
+  ASSERT_OK(WaitForNumTabletsOnTS(
+      new_replica, 1, kTimeout, nullptr, tablet::RUNNING));
+
+  // Ensure that nothing crashes and the replicas converge.
+  NO_FATALS(cluster_->AssertNoCrashes());
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+  NO_FATALS(v.CheckRowCount(workload.table_name(),
+                            ClusterVerifier::EXACTLY,
+                            workload.rows_inserted()));
+
+  // Remove the newly added replica.
+  ASSERT_OK(RemoveReplica(tablet_id, new_replica, kTimeout));
+  ASSERT_OK(WaitForServersToAgree(kTimeout, replica_servers, tablet_id, 1));
+
+  // Verify the removed replica gets tombstoned.
+  const int new_replica_idx =
+      cluster_->tablet_server_index_by_uuid(new_replica->uuid());
+  ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(
+      new_replica_idx, tablet_id, { TABLET_DATA_TOMBSTONED }, kTimeout));
+
+  // The added and then removed tablet replica should be gone, and the master
+  // should report approrpiate replica count at this point. The tablet leader
+  // should be established.
+  bool has_leader;
+  master::TabletLocationsPB tablet_locations;
+  ASSERT_OK(WaitForReplicasReportedToMaster(
+      cluster_->master_proxy(), kOriginalReplicasNum, tablet_id, kTimeout,
+      WAIT_FOR_LEADER, &has_leader, &tablet_locations));
+  ASSERT_TRUE(has_leader);
+
+  // Make sure it's possible to insert data into the tablet once the NON_VOTER
+  // replica is gone.
+  const int64_t prev_inserted = workload.rows_inserted();
+  workload.Start();
+  while (workload.rows_inserted() < 2 * prev_inserted) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+
+  // Ensure that nothing crashed and the replicas converge.
+  NO_FATALS(cluster_->AssertNoCrashes());
+  NO_FATALS(v.CheckCluster());
+  NO_FATALS(v.CheckRowCount(workload.table_name(),
+                            ClusterVerifier::EXACTLY,
+                            workload.rows_inserted()));
+}
+
+// Test to ensure that a non-voter replica:
+//  * does not vote
+//  * does not start leader elections
+//  * returns an error on RunLeaderElection() RPC call
+TEST_F(RaftConsensusNonVoterITest, NonVoterReplicasDoNotVote) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(60);
+  const int kOriginalReplicasNum = 2;
+  const int kHbIntervalMs = 64;
+  const int kHbLeaderMissedNum = 1;
+  const vector<string> kMasterFlags = {
+    // Allow replication factor of 2.
+    "--allow_unsafe_replication_factor=true",
+  };
+  const vector<string> kTserverFlags = {
+    Substitute("--raft_heartbeat_interval_ms=$0", kHbIntervalMs),
+    Substitute("--leader_failure_max_missed_heartbeat_periods=$0",
+        kHbLeaderMissedNum),
+  };
+
+  FLAGS_num_tablet_servers = 3;
+  FLAGS_num_replicas = kOriginalReplicasNum;
+  NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+  ASSERT_EQ(3, tablet_servers_.size());
+  ASSERT_EQ(kOriginalReplicasNum, tablet_replicas_.size());
+
+  const string& tablet_id = tablet_id_;
+  TabletServerMap replica_servers;
+  for (const auto& e : tablet_replicas_) {
+    if (e.first == tablet_id) {
+      replica_servers.emplace(e.second->uuid(), e.second);
+    }
+  }
+  ASSERT_EQ(FLAGS_num_replicas, replica_servers.size());
+
+  ASSERT_OK(WaitForServersToAgree(kTimeout, replica_servers, tablet_id, 1));
+
+  TServerDetails* new_replica = nullptr;
+  for (const auto& ts : tablet_servers_) {
+    if (replica_servers.find(ts.first) == replica_servers.end()) {
+      new_replica = ts.second;
+      break;
+    }
+  }
+  ASSERT_NE(nullptr, new_replica);
+
+  ASSERT_OK(AddReplica(tablet_id, new_replica, RaftPeerPB::NON_VOTER, kTimeout));
+
+  // The newly copied replica should be able to start.
+  ASSERT_OK(WaitForNumTabletsOnTS(
+      new_replica, 1, kTimeout, nullptr, tablet::RUNNING));
+
+  // Ensure that nothing crashes: all tservers must be alive for next step
+  // of the scenario.
+  NO_FATALS(cluster_->AssertNoCrashes());
+
+  // Make sure a NON_VOTER replica doesn't vote.
+  {
+    // Pause the current leader and make sure the majority is not achievable.
+    // It would not be the case if the non-voter replica could vote in the
+    // election initiated after the failure of the current leader was detected.
+    TServerDetails* leader;
+    ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
+    ExternalTabletServer* leader_ts =
+        cluster_->tablet_server_by_uuid(leader->uuid());
+
+    int64_t term_leader;
+    ASSERT_OK(GetTermMetricValue(leader_ts, &term_leader));
+
+    ASSERT_OK(leader_ts->Pause());
+    auto cleanup = MakeScopedCleanup([&]() {
+      ASSERT_OK(leader_ts->Resume());
+    });
+    TServerDetails* new_leader;
+    const Status s = GetLeaderReplicaWithRetries(tablet_id, &new_leader, 10);
+    ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+    ASSERT_OK(leader_ts->Resume());
+
+    // The majority should be achievable once the leader replica is resumed.
+    ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &new_leader));
+    NO_FATALS(cluster_->AssertNoCrashes());
+  }
+
+  // Make sure a NON_VOTER replica does not start leader election on start-up.
+  {
+    // Disable failure detection for all replicas.
+    for (int i = 0; i < cluster_->num_tablet_servers(); ++i) {
+      ExternalTabletServer* ts = cluster_->tablet_server(i);
+      ASSERT_OK(cluster_->SetFlag(ts,
+          "enable_leader_failure_detection", "false"));
+    }
+    ExternalTabletServer* new_replica_ts =
+        cluster_->tablet_server_by_uuid(new_replica->uuid());
+
+    // Get the current Raft term for the tablet.
+    int64_t term_before_restart = 0;
+    ASSERT_OK(GetTermMetricValue(new_replica_ts, &term_before_restart));
+
+    new_replica_ts->Shutdown();
+    ASSERT_OK(new_replica_ts->Restart());
+    // Wait for the tablet server to start up.
+    ASSERT_OK(cluster_->WaitForTabletsRunning(new_replica_ts, 1, kTimeout));
+
+    // Once restarted, the tablet server will have the default disposition
+    // for the enable_leader_failure_detection flag, i.e. our new NON_VOTER
+    // replica will have leader failure detection enabled. That said,
+    // the leader election could trigger if the replica was of VOTER type.
+    // However, it's not and no election should be started, and the term
+    // must be the same as before starting this NON_VOTER replica.
+    // So, give a chance for a new election to happen and compare the terms.
+    SleepFor(MonoDelta::FromMilliseconds(
+        3L * kHbLeaderMissedNum * kHbIntervalMs));
+
+    int64_t term_after_restart = 0;
+    ASSERT_OK(GetTermMetricValue(new_replica_ts, &term_after_restart));
+    EXPECT_EQ(term_before_restart, term_after_restart);
+  }
+
+  // Make sure a non-voter replica returns an error on RunLeaderElection()
+  // RPC call.
+  {
+    const Status s = StartElection(new_replica, tablet_id_, kTimeout);
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+  }
+}
+
+}  // namespace tserver
+}  // namespace kudu