You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2018/01/08 19:38:47 UTC

[3/3] kudu git commit: consensus: Fix NON_VOTER ack-counting bug

consensus: Fix NON_VOTER ack-counting bug

This patch fixes an issue where we were not differentiating between
replicating to voters and non-voters.

This enables the test written by Alexey and also makes some changes to
it. The test fails without this patch and passes with the patch.

Tests added:
* Added a unit test in consensus_queue-test
* Updated and enabled the system test in raft_consensus_nonvoter-itest

Change-Id: I13143e9bb4b76af3fd6dada28fcec05b27d24476
Reviewed-on: http://gerrit.cloudera.org:8080/8868
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1277f69a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1277f69a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1277f69a

Branch: refs/heads/master
Commit: 1277f69a1feb3715750552991bc19444282f652e
Parents: 710b238
Author: Mike Percy <mp...@apache.org>
Authored: Mon Dec 18 15:13:16 2017 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Mon Jan 8 19:38:17 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus-test-util.h        |  13 ++-
 src/kudu/consensus/consensus_peers-test.cc      |   1 +
 src/kudu/consensus/consensus_peers.cc           |   2 +-
 src/kudu/consensus/consensus_queue-test.cc      |  91 ++++++++++++++--
 src/kudu/consensus/consensus_queue.cc           | 105 +++++++++++++------
 src/kudu/consensus/consensus_queue.h            |  34 ++++--
 src/kudu/consensus/raft_consensus.cc            |   8 +-
 .../raft_consensus_nonvoter-itest.cc            |  63 ++++++-----
 8 files changed, 231 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 409841b..9cb0473 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -80,6 +80,7 @@ inline gscoped_ptr<ReplicateMsg> CreateDummyReplicate(int64_t term,
 inline RaftPeerPB FakeRaftPeerPB(const std::string& uuid) {
   RaftPeerPB peer_pb;
   peer_pb.set_permanent_uuid(uuid);
+  peer_pb.set_member_type(RaftPeerPB::VOTER);
   peer_pb.mutable_last_known_addr()->set_host(strings::Substitute(
       "$0-fake-hostname", CURRENT_TEST_NAME()));
   peer_pb.mutable_last_known_addr()->set_port(0);
@@ -107,9 +108,9 @@ inline void AppendReplicateMessagesToQueue(
 }
 
 // Builds a configuration of 'num' voters.
-inline RaftConfigPB BuildRaftConfigPBForTests(int num) {
+inline RaftConfigPB BuildRaftConfigPBForTests(int num_voters, int num_non_voters = 0) {
   RaftConfigPB raft_config;
-  for (int i = 0; i < num; i++) {
+  for (int i = 0; i < num_voters; i++) {
     RaftPeerPB* peer_pb = raft_config.add_peers();
     peer_pb->set_member_type(RaftPeerPB::VOTER);
     peer_pb->set_permanent_uuid(strings::Substitute("peer-$0", i));
@@ -117,6 +118,14 @@ inline RaftConfigPB BuildRaftConfigPBForTests(int num) {
     hp->set_host(strings::Substitute("peer-$0.fake-domain-for-tests", i));
     hp->set_port(0);
   }
+  for (int i = 0; i < num_non_voters; i++) {
+    RaftPeerPB* peer_pb = raft_config.add_peers();
+    peer_pb->set_member_type(RaftPeerPB::NON_VOTER);
+    peer_pb->set_permanent_uuid(strings::Substitute("non-voter-peer-$0", i));
+    HostPortPB* hp = peer_pb->mutable_last_known_addr();
+    hp->set_host(strings::Substitute("non-voter-peer-$0.fake-domain-for-tests", i));
+    hp->set_port(0);
+  }
   return raft_config;
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_peers-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index 7f4eb57..f10cb34 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -126,6 +126,7 @@ class ConsensusPeersTest : public KuduTest {
       shared_ptr<Peer>* peer) {
     RaftPeerPB peer_pb;
     peer_pb.set_permanent_uuid(peer_name);
+    peer_pb.set_member_type(RaftPeerPB::VOTER);
     auto proxy_ptr = new DelayablePeerProxy<NoOpTestPeerProxy>(
         raft_pool_.get(), new NoOpTestPeerProxy(raft_pool_.get(), peer_pb));
     gscoped_ptr<PeerProxy> proxy(proxy_ptr);

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index ffa373d..6d68528 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -142,7 +142,7 @@ Peer::Peer(RaftPeerPB peer_pb,
 Status Peer::Init() {
   {
     std::lock_guard<simple_spinlock> l(peer_lock_);
-    queue_->TrackPeer(peer_pb_.permanent_uuid());
+    queue_->TrackPeer(peer_pb_);
   }
 
   // Capture a weak_ptr reference into the functor so it can safely handle

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 62a2345..c8dc086 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -126,6 +126,14 @@ class ConsensusQueueTest : public KuduTest {
                                                           payload_size).release()));
   }
 
+  RaftPeerPB MakePeer(const std::string& peer_uuid,
+                      RaftPeerPB::MemberType member_type) {
+    RaftPeerPB peer_pb;
+    *peer_pb.mutable_permanent_uuid() = peer_uuid;
+    peer_pb.set_member_type(member_type);
+    return peer_pb;
+  }
+
   // Updates the peer's watermark in the queue so that it matches
   // the operation we want, since the queue always assumes that
   // when a peer gets tracked it's always tracked starting at the
@@ -137,7 +145,7 @@ class ConsensusQueueTest : public KuduTest {
                                int last_committed_idx,
                                bool* more_pending) {
 
-    queue_->TrackPeer(kPeerUuid);
+    queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
     response->set_responder_uuid(kPeerUuid);
 
     // Ask for a request. The queue assumes the peer is up-to-date so
@@ -406,10 +414,10 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
 TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
   queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(5));
   // Track 4 additional peers (in addition to the local peer)
-  queue_->TrackPeer("peer-1");
-  queue_->TrackPeer("peer-2");
-  queue_->TrackPeer("peer-3");
-  queue_->TrackPeer("peer-4");
+  queue_->TrackPeer(MakePeer("peer-1", RaftPeerPB::VOTER));
+  queue_->TrackPeer(MakePeer("peer-2", RaftPeerPB::VOTER));
+  queue_->TrackPeer(MakePeer("peer-3", RaftPeerPB::VOTER));
+  queue_->TrackPeer(MakePeer("peer-4", RaftPeerPB::VOTER));
 
   // Append 10 messages to the queue.
   // This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue.
@@ -479,6 +487,68 @@ TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
   ASSERT_EQ(queue_->GetAllReplicatedIndex(), 5);
 }
 
+// Ensure that the acks for a non-voter don't count toward the majority.
+TEST_F(ConsensusQueueTest, TestNonVoterAcksDontCountTowardMajority) {
+  const auto kOtherVoterPeer = "peer-1";
+  const auto kNonVoterPeer = "non-voter-peer-0";
+
+  // 1. Add a non-voter to the config where there are 2 voters.
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm,
+                        BuildRaftConfigPBForTests(/*num_voters=*/ 2,
+                                                  /*num_non_voters=*/ 1));
+  // Track 2 additional peers (in addition to the local peer)
+  queue_->TrackPeer(MakePeer(kOtherVoterPeer, RaftPeerPB::VOTER));
+  queue_->TrackPeer(MakePeer(kNonVoterPeer, RaftPeerPB::NON_VOTER));
+
+  // 2. Add some writes. Only the local leader immediately acks them, which is
+  // not enough to commit in a 2-voter + 1 non-voter config.
+  //
+  // Append 10 messages to the queue.
+  // This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue.
+  const int kNumMessages = 10;
+  AppendReplicateMessagesToQueue(queue_.get(), clock_,
+                                 /*first=*/ 1, /*count=*/ kNumMessages);
+  WaitForLocalPeerToAckIndex(kNumMessages);
+
+  // Since only the local log has acked at this point, the committed_index
+  // should be 0.
+  const int64_t kNoneCommittedIndex = 0;
+  ASSERT_EQ(kNoneCommittedIndex, queue_->GetCommittedIndex());
+
+  // 3. Ack the operations from the NON_VOTER peer. The writes will not have
+  // been committed yet, because the 2nd VOTER has not yet acked them.
+  ConsensusResponsePB response;
+  response.set_responder_uuid(kNonVoterPeer);
+  const int64_t kCurrentTerm = 1;
+  response.set_responder_term(kCurrentTerm);
+  SetLastReceivedAndLastCommitted(&response,
+                                  /*last_received=*/ MakeOpId(kCurrentTerm, kNumMessages),
+                                  /*last_committed_idx=*/ kNoneCommittedIndex);
+
+  bool more_pending;
+  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+  ASSERT_FALSE(more_pending);
+
+  // Committed index should be the same.
+  ASSERT_EQ(kNoneCommittedIndex, queue_->GetCommittedIndex());
+
+  // 4. Send an identical ack from the 2nd VOTER peer. This should cause the
+  // operation to be committed.
+  response.set_responder_uuid(kOtherVoterPeer);
+  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+  ASSERT_TRUE(more_pending); // The committed index has increased.
+
+  // The committed index should include the full set of ops now.
+  ASSERT_EQ(kNumMessages, queue_->GetCommittedIndex());
+
+  SetLastReceivedAndLastCommitted(&response,
+                                  /*last_received=*/ MakeOpId(kCurrentTerm, kNumMessages),
+                                  /*last_committed_idx=*/ kNumMessages);
+
+  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+  ASSERT_FALSE(more_pending);
+}
+
 // In this test we append a sequence of operations to a log
 // and then start tracking a peer whose first required operation
 // is before the first operation in the queue.
@@ -588,7 +658,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   response.set_responder_uuid(kPeerUuid);
   bool more_pending = false;
 
-  queue_->TrackPeer(kPeerUuid);
+  queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
 
   // Ask for a request. The queue assumes the peer is up-to-date so
   // this should contain no operations.
@@ -653,7 +723,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
 // operations, which would cause a check failure on the write immediately
 // following the overwriting write.
 TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
-  queue_->SetNonLeaderMode();
+  queue_->SetNonLeaderMode(BuildRaftConfigPBForTests(3));
   // Append a bunch of messages and update as if they were also appeneded to the leader.
   queue_->UpdateLastIndexAppendedToLeader(10);
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10);
@@ -820,7 +890,7 @@ TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
   ConsensusRequestPB request;
   ConsensusResponsePB response;
   response.set_responder_uuid(kPeerUuid);
-  queue_->TrackPeer(kPeerUuid);
+  queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
 
   // Create request for new peer.
   vector<ReplicateRefPtr> refs;
@@ -848,7 +918,7 @@ TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
 }
 
 TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
-  queue_->SetNonLeaderMode();
+  queue_->SetNonLeaderMode(BuildRaftConfigPBForTests(3));
 
   // Emulate a follower sending a request to replicate 10 messages.
   queue_->UpdateLastIndexAppendedToLeader(10);
@@ -861,7 +931,8 @@ TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
 
   // Update the committed index. In real life, this would be done by the consensus
   // implementation when it receives an updated committed index from the leader.
-  queue_->UpdateFollowerWatermarks(10, 10);
+  queue_->UpdateFollowerWatermarks(/*committed_index=*/ 10,
+                                   /*all_replicated_index=*/ 10);
   ASSERT_EQ(10, queue_->GetCommittedIndex());
 
   // Check the metrics have the right values based on the updated committed index.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 6d09b33..f6d0db1 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -123,7 +123,7 @@ const char* PeerStatusToString(PeerStatus p) {
 std::string PeerMessageQueue::TrackedPeer::ToString() const {
   return Substitute("Peer: $0, Status: $1, Last received: $2, Next index: $3, "
                     "Last known committed idx: $4, Time since last communication: $5",
-                    uuid,
+                    SecureShortDebugString(peer_pb),
                     PeerStatusToString(last_exchange_status),
                     OpIdToString(last_received), next_index,
                     last_known_committed_index,
@@ -170,7 +170,6 @@ PeerMessageQueue::PeerMessageQueue(scoped_refptr<MetricEntity> metric_entity,
   queue_state_.state = kQueueOpen;
   // TODO(mpercy): Merge LogCache::Init() with its constructor.
   log_cache_.Init(queue_state_.last_appended);
-  TrackPeer(local_peer_pb_.permanent_uuid());
 }
 
 void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
@@ -186,49 +185,53 @@ void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
   queue_state_.committed_index = committed_index;
   queue_state_.majority_replicated_index = committed_index;
   queue_state_.active_config.reset(new RaftConfigPB(active_config));
-  CHECK(IsRaftConfigVoter(local_peer_pb_.permanent_uuid(), *queue_state_.active_config))
-      << SecureShortDebugString(local_peer_pb_) << " not a voter in config: "
-      << SecureShortDebugString(*queue_state_.active_config);
   queue_state_.majority_size_ = MajoritySize(CountVoters(*queue_state_.active_config));
   queue_state_.mode = LEADER;
 
-  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: "
-      << queue_state_.ToString();
+  TrackLocalPeerUnlocked();
   CheckPeersInActiveConfigIfLeaderUnlocked();
 
+  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: "
+                                 << queue_state_.ToString();
+
   // Reset last communication time with all peers to reset the clock on the
   // failure timeout.
-  MonoTime now(MonoTime::Now());
+  const auto now = MonoTime::Now();
   for (const PeersMap::value_type& entry : peers_map_) {
     entry.second->last_communication_time = now;
   }
   time_manager_->SetLeaderMode();
 }
 
-void PeerMessageQueue::SetNonLeaderMode() {
+void PeerMessageQueue::SetNonLeaderMode(const RaftConfigPB& active_config) {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
-  queue_state_.active_config.reset();
+  queue_state_.active_config.reset(new RaftConfigPB(active_config));
   queue_state_.mode = NON_LEADER;
   queue_state_.majority_size_ = -1;
 
   // Update this when stepping down, since it doesn't get tracked as LEADER.
   queue_state_.last_idx_appended_to_leader = queue_state_.last_appended.index();
+
+  TrackLocalPeerUnlocked();
+
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to NON_LEADER mode. State: "
-      << queue_state_.ToString();
+                                 << queue_state_.ToString();
+
   time_manager_->SetNonLeaderMode();
 }
 
-void PeerMessageQueue::TrackPeer(const string& uuid) {
+void PeerMessageQueue::TrackPeer(const RaftPeerPB& peer_pb) {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
-  TrackPeerUnlocked(uuid);
+  TrackPeerUnlocked(peer_pb);
 }
 
-void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
-  CHECK(!uuid.empty()) << "Got request to track peer with empty UUID";
+void PeerMessageQueue::TrackPeerUnlocked(const RaftPeerPB& peer_pb) {
+  CHECK(!peer_pb.permanent_uuid().empty()) << SecureShortDebugString(peer_pb);
+  CHECK(peer_pb.has_member_type()) << SecureShortDebugString(peer_pb);
   DCHECK(queue_lock_.is_locked());
   DCHECK_EQ(queue_state_.state, kQueueOpen);
 
-  TrackedPeer* tracked_peer = new TrackedPeer(uuid);
+  TrackedPeer* tracked_peer = new TrackedPeer(peer_pb);
   // We don't know the last operation received by the peer so, following the
   // Raft protocol, we set next_index to one past the end of our own log. This
   // way, if calling this method is the result of a successful leader election
@@ -238,7 +241,7 @@ void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
   // does not have a log that matches ours, the normal queue negotiation
   // process will eventually find the right point to resume from.
   tracked_peer->next_index = queue_state_.last_appended.index() + 1;
-  InsertOrDie(&peers_map_, uuid, tracked_peer);
+  InsertOrDie(&peers_map_, tracked_peer->uuid(), tracked_peer);
 
   CheckPeersInActiveConfigIfLeaderUnlocked();
 
@@ -249,10 +252,39 @@ void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
 
 void PeerMessageQueue::UntrackPeer(const string& uuid) {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
+  UntrackPeerUnlocked(uuid);
+}
+
+void PeerMessageQueue::UntrackPeerUnlocked(const string& uuid) {
+  DCHECK(queue_lock_.is_locked());
   TrackedPeer* peer = EraseKeyReturnValuePtr(&peers_map_, uuid);
-  if (peer != nullptr) {
-    delete peer;
+  delete peer; // Deleting a nullptr is safe.
+}
+
+void PeerMessageQueue::TrackLocalPeerUnlocked() {
+  DCHECK(queue_lock_.is_locked());
+  RaftPeerPB* local_peer_in_config;
+  Status s = GetRaftConfigMember(queue_state_.active_config.get(),
+                                 local_peer_pb_.permanent_uuid(),
+                                 &local_peer_in_config);
+  auto local_copy = local_peer_pb_;
+  if (!s.ok()) {
+    // The local peer is not a member of the config. The queue requires the
+    // 'member_type' field to be set for any tracked peer, so we explicitly
+    // mark the local peer as a NON_VOTER. This case is only possible when the
+    // local peer is not the leader, so the choice is not particularly
+    // important, but NON_VOTER is the most reasonable option.
+    local_copy.set_member_type(RaftPeerPB::NON_VOTER);
+    local_peer_in_config = &local_copy;
+  }
+  CHECK(local_peer_in_config->member_type() == RaftPeerPB::VOTER ||
+        queue_state_.mode != LEADER)
+      << "local peer " << local_peer_pb_.permanent_uuid()
+      << " is not a voter in config: " << queue_state_.ToString();
+  if (ContainsKey(peers_map_, local_peer_pb_.permanent_uuid())) {
+    UntrackPeerUnlocked(local_peer_pb_.permanent_uuid());
   }
+  TrackPeerUnlocked(*local_peer_in_config);
 }
 
 unordered_map<string, HealthReportPB> PeerMessageQueue::ReportHealthOfPeers() const {
@@ -471,15 +503,15 @@ void PeerMessageQueue::UpdatePeerHealthUnlocked(TrackedPeer* peer) {
   string error_msg;
   if (overall_health_status == HealthReportPB::FAILED) {
     if (peer->last_exchange_status == PeerStatus::TABLET_FAILED) {
-      error_msg = Substitute("The tablet replica hosted on peer $0 has failed", peer->uuid);
+      error_msg = Substitute("The tablet replica hosted on peer $0 has failed", peer->uuid());
     } else if (!peer->wal_catchup_possible) {
       error_msg = Substitute("The logs necessary to catch up peer $0 have been "
                              "garbage collected. The replica will never be able "
-                             "to catch up", peer->uuid);
+                             "to catch up", peer->uuid());
     } else {
       error_msg = Substitute("Leader has been unable to successfully communicate "
                              "with peer $0 for more than $1 seconds ($2)",
-                             peer->uuid,
+                             peer->uuid(),
                              FLAGS_follower_unavailable_considered_failed_sec,
                              (MonoTime::Now() - peer->last_communication_time).ToString());
     }
@@ -499,8 +531,8 @@ void PeerMessageQueue::UpdatePeerHealthUnlocked(TrackedPeer* peer) {
     }
   } else {
     if (overall_health_status == HealthReportPB::FAILED &&
-        SafeToEvictUnlocked(peer->uuid)) {
-      NotifyObserversOfFailedFollower(peer->uuid, queue_state_.current_term, error_msg);
+        SafeToEvictUnlocked(peer->uuid())) {
+      NotifyObserversOfFailedFollower(peer->uuid(), queue_state_.current_term, error_msg);
     }
   }
 }
@@ -711,6 +743,7 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
                                              const OpId& replicated_before,
                                              const OpId& replicated_after,
                                              int num_peers_required,
+                                             ReplicaTypes replica_types,
                                              const TrackedPeer* who_caused) {
 
   if (VLOG_IS_ON(2)) {
@@ -729,7 +762,11 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
   //   will be the new 'watermark'.
   vector<int64_t> watermarks;
   for (const PeersMap::value_type& peer : peers_map_) {
-    // TODO: The fact that we only consider peers whose last exchange was
+    if (replica_types == VOTER_REPLICAS &&
+        peer.second->peer_pb.member_type() != RaftPeerPB::VOTER) {
+      continue;
+    }
+    // TODO(todd): The fact that we only consider peers whose last exchange was
     // successful can cause the "all_replicated" watermark to lag behind
     // farther than necessary. For example:
     // - local peer has replicated opid 100
@@ -922,7 +959,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
         // in the TrackedPeer data structure. The downside is that we'd end up
         // with multiple sources of truth that would need to be kept in sync.
         Status s = GetRaftConfigMember(DCHECK_NOTNULL(queue_state_.active_config.get()),
-                                       peer->uuid, &peer_pb);
+                                       peer->uuid(), &peer_pb);
         if (s.ok() &&
             peer_pb &&
             peer_pb->member_type() == RaftPeerPB::NON_VOTER &&
@@ -936,7 +973,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
           // committed config's opid_index because if we're in the middle of a
           // config change, this requested config change will be rejected
           // anyway.
-          NotifyObserversOfPeerToPromote(peer->uuid,
+          NotifyObserversOfPeerToPromote(peer->uuid(),
                                          queue_state_.current_term,
                                          queue_state_.active_config->opid_index());
         }
@@ -1022,17 +1059,19 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       // Advance the majority replicated index.
       AdvanceQueueWatermark("majority_replicated",
                             &queue_state_.majority_replicated_index,
-                            previous.last_received,
-                            peer->last_received,
-                            queue_state_.majority_size_,
+                            /*replicated_before=*/ previous.last_received,
+                            /*replicated_after=*/ peer->last_received,
+                            /*num_peers_required=*/ queue_state_.majority_size_,
+                            VOTER_REPLICAS,
                             peer);
 
       // Advance the all replicated index.
       AdvanceQueueWatermark("all_replicated",
                             &queue_state_.all_replicated_index,
-                            previous.last_received,
-                            peer->last_received,
-                            peers_map_.size(),
+                            /*replicated_before=*/ previous.last_received,
+                            /*replicated_after=*/ peer->last_received,
+                            /*num_peers_required=*/ peers_map_.size(),
+                            ALL_REPLICAS,
                             peer);
 
       // If the majority-replicated index is in our current term,

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 59d0f19..0808c69 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -113,8 +113,8 @@ const char* PeerStatusToString(PeerStatus p);
 class PeerMessageQueue {
  public:
   struct TrackedPeer {
-    explicit TrackedPeer(std::string uuid)
-        : uuid(std::move(uuid)),
+    explicit TrackedPeer(RaftPeerPB peer_pb)
+        : peer_pb(std::move(peer_pb)),
           next_index(kInvalidOpIdIndex),
           last_received(MinimumOpId()),
           last_known_committed_index(MinimumOpId().index()),
@@ -136,10 +136,13 @@ class PeerMessageQueue {
       last_seen_term_ = term;
     }
 
+    const std::string& uuid() const {
+      return peer_pb.permanent_uuid();
+    }
+
     std::string ToString() const;
 
-    // UUID of the peer.
-    std::string uuid;
+    RaftPeerPB peer_pb;
 
     // Next index to send to the peer.
     // This corresponds to "nextIndex" as specified in Raft.
@@ -212,10 +215,10 @@ class PeerMessageQueue {
   // be tracked so that the cache is only evicted when the peers no longer need
   // the operations but the queue will no longer advance the majority replicated
   // index or notify observers of its advancement.
-  void SetNonLeaderMode();
+  void SetNonLeaderMode(const RaftConfigPB& active_config);
 
   // Makes the queue track this peer.
-  void TrackPeer(const std::string& uuid);
+  void TrackPeer(const RaftPeerPB& peer_pb);
 
   // Makes the queue untrack this peer.
   void UntrackPeer(const std::string& uuid);
@@ -378,6 +381,12 @@ class PeerMessageQueue {
     kQueueClosed
   };
 
+  // Types of replicas to count when advancing a queue watermark.
+  enum ReplicaTypes {
+    ALL_REPLICAS,
+    VOTER_REPLICAS,
+  };
+
   struct QueueState {
 
     // The first operation that has been replicated to all currently
@@ -485,7 +494,13 @@ class PeerMessageQueue {
   // 'preceding_first_op_in_queue_' if the queue is empty.
   const OpId& GetLastOp() const;
 
-  void TrackPeerUnlocked(const std::string& uuid);
+  void TrackPeerUnlocked(const RaftPeerPB& peer_pb);
+
+  void UntrackPeerUnlocked(const std::string& uuid);
+
+  // We need the local peer in the config because it contains the current
+  // 'member_type' of the local node while 'local_peer_pb_' does not.
+  void TrackLocalPeerUnlocked();
 
   // Checks that if the queue is in LEADER mode then all registered peers are
   // in the active config. Crashes with a FATAL log message if this invariant
@@ -498,11 +513,16 @@ class PeerMessageQueue {
                                const Status& status);
 
   // Advances 'watermark' to the smallest op that 'num_peers_required' have.
+  // If 'replica_types' is set to VOTER_REPLICAS, the 'num_peers_required' is
+  // interpreted as "number of voters required". If 'replica_types' is set to
+  // ALL_REPLICAS, 'num_peers_required' counts any peer, regardless of its
+  // voting status.
   void AdvanceQueueWatermark(const char* type,
                              int64_t* watermark,
                              const OpId& replicated_before,
                              const OpId& replicated_after,
                              int num_peers_required,
+                             ReplicaTypes replica_types,
                              const TrackedPeer* who_caused);
 
   std::vector<PeerMessageQueueObserver*> observers_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index cb3118e..0892f9f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -604,12 +604,12 @@ Status RaftConsensus::BecomeReplicaUnlocked(boost::optional<MonoDelta> fd_delta)
   // Now that we're a replica, we can allow voting for other nodes.
   withhold_votes_until_ = MonoTime::Min();
 
+  // Deregister ourselves from the queue. We no longer need to track what gets
+  // replicated since we're stepping down.
   queue_->UnRegisterObserver(this);
-  // Deregister ourselves from the queue. We don't care what get's replicated, since
-  // we're stepping down.
-  queue_->SetNonLeaderMode();
-
+  queue_->SetNonLeaderMode(cmeta_->ActiveConfig());
   peer_manager_->Close();
+
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
index 740c559..f0350dc 100644
--- a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
@@ -20,6 +20,7 @@
 #include <numeric>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -1724,7 +1725,7 @@ TEST_F(RaftConsensusNonVoterITest, RestartClusterWithNonVoter) {
 // The test scenario is simple: try to make a configuration change in a 3 voter
 // Raft cluster, adding a new non-voter replica, when a majority of voters
 // is not online. Make sure the configuration change is not committed.
-TEST_F(RaftConsensusNonVoterITest, DISABLED_NonVoterReplicasInConsensusQueue) {
+TEST_F(RaftConsensusNonVoterITest, NonVoterReplicasInConsensusQueue) {
   if (!AllowSlowTests()) {
     LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
     return;
@@ -1775,50 +1776,54 @@ TEST_F(RaftConsensusNonVoterITest, DISABLED_NonVoterReplicasInConsensusQueue) {
 
   // Pause all but the leader replica and try to add a new non-voter into the
   // configuration. It should not pass.
+  LOG(INFO) << "Getting leader replica...";
   TServerDetails* leader;
   ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
 
-  const auto do_resume = [&] {
-    for (auto& e : replica_servers) {
-      const auto& uuid = e.first;
-      if (uuid == leader->uuid()) {
-        continue;
-      }
-      ExternalTabletServer* ts = cluster_->tablet_server_by_uuid(uuid);
-      ASSERT_OK(ts->Resume());
-    }
-  };
-  auto resumer = MakeScopedCleanup([&] {
-    do_resume();
-  });
-
+  LOG(INFO) << "Shutting down non-leader replicas...";
   for (auto& e : replica_servers) {
     const auto& uuid = e.first;
-    if (uuid == leader->uuid()) {
-      continue;
-    }
-    ExternalTabletServer* ts = cluster_->tablet_server_by_uuid(uuid);
-    ASSERT_OK(ts->Pause());
+    if (uuid == leader->uuid()) continue;
+    cluster_->tablet_server_by_uuid(uuid)->Shutdown();
   }
 
-  const Status s = AddServer(leader, tablet_id, new_replica,
-                             RaftPeerPB::NON_VOTER, kTimeout);
-  EXPECT_FALSE(s.ok()) << s.ToString();
-
-  NO_FATALS(do_resume());
-  resumer.cancel();
+  LOG(INFO) << "Adding NON_VOTER replica...";
+  std::thread t([&] {
+      AddServer(leader, tablet_id, new_replica, RaftPeerPB::NON_VOTER, kTimeout);
+  });
+  SCOPED_CLEANUP({ t.join(); });
 
   // Verify that the configuration hasn't changed.
+  LOG(INFO) << "Waiting for pending config...";
   consensus::ConsensusStatePB cstate;
   ASSERT_EVENTUALLY([&] {
     ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate));
-    ASSERT_FALSE(cstate.has_pending_config());
+    ASSERT_TRUE(cstate.has_pending_config());
   });
+
+  // Ensure it does not commit.
+  SleepFor(MonoDelta::FromSeconds(5));
+  ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate));
+  ASSERT_TRUE(cstate.has_pending_config());
+
   const auto& new_replica_uuid = new_replica->uuid();
-  EXPECT_FALSE(IsRaftConfigMember(new_replica_uuid, cstate.committed_config()))
+  ASSERT_FALSE(IsRaftConfigMember(new_replica_uuid, cstate.committed_config()))
       << pb_util::SecureDebugString(cstate.committed_config())
       << "new non-voter replica UUID: " << new_replica_uuid;
-  EXPECT_EQ(kOriginalReplicasNum, cstate.committed_config().peers_size());
+  ASSERT_EQ(kOriginalReplicasNum, cstate.committed_config().peers_size());
+
+  // Restart the tablet servers.
+  for (auto& e : replica_servers) {
+    const auto& uuid = e.first;
+    if (uuid == leader->uuid()) continue;
+    ASSERT_OK(cluster_->tablet_server_by_uuid(uuid)->Restart());
+  }
+
+  // Once the new replicas come back online, this should be committed.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate));
+    ASSERT_FALSE(cstate.has_pending_config());
+  });
 
   NO_FATALS(cluster_->AssertNoCrashes());
 }