You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2018/01/08 19:38:47 UTC
[3/3] kudu git commit: consensus: Fix NON_VOTER ack-counting bug
consensus: Fix NON_VOTER ack-counting bug
This patch fixes an issue where we were not differentiating between
replicating to voters and non-voters.
This enables the test written by Alexey and also makes some changes to
it. The test fails without this patch and passes with the patch.
Tests added:
* Added a unit test in consensus_queue-test
* Updated and enabled the system test in raft_consensus_nonvoter-itest
Change-Id: I13143e9bb4b76af3fd6dada28fcec05b27d24476
Reviewed-on: http://gerrit.cloudera.org:8080/8868
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1277f69a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1277f69a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1277f69a
Branch: refs/heads/master
Commit: 1277f69a1feb3715750552991bc19444282f652e
Parents: 710b238
Author: Mike Percy <mp...@apache.org>
Authored: Mon Dec 18 15:13:16 2017 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Mon Jan 8 19:38:17 2018 +0000
----------------------------------------------------------------------
src/kudu/consensus/consensus-test-util.h | 13 ++-
src/kudu/consensus/consensus_peers-test.cc | 1 +
src/kudu/consensus/consensus_peers.cc | 2 +-
src/kudu/consensus/consensus_queue-test.cc | 91 ++++++++++++++--
src/kudu/consensus/consensus_queue.cc | 105 +++++++++++++------
src/kudu/consensus/consensus_queue.h | 34 ++++--
src/kudu/consensus/raft_consensus.cc | 8 +-
.../raft_consensus_nonvoter-itest.cc | 63 ++++++-----
8 files changed, 231 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 409841b..9cb0473 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -80,6 +80,7 @@ inline gscoped_ptr<ReplicateMsg> CreateDummyReplicate(int64_t term,
inline RaftPeerPB FakeRaftPeerPB(const std::string& uuid) {
RaftPeerPB peer_pb;
peer_pb.set_permanent_uuid(uuid);
+ peer_pb.set_member_type(RaftPeerPB::VOTER);
peer_pb.mutable_last_known_addr()->set_host(strings::Substitute(
"$0-fake-hostname", CURRENT_TEST_NAME()));
peer_pb.mutable_last_known_addr()->set_port(0);
@@ -107,9 +108,9 @@ inline void AppendReplicateMessagesToQueue(
}
// Builds a configuration of 'num' voters.
-inline RaftConfigPB BuildRaftConfigPBForTests(int num) {
+inline RaftConfigPB BuildRaftConfigPBForTests(int num_voters, int num_non_voters = 0) {
RaftConfigPB raft_config;
- for (int i = 0; i < num; i++) {
+ for (int i = 0; i < num_voters; i++) {
RaftPeerPB* peer_pb = raft_config.add_peers();
peer_pb->set_member_type(RaftPeerPB::VOTER);
peer_pb->set_permanent_uuid(strings::Substitute("peer-$0", i));
@@ -117,6 +118,14 @@ inline RaftConfigPB BuildRaftConfigPBForTests(int num) {
hp->set_host(strings::Substitute("peer-$0.fake-domain-for-tests", i));
hp->set_port(0);
}
+ for (int i = 0; i < num_non_voters; i++) {
+ RaftPeerPB* peer_pb = raft_config.add_peers();
+ peer_pb->set_member_type(RaftPeerPB::NON_VOTER);
+ peer_pb->set_permanent_uuid(strings::Substitute("non-voter-peer-$0", i));
+ HostPortPB* hp = peer_pb->mutable_last_known_addr();
+ hp->set_host(strings::Substitute("non-voter-peer-$0.fake-domain-for-tests", i));
+ hp->set_port(0);
+ }
return raft_config;
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_peers-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index 7f4eb57..f10cb34 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -126,6 +126,7 @@ class ConsensusPeersTest : public KuduTest {
shared_ptr<Peer>* peer) {
RaftPeerPB peer_pb;
peer_pb.set_permanent_uuid(peer_name);
+ peer_pb.set_member_type(RaftPeerPB::VOTER);
auto proxy_ptr = new DelayablePeerProxy<NoOpTestPeerProxy>(
raft_pool_.get(), new NoOpTestPeerProxy(raft_pool_.get(), peer_pb));
gscoped_ptr<PeerProxy> proxy(proxy_ptr);
http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index ffa373d..6d68528 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -142,7 +142,7 @@ Peer::Peer(RaftPeerPB peer_pb,
Status Peer::Init() {
{
std::lock_guard<simple_spinlock> l(peer_lock_);
- queue_->TrackPeer(peer_pb_.permanent_uuid());
+ queue_->TrackPeer(peer_pb_);
}
// Capture a weak_ptr reference into the functor so it can safely handle
http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 62a2345..c8dc086 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -126,6 +126,14 @@ class ConsensusQueueTest : public KuduTest {
payload_size).release()));
}
+ RaftPeerPB MakePeer(const std::string& peer_uuid,
+ RaftPeerPB::MemberType member_type) {
+ RaftPeerPB peer_pb;
+ *peer_pb.mutable_permanent_uuid() = peer_uuid;
+ peer_pb.set_member_type(member_type);
+ return peer_pb;
+ }
+
// Updates the peer's watermark in the queue so that it matches
// the operation we want, since the queue always assumes that
// when a peer gets tracked it's always tracked starting at the
@@ -137,7 +145,7 @@ class ConsensusQueueTest : public KuduTest {
int last_committed_idx,
bool* more_pending) {
- queue_->TrackPeer(kPeerUuid);
+ queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
response->set_responder_uuid(kPeerUuid);
// Ask for a request. The queue assumes the peer is up-to-date so
@@ -406,10 +414,10 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(5));
// Track 4 additional peers (in addition to the local peer)
- queue_->TrackPeer("peer-1");
- queue_->TrackPeer("peer-2");
- queue_->TrackPeer("peer-3");
- queue_->TrackPeer("peer-4");
+ queue_->TrackPeer(MakePeer("peer-1", RaftPeerPB::VOTER));
+ queue_->TrackPeer(MakePeer("peer-2", RaftPeerPB::VOTER));
+ queue_->TrackPeer(MakePeer("peer-3", RaftPeerPB::VOTER));
+ queue_->TrackPeer(MakePeer("peer-4", RaftPeerPB::VOTER));
// Append 10 messages to the queue.
// This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue.
@@ -479,6 +487,68 @@ TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
ASSERT_EQ(queue_->GetAllReplicatedIndex(), 5);
}
+// Ensure that the acks for a non-voter don't count toward the majority.
+TEST_F(ConsensusQueueTest, TestNonVoterAcksDontCountTowardMajority) {
+ const auto kOtherVoterPeer = "peer-1";
+ const auto kNonVoterPeer = "non-voter-peer-0";
+
+ // 1. Add a non-voter to the config where there are 2 voters.
+ queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm,
+ BuildRaftConfigPBForTests(/*num_voters=*/ 2,
+ /*num_non_voters=*/ 1));
+ // Track 2 additional peers (in addition to the local peer)
+ queue_->TrackPeer(MakePeer(kOtherVoterPeer, RaftPeerPB::VOTER));
+ queue_->TrackPeer(MakePeer(kNonVoterPeer, RaftPeerPB::NON_VOTER));
+
+ // 2. Add some writes. Only the local leader immediately acks them, which is
+ // not enough to commit in a 2-voter + 1 non-voter config.
+ //
+ // Append 10 messages to the queue.
+ // This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue.
+ const int kNumMessages = 10;
+ AppendReplicateMessagesToQueue(queue_.get(), clock_,
+ /*first=*/ 1, /*count=*/ kNumMessages);
+ WaitForLocalPeerToAckIndex(kNumMessages);
+
+ // Since only the local log has acked at this point, the committed_index
+ // should be 0.
+ const int64_t kNoneCommittedIndex = 0;
+ ASSERT_EQ(kNoneCommittedIndex, queue_->GetCommittedIndex());
+
+ // 3. Ack the operations from the NON_VOTER peer. The writes will not have
+ // been committed yet, because the 2nd VOTER has not yet acked them.
+ ConsensusResponsePB response;
+ response.set_responder_uuid(kNonVoterPeer);
+ const int64_t kCurrentTerm = 1;
+ response.set_responder_term(kCurrentTerm);
+ SetLastReceivedAndLastCommitted(&response,
+ /*last_received=*/ MakeOpId(kCurrentTerm, kNumMessages),
+ /*last_committed_idx=*/ kNoneCommittedIndex);
+
+ bool more_pending;
+ queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+ ASSERT_FALSE(more_pending);
+
+ // Committed index should be the same.
+ ASSERT_EQ(kNoneCommittedIndex, queue_->GetCommittedIndex());
+
+ // 4. Send an identical ack from the 2nd VOTER peer. This should cause the
+ // operation to be committed.
+ response.set_responder_uuid(kOtherVoterPeer);
+ queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+ ASSERT_TRUE(more_pending); // The committed index has increased.
+
+ // The committed index should include the full set of ops now.
+ ASSERT_EQ(kNumMessages, queue_->GetCommittedIndex());
+
+ SetLastReceivedAndLastCommitted(&response,
+ /*last_received=*/ MakeOpId(kCurrentTerm, kNumMessages),
+ /*last_committed_idx=*/ kNumMessages);
+
+ queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+ ASSERT_FALSE(more_pending);
+}
+
// In this test we append a sequence of operations to a log
// and then start tracking a peer whose first required operation
// is before the first operation in the queue.
@@ -588,7 +658,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
response.set_responder_uuid(kPeerUuid);
bool more_pending = false;
- queue_->TrackPeer(kPeerUuid);
+ queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
// Ask for a request. The queue assumes the peer is up-to-date so
// this should contain no operations.
@@ -653,7 +723,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
// operations, which would cause a check failure on the write immediately
// following the overwriting write.
TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
- queue_->SetNonLeaderMode();
+ queue_->SetNonLeaderMode(BuildRaftConfigPBForTests(3));
// Append a bunch of messages and update as if they were also appeneded to the leader.
queue_->UpdateLastIndexAppendedToLeader(10);
AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10);
@@ -820,7 +890,7 @@ TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
ConsensusRequestPB request;
ConsensusResponsePB response;
response.set_responder_uuid(kPeerUuid);
- queue_->TrackPeer(kPeerUuid);
+ queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
// Create request for new peer.
vector<ReplicateRefPtr> refs;
@@ -848,7 +918,7 @@ TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
}
TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
- queue_->SetNonLeaderMode();
+ queue_->SetNonLeaderMode(BuildRaftConfigPBForTests(3));
// Emulate a follower sending a request to replicate 10 messages.
queue_->UpdateLastIndexAppendedToLeader(10);
@@ -861,7 +931,8 @@ TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
// Update the committed index. In real life, this would be done by the consensus
// implementation when it receives an updated committed index from the leader.
- queue_->UpdateFollowerWatermarks(10, 10);
+ queue_->UpdateFollowerWatermarks(/*committed_index=*/ 10,
+ /*all_replicated_index=*/ 10);
ASSERT_EQ(10, queue_->GetCommittedIndex());
// Check the metrics have the right values based on the updated committed index.
http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 6d09b33..f6d0db1 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -123,7 +123,7 @@ const char* PeerStatusToString(PeerStatus p) {
std::string PeerMessageQueue::TrackedPeer::ToString() const {
return Substitute("Peer: $0, Status: $1, Last received: $2, Next index: $3, "
"Last known committed idx: $4, Time since last communication: $5",
- uuid,
+ SecureShortDebugString(peer_pb),
PeerStatusToString(last_exchange_status),
OpIdToString(last_received), next_index,
last_known_committed_index,
@@ -170,7 +170,6 @@ PeerMessageQueue::PeerMessageQueue(scoped_refptr<MetricEntity> metric_entity,
queue_state_.state = kQueueOpen;
// TODO(mpercy): Merge LogCache::Init() with its constructor.
log_cache_.Init(queue_state_.last_appended);
- TrackPeer(local_peer_pb_.permanent_uuid());
}
void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
@@ -186,49 +185,53 @@ void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
queue_state_.committed_index = committed_index;
queue_state_.majority_replicated_index = committed_index;
queue_state_.active_config.reset(new RaftConfigPB(active_config));
- CHECK(IsRaftConfigVoter(local_peer_pb_.permanent_uuid(), *queue_state_.active_config))
- << SecureShortDebugString(local_peer_pb_) << " not a voter in config: "
- << SecureShortDebugString(*queue_state_.active_config);
queue_state_.majority_size_ = MajoritySize(CountVoters(*queue_state_.active_config));
queue_state_.mode = LEADER;
- LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: "
- << queue_state_.ToString();
+ TrackLocalPeerUnlocked();
CheckPeersInActiveConfigIfLeaderUnlocked();
+ LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: "
+ << queue_state_.ToString();
+
// Reset last communication time with all peers to reset the clock on the
// failure timeout.
- MonoTime now(MonoTime::Now());
+ const auto now = MonoTime::Now();
for (const PeersMap::value_type& entry : peers_map_) {
entry.second->last_communication_time = now;
}
time_manager_->SetLeaderMode();
}
-void PeerMessageQueue::SetNonLeaderMode() {
+void PeerMessageQueue::SetNonLeaderMode(const RaftConfigPB& active_config) {
std::lock_guard<simple_spinlock> lock(queue_lock_);
- queue_state_.active_config.reset();
+ queue_state_.active_config.reset(new RaftConfigPB(active_config));
queue_state_.mode = NON_LEADER;
queue_state_.majority_size_ = -1;
// Update this when stepping down, since it doesn't get tracked as LEADER.
queue_state_.last_idx_appended_to_leader = queue_state_.last_appended.index();
+
+ TrackLocalPeerUnlocked();
+
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to NON_LEADER mode. State: "
- << queue_state_.ToString();
+ << queue_state_.ToString();
+
time_manager_->SetNonLeaderMode();
}
-void PeerMessageQueue::TrackPeer(const string& uuid) {
+void PeerMessageQueue::TrackPeer(const RaftPeerPB& peer_pb) {
std::lock_guard<simple_spinlock> lock(queue_lock_);
- TrackPeerUnlocked(uuid);
+ TrackPeerUnlocked(peer_pb);
}
-void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
- CHECK(!uuid.empty()) << "Got request to track peer with empty UUID";
+void PeerMessageQueue::TrackPeerUnlocked(const RaftPeerPB& peer_pb) {
+ CHECK(!peer_pb.permanent_uuid().empty()) << SecureShortDebugString(peer_pb);
+ CHECK(peer_pb.has_member_type()) << SecureShortDebugString(peer_pb);
DCHECK(queue_lock_.is_locked());
DCHECK_EQ(queue_state_.state, kQueueOpen);
- TrackedPeer* tracked_peer = new TrackedPeer(uuid);
+ TrackedPeer* tracked_peer = new TrackedPeer(peer_pb);
// We don't know the last operation received by the peer so, following the
// Raft protocol, we set next_index to one past the end of our own log. This
// way, if calling this method is the result of a successful leader election
@@ -238,7 +241,7 @@ void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
// does not have a log that matches ours, the normal queue negotiation
// process will eventually find the right point to resume from.
tracked_peer->next_index = queue_state_.last_appended.index() + 1;
- InsertOrDie(&peers_map_, uuid, tracked_peer);
+ InsertOrDie(&peers_map_, tracked_peer->uuid(), tracked_peer);
CheckPeersInActiveConfigIfLeaderUnlocked();
@@ -249,10 +252,39 @@ void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
void PeerMessageQueue::UntrackPeer(const string& uuid) {
std::lock_guard<simple_spinlock> lock(queue_lock_);
+ UntrackPeerUnlocked(uuid);
+}
+
+void PeerMessageQueue::UntrackPeerUnlocked(const string& uuid) {
+ DCHECK(queue_lock_.is_locked());
TrackedPeer* peer = EraseKeyReturnValuePtr(&peers_map_, uuid);
- if (peer != nullptr) {
- delete peer;
+ delete peer; // Deleting a nullptr is safe.
+}
+
+void PeerMessageQueue::TrackLocalPeerUnlocked() {
+ DCHECK(queue_lock_.is_locked());
+ RaftPeerPB* local_peer_in_config;
+ Status s = GetRaftConfigMember(queue_state_.active_config.get(),
+ local_peer_pb_.permanent_uuid(),
+ &local_peer_in_config);
+ auto local_copy = local_peer_pb_;
+ if (!s.ok()) {
+ // The local peer is not a member of the config. The queue requires the
+ // 'member_type' field to be set for any tracked peer, so we explicitly
+ // mark the local peer as a NON_VOTER. This case is only possible when the
+ // local peer is not the leader, so the choice is not particularly
+ // important, but NON_VOTER is the most reasonable option.
+ local_copy.set_member_type(RaftPeerPB::NON_VOTER);
+ local_peer_in_config = &local_copy;
+ }
+ CHECK(local_peer_in_config->member_type() == RaftPeerPB::VOTER ||
+ queue_state_.mode != LEADER)
+ << "local peer " << local_peer_pb_.permanent_uuid()
+ << " is not a voter in config: " << queue_state_.ToString();
+ if (ContainsKey(peers_map_, local_peer_pb_.permanent_uuid())) {
+ UntrackPeerUnlocked(local_peer_pb_.permanent_uuid());
}
+ TrackPeerUnlocked(*local_peer_in_config);
}
unordered_map<string, HealthReportPB> PeerMessageQueue::ReportHealthOfPeers() const {
@@ -471,15 +503,15 @@ void PeerMessageQueue::UpdatePeerHealthUnlocked(TrackedPeer* peer) {
string error_msg;
if (overall_health_status == HealthReportPB::FAILED) {
if (peer->last_exchange_status == PeerStatus::TABLET_FAILED) {
- error_msg = Substitute("The tablet replica hosted on peer $0 has failed", peer->uuid);
+ error_msg = Substitute("The tablet replica hosted on peer $0 has failed", peer->uuid());
} else if (!peer->wal_catchup_possible) {
error_msg = Substitute("The logs necessary to catch up peer $0 have been "
"garbage collected. The replica will never be able "
- "to catch up", peer->uuid);
+ "to catch up", peer->uuid());
} else {
error_msg = Substitute("Leader has been unable to successfully communicate "
"with peer $0 for more than $1 seconds ($2)",
- peer->uuid,
+ peer->uuid(),
FLAGS_follower_unavailable_considered_failed_sec,
(MonoTime::Now() - peer->last_communication_time).ToString());
}
@@ -499,8 +531,8 @@ void PeerMessageQueue::UpdatePeerHealthUnlocked(TrackedPeer* peer) {
}
} else {
if (overall_health_status == HealthReportPB::FAILED &&
- SafeToEvictUnlocked(peer->uuid)) {
- NotifyObserversOfFailedFollower(peer->uuid, queue_state_.current_term, error_msg);
+ SafeToEvictUnlocked(peer->uuid())) {
+ NotifyObserversOfFailedFollower(peer->uuid(), queue_state_.current_term, error_msg);
}
}
}
@@ -711,6 +743,7 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
const OpId& replicated_before,
const OpId& replicated_after,
int num_peers_required,
+ ReplicaTypes replica_types,
const TrackedPeer* who_caused) {
if (VLOG_IS_ON(2)) {
@@ -729,7 +762,11 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
// will be the new 'watermark'.
vector<int64_t> watermarks;
for (const PeersMap::value_type& peer : peers_map_) {
- // TODO: The fact that we only consider peers whose last exchange was
+ if (replica_types == VOTER_REPLICAS &&
+ peer.second->peer_pb.member_type() != RaftPeerPB::VOTER) {
+ continue;
+ }
+ // TODO(todd): The fact that we only consider peers whose last exchange was
// successful can cause the "all_replicated" watermark to lag behind
// farther than necessary. For example:
// - local peer has replicated opid 100
@@ -922,7 +959,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
// in the TrackedPeer data structure. The downside is that we'd end up
// with multiple sources of truth that would need to be kept in sync.
Status s = GetRaftConfigMember(DCHECK_NOTNULL(queue_state_.active_config.get()),
- peer->uuid, &peer_pb);
+ peer->uuid(), &peer_pb);
if (s.ok() &&
peer_pb &&
peer_pb->member_type() == RaftPeerPB::NON_VOTER &&
@@ -936,7 +973,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
// committed config's opid_index because if we're in the middle of a
// config change, this requested config change will be rejected
// anyway.
- NotifyObserversOfPeerToPromote(peer->uuid,
+ NotifyObserversOfPeerToPromote(peer->uuid(),
queue_state_.current_term,
queue_state_.active_config->opid_index());
}
@@ -1022,17 +1059,19 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
// Advance the majority replicated index.
AdvanceQueueWatermark("majority_replicated",
&queue_state_.majority_replicated_index,
- previous.last_received,
- peer->last_received,
- queue_state_.majority_size_,
+ /*replicated_before=*/ previous.last_received,
+ /*replicated_after=*/ peer->last_received,
+ /*num_peers_required=*/ queue_state_.majority_size_,
+ VOTER_REPLICAS,
peer);
// Advance the all replicated index.
AdvanceQueueWatermark("all_replicated",
&queue_state_.all_replicated_index,
- previous.last_received,
- peer->last_received,
- peers_map_.size(),
+ /*replicated_before=*/ previous.last_received,
+ /*replicated_after=*/ peer->last_received,
+ /*num_peers_required=*/ peers_map_.size(),
+ ALL_REPLICAS,
peer);
// If the majority-replicated index is in our current term,
http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 59d0f19..0808c69 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -113,8 +113,8 @@ const char* PeerStatusToString(PeerStatus p);
class PeerMessageQueue {
public:
struct TrackedPeer {
- explicit TrackedPeer(std::string uuid)
- : uuid(std::move(uuid)),
+ explicit TrackedPeer(RaftPeerPB peer_pb)
+ : peer_pb(std::move(peer_pb)),
next_index(kInvalidOpIdIndex),
last_received(MinimumOpId()),
last_known_committed_index(MinimumOpId().index()),
@@ -136,10 +136,13 @@ class PeerMessageQueue {
last_seen_term_ = term;
}
+ const std::string& uuid() const {
+ return peer_pb.permanent_uuid();
+ }
+
std::string ToString() const;
- // UUID of the peer.
- std::string uuid;
+ RaftPeerPB peer_pb;
// Next index to send to the peer.
// This corresponds to "nextIndex" as specified in Raft.
@@ -212,10 +215,10 @@ class PeerMessageQueue {
// be tracked so that the cache is only evicted when the peers no longer need
// the operations but the queue will no longer advance the majority replicated
// index or notify observers of its advancement.
- void SetNonLeaderMode();
+ void SetNonLeaderMode(const RaftConfigPB& active_config);
// Makes the queue track this peer.
- void TrackPeer(const std::string& uuid);
+ void TrackPeer(const RaftPeerPB& peer_pb);
// Makes the queue untrack this peer.
void UntrackPeer(const std::string& uuid);
@@ -378,6 +381,12 @@ class PeerMessageQueue {
kQueueClosed
};
+ // Types of replicas to count when advancing a queue watermark.
+ enum ReplicaTypes {
+ ALL_REPLICAS,
+ VOTER_REPLICAS,
+ };
+
struct QueueState {
// The first operation that has been replicated to all currently
@@ -485,7 +494,13 @@ class PeerMessageQueue {
// 'preceding_first_op_in_queue_' if the queue is empty.
const OpId& GetLastOp() const;
- void TrackPeerUnlocked(const std::string& uuid);
+ void TrackPeerUnlocked(const RaftPeerPB& peer_pb);
+
+ void UntrackPeerUnlocked(const std::string& uuid);
+
+ // We need the local peer in the config because it contains the current
+ // 'member_type' of the local node while 'local_peer_pb_' does not.
+ void TrackLocalPeerUnlocked();
// Checks that if the queue is in LEADER mode then all registered peers are
// in the active config. Crashes with a FATAL log message if this invariant
@@ -498,11 +513,16 @@ class PeerMessageQueue {
const Status& status);
// Advances 'watermark' to the smallest op that 'num_peers_required' have.
+ // If 'replica_types' is set to VOTER_REPLICAS, the 'num_peers_required' is
+ // interpreted as "number of voters required". If 'replica_types' is set to
+ // ALL_REPLICAS, 'num_peers_required' counts any peer, regardless of its
+ // voting status.
void AdvanceQueueWatermark(const char* type,
int64_t* watermark,
const OpId& replicated_before,
const OpId& replicated_after,
int num_peers_required,
+ ReplicaTypes replica_types,
const TrackedPeer* who_caused);
std::vector<PeerMessageQueueObserver*> observers_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index cb3118e..0892f9f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -604,12 +604,12 @@ Status RaftConsensus::BecomeReplicaUnlocked(boost::optional<MonoDelta> fd_delta)
// Now that we're a replica, we can allow voting for other nodes.
withhold_votes_until_ = MonoTime::Min();
+ // Deregister ourselves from the queue. We no longer need to track what gets
+ // replicated since we're stepping down.
queue_->UnRegisterObserver(this);
- // Deregister ourselves from the queue. We don't care what get's replicated, since
- // we're stepping down.
- queue_->SetNonLeaderMode();
-
+ queue_->SetNonLeaderMode(cmeta_->ActiveConfig());
peer_manager_->Close();
+
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
index 740c559..f0350dc 100644
--- a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
@@ -20,6 +20,7 @@
#include <numeric>
#include <ostream>
#include <string>
+#include <thread>
#include <unordered_map>
#include <utility>
#include <vector>
@@ -1724,7 +1725,7 @@ TEST_F(RaftConsensusNonVoterITest, RestartClusterWithNonVoter) {
// The test scenario is simple: try to make a configuration change in a 3 voter
// Raft cluster, adding a new non-voter replica, when a majority of voters
// is not online. Make sure the configuration change is not committed.
-TEST_F(RaftConsensusNonVoterITest, DISABLED_NonVoterReplicasInConsensusQueue) {
+TEST_F(RaftConsensusNonVoterITest, NonVoterReplicasInConsensusQueue) {
if (!AllowSlowTests()) {
LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
return;
@@ -1775,50 +1776,54 @@ TEST_F(RaftConsensusNonVoterITest, DISABLED_NonVoterReplicasInConsensusQueue) {
// Pause all but the leader replica and try to add a new non-voter into the
// configuration. It should not pass.
+ LOG(INFO) << "Getting leader replica...";
TServerDetails* leader;
ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
- const auto do_resume = [&] {
- for (auto& e : replica_servers) {
- const auto& uuid = e.first;
- if (uuid == leader->uuid()) {
- continue;
- }
- ExternalTabletServer* ts = cluster_->tablet_server_by_uuid(uuid);
- ASSERT_OK(ts->Resume());
- }
- };
- auto resumer = MakeScopedCleanup([&] {
- do_resume();
- });
-
+ LOG(INFO) << "Shutting down non-leader replicas...";
for (auto& e : replica_servers) {
const auto& uuid = e.first;
- if (uuid == leader->uuid()) {
- continue;
- }
- ExternalTabletServer* ts = cluster_->tablet_server_by_uuid(uuid);
- ASSERT_OK(ts->Pause());
+ if (uuid == leader->uuid()) continue;
+ cluster_->tablet_server_by_uuid(uuid)->Shutdown();
}
- const Status s = AddServer(leader, tablet_id, new_replica,
- RaftPeerPB::NON_VOTER, kTimeout);
- EXPECT_FALSE(s.ok()) << s.ToString();
-
- NO_FATALS(do_resume());
- resumer.cancel();
+ LOG(INFO) << "Adding NON_VOTER replica...";
+ std::thread t([&] {
+ AddServer(leader, tablet_id, new_replica, RaftPeerPB::NON_VOTER, kTimeout);
+ });
+ SCOPED_CLEANUP({ t.join(); });
// Verify that the configuration hasn't changed.
+ LOG(INFO) << "Waiting for pending config...";
consensus::ConsensusStatePB cstate;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate));
- ASSERT_FALSE(cstate.has_pending_config());
+ ASSERT_TRUE(cstate.has_pending_config());
});
+
+ // Ensure it does not commit.
+ SleepFor(MonoDelta::FromSeconds(5));
+ ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate));
+ ASSERT_TRUE(cstate.has_pending_config());
+
const auto& new_replica_uuid = new_replica->uuid();
- EXPECT_FALSE(IsRaftConfigMember(new_replica_uuid, cstate.committed_config()))
+ ASSERT_FALSE(IsRaftConfigMember(new_replica_uuid, cstate.committed_config()))
<< pb_util::SecureDebugString(cstate.committed_config())
<< "new non-voter replica UUID: " << new_replica_uuid;
- EXPECT_EQ(kOriginalReplicasNum, cstate.committed_config().peers_size());
+ ASSERT_EQ(kOriginalReplicasNum, cstate.committed_config().peers_size());
+
+ // Restart the tablet servers.
+ for (auto& e : replica_servers) {
+ const auto& uuid = e.first;
+ if (uuid == leader->uuid()) continue;
+ ASSERT_OK(cluster_->tablet_server_by_uuid(uuid)->Restart());
+ }
+
+ // Once the new replicas come back online, this should be committed.
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate));
+ ASSERT_FALSE(cstate.has_pending_config());
+ });
NO_FATALS(cluster_->AssertNoCrashes());
}