You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/10/17 15:23:22 UTC
[5/5] kudu git commit: [consensus] adding/removing NON_VOTER members
[consensus] adding/removing NON_VOTER members
Added ability to add and remove NON_VOTER member replicas.
Updated the kudu CLI accordingly. Also, added new integration test:
* RaftConsensusITest.AddNonVoterReplica
* RaftConsensusITest.AddThenRemoveNonVoterReplica
* RaftConsensusITest.NonVoterReplicasDoNotVote
Change-Id: I2662d45ad9bb6a4bf325d4202c2ee619ffad02b7
Reviewed-on: http://gerrit.cloudera.org:8080/8138
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Mike Percy <mp...@apache.org>
Tested-by: Alexey Serbin <as...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fe23710c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fe23710c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fe23710c
Branch: refs/heads/master
Commit: fe23710c63c90e561b2622f6f67563572d10dbf9
Parents: 8c23c97
Author: Alexey Serbin <as...@cloudera.com>
Authored: Mon Oct 16 16:23:57 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Tue Oct 17 15:20:31 2017 +0000
----------------------------------------------------------------------
src/kudu/consensus/leader_election-test.cc | 5 +-
src/kudu/consensus/leader_election.cc | 22 +-
src/kudu/consensus/leader_election.h | 4 +-
src/kudu/consensus/quorum_util-test.cc | 47 +-
src/kudu/consensus/quorum_util.cc | 10 +-
src/kudu/consensus/quorum_util.h | 4 +
src/kudu/consensus/raft_consensus.cc | 22 +-
src/kudu/integration-tests/CMakeLists.txt | 1 +
.../integration-tests/cluster_itest_util.cc | 4 +-
.../raft_consensus-itest-base.cc | 6 +
.../raft_consensus-itest-base.h | 12 +-
.../integration-tests/raft_consensus-itest.cc | 82 ++-
.../raft_consensus_nonvoter-itest.cc | 534 +++++++++++++++++++
13 files changed, 671 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/leader_election-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election-test.cc b/src/kudu/consensus/leader_election-test.cc
index cec04c3..1dc26d9 100644
--- a/src/kudu/consensus/leader_election-test.cc
+++ b/src/kudu/consensus/leader_election-test.cc
@@ -161,7 +161,8 @@ void LeaderElectionTest::ElectionCallback(const ElectionResult& result) {
void LeaderElectionTest::InitUUIDs(int num_voters) {
voter_uuids_ = GenVoterUUIDs(num_voters);
- candidate_uuid_ = voter_uuids_[num_voters - 1];
+ CHECK(!voter_uuids_.empty());
+ candidate_uuid_ = voter_uuids_.back();
voter_uuids_.pop_back();
}
@@ -170,6 +171,7 @@ void LeaderElectionTest::InitNoOpPeerProxies() {
for (const string& uuid : voter_uuids_) {
RaftPeerPB* peer_pb = config_.add_peers();
peer_pb->set_permanent_uuid(uuid);
+ peer_pb->set_member_type(RaftPeerPB::VOTER);
PeerProxy* proxy = new NoOpTestPeerProxy(pool_.get(), *peer_pb);
InsertOrDie(&proxies_, uuid, proxy);
}
@@ -180,6 +182,7 @@ void LeaderElectionTest::InitDelayableMockedProxies(bool enable_delay) {
for (const string& uuid : voter_uuids_) {
RaftPeerPB* peer_pb = config_.add_peers();
peer_pb->set_permanent_uuid(uuid);
+ peer_pb->set_member_type(RaftPeerPB::VOTER);
auto proxy = new DelayablePeerProxy<MockedPeerProxy>(pool_.get(),
new MockedPeerProxy(pool_.get()));
if (enable_delay) {
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/leader_election.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election.cc b/src/kudu/consensus/leader_election.cc
index 07b8a01..b8cbddc 100644
--- a/src/kudu/consensus/leader_election.cc
+++ b/src/kudu/consensus/leader_election.cc
@@ -161,8 +161,18 @@ LeaderElection::LeaderElection(const RaftConfigPB& config,
decision_callback_(std::move(decision_callback)),
highest_voter_term_(0) {
for (const RaftPeerPB& peer : config.peers()) {
- if (request.candidate_uuid() == peer.permanent_uuid()) continue;
- follower_uuids_.push_back(peer.permanent_uuid());
+ if (request.candidate_uuid() == peer.permanent_uuid()) {
+ DCHECK_EQ(peer.member_type(), RaftPeerPB::VOTER)
+ << Substitute("non-voter member $0 tried to start an election; "
+ "Raft config {$1}",
+ peer.permanent_uuid(),
+ pb_util::SecureShortDebugString(config));
+ continue;
+ }
+ if (peer.member_type() != RaftPeerPB::VOTER) {
+ continue;
+ }
+ other_voter_uuids_.push_back(peer.permanent_uuid());
gscoped_ptr<VoterState> state(new VoterState());
state->proxy_status = proxy_factory->NewProxy(peer, &state->proxy);
@@ -173,10 +183,10 @@ LeaderElection::LeaderElection(const RaftConfigPB& config,
CHECK_EQ(1, vote_counter_->GetTotalVotesCounted()) << "Candidate must vote for itself first";
// Ensure that existing votes + future votes add up to the expected total.
- CHECK_EQ(vote_counter_->GetTotalVotesCounted() + follower_uuids_.size(),
+ CHECK_EQ(vote_counter_->GetTotalVotesCounted() + other_voter_uuids_.size(),
vote_counter_->GetTotalExpectedVotes())
- << "Expected different number of followers. Follower UUIDs: ["
- << JoinStringsIterator(follower_uuids_.begin(), follower_uuids_.end(), ", ")
+ << "Expected different number of voters. Voter UUIDs: ["
+ << JoinStringsIterator(other_voter_uuids_.begin(), other_voter_uuids_.end(), ", ")
<< "]; RaftConfig: {" << pb_util::SecureShortDebugString(config) << "}";
}
@@ -194,7 +204,7 @@ void LeaderElection::Run() {
CheckForDecision();
// The rest of the code below is for a typical multi-node configuration.
- for (const std::string& voter_uuid : follower_uuids_) {
+ for (const std::string& voter_uuid : other_voter_uuids_) {
VoterState* state = nullptr;
{
std::lock_guard<Lock> guard(lock_);
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/leader_election.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/leader_election.h b/src/kudu/consensus/leader_election.h
index 3d3620f..33e4241 100644
--- a/src/kudu/consensus/leader_election.h
+++ b/src/kudu/consensus/leader_election.h
@@ -221,9 +221,9 @@ class LeaderElection : public RefCountedThreadSafe<LeaderElection> {
// Callback invoked to notify the caller of an election decision.
const ElectionDecisionCallback decision_callback_;
- // List of all potential followers to request votes from.
+ // List of all other voters to request votes from.
// The candidate's own UUID must not be included.
- std::vector<std::string> follower_uuids_;
+ std::vector<std::string> other_voter_uuids_;
// Map of UUID -> VoterState.
VoterStateMap voter_state_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/quorum_util-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util-test.cc b/src/kudu/consensus/quorum_util-test.cc
index 87b30cd..9982c74 100644
--- a/src/kudu/consensus/quorum_util-test.cc
+++ b/src/kudu/consensus/quorum_util-test.cc
@@ -30,19 +30,21 @@ namespace consensus {
using std::string;
-static void SetPeerInfo(const string& uuid,
- RaftPeerPB::MemberType type,
- RaftPeerPB* peer) {
+// Add a consensus peer into the specified configuration.
+static void AddPeer(RaftConfigPB* config,
+ const string& uuid,
+ RaftPeerPB::MemberType type) {
+ RaftPeerPB* peer = config->add_peers();
peer->set_permanent_uuid(uuid);
- peer->set_member_type(type);
peer->mutable_last_known_addr()->set_host(uuid + ".example.com");
+ peer->set_member_type(type);
}
TEST(QuorumUtilTest, TestMemberExtraction) {
RaftConfigPB config;
- SetPeerInfo("A", RaftPeerPB::VOTER, config.add_peers());
- SetPeerInfo("B", RaftPeerPB::VOTER, config.add_peers());
- SetPeerInfo("C", RaftPeerPB::VOTER, config.add_peers());
+ AddPeer(&config, "A", RaftPeerPB::VOTER);
+ AddPeer(&config, "B", RaftPeerPB::VOTER);
+ AddPeer(&config, "C", RaftPeerPB::VOTER);
// Basic test for GetRaftConfigMember().
RaftPeerPB peer_pb;
@@ -63,9 +65,9 @@ TEST(QuorumUtilTest, TestMemberExtraction) {
TEST(QuorumUtilTest, TestDiffConsensusStates) {
ConsensusStatePB old_cs;
- SetPeerInfo("A", RaftPeerPB::VOTER, old_cs.mutable_committed_config()->add_peers());
- SetPeerInfo("B", RaftPeerPB::VOTER, old_cs.mutable_committed_config()->add_peers());
- SetPeerInfo("C", RaftPeerPB::VOTER, old_cs.mutable_committed_config()->add_peers());
+ AddPeer(old_cs.mutable_committed_config(), "A", RaftPeerPB::VOTER);
+ AddPeer(old_cs.mutable_committed_config(), "B", RaftPeerPB::VOTER);
+ AddPeer(old_cs.mutable_committed_config(), "C", RaftPeerPB::VOTER);
old_cs.set_current_term(1);
old_cs.set_leader_uuid("A");
old_cs.mutable_committed_config()->set_opid_index(1);
@@ -100,7 +102,7 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
{
auto new_cs = old_cs;
new_cs.mutable_committed_config()->set_opid_index(2);
- SetPeerInfo("D", RaftPeerPB::NON_VOTER, new_cs.mutable_committed_config()->add_peers());
+ AddPeer(new_cs.mutable_committed_config(), "D", RaftPeerPB::NON_VOTER);
EXPECT_EQ("config changed from index 1 to 2, "
"NON_VOTER D (D.example.com) added",
@@ -148,7 +150,7 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
// Simulate a change in a pending config
{
auto before_cs = old_cs;
- SetPeerInfo("A", RaftPeerPB::VOTER, before_cs.mutable_pending_config()->add_peers());
+ AddPeer(before_cs.mutable_pending_config(), "A", RaftPeerPB::VOTER);
auto after_cs = before_cs;
after_cs.mutable_pending_config()
->mutable_peers()->Mutable(0)->set_member_type(RaftPeerPB::NON_VOTER);
@@ -158,5 +160,26 @@ TEST(QuorumUtilTest, TestDiffConsensusStates) {
}
}
+TEST(QuorumUtilTest, TestIsRaftConfigVoter) {
+ RaftConfigPB config;
+ AddPeer(&config, "A", RaftPeerPB::VOTER);
+ AddPeer(&config, "B", RaftPeerPB::NON_VOTER);
+ AddPeer(&config, "C", RaftPeerPB::UNKNOWN_MEMBER_TYPE);
+
+ // The case when membership type is not specified. That sort of configuration
+ // would not pass VerifyRaftConfig(), though. Anyway, that should result
+ // in non-voter since the member_type is initialized with UNKNOWN_MEMBER_TYPE.
+ const string no_member_type_peer_uuid = "D";
+ RaftPeerPB* no_member_type_peer = config.add_peers();
+ no_member_type_peer->set_permanent_uuid(no_member_type_peer_uuid);
+ no_member_type_peer->mutable_last_known_addr()->set_host(
+ no_member_type_peer_uuid + ".example.com");
+
+ ASSERT_TRUE(IsRaftConfigVoter("A", config));
+ ASSERT_FALSE(IsRaftConfigVoter("B", config));
+ ASSERT_FALSE(IsRaftConfigVoter("C", config));
+ ASSERT_FALSE(IsRaftConfigVoter(no_member_type_peer_uuid, config));
+}
+
} // namespace consensus
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/quorum_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util.cc b/src/kudu/consensus/quorum_util.cc
index ca074b4..55c5bb9 100644
--- a/src/kudu/consensus/quorum_util.cc
+++ b/src/kudu/consensus/quorum_util.cc
@@ -60,6 +60,10 @@ bool IsRaftConfigVoter(const std::string& uuid, const RaftConfigPB& config) {
return false;
}
+bool IsVoterRole(RaftPeerPB::Role role) {
+ return role == RaftPeerPB::LEADER || role == RaftPeerPB::FOLLOWER;
+}
+
Status GetRaftConfigMember(const RaftConfigPB& config,
const std::string& uuid,
RaftPeerPB* peer_pb) {
@@ -171,12 +175,6 @@ Status VerifyRaftConfig(const RaftConfigPB& config) {
Substitute("Peer: $0 has no member type set. RaftConfig: $1", peer.permanent_uuid(),
SecureShortDebugString(config)));
}
- if (peer.member_type() == RaftPeerPB::NON_VOTER) {
- return Status::IllegalState(
- Substitute(
- "Peer: $0 is a NON_VOTER, but this isn't supported yet. RaftConfig: $1",
- peer.permanent_uuid(), SecureShortDebugString(config)));
- }
}
return Status::OK();
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/quorum_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/quorum_util.h b/src/kudu/consensus/quorum_util.h
index 822a789..a8d9032 100644
--- a/src/kudu/consensus/quorum_util.h
+++ b/src/kudu/consensus/quorum_util.h
@@ -35,6 +35,10 @@ enum RaftConfigState {
bool IsRaftConfigMember(const std::string& uuid, const RaftConfigPB& config);
bool IsRaftConfigVoter(const std::string& uuid, const RaftConfigPB& config);
+// Whether the specified Raft role is attributed to a peer which can participate
+// in leader elections.
+bool IsVoterRole(RaftPeerPB::Role role);
+
// Get the specified member of the config.
// Returns Status::NotFound if a member with the specified uuid could not be
// found in the config.
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 50b60bf..66a3c81 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -401,10 +401,17 @@ Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Not starting " << mode << " -- already leader";
return Status::OK();
}
+ if (PREDICT_FALSE(!consensus::IsVoterRole(active_role))) {
+ // A non-voter should not start leader elections. The leader failure
+ // detector should be re-enabled once the non-voter replica is promoted
+ // to voter replica.
+ return Status::IllegalState("only voting members can start elections",
+ SecureShortDebugString(cmeta_->ActiveConfig()));
+ }
if (PREDICT_FALSE(active_role == RaftPeerPB::NON_PARTICIPANT)) {
SnoozeFailureDetector();
- return Status::IllegalState("Not starting election: Node is currently "
- "a non-participant in the raft config",
+ return Status::IllegalState("Not starting election: node is currently "
+ "a non-participant in the Raft config",
SecureShortDebugString(cmeta_->ActiveConfig()));
}
LOG_WITH_PREFIX_UNLOCKED(INFO)
@@ -578,8 +585,15 @@ Status RaftConsensus::BecomeReplicaUnlocked(boost::optional<MonoDelta> fd_delta)
<< ToStringUnlocked();
ClearLeaderUnlocked();
- // FD should be running while we are a follower.
- EnableFailureDetector(std::move(fd_delta));
+ if (consensus::IsVoterRole(cmeta_->active_role())) {
+ // A voter should run failure detector, if not a leader.
+ EnableFailureDetector(std::move(fd_delta));
+ } else {
+ // A non-voter should not start leader elections. The leader failure
+ // detector should be re-enabled once the non-voter replica is promoted
+ // to voter replica.
+ DisableFailureDetector();
+ }
// Now that we're a replica, we can allow voting for other nodes.
withhold_votes_until_ = MonoTime::Min();
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index ecdda20..e83b56d 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -88,6 +88,7 @@ ADD_KUDU_TEST(open-readonly-fs-itest)
ADD_KUDU_TEST(raft_config_change-itest)
ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true)
ADD_KUDU_TEST(raft_consensus_election-itest)
+ADD_KUDU_TEST(raft_consensus_nonvoter-itest)
ADD_KUDU_TEST(registration-test RESOURCE_LOCK "master-web-port")
ADD_KUDU_TEST(security-faults-itest)
ADD_KUDU_TEST(security-itest)
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index fa4c1d4..ca4cb75 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -429,7 +429,8 @@ Status WaitForNumTabletServers(
Status WaitForReplicasReportedToMaster(
const shared_ptr<master::MasterServiceProxy>& master_proxy,
- int num_replicas, const string& tablet_id,
+ int num_replicas,
+ const string& tablet_id,
const MonoDelta& timeout,
WaitForLeader wait_for_leader,
bool* has_leader,
@@ -701,7 +702,6 @@ Status AddServer(const TServerDetails* leader,
if (cas_config_opid_index) {
req.set_cas_config_opid_index(*cas_config_opid_index);
}
-
RETURN_NOT_OK(leader->consensus_proxy->ChangeConfig(req, &resp, &rpc));
if (resp.has_error()) {
if (error_code) *error_code = resp.error().code();
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/integration-tests/raft_consensus-itest-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest-base.cc b/src/kudu/integration-tests/raft_consensus-itest-base.cc
index afd8c64..08b2871 100644
--- a/src/kudu/integration-tests/raft_consensus-itest-base.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest-base.cc
@@ -268,6 +268,12 @@ void RaftConsensusITestBase::CauseFollowerToFallBehindLogGC(
*fell_behind_uuid = replica->uuid();
}
+Status RaftConsensusITestBase::GetTermMetricValue(ExternalTabletServer* ts,
+ int64_t *term) {
+ return ts->GetInt64Metric(&METRIC_ENTITY_tablet, nullptr, &METRIC_raft_term,
+ "value", term);
+}
+
} // namespace tserver
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/integration-tests/raft_consensus-itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest-base.h b/src/kudu/integration-tests/raft_consensus-itest-base.h
index 98a379e..18aa82a 100644
--- a/src/kudu/integration-tests/raft_consensus-itest-base.h
+++ b/src/kudu/integration-tests/raft_consensus-itest-base.h
@@ -23,8 +23,14 @@
#include "kudu/integration-tests/ts_itest-base.h"
#include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
namespace kudu {
+
+namespace cluster {
+class ExternalTabletServer;
+}
+
namespace tserver {
class TabletServerServiceProxy;
@@ -45,8 +51,12 @@ class RaftConsensusITestBase : public TabletServerIntegrationTestBase {
uint64_t num_batches,
const std::vector<CountDownLatch*>& latches);
protected:
+ // Retrieve the current term of the first tablet on this tablet server.
+ static Status GetTermMetricValue(cluster::ExternalTabletServer* ts,
+ int64_t* term);
+
// Flags needed for CauseFollowerToFallBehindLogGC() to work well.
- void AddFlagsForLogRolls(std::vector<std::string>* extra_tserver_flags);
+ static void AddFlagsForLogRolls(std::vector<std::string>* extra_tserver_flags);
// Pause one of the followers and write enough data to the remaining replicas
// to cause log GC, then resume the paused follower. On success,
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 4d030c5..d012867 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -21,7 +21,6 @@
#include <ostream>
#include <string>
#include <unordered_map>
-#include <unordered_set>
#include <utility>
#include <vector>
@@ -91,7 +90,6 @@ DECLARE_int32(rpc_timeout);
METRIC_DECLARE_entity(tablet);
METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
-METRIC_DECLARE_gauge_int64(raft_term);
using kudu::client::KuduInsert;
using kudu::client::KuduSession;
@@ -107,24 +105,24 @@ using kudu::consensus::OpId;
using kudu::consensus::RaftPeerPB;
using kudu::consensus::ReplicateMsg;
using kudu::itest::AddServer;
-using kudu::itest::GetReplicaStatusAndCheckIfLeader;
+using kudu::itest::DONT_WAIT_FOR_LEADER;
using kudu::itest::LeaderStepDown;
using kudu::itest::RemoveServer;
using kudu::itest::StartElection;
-using kudu::itest::TabletServerMap;
using kudu::itest::TServerDetails;
+using kudu::itest::TabletServerMap;
+using kudu::itest::WAIT_FOR_LEADER;
+using kudu::itest::WaitForReplicasReportedToMaster;
using kudu::itest::WaitUntilLeader;
using kudu::itest::WriteSimpleTestRow;
using kudu::master::TabletLocationsPB;
-using kudu::pb_util::SecureShortDebugString;
using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::RpcController;
using kudu::server::SetFlagRequestPB;
using kudu::server::SetFlagResponsePB;
-using kudu::tablet::TABLET_DATA_COPYING;
using std::string;
using std::unordered_map;
-using std::unordered_set;
using std::vector;
using strings::Substitute;
@@ -201,9 +199,6 @@ class RaftConsensusITest : public RaftConsensusITestBase {
void SetupSingleReplicaTest(TServerDetails** replica_ts);
protected:
- // Retrieve the current term of the first tablet on this tablet server.
- Status GetTermMetricValue(ExternalTabletServer* ts, int64_t* term);
-
shared_ptr<KuduTable> table_;
vector<scoped_refptr<kudu::Thread> > threads_;
};
@@ -254,13 +249,13 @@ string RaftConsensusITest::DumpToString(TServerDetails* leader,
const vector<string>& leader_results,
TServerDetails* replica,
const vector<string>& replica_results) {
- string ret = strings::Substitute("Replica results did not match the leaders."
- "\nLeader: $0\nReplica: $1. Results size "
- "L: $2 R: $3",
- leader->ToString(),
- replica->ToString(),
- leader_results.size(),
- replica_results.size());
+ string ret = Substitute("Replica results did not match the leaders."
+ "\nLeader: $0\nReplica: $1. Results size "
+ "L: $2 R: $3",
+ leader->ToString(),
+ replica->ToString(),
+ leader_results.size(),
+ replica_results.size());
StrAppend(&ret, "Leader Results: \n");
for (const string& result : leader_results) {
@@ -604,11 +599,6 @@ void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
LOG(INFO) << "================================== Cluster setup complete.";
}
-Status RaftConsensusITest::GetTermMetricValue(ExternalTabletServer* ts,
- int64_t *term) {
- return ts->GetInt64Metric(&METRIC_ENTITY_tablet, nullptr, &METRIC_raft_term, "value", term);
-}
-
// Test that we can retrieve the permanent uuid of a server running
// consensus service via RPC.
TEST_F(RaftConsensusITest, TestGetPermanentUuid) {
@@ -703,7 +693,7 @@ TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) {
int num_threads = FLAGS_num_client_threads;
for (int i = 0; i < num_threads; i++) {
scoped_refptr<kudu::Thread> new_thread;
- CHECK_OK(kudu::Thread::Create("test", strings::Substitute("ts-test$0", i),
+ CHECK_OK(kudu::Thread::Create("test", Substitute("ts-test$0", i),
&RaftConsensusITest::InsertTestRowsRemoteThread,
this, i * FLAGS_client_inserts_per_thread,
FLAGS_client_inserts_per_thread,
@@ -714,7 +704,7 @@ TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) {
}
for (int i = 0; i < FLAGS_num_replicas; i++) {
scoped_refptr<kudu::Thread> new_thread;
- CHECK_OK(kudu::Thread::Create("test", strings::Substitute("chaos-test$0", i),
+ CHECK_OK(kudu::Thread::Create("test", Substitute("chaos-test$0", i),
&RaftConsensusITest::DelayInjectorThread,
this, cluster_->tablet_server(i),
kConsensusRpcTimeoutForTests,
@@ -939,10 +929,10 @@ TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
OverrideFlagForSlowTests(
"client_inserts_per_thread",
- strings::Substitute("$0", (FLAGS_client_inserts_per_thread * 100)));
+ Substitute("$0", (FLAGS_client_inserts_per_thread * 100)));
OverrideFlagForSlowTests(
"client_num_batches_per_thread",
- strings::Substitute("$0", (FLAGS_client_num_batches_per_thread * 100)));
+ Substitute("$0", (FLAGS_client_num_batches_per_thread * 100)));
int num_threads = FLAGS_num_client_threads;
int64_t total_num_rows = num_threads * FLAGS_client_inserts_per_thread;
@@ -956,7 +946,7 @@ TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
for (int i = 0; i < num_threads; i++) {
scoped_refptr<kudu::Thread> new_thread;
- CHECK_OK(kudu::Thread::Create("test", strings::Substitute("ts-test$0", i),
+ CHECK_OK(kudu::Thread::Create("test", Substitute("ts-test$0", i),
&RaftConsensusITest::InsertTestRowsRemoteThread,
this, i * FLAGS_client_inserts_per_thread,
FLAGS_client_inserts_per_thread,
@@ -1003,7 +993,7 @@ TEST_F(RaftConsensusITest, TestKUDU_597) {
AtomicBool finish(false);
for (int i = 0; i < FLAGS_num_tablet_servers; i++) {
scoped_refptr<kudu::Thread> new_thread;
- CHECK_OK(kudu::Thread::Create("test", strings::Substitute("ts-test$0", i),
+ CHECK_OK(kudu::Thread::Create("test", Substitute("ts-test$0", i),
&RaftConsensusITest::StubbornlyWriteSameRowThread,
this, i, &finish, &new_thread));
threads_.push_back(new_thread);
@@ -1459,10 +1449,8 @@ TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) {
// Now try to replicate a ChangeConfig operation. This should get stuck and time out
// because the server can't replicate any operations.
- TabletServerErrorPB::Code error_code;
Status s = RemoveServer(leader_tserver, tablet_id_, tservers[1],
- -1, MonoDelta::FromSeconds(1),
- &error_code);
+ -1, MonoDelta::FromSeconds(1));
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
// Pause the leader, and restart the other servers.
@@ -1490,8 +1478,7 @@ TEST_F(RaftConsensusITest, TestReplaceChangeConfigOperation) {
// This acts as a regression test for KUDU-1338, in which aborting the original
// config change didn't properly unset the 'pending' configuration.
ASSERT_OK(RemoveServer(leader_tserver, tablet_id_, tservers[2],
- -1, MonoDelta::FromSeconds(5),
- &error_code));
+ -1, MonoDelta::FromSeconds(5)));
NO_FATALS(InsertTestRowsRemoteThread(10, 10, 1, vector<CountDownLatch*>()));
}
@@ -1713,9 +1700,9 @@ TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) {
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 2, tablet_id, timeout, itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 2, tablet_id, timeout, WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
@@ -1733,10 +1720,10 @@ TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) {
// Wait for the master to be notified of the config change.
// It should continue to have the same leader, even without waiting.
LOG(INFO) << "Waiting for Master to see config change...";
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id, timeout,
- itest::DONT_WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id, timeout,
+ DONT_WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
@@ -1749,10 +1736,10 @@ TEST_F(RaftConsensusITest, TestMasterNotifiedOnConfigChange) {
// Wait for the master to be notified of the removal.
LOG(INFO) << "Waiting for Master to see config change...";
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 2, tablet_id, timeout,
- itest::DONT_WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 2, tablet_id, timeout,
+ DONT_WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
}
@@ -2326,9 +2313,9 @@ TEST_F(RaftConsensusITest, TestChangeConfigRejectedUnlessNoopReplicated) {
// Now attempt to do a config change. It should be rejected because there
// have not been any ops (notably the initial NO_OP) from the leader's term
// that have been committed yet.
- Status s = itest::RemoveServer(leader_ts, tablet_id_,
- tablet_servers_[cluster_->tablet_server(1)->uuid()],
- boost::none, timeout);
+ Status s = RemoveServer(leader_ts, tablet_id_,
+ tablet_servers_[cluster_->tablet_server(1)->uuid()],
+ boost::none, timeout);
ASSERT_TRUE(!s.ok()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "Leader has not yet committed an operation in its own term");
}
@@ -2587,4 +2574,3 @@ TEST_F(RaftConsensusITest, TestLogIOErrorIsFatal) {
} // namespace tserver
} // namespace kudu
-
http://git-wip-us.apache.org/repos/asf/kudu/blob/fe23710c/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
new file mode 100644
index 0000000..fa32bb9
--- /dev/null
+++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
@@ -0,0 +1,534 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
+#include <gtest/gtest.h>
+
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
+#include "kudu/integration-tests/raft_consensus-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_int32(num_replicas);
+DECLARE_int32(num_tablet_servers);
+
+METRIC_DECLARE_gauge_int32(tablet_copy_open_client_sessions);
+METRIC_DECLARE_gauge_int32(tablet_copy_open_source_sessions);
+
+using kudu::cluster::ExternalDaemon;
+using kudu::cluster::ExternalTabletServer;
+using kudu::consensus::RaftPeerPB;
+using kudu::itest::AddServer;
+using kudu::itest::LeaderStepDown;
+using kudu::itest::RemoveServer;
+using kudu::itest::StartElection;
+using kudu::itest::TServerDetails;
+using kudu::itest::TabletServerMap;
+using kudu::itest::WAIT_FOR_LEADER;
+using kudu::itest::WaitForReplicasReportedToMaster;
+using kudu::master::TabletLocationsPB;
+using kudu::tablet::TABLET_DATA_COPYING;
+using kudu::tablet::TABLET_DATA_TOMBSTONED;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tserver {
+
+// Integration test for the raft consensus implementation.
+// Uses the whole tablet server stack with ExternalMiniCluster.
+class RaftConsensusNonVoterITest : public RaftConsensusITestBase {
+ public:
+ RaftConsensusNonVoterITest() = default;
+
+ protected:
+ // Get number of source tablet copy sessions at the specified server.
+ Status GetTabletCopySourceSessionsCount(const ExternalDaemon& server,
+ int64_t* count);
+
+ // Get number of target/client tablet copy sessions at the specified server.
+ Status GetTabletCopyTargetSessionsCount(const ExternalDaemon& server,
+ int64_t* count);
+
+ // Add replica of the specified type for the specified tablet.
+ Status AddReplica(const string& tablet_id,
+ const TServerDetails* replica,
+ RaftPeerPB::MemberType replica_type,
+ const MonoDelta& timeout);
+
+ // Remove replica of the specified tablet.
+ Status RemoveReplica(const string& tablet_id,
+ const TServerDetails* replica,
+ const MonoDelta& timeout);
+};
+
+Status RaftConsensusNonVoterITest::GetTabletCopySourceSessionsCount(
+ const ExternalDaemon& server, int64_t* count) {
+ return server.GetInt64Metric(
+ &METRIC_ENTITY_server, "kudu.tabletserver",
+ &METRIC_tablet_copy_open_source_sessions, "value", count);
+}
+
+Status RaftConsensusNonVoterITest::GetTabletCopyTargetSessionsCount(
+ const ExternalDaemon& server, int64_t* count) {
+ return server.GetInt64Metric(
+ &METRIC_ENTITY_server, "kudu.tabletserver",
+ &METRIC_tablet_copy_open_client_sessions, "value", count);
+}
+
+Status RaftConsensusNonVoterITest::AddReplica(const string& tablet_id,
+ const TServerDetails* replica,
+ RaftPeerPB::MemberType replica_type,
+ const MonoDelta& timeout) {
+ TServerDetails* leader = nullptr;
+ RETURN_NOT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
+
+ // Wait for at least one operation committed by the leader in current term.
+ // Otherwise, any Raft configuration change attempt might end up with error
+ // 'Illegal state: Leader has not yet committed an operation in its own term'.
+ RETURN_NOT_OK(WaitForOpFromCurrentTerm(leader, tablet_id,
+ consensus::COMMITTED_OPID, timeout));
+ return AddServer(leader, tablet_id, replica, replica_type,
+ boost::none, timeout);
+}
+
+Status RaftConsensusNonVoterITest::RemoveReplica(const string& tablet_id,
+ const TServerDetails* replica,
+ const MonoDelta& timeout) {
+ TServerDetails* leader = nullptr;
+ RETURN_NOT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
+
+ // Wait for at least one operation committed by the leader in current term.
+ // Otherwise, any Raft configuration change attempt might end up with error
+ // 'Illegal state: Leader has not yet committed an operation in its own term'.
+ RETURN_NOT_OK(WaitForOpFromCurrentTerm(leader, tablet_id,
+ consensus::COMMITTED_OPID, timeout));
+ return RemoveServer(leader, tablet_id, replica, boost::none, timeout);
+}
+
+// Ensure that adding a NON_VOTER replica is properly handled by the system:
+//
+// * Updating Raft configuration for tablet by adding a NON_VOTER replica
+// succeeds, no errors reported.
+//
+// * After adding a replica, the system should start tablet copying
+// to the newly added replica: both the source and the target copy sessions
+// should be active for some time.
+//
+// * By the time the newly added replica changes its state to RUNNING,
+// the tablet copy session should end at both sides.
+//
+// * Tablet leader reports about the newly added replica to the master.
+//
+// * If the leader steps down, a new one can be elected and it's possible
+// to insert data into the table which contains the tablet.
+//
+// * The tablet stays consistent: ksck verification reports no error,
+// replicated operation indices match across all replicas,
+// tablet row count matches the expected number.
+//
+TEST_F(RaftConsensusNonVoterITest, AddNonVoterReplica) {
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(60);
+ const vector<string> kMasterFlags = {
+ // Allow replication factor of 2.
+ "--allow_unsafe_replication_factor=true",
+ };
+ const vector<string> kTserverFlags = {
+ // Slow down tablet copy to observe active source and target sessions.
+ "--tablet_copy_download_file_inject_latency_ms=1000",
+ };
+ const int kOriginalReplicasNum = 2;
+
+ FLAGS_num_tablet_servers = 3;
+ FLAGS_num_replicas = kOriginalReplicasNum;
+ NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+ ASSERT_EQ(3, tablet_servers_.size());
+ ASSERT_EQ(kOriginalReplicasNum, tablet_replicas_.size());
+
+ const string& tablet_id = tablet_id_;
+ TabletServerMap replica_servers;
+ for (const auto& e : tablet_replicas_) {
+ if (e.first == tablet_id) {
+ replica_servers.emplace(e.second->uuid(), e.second);
+ }
+ }
+ ASSERT_EQ(FLAGS_num_replicas, replica_servers.size());
+
+ // Create a test table and insert some data into the table,
+ // so the special flag --tablet_copy_download_file_inject_latency_ms
+ // could take affect while tablet copy happens down the road.
+ TestWorkload workload(cluster_.get());
+ workload.set_table_name(kTableId);
+ workload.Setup();
+ workload.Start();
+ while (workload.rows_inserted() < 100) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ workload.StopAndJoin();
+
+ ASSERT_OK(WaitForServersToAgree(kTimeout, replica_servers, tablet_id, 1));
+
+ TServerDetails* new_replica = nullptr;
+ for (const auto& ts : tablet_servers_) {
+ if (replica_servers.find(ts.first) == replica_servers.end()) {
+ new_replica = ts.second;
+ break;
+ }
+ }
+ ASSERT_NE(nullptr, new_replica);
+
+ ASSERT_OK(AddReplica(tablet_id, new_replica, RaftPeerPB::NON_VOTER, kTimeout));
+
+ const int new_replica_idx =
+ cluster_->tablet_server_index_by_uuid(new_replica->uuid());
+ // Wait for the tablet copying to start.
+ ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(
+ new_replica_idx, tablet_id, { TABLET_DATA_COPYING }, kTimeout));
+
+ TServerDetails* leader = nullptr;
+ ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
+ const ExternalDaemon& ed_leader =
+ *cluster_->tablet_server_by_uuid(leader->uuid());
+ const ExternalDaemon& ed_new_replica =
+ *cluster_->tablet_server_by_uuid(new_replica->uuid());
+ {
+ int64_t num_sessions;
+ ASSERT_OK(GetTabletCopySourceSessionsCount(ed_leader, &num_sessions));
+ EXPECT_EQ(1, num_sessions);
+ }
+ {
+ int64_t num_sessions;
+ ASSERT_OK(GetTabletCopyTargetSessionsCount(ed_new_replica, &num_sessions));
+ EXPECT_EQ(1, num_sessions);
+ }
+
+ // The newly copied replica should be able to start.
+ ASSERT_OK(WaitForNumTabletsOnTS(
+ new_replica, 1, kTimeout, nullptr, tablet::RUNNING));
+
+ // The tablet copying should complete shortly after tablet state becomes
+ // RUNNING. Sampling the counters right after seeing RUNNING tablet status
+ // is a little racy: it takes some time to end tablet copy sessions at both
+ // sides. So, using ASSERT_EVENTUALLY here to avoid flakiness.
+ ASSERT_EVENTUALLY([&]() {
+ int64_t num_sessions;
+ ASSERT_OK(GetTabletCopySourceSessionsCount(ed_leader, &num_sessions));
+ EXPECT_EQ(0, num_sessions);
+ ASSERT_OK(GetTabletCopyTargetSessionsCount(ed_new_replica, &num_sessions));
+ EXPECT_EQ(0, num_sessions);
+ });
+
+ // The master should report about the newly added NON_VOTER tablet replica
+ // to the established leader.
+ bool has_leader;
+ master::TabletLocationsPB tablet_locations;
+ ASSERT_OK(WaitForReplicasReportedToMaster(
+ cluster_->master_proxy(), kOriginalReplicasNum + 1, tablet_id, kTimeout,
+ WAIT_FOR_LEADER, &has_leader, &tablet_locations));
+ ASSERT_TRUE(has_leader);
+
+ // Check the update cluster is able to elect a leader.
+ {
+ TServerDetails* leader = nullptr;
+ ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
+ ASSERT_OK(LeaderStepDown(leader, tablet_id, kTimeout));
+ }
+
+ // Make sure it's possible to insert more data into the table once it's backed
+ // by one more (NON_VOTER) replica.
+ const int64_t prev_inserted = workload.rows_inserted();
+ workload.Start();
+ while (workload.rows_inserted() < 2 * prev_inserted) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ workload.StopAndJoin();
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+ // Ensure that the replicas converge. Along with other verification steps,
+ // ClusterVerifier employs VerifyCommittedOpIdsMatch() to verify that
+ // all OpIds match in local files under all tablet servers of the cluster,
+ // so NON_VOTER replicas are covered by this check as well.
+ ClusterVerifier v(cluster_.get());
+ NO_FATALS(v.CheckCluster());
+ NO_FATALS(v.CheckRowCount(workload.table_name(),
+ ClusterVerifier::EXACTLY,
+ workload.rows_inserted()));
+}
+
+// Test how the system reacts on removing a NON_VOTER replica from
+// tablet cluster:
+//
+// * First, add a NON_VOTER member into the cluster (covered by other test).
+//
+// * Make sure that changing Raft configuration by removing a NON_VOTER
+// replica does not return errors.
+//
+// * After removing such a non-voter replica, the system should not try
+// to add a new replica instead of the removed one.
+//
+// * Tablet leader is established and it reports about the removed replica
+// to the master.
+//
+// * The updated tablet is still available: it's possible to insert data
+// into the table which is hosted by the tablet.
+//
+// * The tablet stays consistent: ksck verification reports no error,
+// replicated operation indices match across all remaining replicas,
+// tablet row count matches the expected number.
+//
+TEST_F(RaftConsensusNonVoterITest, AddThenRemoveNonVoterReplica) {
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(60);
+ const vector<string> kMasterFlags = {
+ // Allow replication factor of 2.
+ "--allow_unsafe_replication_factor=true",
+ };
+ const int kOriginalReplicasNum = 2;
+
+ FLAGS_num_tablet_servers = 3;
+ FLAGS_num_replicas = kOriginalReplicasNum;
+ NO_FATALS(BuildAndStart({}, kMasterFlags));
+ ASSERT_EQ(3, tablet_servers_.size());
+ ASSERT_EQ(kOriginalReplicasNum, tablet_replicas_.size());
+
+ const string& tablet_id = tablet_id_;
+ TabletServerMap replica_servers;
+ for (const auto& e : tablet_replicas_) {
+ if (e.first == tablet_id) {
+ replica_servers.emplace(e.second->uuid(), e.second);
+ }
+ }
+ ASSERT_EQ(FLAGS_num_replicas, replica_servers.size());
+
+ TServerDetails* new_replica = nullptr;
+ for (const auto& ts : tablet_servers_) {
+ if (replica_servers.find(ts.first) == replica_servers.end()) {
+ new_replica = ts.second;
+ break;
+ }
+ }
+ ASSERT_NE(nullptr, new_replica);
+ ASSERT_OK(AddReplica(tablet_id, new_replica, RaftPeerPB::NON_VOTER, kTimeout));
+
+ TestWorkload workload(cluster_.get());
+ workload.set_table_name(kTableId);
+ workload.Setup();
+ workload.Start();
+ while (workload.rows_inserted() < 100) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ workload.StopAndJoin();
+ ASSERT_OK(WaitForServersToAgree(kTimeout, replica_servers, tablet_id, 1));
+
+ // The newly copied replica should be able to start.
+ ASSERT_OK(WaitForNumTabletsOnTS(
+ new_replica, 1, kTimeout, nullptr, tablet::RUNNING));
+
+ // Ensure that nothing crashes and the replicas converge.
+ NO_FATALS(cluster_->AssertNoCrashes());
+ ClusterVerifier v(cluster_.get());
+ NO_FATALS(v.CheckCluster());
+ NO_FATALS(v.CheckRowCount(workload.table_name(),
+ ClusterVerifier::EXACTLY,
+ workload.rows_inserted()));
+
+ // Remove the newly added replica.
+ ASSERT_OK(RemoveReplica(tablet_id, new_replica, kTimeout));
+ ASSERT_OK(WaitForServersToAgree(kTimeout, replica_servers, tablet_id, 1));
+
+ // Verify the removed replica gets tombstoned.
+ const int new_replica_idx =
+ cluster_->tablet_server_index_by_uuid(new_replica->uuid());
+ ASSERT_OK(inspect_->WaitForTabletDataStateOnTS(
+ new_replica_idx, tablet_id, { TABLET_DATA_TOMBSTONED }, kTimeout));
+
+ // The added and then removed tablet replica should be gone, and the master
+ // should report approrpiate replica count at this point. The tablet leader
+ // should be established.
+ bool has_leader;
+ master::TabletLocationsPB tablet_locations;
+ ASSERT_OK(WaitForReplicasReportedToMaster(
+ cluster_->master_proxy(), kOriginalReplicasNum, tablet_id, kTimeout,
+ WAIT_FOR_LEADER, &has_leader, &tablet_locations));
+ ASSERT_TRUE(has_leader);
+
+ // Make sure it's possible to insert data into the tablet once the NON_VOTER
+ // replica is gone.
+ const int64_t prev_inserted = workload.rows_inserted();
+ workload.Start();
+ while (workload.rows_inserted() < 2 * prev_inserted) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ workload.StopAndJoin();
+
+ // Ensure that nothing crashed and the replicas converge.
+ NO_FATALS(cluster_->AssertNoCrashes());
+ NO_FATALS(v.CheckCluster());
+ NO_FATALS(v.CheckRowCount(workload.table_name(),
+ ClusterVerifier::EXACTLY,
+ workload.rows_inserted()));
+}
+
+// Test to ensure that a non-voter replica:
+// * does not vote
+// * does not start leader elections
+// * returns an error on RunLeaderElection() RPC call
+TEST_F(RaftConsensusNonVoterITest, NonVoterReplicasDoNotVote) {
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(60);
+ const int kOriginalReplicasNum = 2;
+ const int kHbIntervalMs = 64;
+ const int kHbLeaderMissedNum = 1;
+ const vector<string> kMasterFlags = {
+ // Allow replication factor of 2.
+ "--allow_unsafe_replication_factor=true",
+ };
+ const vector<string> kTserverFlags = {
+ Substitute("--raft_heartbeat_interval_ms=$0", kHbIntervalMs),
+ Substitute("--leader_failure_max_missed_heartbeat_periods=$0",
+ kHbLeaderMissedNum),
+ };
+
+ FLAGS_num_tablet_servers = 3;
+ FLAGS_num_replicas = kOriginalReplicasNum;
+ NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+ ASSERT_EQ(3, tablet_servers_.size());
+ ASSERT_EQ(kOriginalReplicasNum, tablet_replicas_.size());
+
+ const string& tablet_id = tablet_id_;
+ TabletServerMap replica_servers;
+ for (const auto& e : tablet_replicas_) {
+ if (e.first == tablet_id) {
+ replica_servers.emplace(e.second->uuid(), e.second);
+ }
+ }
+ ASSERT_EQ(FLAGS_num_replicas, replica_servers.size());
+
+ ASSERT_OK(WaitForServersToAgree(kTimeout, replica_servers, tablet_id, 1));
+
+ TServerDetails* new_replica = nullptr;
+ for (const auto& ts : tablet_servers_) {
+ if (replica_servers.find(ts.first) == replica_servers.end()) {
+ new_replica = ts.second;
+ break;
+ }
+ }
+ ASSERT_NE(nullptr, new_replica);
+
+ ASSERT_OK(AddReplica(tablet_id, new_replica, RaftPeerPB::NON_VOTER, kTimeout));
+
+ // The newly copied replica should be able to start.
+ ASSERT_OK(WaitForNumTabletsOnTS(
+ new_replica, 1, kTimeout, nullptr, tablet::RUNNING));
+
+ // Ensure that nothing crashes: all tservers must be alive for next step
+ // of the scenario.
+ NO_FATALS(cluster_->AssertNoCrashes());
+
+ // Make sure a NON_VOTER replica doesn't vote.
+ {
+ // Pause the current leader and make sure the majority is not achievable.
+ // It would not be the case if the non-voter replica could vote in the
+ // election initiated after the failure of the current leader was detected.
+ TServerDetails* leader;
+ ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
+ ExternalTabletServer* leader_ts =
+ cluster_->tablet_server_by_uuid(leader->uuid());
+
+ int64_t term_leader;
+ ASSERT_OK(GetTermMetricValue(leader_ts, &term_leader));
+
+ ASSERT_OK(leader_ts->Pause());
+ auto cleanup = MakeScopedCleanup([&]() {
+ ASSERT_OK(leader_ts->Resume());
+ });
+ TServerDetails* new_leader;
+ const Status s = GetLeaderReplicaWithRetries(tablet_id, &new_leader, 10);
+ ASSERT_TRUE(s.IsNotFound()) << s.ToString();
+ ASSERT_OK(leader_ts->Resume());
+
+ // The majority should be achievable once the leader replica is resumed.
+ ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &new_leader));
+ NO_FATALS(cluster_->AssertNoCrashes());
+ }
+
+ // Make sure a NON_VOTER replica does not start leader election on start-up.
+ {
+ // Disable failure detection for all replicas.
+ for (int i = 0; i < cluster_->num_tablet_servers(); ++i) {
+ ExternalTabletServer* ts = cluster_->tablet_server(i);
+ ASSERT_OK(cluster_->SetFlag(ts,
+ "enable_leader_failure_detection", "false"));
+ }
+ ExternalTabletServer* new_replica_ts =
+ cluster_->tablet_server_by_uuid(new_replica->uuid());
+
+ // Get the current Raft term for the tablet.
+ int64_t term_before_restart = 0;
+ ASSERT_OK(GetTermMetricValue(new_replica_ts, &term_before_restart));
+
+ new_replica_ts->Shutdown();
+ ASSERT_OK(new_replica_ts->Restart());
+ // Wait for the tablet server to start up.
+ ASSERT_OK(cluster_->WaitForTabletsRunning(new_replica_ts, 1, kTimeout));
+
+ // Once restarted, the tablet server will have the default disposition
+ // for the enable_leader_failure_detection flag, i.e. our new NON_VOTER
+ // replica will have leader failure detection enabled. That said,
+ // the leader election could trigger if the replica was of VOTER type.
+ // However, it's not and no election should be started, and the term
+ // must be the same as before starting this NON_VOTER replica.
+ // So, give a chance for a new election to happen and compare the terms.
+ SleepFor(MonoDelta::FromMilliseconds(
+ 3L * kHbLeaderMissedNum * kHbIntervalMs));
+
+ int64_t term_after_restart = 0;
+ ASSERT_OK(GetTermMetricValue(new_replica_ts, &term_after_restart));
+ EXPECT_EQ(term_before_restart, term_after_restart);
+ }
+
+ // Make sure a non-voter replica returns an error on RunLeaderElection()
+ // RPC call.
+ {
+ const Status s = StartElection(new_replica, tablet_id_, kTimeout);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ }
+}
+
+} // namespace tserver
+} // namespace kudu