You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2018/01/08 19:38:45 UTC

[1/3] kudu git commit: KUDU-2249 Avoid sharing the client between the InputFormat and RecordReader

Repository: kudu
Updated Branches:
  refs/heads/master 391e3255d -> 1277f69a1


KUDU-2249 Avoid sharing the client between the InputFormat and RecordReader

This commit prevents a possible race condition between getSplits() method and
TableRecordReader in the KuduTableInputFormat, when both try to access and
shutdown the KuduClient.

Both are sharing the same client and shut it down after use. In some scenarios
the client might still be accessed after that and throwing an error.
So the TableRecordReader gets its own client with this commit. This increases
the number of opened Kudu clients by a MR application at most by one (The one
that was shared by getSplits() with a TableRecordReader)
Also clarified the behaviour of MR applications and how many open Kudu clients
one might have to expect in total.

Change-Id: I24f45ee9253790c5348cabd0afe6c6a4b6d3f3d4
Reviewed-on: http://gerrit.cloudera.org:8080/8921
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <da...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7f4157c4
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7f4157c4
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7f4157c4

Branch: refs/heads/master
Commit: 7f4157c416c1f17e6326753f2138bd17fa79ad66
Parents: 391e325
Author: Clemens Valiente <cl...@trivago.com>
Authored: Thu Dec 28 10:34:26 2017 +0100
Committer: David Ribeiro Alves <da...@gmail.com>
Committed: Mon Jan 8 18:57:11 2018 +0000

----------------------------------------------------------------------
 .../kudu/mapreduce/KuduTableInputFormat.java    | 66 +++++++++++++-------
 1 file changed, 45 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7f4157c4/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
index fcbf10e..a018ae2 100644
--- a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
+++ b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
@@ -70,9 +70,24 @@ import org.apache.kudu.client.RowResultIterator;
  *
  * <p>
  * Hadoop doesn't have the concept of "closing" the input format so in order to release the
- * resources we assume that once either {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)}
- * or {@link KuduTableInputFormat.TableRecordReader#close()} have been called that
- * the object won't be used again and the AsyncKuduClient is shut down.
+ * resources (mainly, the Kudu client) we assume that once either
+ * {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)}
+ * or {@link KuduTableInputFormat.TableRecordReader#close()}
+ * have been called that the object won't be used again and the AsyncKuduClient is shut down.
+ *
+ * To prevent a premature shutdown of the client, the KuduTableInputFormat and the
+ * TableRecordReader both get their own client that they don't share.
+ * </p>
+ *
+ * <p>
+ * Default behavior of hadoop is to call {@link #getSplits(org.apache.hadoop.mapreduce.JobContext)}
+ * in the MRAppMaster and for each inputSplit (in our case, Kudu tablet) will spawn one Mapper
+ * with a TableRecordReader reading one Tablet.
+ *
+ * Therefore, total number of Kudu clients opened over the course of a MR application can be
+ * estimated by (#Tablets +1). To reduce the number of concurrent open clients, it might be
+ * advisable to restrict resources of the MR application or implement the
+ * {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat} over this InputFormat.
  * </p>
  */
 @InterfaceAudience.Public
@@ -161,15 +176,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
       }
       return splits;
     } finally {
-      shutdownClient();
-    }
-  }
-
-  private void shutdownClient() throws IOException {
-    try {
-      client.shutdown();
-    } catch (Exception e) {
-      throw new IOException(e);
+      shutdownClient(client);
     }
   }
 
@@ -214,12 +221,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
 
     String tableName = conf.get(INPUT_TABLE_KEY);
     String masterAddresses = conf.get(MASTER_ADDRESSES_KEY);
-    this.operationTimeoutMs = conf.getLong(OPERATION_TIMEOUT_MS_KEY,
-                                           AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
-    this.client = new KuduClient.KuduClientBuilder(masterAddresses)
-                                .defaultOperationTimeoutMs(operationTimeoutMs)
-                                .build();
-    KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(client);
+    this.client = buildKuduClient();
     this.nameServer = conf.get(NAME_SERVER_KEY);
     this.cacheBlocks = conf.getBoolean(SCAN_CACHE_BLOCKS, false);
     this.isFaultTolerant = conf.getBoolean(FAULT_TOLERANT_SCAN, false);
@@ -263,6 +265,26 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
     }
   }
 
+  private KuduClient buildKuduClient() {
+
+    String masterAddresses = conf.get(MASTER_ADDRESSES_KEY);
+    this.operationTimeoutMs = conf.getLong(OPERATION_TIMEOUT_MS_KEY,
+        AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS);
+    KuduClient kuduClient = new KuduClient.KuduClientBuilder(masterAddresses)
+        .defaultOperationTimeoutMs(operationTimeoutMs)
+        .build();
+    KuduTableMapReduceUtil.importCredentialsFromCurrentSubject(kuduClient);
+    return kuduClient;
+  }
+
+  private void shutdownClient(KuduClient kuduClient) throws IOException {
+    try {
+      kuduClient.shutdown();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
   /**
    * Given a PTR string generated via reverse DNS lookup, return everything
    * except the trailing period. Example for host.example.com., return
@@ -384,6 +406,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
     private RowResultIterator iterator;
     private KuduScanner scanner;
     private TableSplit split;
+    private KuduClient kuduClient;
 
     @Override
     public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
@@ -393,9 +416,10 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
       }
 
       split = (TableSplit) inputSplit;
+      kuduClient = buildKuduClient();
       LOG.debug("Creating scanner for token: {}",
-                KuduScanToken.stringifySerializedToken(split.getScanToken(), client));
-      scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(), client);
+                KuduScanToken.stringifySerializedToken(split.getScanToken(), kuduClient));
+      scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(), kuduClient);
 
       // Calling this now to set iterator.
       tryRefreshIterator();
@@ -452,7 +476,7 @@ public class KuduTableInputFormat extends InputFormat<NullWritable, RowResult>
       } catch (Exception e) {
         throw new IOException(e);
       }
-      shutdownClient();
+      shutdownClient(kuduClient);
     }
   }
 }


[2/3] kudu git commit: [consensus] add test for non-voters in consensus queue

Posted by mp...@apache.org.
[consensus] add test for non-voters in consensus queue

Added an integration test to verify that the consensus queue correctly
distinguishes between voter and non-voter replica acknowledgements
of the Raft consensus messages.

The test is currently disabled because it fails: currently, the
consensus queue does not differentiate between Raft acknowledgement
messages sent by voter and non-voter replicas.

Change-Id: I16382b75256a32d695d8bc9ce020f40276114511
Reviewed-on: http://gerrit.cloudera.org:8080/8867
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/710b238e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/710b238e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/710b238e

Branch: refs/heads/master
Commit: 710b238e775ae5d4b59cd0664e05972fcc0a7b2d
Parents: 7f4157c
Author: Alexey Serbin <as...@cloudera.com>
Authored: Mon Dec 18 15:56:24 2017 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Mon Jan 8 19:38:08 2018 +0000

----------------------------------------------------------------------
 .../raft_consensus_nonvoter-itest.cc            | 108 +++++++++++++++++++
 1 file changed, 108 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/710b238e/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
index 9c9121c..740c559 100644
--- a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
@@ -77,6 +77,7 @@ using kudu::consensus::RaftPeerPB;
 using kudu::consensus::IsRaftConfigMember;
 using kudu::consensus::IsRaftConfigVoter;
 using kudu::itest::AddServer;
+using kudu::itest::GetConsensusState;
 using kudu::itest::GetInt64Metric;
 using kudu::itest::GetTableLocations;
 using kudu::itest::GetTabletLocations;
@@ -1715,5 +1716,112 @@ TEST_F(RaftConsensusNonVoterITest, RestartClusterWithNonVoter) {
       << "replacement replica UUID: " << new_replica_uuid;
 }
 
+// This test verifies that the consensus queue correctly distinguishes between
+// voter and non-voter acknowledgements of the Raft messages. Essentially, the
+// leader replica's consensus queue should not count an ack message from
+// a non-voter replica as if it was sent by a voter replica.
+//
+// The test scenario is simple: try to make a configuration change in a 3 voter
+// Raft cluster, adding a new non-voter replica, when a majority of voters
+// is not online. Make sure the configuration change is not committed.
+TEST_F(RaftConsensusNonVoterITest, DISABLED_NonVoterReplicasInConsensusQueue) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(60);
+  const int kOriginalReplicasNum = 3;
+  const int kHbIntervalMs = 50;
+  const vector<string> kMasterFlags = {
+    // Don't evict excess replicas to avoid races in the scenario.
+    "--catalog_manager_evict_excess_replicas=false",
+  };
+  const vector<string> kTserverFlags = {
+    Substitute("--raft_heartbeat_interval_ms=$0", kHbIntervalMs),
+  };
+
+  FLAGS_num_replicas = kOriginalReplicasNum;
+  FLAGS_num_tablet_servers = kOriginalReplicasNum + 1;
+  NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+  ASSERT_EQ(kOriginalReplicasNum + 1, tablet_servers_.size());
+
+  const string& tablet_id = tablet_id_;
+  TabletServerMap replica_servers;
+  for (const auto& e : tablet_replicas_) {
+    if (e.first == tablet_id) {
+      replica_servers.emplace(e.second->uuid(), e.second);
+    }
+  }
+  ASSERT_EQ(FLAGS_num_replicas, replica_servers.size());
+
+  ASSERT_OK(WaitForServersToAgree(kTimeout, replica_servers, tablet_id, 1));
+
+  TServerDetails* new_replica = nullptr;
+  for (const auto& ts : tablet_servers_) {
+    if (replica_servers.find(ts.first) == replica_servers.end()) {
+      new_replica = ts.second;
+      break;
+    }
+  }
+  ASSERT_NE(nullptr, new_replica);
+
+  // Disable failure detection for all replicas.
+  for (int i = 0; i < cluster_->num_tablet_servers(); ++i) {
+    ExternalTabletServer* ts = cluster_->tablet_server(i);
+    ASSERT_OK(cluster_->SetFlag(ts,
+        "enable_leader_failure_detection", "false"));
+  }
+
+  // Pause all but the leader replica and try to add a new non-voter into the
+  // configuration. It should not pass.
+  TServerDetails* leader;
+  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
+
+  const auto do_resume = [&] {
+    for (auto& e : replica_servers) {
+      const auto& uuid = e.first;
+      if (uuid == leader->uuid()) {
+        continue;
+      }
+      ExternalTabletServer* ts = cluster_->tablet_server_by_uuid(uuid);
+      ASSERT_OK(ts->Resume());
+    }
+  };
+  auto resumer = MakeScopedCleanup([&] {
+    do_resume();
+  });
+
+  for (auto& e : replica_servers) {
+    const auto& uuid = e.first;
+    if (uuid == leader->uuid()) {
+      continue;
+    }
+    ExternalTabletServer* ts = cluster_->tablet_server_by_uuid(uuid);
+    ASSERT_OK(ts->Pause());
+  }
+
+  const Status s = AddServer(leader, tablet_id, new_replica,
+                             RaftPeerPB::NON_VOTER, kTimeout);
+  EXPECT_FALSE(s.ok()) << s.ToString();
+
+  NO_FATALS(do_resume());
+  resumer.cancel();
+
+  // Verify that the configuration hasn't changed.
+  consensus::ConsensusStatePB cstate;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate));
+    ASSERT_FALSE(cstate.has_pending_config());
+  });
+  const auto& new_replica_uuid = new_replica->uuid();
+  EXPECT_FALSE(IsRaftConfigMember(new_replica_uuid, cstate.committed_config()))
+      << pb_util::SecureDebugString(cstate.committed_config())
+      << "new non-voter replica UUID: " << new_replica_uuid;
+  EXPECT_EQ(kOriginalReplicasNum, cstate.committed_config().peers_size());
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+}
+
 }  // namespace tserver
 }  // namespace kudu


[3/3] kudu git commit: consensus: Fix NON_VOTER ack-counting bug

Posted by mp...@apache.org.
consensus: Fix NON_VOTER ack-counting bug

This patch fixes an issue where we were not differentiating between
replicating to voters and non-voters.

This enables the test written by Alexey and also makes some changes to
it. The test fails without this patch and passes with the patch.

Tests added:
* Added a unit test in consensus_queue-test
* Updated and enabled the system test in raft_consensus_nonvoter-itest

Change-Id: I13143e9bb4b76af3fd6dada28fcec05b27d24476
Reviewed-on: http://gerrit.cloudera.org:8080/8868
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1277f69a
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1277f69a
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1277f69a

Branch: refs/heads/master
Commit: 1277f69a1feb3715750552991bc19444282f652e
Parents: 710b238
Author: Mike Percy <mp...@apache.org>
Authored: Mon Dec 18 15:13:16 2017 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Mon Jan 8 19:38:17 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus-test-util.h        |  13 ++-
 src/kudu/consensus/consensus_peers-test.cc      |   1 +
 src/kudu/consensus/consensus_peers.cc           |   2 +-
 src/kudu/consensus/consensus_queue-test.cc      |  91 ++++++++++++++--
 src/kudu/consensus/consensus_queue.cc           | 105 +++++++++++++------
 src/kudu/consensus/consensus_queue.h            |  34 ++++--
 src/kudu/consensus/raft_consensus.cc            |   8 +-
 .../raft_consensus_nonvoter-itest.cc            |  63 ++++++-----
 8 files changed, 231 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index 409841b..9cb0473 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -80,6 +80,7 @@ inline gscoped_ptr<ReplicateMsg> CreateDummyReplicate(int64_t term,
 inline RaftPeerPB FakeRaftPeerPB(const std::string& uuid) {
   RaftPeerPB peer_pb;
   peer_pb.set_permanent_uuid(uuid);
+  peer_pb.set_member_type(RaftPeerPB::VOTER);
   peer_pb.mutable_last_known_addr()->set_host(strings::Substitute(
       "$0-fake-hostname", CURRENT_TEST_NAME()));
   peer_pb.mutable_last_known_addr()->set_port(0);
@@ -107,9 +108,9 @@ inline void AppendReplicateMessagesToQueue(
 }
 
 // Builds a configuration of 'num' voters.
-inline RaftConfigPB BuildRaftConfigPBForTests(int num) {
+inline RaftConfigPB BuildRaftConfigPBForTests(int num_voters, int num_non_voters = 0) {
   RaftConfigPB raft_config;
-  for (int i = 0; i < num; i++) {
+  for (int i = 0; i < num_voters; i++) {
     RaftPeerPB* peer_pb = raft_config.add_peers();
     peer_pb->set_member_type(RaftPeerPB::VOTER);
     peer_pb->set_permanent_uuid(strings::Substitute("peer-$0", i));
@@ -117,6 +118,14 @@ inline RaftConfigPB BuildRaftConfigPBForTests(int num) {
     hp->set_host(strings::Substitute("peer-$0.fake-domain-for-tests", i));
     hp->set_port(0);
   }
+  for (int i = 0; i < num_non_voters; i++) {
+    RaftPeerPB* peer_pb = raft_config.add_peers();
+    peer_pb->set_member_type(RaftPeerPB::NON_VOTER);
+    peer_pb->set_permanent_uuid(strings::Substitute("non-voter-peer-$0", i));
+    HostPortPB* hp = peer_pb->mutable_last_known_addr();
+    hp->set_host(strings::Substitute("non-voter-peer-$0.fake-domain-for-tests", i));
+    hp->set_port(0);
+  }
   return raft_config;
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_peers-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index 7f4eb57..f10cb34 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -126,6 +126,7 @@ class ConsensusPeersTest : public KuduTest {
       shared_ptr<Peer>* peer) {
     RaftPeerPB peer_pb;
     peer_pb.set_permanent_uuid(peer_name);
+    peer_pb.set_member_type(RaftPeerPB::VOTER);
     auto proxy_ptr = new DelayablePeerProxy<NoOpTestPeerProxy>(
         raft_pool_.get(), new NoOpTestPeerProxy(raft_pool_.get(), peer_pb));
     gscoped_ptr<PeerProxy> proxy(proxy_ptr);

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index ffa373d..6d68528 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -142,7 +142,7 @@ Peer::Peer(RaftPeerPB peer_pb,
 Status Peer::Init() {
   {
     std::lock_guard<simple_spinlock> l(peer_lock_);
-    queue_->TrackPeer(peer_pb_.permanent_uuid());
+    queue_->TrackPeer(peer_pb_);
   }
 
   // Capture a weak_ptr reference into the functor so it can safely handle

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_queue-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 62a2345..c8dc086 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -126,6 +126,14 @@ class ConsensusQueueTest : public KuduTest {
                                                           payload_size).release()));
   }
 
+  RaftPeerPB MakePeer(const std::string& peer_uuid,
+                      RaftPeerPB::MemberType member_type) {
+    RaftPeerPB peer_pb;
+    *peer_pb.mutable_permanent_uuid() = peer_uuid;
+    peer_pb.set_member_type(member_type);
+    return peer_pb;
+  }
+
   // Updates the peer's watermark in the queue so that it matches
   // the operation we want, since the queue always assumes that
   // when a peer gets tracked it's always tracked starting at the
@@ -137,7 +145,7 @@ class ConsensusQueueTest : public KuduTest {
                                int last_committed_idx,
                                bool* more_pending) {
 
-    queue_->TrackPeer(kPeerUuid);
+    queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
     response->set_responder_uuid(kPeerUuid);
 
     // Ask for a request. The queue assumes the peer is up-to-date so
@@ -406,10 +414,10 @@ TEST_F(ConsensusQueueTest, TestPeersDontAckBeyondWatermarks) {
 TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
   queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, BuildRaftConfigPBForTests(5));
   // Track 4 additional peers (in addition to the local peer)
-  queue_->TrackPeer("peer-1");
-  queue_->TrackPeer("peer-2");
-  queue_->TrackPeer("peer-3");
-  queue_->TrackPeer("peer-4");
+  queue_->TrackPeer(MakePeer("peer-1", RaftPeerPB::VOTER));
+  queue_->TrackPeer(MakePeer("peer-2", RaftPeerPB::VOTER));
+  queue_->TrackPeer(MakePeer("peer-3", RaftPeerPB::VOTER));
+  queue_->TrackPeer(MakePeer("peer-4", RaftPeerPB::VOTER));
 
   // Append 10 messages to the queue.
   // This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue.
@@ -479,6 +487,68 @@ TEST_F(ConsensusQueueTest, TestQueueAdvancesCommittedIndex) {
   ASSERT_EQ(queue_->GetAllReplicatedIndex(), 5);
 }
 
+// Ensure that the acks for a non-voter don't count toward the majority.
+TEST_F(ConsensusQueueTest, TestNonVoterAcksDontCountTowardMajority) {
+  const auto kOtherVoterPeer = "peer-1";
+  const auto kNonVoterPeer = "non-voter-peer-0";
+
+  // 1. Add a non-voter to the config where there are 2 voters.
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm,
+                        BuildRaftConfigPBForTests(/*num_voters=*/ 2,
+                                                  /*num_non_voters=*/ 1));
+  // Track 2 additional peers (in addition to the local peer)
+  queue_->TrackPeer(MakePeer(kOtherVoterPeer, RaftPeerPB::VOTER));
+  queue_->TrackPeer(MakePeer(kNonVoterPeer, RaftPeerPB::NON_VOTER));
+
+  // 2. Add some writes. Only the local leader immediately acks them, which is
+  // not enough to commit in a 2-voter + 1 non-voter config.
+  //
+  // Append 10 messages to the queue.
+  // This should add messages 0.1 -> 0.7, 1.8 -> 1.10 to the queue.
+  const int kNumMessages = 10;
+  AppendReplicateMessagesToQueue(queue_.get(), clock_,
+                                 /*first=*/ 1, /*count=*/ kNumMessages);
+  WaitForLocalPeerToAckIndex(kNumMessages);
+
+  // Since only the local log has acked at this point, the committed_index
+  // should be 0.
+  const int64_t kNoneCommittedIndex = 0;
+  ASSERT_EQ(kNoneCommittedIndex, queue_->GetCommittedIndex());
+
+  // 3. Ack the operations from the NON_VOTER peer. The writes will not have
+  // been committed yet, because the 2nd VOTER has not yet acked them.
+  ConsensusResponsePB response;
+  response.set_responder_uuid(kNonVoterPeer);
+  const int64_t kCurrentTerm = 1;
+  response.set_responder_term(kCurrentTerm);
+  SetLastReceivedAndLastCommitted(&response,
+                                  /*last_received=*/ MakeOpId(kCurrentTerm, kNumMessages),
+                                  /*last_committed_idx=*/ kNoneCommittedIndex);
+
+  bool more_pending;
+  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+  ASSERT_FALSE(more_pending);
+
+  // Committed index should be the same.
+  ASSERT_EQ(kNoneCommittedIndex, queue_->GetCommittedIndex());
+
+  // 4. Send an identical ack from the 2nd VOTER peer. This should cause the
+  // operation to be committed.
+  response.set_responder_uuid(kOtherVoterPeer);
+  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+  ASSERT_TRUE(more_pending); // The committed index has increased.
+
+  // The committed index should include the full set of ops now.
+  ASSERT_EQ(kNumMessages, queue_->GetCommittedIndex());
+
+  SetLastReceivedAndLastCommitted(&response,
+                                  /*last_received=*/ MakeOpId(kCurrentTerm, kNumMessages),
+                                  /*last_committed_idx=*/ kNumMessages);
+
+  queue_->ResponseFromPeer(response.responder_uuid(), response, &more_pending);
+  ASSERT_FALSE(more_pending);
+}
+
 // In this test we append a sequence of operations to a log
 // and then start tracking a peer whose first required operation
 // is before the first operation in the queue.
@@ -588,7 +658,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
   response.set_responder_uuid(kPeerUuid);
   bool more_pending = false;
 
-  queue_->TrackPeer(kPeerUuid);
+  queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
 
   // Ask for a request. The queue assumes the peer is up-to-date so
   // this should contain no operations.
@@ -653,7 +723,7 @@ TEST_F(ConsensusQueueTest, TestQueueHandlesOperationOverwriting) {
 // operations, which would cause a check failure on the write immediately
 // following the overwriting write.
 TEST_F(ConsensusQueueTest, TestQueueMovesWatermarksBackward) {
-  queue_->SetNonLeaderMode();
+  queue_->SetNonLeaderMode(BuildRaftConfigPBForTests(3));
   // Append a bunch of messages and update as if they were also appeneded to the leader.
   queue_->UpdateLastIndexAppendedToLeader(10);
   AppendReplicateMessagesToQueue(queue_.get(), clock_, 1, 10);
@@ -820,7 +890,7 @@ TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
   ConsensusRequestPB request;
   ConsensusResponsePB response;
   response.set_responder_uuid(kPeerUuid);
-  queue_->TrackPeer(kPeerUuid);
+  queue_->TrackPeer(MakePeer(kPeerUuid, RaftPeerPB::VOTER));
 
   // Create request for new peer.
   vector<ReplicateRefPtr> refs;
@@ -848,7 +918,7 @@ TEST_F(ConsensusQueueTest, TestTriggerTabletCopyIfTabletNotFound) {
 }
 
 TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
-  queue_->SetNonLeaderMode();
+  queue_->SetNonLeaderMode(BuildRaftConfigPBForTests(3));
 
   // Emulate a follower sending a request to replicate 10 messages.
   queue_->UpdateLastIndexAppendedToLeader(10);
@@ -861,7 +931,8 @@ TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
 
   // Update the committed index. In real life, this would be done by the consensus
   // implementation when it receives an updated committed index from the leader.
-  queue_->UpdateFollowerWatermarks(10, 10);
+  queue_->UpdateFollowerWatermarks(/*committed_index=*/ 10,
+                                   /*all_replicated_index=*/ 10);
   ASSERT_EQ(10, queue_->GetCommittedIndex());
 
   // Check the metrics have the right values based on the updated committed index.

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 6d09b33..f6d0db1 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -123,7 +123,7 @@ const char* PeerStatusToString(PeerStatus p) {
 std::string PeerMessageQueue::TrackedPeer::ToString() const {
   return Substitute("Peer: $0, Status: $1, Last received: $2, Next index: $3, "
                     "Last known committed idx: $4, Time since last communication: $5",
-                    uuid,
+                    SecureShortDebugString(peer_pb),
                     PeerStatusToString(last_exchange_status),
                     OpIdToString(last_received), next_index,
                     last_known_committed_index,
@@ -170,7 +170,6 @@ PeerMessageQueue::PeerMessageQueue(scoped_refptr<MetricEntity> metric_entity,
   queue_state_.state = kQueueOpen;
   // TODO(mpercy): Merge LogCache::Init() with its constructor.
   log_cache_.Init(queue_state_.last_appended);
-  TrackPeer(local_peer_pb_.permanent_uuid());
 }
 
 void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
@@ -186,49 +185,53 @@ void PeerMessageQueue::SetLeaderMode(int64_t committed_index,
   queue_state_.committed_index = committed_index;
   queue_state_.majority_replicated_index = committed_index;
   queue_state_.active_config.reset(new RaftConfigPB(active_config));
-  CHECK(IsRaftConfigVoter(local_peer_pb_.permanent_uuid(), *queue_state_.active_config))
-      << SecureShortDebugString(local_peer_pb_) << " not a voter in config: "
-      << SecureShortDebugString(*queue_state_.active_config);
   queue_state_.majority_size_ = MajoritySize(CountVoters(*queue_state_.active_config));
   queue_state_.mode = LEADER;
 
-  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: "
-      << queue_state_.ToString();
+  TrackLocalPeerUnlocked();
   CheckPeersInActiveConfigIfLeaderUnlocked();
 
+  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to LEADER mode. State: "
+                                 << queue_state_.ToString();
+
   // Reset last communication time with all peers to reset the clock on the
   // failure timeout.
-  MonoTime now(MonoTime::Now());
+  const auto now = MonoTime::Now();
   for (const PeersMap::value_type& entry : peers_map_) {
     entry.second->last_communication_time = now;
   }
   time_manager_->SetLeaderMode();
 }
 
-void PeerMessageQueue::SetNonLeaderMode() {
+void PeerMessageQueue::SetNonLeaderMode(const RaftConfigPB& active_config) {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
-  queue_state_.active_config.reset();
+  queue_state_.active_config.reset(new RaftConfigPB(active_config));
   queue_state_.mode = NON_LEADER;
   queue_state_.majority_size_ = -1;
 
   // Update this when stepping down, since it doesn't get tracked as LEADER.
   queue_state_.last_idx_appended_to_leader = queue_state_.last_appended.index();
+
+  TrackLocalPeerUnlocked();
+
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Queue going to NON_LEADER mode. State: "
-      << queue_state_.ToString();
+                                 << queue_state_.ToString();
+
   time_manager_->SetNonLeaderMode();
 }
 
-void PeerMessageQueue::TrackPeer(const string& uuid) {
+void PeerMessageQueue::TrackPeer(const RaftPeerPB& peer_pb) {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
-  TrackPeerUnlocked(uuid);
+  TrackPeerUnlocked(peer_pb);
 }
 
-void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
-  CHECK(!uuid.empty()) << "Got request to track peer with empty UUID";
+void PeerMessageQueue::TrackPeerUnlocked(const RaftPeerPB& peer_pb) {
+  CHECK(!peer_pb.permanent_uuid().empty()) << SecureShortDebugString(peer_pb);
+  CHECK(peer_pb.has_member_type()) << SecureShortDebugString(peer_pb);
   DCHECK(queue_lock_.is_locked());
   DCHECK_EQ(queue_state_.state, kQueueOpen);
 
-  TrackedPeer* tracked_peer = new TrackedPeer(uuid);
+  TrackedPeer* tracked_peer = new TrackedPeer(peer_pb);
   // We don't know the last operation received by the peer so, following the
   // Raft protocol, we set next_index to one past the end of our own log. This
   // way, if calling this method is the result of a successful leader election
@@ -238,7 +241,7 @@ void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
   // does not have a log that matches ours, the normal queue negotiation
   // process will eventually find the right point to resume from.
   tracked_peer->next_index = queue_state_.last_appended.index() + 1;
-  InsertOrDie(&peers_map_, uuid, tracked_peer);
+  InsertOrDie(&peers_map_, tracked_peer->uuid(), tracked_peer);
 
   CheckPeersInActiveConfigIfLeaderUnlocked();
 
@@ -249,10 +252,39 @@ void PeerMessageQueue::TrackPeerUnlocked(const string& uuid) {
 
 void PeerMessageQueue::UntrackPeer(const string& uuid) {
   std::lock_guard<simple_spinlock> lock(queue_lock_);
+  UntrackPeerUnlocked(uuid);
+}
+
+void PeerMessageQueue::UntrackPeerUnlocked(const string& uuid) {
+  DCHECK(queue_lock_.is_locked());
   TrackedPeer* peer = EraseKeyReturnValuePtr(&peers_map_, uuid);
-  if (peer != nullptr) {
-    delete peer;
+  delete peer; // Deleting a nullptr is safe.
+}
+
+void PeerMessageQueue::TrackLocalPeerUnlocked() {
+  DCHECK(queue_lock_.is_locked());
+  RaftPeerPB* local_peer_in_config;
+  Status s = GetRaftConfigMember(queue_state_.active_config.get(),
+                                 local_peer_pb_.permanent_uuid(),
+                                 &local_peer_in_config);
+  auto local_copy = local_peer_pb_;
+  if (!s.ok()) {
+    // The local peer is not a member of the config. The queue requires the
+    // 'member_type' field to be set for any tracked peer, so we explicitly
+    // mark the local peer as a NON_VOTER. This case is only possible when the
+    // local peer is not the leader, so the choice is not particularly
+    // important, but NON_VOTER is the most reasonable option.
+    local_copy.set_member_type(RaftPeerPB::NON_VOTER);
+    local_peer_in_config = &local_copy;
+  }
+  CHECK(local_peer_in_config->member_type() == RaftPeerPB::VOTER ||
+        queue_state_.mode != LEADER)
+      << "local peer " << local_peer_pb_.permanent_uuid()
+      << " is not a voter in config: " << queue_state_.ToString();
+  if (ContainsKey(peers_map_, local_peer_pb_.permanent_uuid())) {
+    UntrackPeerUnlocked(local_peer_pb_.permanent_uuid());
   }
+  TrackPeerUnlocked(*local_peer_in_config);
 }
 
 unordered_map<string, HealthReportPB> PeerMessageQueue::ReportHealthOfPeers() const {
@@ -471,15 +503,15 @@ void PeerMessageQueue::UpdatePeerHealthUnlocked(TrackedPeer* peer) {
   string error_msg;
   if (overall_health_status == HealthReportPB::FAILED) {
     if (peer->last_exchange_status == PeerStatus::TABLET_FAILED) {
-      error_msg = Substitute("The tablet replica hosted on peer $0 has failed", peer->uuid);
+      error_msg = Substitute("The tablet replica hosted on peer $0 has failed", peer->uuid());
     } else if (!peer->wal_catchup_possible) {
       error_msg = Substitute("The logs necessary to catch up peer $0 have been "
                              "garbage collected. The replica will never be able "
-                             "to catch up", peer->uuid);
+                             "to catch up", peer->uuid());
     } else {
       error_msg = Substitute("Leader has been unable to successfully communicate "
                              "with peer $0 for more than $1 seconds ($2)",
-                             peer->uuid,
+                             peer->uuid(),
                              FLAGS_follower_unavailable_considered_failed_sec,
                              (MonoTime::Now() - peer->last_communication_time).ToString());
     }
@@ -499,8 +531,8 @@ void PeerMessageQueue::UpdatePeerHealthUnlocked(TrackedPeer* peer) {
     }
   } else {
     if (overall_health_status == HealthReportPB::FAILED &&
-        SafeToEvictUnlocked(peer->uuid)) {
-      NotifyObserversOfFailedFollower(peer->uuid, queue_state_.current_term, error_msg);
+        SafeToEvictUnlocked(peer->uuid())) {
+      NotifyObserversOfFailedFollower(peer->uuid(), queue_state_.current_term, error_msg);
     }
   }
 }
@@ -711,6 +743,7 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
                                              const OpId& replicated_before,
                                              const OpId& replicated_after,
                                              int num_peers_required,
+                                             ReplicaTypes replica_types,
                                              const TrackedPeer* who_caused) {
 
   if (VLOG_IS_ON(2)) {
@@ -729,7 +762,11 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
   //   will be the new 'watermark'.
   vector<int64_t> watermarks;
   for (const PeersMap::value_type& peer : peers_map_) {
-    // TODO: The fact that we only consider peers whose last exchange was
+    if (replica_types == VOTER_REPLICAS &&
+        peer.second->peer_pb.member_type() != RaftPeerPB::VOTER) {
+      continue;
+    }
+    // TODO(todd): The fact that we only consider peers whose last exchange was
     // successful can cause the "all_replicated" watermark to lag behind
     // farther than necessary. For example:
     // - local peer has replicated opid 100
@@ -922,7 +959,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
         // in the TrackedPeer data structure. The downside is that we'd end up
         // with multiple sources of truth that would need to be kept in sync.
         Status s = GetRaftConfigMember(DCHECK_NOTNULL(queue_state_.active_config.get()),
-                                       peer->uuid, &peer_pb);
+                                       peer->uuid(), &peer_pb);
         if (s.ok() &&
             peer_pb &&
             peer_pb->member_type() == RaftPeerPB::NON_VOTER &&
@@ -936,7 +973,7 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
           // committed config's opid_index because if we're in the middle of a
           // config change, this requested config change will be rejected
           // anyway.
-          NotifyObserversOfPeerToPromote(peer->uuid,
+          NotifyObserversOfPeerToPromote(peer->uuid(),
                                          queue_state_.current_term,
                                          queue_state_.active_config->opid_index());
         }
@@ -1022,17 +1059,19 @@ void PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       // Advance the majority replicated index.
       AdvanceQueueWatermark("majority_replicated",
                             &queue_state_.majority_replicated_index,
-                            previous.last_received,
-                            peer->last_received,
-                            queue_state_.majority_size_,
+                            /*replicated_before=*/ previous.last_received,
+                            /*replicated_after=*/ peer->last_received,
+                            /*num_peers_required=*/ queue_state_.majority_size_,
+                            VOTER_REPLICAS,
                             peer);
 
       // Advance the all replicated index.
       AdvanceQueueWatermark("all_replicated",
                             &queue_state_.all_replicated_index,
-                            previous.last_received,
-                            peer->last_received,
-                            peers_map_.size(),
+                            /*replicated_before=*/ previous.last_received,
+                            /*replicated_after=*/ peer->last_received,
+                            /*num_peers_required=*/ peers_map_.size(),
+                            ALL_REPLICAS,
                             peer);
 
       // If the majority-replicated index is in our current term,

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 59d0f19..0808c69 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -113,8 +113,8 @@ const char* PeerStatusToString(PeerStatus p);
 class PeerMessageQueue {
  public:
   struct TrackedPeer {
-    explicit TrackedPeer(std::string uuid)
-        : uuid(std::move(uuid)),
+    explicit TrackedPeer(RaftPeerPB peer_pb)
+        : peer_pb(std::move(peer_pb)),
           next_index(kInvalidOpIdIndex),
           last_received(MinimumOpId()),
           last_known_committed_index(MinimumOpId().index()),
@@ -136,10 +136,13 @@ class PeerMessageQueue {
       last_seen_term_ = term;
     }
 
+    const std::string& uuid() const {
+      return peer_pb.permanent_uuid();
+    }
+
     std::string ToString() const;
 
-    // UUID of the peer.
-    std::string uuid;
+    RaftPeerPB peer_pb;
 
     // Next index to send to the peer.
     // This corresponds to "nextIndex" as specified in Raft.
@@ -212,10 +215,10 @@ class PeerMessageQueue {
   // be tracked so that the cache is only evicted when the peers no longer need
   // the operations but the queue will no longer advance the majority replicated
   // index or notify observers of its advancement.
-  void SetNonLeaderMode();
+  void SetNonLeaderMode(const RaftConfigPB& active_config);
 
   // Makes the queue track this peer.
-  void TrackPeer(const std::string& uuid);
+  void TrackPeer(const RaftPeerPB& peer_pb);
 
   // Makes the queue untrack this peer.
   void UntrackPeer(const std::string& uuid);
@@ -378,6 +381,12 @@ class PeerMessageQueue {
     kQueueClosed
   };
 
+  // Types of replicas to count when advancing a queue watermark.
+  enum ReplicaTypes {
+    ALL_REPLICAS,
+    VOTER_REPLICAS,
+  };
+
   struct QueueState {
 
     // The first operation that has been replicated to all currently
@@ -485,7 +494,13 @@ class PeerMessageQueue {
   // 'preceding_first_op_in_queue_' if the queue is empty.
   const OpId& GetLastOp() const;
 
-  void TrackPeerUnlocked(const std::string& uuid);
+  void TrackPeerUnlocked(const RaftPeerPB& peer_pb);
+
+  void UntrackPeerUnlocked(const std::string& uuid);
+
+  // We need the local peer in the config because it contains the current
+  // 'member_type' of the local node while 'local_peer_pb_' does not.
+  void TrackLocalPeerUnlocked();
 
   // Checks that if the queue is in LEADER mode then all registered peers are
   // in the active config. Crashes with a FATAL log message if this invariant
@@ -498,11 +513,16 @@ class PeerMessageQueue {
                                const Status& status);
 
   // Advances 'watermark' to the smallest op that 'num_peers_required' have.
+  // If 'replica_types' is set to VOTER_REPLICAS, the 'num_peers_required' is
+  // interpreted as "number of voters required". If 'replica_types' is set to
+  // ALL_REPLICAS, 'num_peers_required' counts any peer, regardless of its
+  // voting status.
   void AdvanceQueueWatermark(const char* type,
                              int64_t* watermark,
                              const OpId& replicated_before,
                              const OpId& replicated_after,
                              int num_peers_required,
+                             ReplicaTypes replica_types,
                              const TrackedPeer* who_caused);
 
   std::vector<PeerMessageQueueObserver*> observers_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index cb3118e..0892f9f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -604,12 +604,12 @@ Status RaftConsensus::BecomeReplicaUnlocked(boost::optional<MonoDelta> fd_delta)
   // Now that we're a replica, we can allow voting for other nodes.
   withhold_votes_until_ = MonoTime::Min();
 
+  // Deregister ourselves from the queue. We no longer need to track what gets
+  // replicated since we're stepping down.
   queue_->UnRegisterObserver(this);
-  // Deregister ourselves from the queue. We don't care what get's replicated, since
-  // we're stepping down.
-  queue_->SetNonLeaderMode();
-
+  queue_->SetNonLeaderMode(cmeta_->ActiveConfig());
   peer_manager_->Close();
+
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/1277f69a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
index 740c559..f0350dc 100644
--- a/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus_nonvoter-itest.cc
@@ -20,6 +20,7 @@
 #include <numeric>
 #include <ostream>
 #include <string>
+#include <thread>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -1724,7 +1725,7 @@ TEST_F(RaftConsensusNonVoterITest, RestartClusterWithNonVoter) {
 // The test scenario is simple: try to make a configuration change in a 3 voter
 // Raft cluster, adding a new non-voter replica, when a majority of voters
 // is not online. Make sure the configuration change is not committed.
-TEST_F(RaftConsensusNonVoterITest, DISABLED_NonVoterReplicasInConsensusQueue) {
+TEST_F(RaftConsensusNonVoterITest, NonVoterReplicasInConsensusQueue) {
   if (!AllowSlowTests()) {
     LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
     return;
@@ -1775,50 +1776,54 @@ TEST_F(RaftConsensusNonVoterITest, DISABLED_NonVoterReplicasInConsensusQueue) {
 
   // Pause all but the leader replica and try to add a new non-voter into the
   // configuration. It should not pass.
+  LOG(INFO) << "Getting leader replica...";
   TServerDetails* leader;
   ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id, &leader));
 
-  const auto do_resume = [&] {
-    for (auto& e : replica_servers) {
-      const auto& uuid = e.first;
-      if (uuid == leader->uuid()) {
-        continue;
-      }
-      ExternalTabletServer* ts = cluster_->tablet_server_by_uuid(uuid);
-      ASSERT_OK(ts->Resume());
-    }
-  };
-  auto resumer = MakeScopedCleanup([&] {
-    do_resume();
-  });
-
+  LOG(INFO) << "Shutting down non-leader replicas...";
   for (auto& e : replica_servers) {
     const auto& uuid = e.first;
-    if (uuid == leader->uuid()) {
-      continue;
-    }
-    ExternalTabletServer* ts = cluster_->tablet_server_by_uuid(uuid);
-    ASSERT_OK(ts->Pause());
+    if (uuid == leader->uuid()) continue;
+    cluster_->tablet_server_by_uuid(uuid)->Shutdown();
   }
 
-  const Status s = AddServer(leader, tablet_id, new_replica,
-                             RaftPeerPB::NON_VOTER, kTimeout);
-  EXPECT_FALSE(s.ok()) << s.ToString();
-
-  NO_FATALS(do_resume());
-  resumer.cancel();
+  LOG(INFO) << "Adding NON_VOTER replica...";
+  std::thread t([&] {
+      AddServer(leader, tablet_id, new_replica, RaftPeerPB::NON_VOTER, kTimeout);
+  });
+  SCOPED_CLEANUP({ t.join(); });
 
   // Verify that the configuration hasn't changed.
+  LOG(INFO) << "Waiting for pending config...";
   consensus::ConsensusStatePB cstate;
   ASSERT_EVENTUALLY([&] {
     ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate));
-    ASSERT_FALSE(cstate.has_pending_config());
+    ASSERT_TRUE(cstate.has_pending_config());
   });
+
+  // Ensure it does not commit.
+  SleepFor(MonoDelta::FromSeconds(5));
+  ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate));
+  ASSERT_TRUE(cstate.has_pending_config());
+
   const auto& new_replica_uuid = new_replica->uuid();
-  EXPECT_FALSE(IsRaftConfigMember(new_replica_uuid, cstate.committed_config()))
+  ASSERT_FALSE(IsRaftConfigMember(new_replica_uuid, cstate.committed_config()))
       << pb_util::SecureDebugString(cstate.committed_config())
       << "new non-voter replica UUID: " << new_replica_uuid;
-  EXPECT_EQ(kOriginalReplicasNum, cstate.committed_config().peers_size());
+  ASSERT_EQ(kOriginalReplicasNum, cstate.committed_config().peers_size());
+
+  // Restart the tablet servers.
+  for (auto& e : replica_servers) {
+    const auto& uuid = e.first;
+    if (uuid == leader->uuid()) continue;
+    ASSERT_OK(cluster_->tablet_server_by_uuid(uuid)->Restart());
+  }
+
+  // Once the new replicas come back online, this should be committed.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(GetConsensusState(leader, tablet_id, kTimeout, &cstate));
+    ASSERT_FALSE(cstate.has_pending_config());
+  });
 
   NO_FATALS(cluster_->AssertNoCrashes());
 }