You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/10/17 01:10:32 UTC

[1/3] kudu git commit: [raft_consensus-itest] separate Raft election tests

Repository: kudu
Updated Branches:
  refs/heads/master 441408908 -> ab77ce025


http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/integration-tests/raft_consensus_election-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_election-itest.cc b/src/kudu/integration-tests/raft_consensus_election-itest.cc
new file mode 100644
index 0000000..ba4d538
--- /dev/null
+++ b/src/kudu/integration-tests/raft_consensus_election-itest.cc
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <ostream>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
+#include "kudu/integration-tests/raft_consensus-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_int64(client_inserts_per_thread);
+DECLARE_int64(client_num_batches_per_thread);
+DECLARE_int32(consensus_rpc_timeout_ms);
+DECLARE_int32(num_client_threads);
+DECLARE_int32(num_replicas);
+DECLARE_int32(num_tablet_servers);
+
+METRIC_DECLARE_entity(tablet);
+METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
+METRIC_DECLARE_gauge_int64(raft_term);
+
+using kudu::cluster::ExternalTabletServer;
+using kudu::consensus::RaftPeerPB;
+using kudu::itest::AddServer;
+using kudu::itest::GetReplicaStatusAndCheckIfLeader;
+using kudu::itest::LeaderStepDown;
+using kudu::itest::RemoveServer;
+using kudu::itest::StartElection;
+using kudu::itest::TabletServerMap;
+using kudu::itest::TServerDetails;
+using kudu::itest::WaitUntilLeader;
+using kudu::itest::WriteSimpleTestRow;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::tablet::TABLET_DATA_COPYING;
+using std::string;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tserver {
+
+static const int kTestRowKey = 1234;
+static const int kTestRowIntVal = 5678;
+
+class RaftConsensusElectionITest : public RaftConsensusITestBase {
+ protected:
+  void CreateClusterForChurnyElectionsTests(const vector<string>& extra_ts_flags);
+  void DoTestChurnyElections(TestWorkload* workload, int max_rows_to_insert);
+};
+
+void RaftConsensusElectionITest::CreateClusterForChurnyElectionsTests(
+    const vector<string>& extra_ts_flags) {
+  vector<string> ts_flags;
+
+#ifdef THREAD_SANITIZER
+  // On TSAN builds, we need to be a little bit less churny in order to make
+  // any progress at all.
+  ts_flags.push_back("--raft_heartbeat_interval_ms=5");
+#else
+  ts_flags.emplace_back("--raft_heartbeat_interval_ms=1");
+#endif
+
+  ts_flags.insert(ts_flags.end(), extra_ts_flags.cbegin(), extra_ts_flags.cend());
+
+  CreateCluster("raft_consensus-itest-cluster", ts_flags, {});
+}
+
+void RaftConsensusElectionITest::DoTestChurnyElections(TestWorkload* workload,
+                                                       int max_rows_to_insert) {
+  workload->set_num_replicas(FLAGS_num_replicas);
+  // Set a really high write timeout so that even in the presence of many failures we
+  // can verify an exact number of rows in the end, thanks to exactly once semantics.
+  workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
+  workload->set_num_write_threads(2);
+  workload->set_write_batch_size(1);
+  workload->Setup();
+  workload->Start();
+
+  // Run for either a prescribed number of writes, or 30 seconds,
+  // whichever comes first. This prevents test timeouts on slower
+  // build machines, TSAN builds, etc.
+  Stopwatch sw;
+  sw.start();
+  while (workload->rows_inserted() < max_rows_to_insert &&
+      sw.elapsed().wall_seconds() < 30) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+        NO_FATALS(AssertNoTabletServersCrashed());
+  }
+  workload->StopAndJoin();
+  ASSERT_GT(workload->rows_inserted(), 0) << "No rows inserted";
+
+  // Ensure that the replicas converge.
+  // We expect an exact result due to exactly once semantics and snapshot scans.
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+  NO_FATALS(v.CheckRowCount(workload->table_name(),
+                            ClusterVerifier::EXACTLY,
+                            workload->rows_inserted()));
+  NO_FATALS(AssertNoTabletServersCrashed());
+}
+
+TEST_F(RaftConsensusElectionITest, RunLeaderElection) {
+  // Reset consensus RPC timeout to the default value, otherwise the election
+  // might fail often, making the test flaky.
+  FLAGS_consensus_rpc_timeout_ms = 1000;
+  NO_FATALS(BuildAndStart());
+
+  int num_iters = AllowSlowTests() ? 10 : 1;
+
+  InsertTestRowsRemoteThread(0,
+                             FLAGS_client_inserts_per_thread * num_iters,
+                             FLAGS_client_num_batches_per_thread,
+                             {});
+
+  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters));
+
+  // Select the last follower to be new leader.
+  vector<TServerDetails*> followers;
+  GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
+
+  // Now shutdown the current leader.
+  TServerDetails* leader = DCHECK_NOTNULL(GetLeaderReplicaOrNull(tablet_id_));
+  ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid());
+  leader_ets->Shutdown();
+
+  TServerDetails* replica = followers.back();
+  CHECK_NE(leader->instance_id.permanent_uuid(), replica->instance_id.permanent_uuid());
+
+  // Make the new replica leader.
+  ASSERT_OK(StartElection(replica, tablet_id_, MonoDelta::FromSeconds(10)));
+
+  // Insert a bunch more rows.
+  InsertTestRowsRemoteThread(FLAGS_client_inserts_per_thread * num_iters,
+                             FLAGS_client_inserts_per_thread * num_iters,
+                             FLAGS_client_num_batches_per_thread,
+                             {});
+
+  // Restart the original replica and make sure they all agree.
+  ASSERT_OK(leader_ets->Restart());
+
+  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters * 2));
+}
+
+// This test sets all of the election timers to be very short, resulting
+// in a lot of churn. We expect to make some progress and not diverge or
+// crash, despite the frequent re-elections and races.
+TEST_F(RaftConsensusElectionITest, ChurnyElections) {
+  const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
+  CreateClusterForChurnyElectionsTests({});
+  TestWorkload workload(cluster_.get());
+  workload.set_write_batch_size(1);
+  workload.set_num_read_threads(2);
+  DoTestChurnyElections(&workload, kNumWrites);
+}
+
+// The same test, except inject artificial latency when propagating notifications
+// from the queue back to consensus. This previously reproduced bugs like KUDU-1078 which
+// normally only appear under high load.
+TEST_F(RaftConsensusElectionITest, ChurnyElections_WithNotificationLatency) {
+  CreateClusterForChurnyElectionsTests({"--consensus_inject_latency_ms_in_notifications=50"});
+  const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
+  TestWorkload workload(cluster_.get());
+  workload.set_write_batch_size(1);
+  workload.set_num_read_threads(2);
+  DoTestChurnyElections(&workload, kNumWrites);
+}
+
+// The same as TestChurnyElections except insert many duplicated rows.
+// This emulates cases where there are many duplicate keys which, due to two phase
+// locking, may cause deadlocks and other anomalies that cannot be observed when
+// keys are unique.
+TEST_F(RaftConsensusElectionITest, ChurnyElections_WithDuplicateKeys) {
+  CreateClusterForChurnyElectionsTests({});
+  const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
+  TestWorkload workload(cluster_.get());
+  workload.set_write_pattern(TestWorkload::INSERT_WITH_MANY_DUP_KEYS);
+  // Increase the number of rows per batch to get a higher chance of key collision.
+  workload.set_write_batch_size(3);
+  DoTestChurnyElections(&workload, kNumWrites);
+}
+
+// Test automatic leader election by killing leaders.
+TEST_F(RaftConsensusElectionITest, AutomaticLeaderElection) {
+  if (AllowSlowTests()) {
+    FLAGS_num_tablet_servers = 5;
+    FLAGS_num_replicas = 5;
+  }
+  NO_FATALS(BuildAndStart());
+
+  TServerDetails* leader;
+  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
+
+  unordered_set<TServerDetails*> killed_leaders;
+
+  const int kNumLeadersToKill = FLAGS_num_replicas / 2;
+  const int kFinalNumReplicas = FLAGS_num_replicas / 2 + 1;
+
+  for (int leaders_killed = 0; leaders_killed < kFinalNumReplicas; leaders_killed++) {
+    LOG(INFO) << Substitute("Writing data to leader of $0-node config ($1 alive)...",
+                            FLAGS_num_replicas, FLAGS_num_replicas - leaders_killed);
+
+    InsertTestRowsRemoteThread(leaders_killed * FLAGS_client_inserts_per_thread,
+                               FLAGS_client_inserts_per_thread,
+                               FLAGS_client_num_batches_per_thread,
+                               {});
+
+    // At this point, the writes are flushed but the commit index may not be
+    // propagated to all replicas. We kill the leader anyway.
+    if (leaders_killed < kNumLeadersToKill) {
+      LOG(INFO) << "Killing current leader " << leader->instance_id.permanent_uuid() << "...";
+      cluster_->tablet_server_by_uuid(leader->uuid())->Shutdown();
+      InsertOrDie(&killed_leaders, leader);
+
+      LOG(INFO) << "Waiting for new guy to be elected leader.";
+      ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
+    }
+  }
+
+  // Restart every node that was killed, and wait for the nodes to converge
+  for (TServerDetails* killed_node : killed_leaders) {
+    CHECK_OK(cluster_->tablet_server_by_uuid(killed_node->uuid())->Restart());
+  }
+  // Verify the data on the remaining replicas.
+  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * kFinalNumReplicas));
+}
+
+// Single-replica leader election test.
+TEST_F(RaftConsensusElectionITest, AutomaticLeaderElectionOneReplica) {
+  FLAGS_num_tablet_servers = 1;
+  FLAGS_num_replicas = 1;
+  NO_FATALS(BuildAndStart());
+  // Ensure that single-node Raft configs elect themselves as leader
+  // immediately upon Consensus startup.
+  ASSERT_OK(GetReplicaStatusAndCheckIfLeader(tablet_servers_[cluster_->tablet_server(0)->uuid()],
+                                             tablet_id_, MonoDelta::FromMilliseconds(500)));
+}
+
+TEST_F(RaftConsensusElectionITest, LeaderStepDown) {
+  const vector<string> kTsFlags = {
+    "--enable_leader_failure_detection=false"
+  };
+  const vector<string> kMasterFlags = {
+    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
+  };
+
+  FLAGS_num_replicas = 3;
+  FLAGS_num_tablet_servers = 3;
+  NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
+
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+
+  // Start with no leader.
+  Status s = GetReplicaStatusAndCheckIfLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10));
+  ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader yet: " << s.ToString();
+
+  // Become leader.
+  ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
+  ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
+  ASSERT_OK(WriteSimpleTestRow(tservers[0], tablet_id_, RowOperationsPB::INSERT,
+                               kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10)));
+  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 2));
+
+  // Step down and test that a 2nd stepdown returns the expected result.
+  ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
+  TabletServerErrorPB error;
+  s = LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(10), &error);
+  ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader anymore: " << s.ToString();
+  ASSERT_EQ(TabletServerErrorPB::NOT_THE_LEADER, error.code()) << SecureShortDebugString(error);
+
+  s = WriteSimpleTestRow(tservers[0], tablet_id_, RowOperationsPB::INSERT,
+                         kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10));
+  ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not accept writes as follower: "
+                                  << s.ToString();
+}
+
+// Test for KUDU-699: sets the consensus RPC timeout to be long,
+// and freezes both followers before asking the leader to step down.
+// Prior to fixing KUDU-699, the step-down process would block
+// until the pending requests timed out.
+TEST_F(RaftConsensusElectionITest, StepDownWithSlowFollower) {
+  const vector<string> kTsFlags = {
+    "--enable_leader_failure_detection=false",
+    // Bump up the RPC timeout, so that we can verify that the stepdown responds
+    // quickly even when an outbound request is hung.
+    "--consensus_rpc_timeout_ms=15000"
+  };
+  const vector<string> kMasterFlags = {
+    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
+  };
+
+  NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
+
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+  ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
+  ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
+
+  // Stop both followers.
+  for (int i = 1; i < 3; i++) {
+    ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[i]->uuid())->Pause());
+  }
+
+  // Sleep a little bit of time to make sure that the leader has outstanding heartbeats
+  // to the paused followers before requesting the stepdown.
+  SleepFor(MonoDelta::FromSeconds(1));
+
+  // Step down should respond quickly despite the hung requests.
+  ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(3)));
+}
+
+// Ensure that we can elect a server that is in the "pending" configuration.
+// This is required by the Raft protocol. See Diego Ongaro's PhD thesis, section
+// 4.1, where it states that "it is the caller’s configuration that is used in
+// reaching consensus, both for voting and for log replication".
+//
+// This test also tests the case where a node comes back from the dead to a
+// leader that was not in its configuration when it died. That should also work, i.e.
+// the revived node should accept writes from the new leader.
+TEST_F(RaftConsensusElectionITest, ElectPendingVoter) {
+  // Test plan:
+  //  1. Disable failure detection to avoid non-deterministic behavior.
+  //  2. Start with a configuration size of 5, all servers synced.
+  //  3. Remove one server from the configuration, wait until committed.
+  //  4. Pause the 3 remaining non-leaders (SIGSTOP).
+  //  5. Run a config change to add back the previously-removed server.
+  //     Ensure that, while the op cannot be committed yet due to lack of a
+  //     majority in the new config (only 2 out of 5 servers are alive), the op
+  //     has been replicated to both the local leader and the new member.
+  //  6. Force the existing leader to step down.
+  //  7. Resume one of the paused nodes so that a majority (of the 5-node
+  //     configuration, but not the original 4-node configuration) will be available.
+  //  8. Start a leader election on the new (pending) node. It should win.
+  //  9. Unpause the two remaining stopped nodes.
+  // 10. Wait for all nodes to sync to the new leader's log.
+  const vector<string> kTsFlags = {
+    "--enable_leader_failure_detection=false",
+  };
+  const vector<string> kMasterFlags = {
+    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
+  };
+
+  FLAGS_num_tablet_servers = 5;
+  FLAGS_num_replicas = 5;
+  NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
+
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
+
+  // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
+  TServerDetails* initial_leader = tservers[0];
+  ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, initial_leader, tablet_id_,
+                                          MonoDelta::FromSeconds(10)));
+
+  // The server we will remove and then bring back.
+  TServerDetails* final_leader = tservers[4];
+
+  // Kill the master, so we can change the config without interference.
+  cluster_->master()->Shutdown();
+
+  // Now remove server 4 from the configuration.
+  TabletServerMap active_tablet_servers = tablet_servers_;
+  LOG(INFO) << "Removing tserver with uuid " << final_leader->uuid();
+  ASSERT_OK(RemoveServer(initial_leader, tablet_id_, final_leader, boost::none,
+                         MonoDelta::FromSeconds(10)));
+  ASSERT_EQ(1, active_tablet_servers.erase(final_leader->uuid()));
+  int64_t cur_log_index = 2;
+  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
+                                  active_tablet_servers, tablet_id_, cur_log_index));
+
+  // Pause tablet servers 1 through 3, so they won't see the operation to add
+  // server 4 back.
+  LOG(INFO) << "Pausing 3 replicas...";
+  for (int i = 1; i <= 3; i++) {
+    ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(tservers[i]->uuid());
+    ASSERT_OK(replica_ts->Pause());
+  }
+
+  // Now add server 4 back to the peers.
+  // This operation will time out on the client side.
+  LOG(INFO) << "Adding back Peer " << final_leader->uuid() << " and expecting timeout...";
+  Status s = AddServer(initial_leader, tablet_id_, final_leader, RaftPeerPB::VOTER, boost::none,
+                       MonoDelta::FromMilliseconds(100));
+  ASSERT_TRUE(s.IsTimedOut()) << "Expected AddServer() to time out. Result: " << s.ToString();
+  LOG(INFO) << "Timeout achieved.";
+  active_tablet_servers = tablet_servers_; // Reset to the unpaused servers.
+  for (int i = 1; i <= 3; i++) {
+    ASSERT_EQ(1, active_tablet_servers.erase(tservers[i]->uuid()));
+  }
+  // Only wait for TS 0 and 4 to agree that the new change config op has been
+  // replicated.
+  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
+                                  active_tablet_servers, tablet_id_, ++cur_log_index));
+
+  // Now that TS 4 is electable (and pending), have TS 0 step down.
+  LOG(INFO) << "Forcing Peer " << initial_leader->uuid() << " to step down...";
+  ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+
+  // Resume TS 1 so we have a majority of 3 to elect a new leader.
+  LOG(INFO) << "Resuming Peer " << tservers[1]->uuid() << " ...";
+  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Resume());
+  InsertOrDie(&active_tablet_servers, tservers[1]->uuid(), tservers[1]);
+
+  // Now try to get TS 4 elected. It should succeed and push a NO_OP.
+  LOG(INFO) << "Trying to elect Peer " << tservers[4]->uuid() << " ...";
+  ASSERT_OK(StartElection(final_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
+                                  active_tablet_servers, tablet_id_, ++cur_log_index));
+
+  // Resume the remaining paused nodes.
+  LOG(INFO) << "Resuming remaining nodes...";
+  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Resume());
+  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[3]->uuid())->Resume());
+  active_tablet_servers = tablet_servers_;
+
+  // Do one last operation on the new leader: an insert.
+  ASSERT_OK(WriteSimpleTestRow(final_leader, tablet_id_, RowOperationsPB::INSERT,
+                               kTestRowKey, kTestRowIntVal, "Ob-La-Di, Ob-La-Da",
+                               MonoDelta::FromSeconds(10)));
+
+  // Wait for all servers to replicate everything up through the last write op.
+  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
+                                  active_tablet_servers, tablet_id_, ++cur_log_index));
+}
+
+// Have a replica fall behind the leader's log, then fail a tablet copy. It
+// should still be able to vote in leader elections.
+TEST_F(RaftConsensusElectionITest, TombstonedVoteAfterFailedTabletCopy) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+  vector<string> ts_flags;
+  AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC().
+  const vector<string> kMasterFlags = {
+    "--master_add_server_when_underreplicated=false",
+  };
+  NO_FATALS(BuildAndStart(ts_flags, kMasterFlags));
+
+  TabletServerMap active_tablet_servers = tablet_servers_;
+  ASSERT_EQ(3, FLAGS_num_replicas);
+  ASSERT_EQ(3, active_tablet_servers.size());
+
+  string leader_uuid;
+  int64_t orig_term;
+  string follower_uuid;
+  NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid));
+
+  // Wait for the abandoned follower to be evicted.
+  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(2, tablet_servers_[leader_uuid],
+                                                tablet_id_, kTimeout));
+  ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid));
+  ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, 2));
+
+  // Now the follower is evicted and will be tombstoned.
+  // We'll add it back to the config and then crash the follower during the
+  // resulting tablet copy.
+  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server_by_uuid(follower_uuid),
+            "tablet_copy_fault_crash_on_fetch_all", "1.0"));
+  auto* leader_ts = tablet_servers_[leader_uuid];
+  auto* follower_ts = tablet_servers_[follower_uuid];
+  ASSERT_OK(itest::AddServer(leader_ts, tablet_id_, follower_ts, RaftPeerPB::VOTER,
+                             boost::none, kTimeout));
+  ASSERT_OK(cluster_->tablet_server_by_uuid(follower_uuid)->WaitForInjectedCrash(kTimeout));
+
+  // Shut down the rest of the cluster, then only bring back the node we
+  // crashed, plus one other node. The tombstoned replica should still be able
+  // to vote and the tablet should come online.
+  cluster_->Shutdown();
+
+  int follower_idx = cluster_->tablet_server_index_by_uuid(follower_uuid);
+  ASSERT_OK(inspect_->CheckTabletDataStateOnTS(follower_idx, tablet_id_, { TABLET_DATA_COPYING }));
+
+  ASSERT_OK(cluster_->master()->Restart());
+  ASSERT_OK(cluster_->tablet_server_by_uuid(leader_uuid)->Restart());
+  ASSERT_OK(cluster_->tablet_server_by_uuid(follower_uuid)->Restart());
+
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableId);
+  workload.set_timeout_allowed(true);
+  workload.Setup();
+  workload.Start();
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_GE(workload.rows_inserted(), 100);
+  });
+  workload.StopAndJoin();
+}
+
+}  // namespace tserver
+}  // namespace kudu
+

http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/tablet/mt-tablet-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index 10bb8dc..d46155b 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -59,11 +59,11 @@ DECLARE_int32(tablet_delta_store_minor_compact_max);
 DEFINE_int32(num_insert_threads, 8, "Number of inserting threads to launch");
 DEFINE_int32(num_counter_threads, 8, "Number of counting threads to launch");
 DEFINE_int32(num_summer_threads, 1, "Number of summing threads to launch");
-DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
 DEFINE_int32(num_slowreader_threads, 1, "Number of 'slow' reader threads to launch");
 DEFINE_int32(num_flush_threads, 1, "Number of flusher reader threads to launch");
 DEFINE_int32(num_compact_threads, 1, "Number of compactor threads to launch");
 DEFINE_int32(num_undo_delta_gc_threads, 1, "Number of undo delta gc threads to launch");
+DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
 DEFINE_int32(num_flush_delta_threads, 1, "Number of delta flusher reader threads to launch");
 DEFINE_int32(num_minor_compact_deltas_threads, 1,
              "Number of delta minor compactor threads to launch");

http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/tserver/tablet_server-test-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc
index 192e3a0..8e701ce 100644
--- a/src/kudu/tserver/tablet_server-test-base.cc
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -63,7 +63,6 @@
 #include "kudu/util/test_util.h"
 
 DEFINE_int32(rpc_timeout, 1000, "Timeout for RPC calls, in seconds");
-DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_int32(heartbeat_rpc_timeout_ms);
 


[2/3] kudu git commit: [raft_consensus-itest] separate Raft election tests

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index b3b63f9..4d030c5 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <algorithm>
 #include <cstdint>
 #include <cstdlib>
 #include <memory>
@@ -27,13 +26,11 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
-#include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <glog/stl_logging.h>
 #include <gtest/gtest.h>
 
-#include "kudu/client/client-test-util.h"
 #include "kudu/client/client.h"
 #include "kudu/client/shared_ptr.h"
 #include "kudu/client/write_op.h"
@@ -52,10 +49,8 @@
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
@@ -63,8 +58,8 @@
 #include "kudu/integration-tests/cluster_verifier.h"
 #include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
 #include "kudu/integration-tests/log_verifier.h"
+#include "kudu/integration-tests/raft_consensus-itest-base.h"
 #include "kudu/integration-tests/test_workload.h"
-#include "kudu/integration-tests/ts_itest-base.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/rpc/messenger.h"
@@ -81,23 +76,18 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/random.h"
-#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
-#include "kudu/util/stopwatch.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 #include "kudu/util/thread.h"
 
+DECLARE_int64(client_inserts_per_thread);
+DECLARE_int64(client_num_batches_per_thread);
 DECLARE_int32(consensus_rpc_timeout_ms);
+DECLARE_int32(num_client_threads);
 DECLARE_int32(num_replicas);
 DECLARE_int32(num_tablet_servers);
 DECLARE_int32(rpc_timeout);
-DEFINE_int32(num_client_threads, 8,
-             "Number of client threads to launch");
-DEFINE_int64(client_inserts_per_thread, 50,
-             "Number of rows inserted by each client thread");
-DEFINE_int64(client_num_batches_per_thread, 5,
-             "In how many batches to group the rows, for each client");
 
 METRIC_DECLARE_entity(tablet);
 METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
@@ -148,50 +138,9 @@ static const int kTestRowIntVal = 5678;
 
 // Integration test for the raft consensus implementation.
 // Uses the whole tablet server stack with ExternalMiniCluster.
-class RaftConsensusITest : public TabletServerIntegrationTestBase {
+class RaftConsensusITest : public RaftConsensusITestBase {
  public:
-  RaftConsensusITest()
-      : inserters_(FLAGS_num_client_threads) {
-  }
-
-  virtual void SetUp() OVERRIDE {
-    TabletServerIntegrationTestBase::SetUp();
-    FLAGS_consensus_rpc_timeout_ms = kConsensusRpcTimeoutForTests;
-  }
-
-  void ScanReplica(TabletServerServiceProxy* replica_proxy,
-                   vector<string>* results) {
-
-    ScanRequestPB req;
-    ScanResponsePB resp;
-    RpcController rpc;
-    rpc.set_timeout(MonoDelta::FromSeconds(10)); // Squelch warnings.
-
-    NewScanRequestPB* scan = req.mutable_new_scan_request();
-    scan->set_tablet_id(tablet_id_);
-    ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns()));
-
-    // Send the call
-    {
-      req.set_batch_size_bytes(0);
-      SCOPED_TRACE(SecureDebugString(req));
-      ASSERT_OK(replica_proxy->Scan(req, &resp, &rpc));
-      SCOPED_TRACE(SecureDebugString(resp));
-      if (resp.has_error()) {
-        ASSERT_OK(StatusFromPB(resp.error().status()));
-      }
-    }
-
-    if (!resp.has_more_results())
-      return;
-
-    // Drain all the rows from the scanner.
-    NO_FATALS(DrainScannerToStrings(resp.scanner_id(),
-                                    schema_,
-                                    results,
-                                    replica_proxy));
-
-    std::sort(results->begin(), results->end());
+  RaftConsensusITest() {
   }
 
   // Scan the given replica in a loop until the number of rows
@@ -199,28 +148,7 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
   // fails the test.
   void WaitForRowCount(TabletServerServiceProxy* replica_proxy,
                        int expected_count,
-                       vector<string>* results) {
-    LOG(INFO) << "Waiting for row count " << expected_count << "...";
-    MonoTime start = MonoTime::Now();
-    MonoTime deadline = start + MonoDelta::FromSeconds(90);
-    while (true) {
-      results->clear();
-      NO_FATALS(ScanReplica(replica_proxy, results));
-      if (results->size() == expected_count) {
-        return;
-      }
-      SleepFor(MonoDelta::FromMilliseconds(10));
-      if (MonoTime::Now() >= deadline) {
-        break;
-      }
-    }
-    MonoTime end = MonoTime::Now();
-    LOG(WARNING) << "Didn't reach row count " << expected_count;
-    FAIL() << "Did not reach expected row count " << expected_count
-           << " after " << (end - start).ToString()
-           << ": rows: " << *results;
-  }
-
+                       vector<string>* results);
 
   // Add an Insert operation to the given consensus request.
   // The row to be inserted is generated based on the OpId.
@@ -230,100 +158,19 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
                            int32_t key,
                            ConsensusRequestPB* req);
 
-
   string DumpToString(TServerDetails* leader,
                       const vector<string>& leader_results,
                       TServerDetails* replica,
-                      const vector<string>& replica_results) {
-    string ret = strings::Substitute("Replica results did not match the leaders."
-                                     "\nLeader: $0\nReplica: $1. Results size "
-                                     "L: $2 R: $3",
-                                     leader->ToString(),
-                                     replica->ToString(),
-                                     leader_results.size(),
-                                     replica_results.size());
-
-    StrAppend(&ret, "Leader Results: \n");
-    for (const string& result : leader_results) {
-      StrAppend(&ret, result, "\n");
-    }
-
-    StrAppend(&ret, "Replica Results: \n");
-    for (const string& result : replica_results) {
-      StrAppend(&ret, result, "\n");
-    }
-
-    return ret;
-  }
+                      const vector<string>& replica_results);
 
   // Insert 'num_rows' rows starting with row key 'start_row'.
   // Each row will have size 'payload_size'. A short (100ms) timeout is
   // used. If the flush generates any errors they will be ignored.
   void InsertPayloadIgnoreErrors(int start_row, int num_rows, int payload_size);
 
-  void InsertTestRowsRemoteThread(uint64_t first_row,
-                                  uint64_t count,
-                                  uint64_t num_batches,
-                                  const vector<CountDownLatch*>& latches) {
-    shared_ptr<KuduTable> table;
-    CHECK_OK(client_->OpenTable(kTableId, &table));
-
-    shared_ptr<KuduSession> session = client_->NewSession();
-    session->SetTimeoutMillis(60000);
-    CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
-
-    for (int i = 0; i < num_batches; i++) {
-      uint64_t first_row_in_batch = first_row + (i * count / num_batches);
-      uint64_t last_row_in_batch = first_row_in_batch + count / num_batches;
-
-      for (int j = first_row_in_batch; j < last_row_in_batch; j++) {
-        gscoped_ptr<KuduInsert> insert(table->NewInsert());
-        KuduPartialRow* row = insert->mutable_row();
-        CHECK_OK(row->SetInt32(0, j));
-        CHECK_OK(row->SetInt32(1, j * 2));
-        CHECK_OK(row->SetStringCopy(2, Slice(StringPrintf("hello %d", j))));
-        CHECK_OK(session->Apply(insert.release()));
-      }
-
-      FlushSessionOrDie(session);
-
-      int inserted = last_row_in_batch - first_row_in_batch;
-      for (CountDownLatch* latch : latches) {
-        latch->CountDown(inserted);
-      }
-    }
-
-    inserters_.CountDown();
-  }
-
   // Brings Chaos to a MiniTabletServer by introducing random delays. Does this by
   // pausing the daemon a random amount of time.
-  void DelayInjectorThread(ExternalTabletServer* tablet_server, int timeout_msec) {
-    while (inserters_.count() > 0) {
-
-      // Adjust the value obtained from the normalized gauss. dist. so that we steal the lock
-      // longer than the the timeout a small (~5%) percentage of the times.
-      // (95% corresponds to 1.64485, in a normalized (0,1) gaussian distribution).
-      double sleep_time_usec = 1000 *
-          ((random_.Normal(0, 1) * timeout_msec) / 1.64485);
-
-      if (sleep_time_usec < 0) sleep_time_usec = 0;
-
-      // Additionally only cause timeouts at all 50% of the time, otherwise sleep.
-      double val = (rand() * 1.0) / RAND_MAX;
-      if (val < 0.5) {
-        SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec));
-        continue;
-      }
-
-      ASSERT_OK(tablet_server->Pause());
-      LOG_IF(INFO, sleep_time_usec > 0.0)
-          << "Delay injector thread for TS " << tablet_server->instance_id().permanent_uuid()
-          << " SIGSTOPped the ts, sleeping for " << sleep_time_usec << " usec...";
-      SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec));
-      ASSERT_OK(tablet_server->Resume());
-    }
-  }
+  void DelayInjectorThread(ExternalTabletServer* tablet_server, int timeout_msec);
 
   // Thread which loops until '*finish' becomes true, trying to insert a row
   // on the given tablet server identified by 'replica_idx'.
@@ -332,49 +179,7 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
   // Stops the current leader of the configuration, runs leader election and then brings it back.
   // Before stopping the leader this pauses all follower nodes in regular intervals so that
   // we get an increased chance of stuff being pending.
-  void StopOrKillLeaderAndElectNewOne() {
-    TServerDetails* leader;
-    vector<TServerDetails*> followers;
-    CHECK_OK(GetTabletLeaderAndFollowers(tablet_id_, &leader, &followers));
-    ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid());
-
-    for (const auto* ts : followers) {
-      ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid());
-      CHECK_OK(ets->Pause());
-      SleepFor(MonoDelta::FromMilliseconds(100));
-    }
-
-    // When all are paused also pause or kill the current leader. Since we've waited a bit
-    // the old leader is likely to have operations that must be aborted.
-    const bool do_kill = rand() % 2 == 0;
-    if (do_kill) {
-      leader_ets->Shutdown();
-    } else {
-      CHECK_OK(leader_ets->Pause());
-    }
-
-    // Resume the replicas.
-    for (const auto* ts : followers) {
-      ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid());
-      CHECK_OK(ets->Resume());
-    }
-
-    // Get the new leader.
-    TServerDetails* new_leader;
-    CHECK_OK(GetLeaderReplicaWithRetries(tablet_id_, &new_leader));
-
-    // Bring the old leader back.
-    if (do_kill) {
-      CHECK_OK(leader_ets->Restart());
-      // Wait until we have the same number of followers.
-      const auto initial_followers = followers.size();
-      do {
-        GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
-      } while (followers.size() < initial_followers);
-    } else {
-      CHECK_OK(leader_ets->Resume());
-    }
-  }
+  void StopOrKillLeaderAndElectNewOne();
 
   // Writes 'num_writes' operations to the current leader. Each of the operations
   // has a payload of around 128KB. Causes a gtest failure on error.
@@ -388,8 +193,6 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
   void AssertMajorityRequiredForElectionsAndWrites(const TabletServerMap& tablet_servers,
                                                    const string& leader_uuid);
 
-  void CreateClusterForChurnyElectionsTests(const vector<string>& extra_ts_flags);
-  void DoTestChurnyElections(TestWorkload* workload, int max_rows_to_insert);
   void CreateClusterForCrashyNodesTests();
   void DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert);
 
@@ -398,57 +201,414 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
   void SetupSingleReplicaTest(TServerDetails** replica_ts);
 
  protected:
-  // Flags needed for CauseFollowerToFallBehindLogGC() to work well.
-  void AddFlagsForLogRolls(vector<string>* extra_tserver_flags);
-
-  // Pause one of the followers and write enough data to the remaining replicas
-  // to cause log GC, then resume the paused follower. On success,
-  // 'leader_uuid' will be set to the UUID of the leader, 'orig_term' will be
-  // set to the term of the leader before un-pausing the follower, and
-  // 'fell_behind_uuid' will be set to the UUID of the follower that was paused
-  // and caused to fall behind. These can be used for verification purposes.
-  //
-  // Certain flags should be set. You can add the required flags with
-  // AddFlagsForLogRolls() before starting the cluster.
-  void CauseFollowerToFallBehindLogGC(string* leader_uuid,
-                                      int64_t* orig_term,
-                                      string* fell_behind_uuid);
-
   // Retrieve the current term of the first tablet on this tablet server.
   Status GetTermMetricValue(ExternalTabletServer* ts, int64_t* term);
 
   shared_ptr<KuduTable> table_;
   vector<scoped_refptr<kudu::Thread> > threads_;
-  CountDownLatch inserters_;
 };
 
+void RaftConsensusITest::WaitForRowCount(TabletServerServiceProxy* replica_proxy,
+                                         int expected_count,
+                                         vector<string>* results) {
+  LOG(INFO) << "Waiting for row count " << expected_count << "...";
+  MonoTime start = MonoTime::Now();
+  MonoTime deadline = start + MonoDelta::FromSeconds(90);
+  while (true) {
+    results->clear();
+    NO_FATALS(ScanReplica(replica_proxy, results));
+    if (results->size() == expected_count) {
+      return;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(10));
+    if (MonoTime::Now() >= deadline) {
+      break;
+    }
+  }
+  FAIL() << "Did not reach expected row count " << expected_count
+         << " after " << (MonoTime::Now() - start).ToString()
+         << ": rows: " << *results;
+}
+
+void RaftConsensusITest::AddOp(const OpId& id, ConsensusRequestPB* req) {
+  AddOpWithTypeAndKey(id, RowOperationsPB::INSERT,
+                      id.index() * 10000 + id.term(), req);
+}
+
+void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id,
+                                             RowOperationsPB::Type op_type,
+                                             int32_t key,
+                                             ConsensusRequestPB* req) {
+  ReplicateMsg* msg = req->add_ops();
+  msg->mutable_id()->CopyFrom(id);
+  msg->set_timestamp(id.index());
+  msg->set_op_type(consensus::WRITE_OP);
+  WriteRequestPB* write_req = msg->mutable_write_request();
+  CHECK_OK(SchemaToPB(schema_, write_req->mutable_schema()));
+  write_req->set_tablet_id(tablet_id_);
+  AddTestRowToPB(op_type, schema_, key, id.term(),
+                 SecureShortDebugString(id), write_req->mutable_row_operations());
+}
+
+string RaftConsensusITest::DumpToString(TServerDetails* leader,
+                                        const vector<string>& leader_results,
+                                        TServerDetails* replica,
+                                        const vector<string>& replica_results) {
+  string ret = strings::Substitute("Replica results did not match the leaders."
+                                   "\nLeader: $0\nReplica: $1. Results size "
+                                   "L: $2 R: $3",
+                                   leader->ToString(),
+                                   replica->ToString(),
+                                   leader_results.size(),
+                                   replica_results.size());
+
+  StrAppend(&ret, "Leader Results: \n");
+  for (const string& result : leader_results) {
+    StrAppend(&ret, result, "\n");
+  }
+
+  StrAppend(&ret, "Replica Results: \n");
+  for (const string& result : replica_results) {
+    StrAppend(&ret, result, "\n");
+  }
+
+  return ret;
+}
+
+void RaftConsensusITest::InsertPayloadIgnoreErrors(int start_row,
+                                                   int num_rows,
+                                                   int payload_size) {
+  shared_ptr<KuduTable> table;
+  CHECK_OK(client_->OpenTable(kTableId, &table));
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(100);
+  CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  string payload(payload_size, 'x');
+  for (int i = 0; i < num_rows; i++) {
+    gscoped_ptr<KuduInsert> insert(table->NewInsert());
+    KuduPartialRow* row = insert->mutable_row();
+    CHECK_OK(row->SetInt32(0, i + start_row));
+    CHECK_OK(row->SetInt32(1, 0));
+    CHECK_OK(row->SetStringCopy(2, payload));
+    CHECK_OK(session->Apply(insert.release()));
+    ignore_result(session->Flush());
+  }
+}
+
+void RaftConsensusITest::DelayInjectorThread(
+    ExternalTabletServer* tablet_server, int timeout_msec) {
+
+  while (inserters_.count() > 0) {
+    // Adjust the value obtained from the normalized gauss. dist. so that we steal the lock
+    // longer than the the timeout a small (~5%) percentage of the times.
+    // (95% corresponds to 1.64485, in a normalized (0,1) gaussian distribution).
+    double sleep_time_usec = 1000 *
+        ((random_.Normal(0, 1) * timeout_msec) / 1.64485);
+
+    if (sleep_time_usec < 0) sleep_time_usec = 0;
+
+    // Additionally only cause timeouts at all 50% of the time, otherwise sleep.
+    double val = (rand() * 1.0) / RAND_MAX;
+    if (val < 0.5) {
+      SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec));
+      continue;
+    }
+
+    ASSERT_OK(tablet_server->Pause());
+    LOG_IF(INFO, sleep_time_usec > 0.0)
+        << "Delay injector thread for TS " << tablet_server->instance_id().permanent_uuid()
+        << " SIGSTOPped the ts, sleeping for " << sleep_time_usec << " usec...";
+    SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec));
+    ASSERT_OK(tablet_server->Resume());
+  }
+}
+
+void RaftConsensusITest::StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish) {
+  vector<TServerDetails*> servers;
+  AppendValuesFromMap(tablet_servers_, &servers);
+  CHECK_LT(replica_idx, servers.size());
+  TServerDetails* ts = servers[replica_idx];
+
+  // Manually construct an RPC to our target replica. We expect most of the calls
+  // to fail either with an "already present" or an error because we are writing
+  // to a follower. That's OK, though - what we care about for this test is
+  // just that the operations Apply() in the same order everywhere (even though
+  // in this case the result will just be an error).
+  WriteRequestPB req;
+  WriteResponsePB resp;
+  RpcController rpc;
+  req.set_tablet_id(tablet_id_);
+  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
+  AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal,
+                 "hello world", req.mutable_row_operations());
+
+  while (!finish->Load()) {
+    resp.Clear();
+    rpc.Reset();
+    rpc.set_timeout(MonoDelta::FromSeconds(10));
+    ignore_result(ts->tserver_proxy->Write(req, &resp, &rpc));
+    VLOG(1) << "Response from server " << replica_idx << ": "
+            << SecureShortDebugString(resp);
+  }
+}
+
+void RaftConsensusITest::StopOrKillLeaderAndElectNewOne() {
+  TServerDetails* leader;
+  vector<TServerDetails*> followers;
+  CHECK_OK(GetTabletLeaderAndFollowers(tablet_id_, &leader, &followers));
+  ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid());
+
+  for (const auto* ts : followers) {
+    ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid());
+    CHECK_OK(ets->Pause());
+    SleepFor(MonoDelta::FromMilliseconds(100));
+  }
+
+  // When all are paused also pause or kill the current leader. Since we've waited a bit
+  // the old leader is likely to have operations that must be aborted.
+  const bool do_kill = rand() % 2 == 0;
+  if (do_kill) {
+    leader_ets->Shutdown();
+  } else {
+    CHECK_OK(leader_ets->Pause());
+  }
+
+  // Resume the replicas.
+  for (const auto* ts : followers) {
+    ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid());
+    CHECK_OK(ets->Resume());
+  }
+
+  // Get the new leader.
+  TServerDetails* new_leader;
+  CHECK_OK(GetLeaderReplicaWithRetries(tablet_id_, &new_leader));
+
+  // Bring the old leader back.
+  if (do_kill) {
+    CHECK_OK(leader_ets->Restart());
+    // Wait until we have the same number of followers.
+    const auto initial_followers = followers.size();
+    do {
+      GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
+    } while (followers.size() < initial_followers);
+  } else {
+    CHECK_OK(leader_ets->Resume());
+  }
+}
+
+void RaftConsensusITest::Write128KOpsToLeader(int num_writes) {
+  TServerDetails* leader = nullptr;
+  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
+
+  WriteRequestPB req;
+  req.set_tablet_id(tablet_id_);
+  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
+  RowOperationsPB* data = req.mutable_row_operations();
+  WriteResponsePB resp;
+  RpcController rpc;
+  rpc.set_timeout(MonoDelta::FromMilliseconds(10000));
+  int key = 0;
+
+  // generate a 128Kb dummy payload
+  string test_payload(128 * 1024, '0');
+  for (int i = 0; i < num_writes; i++) {
+    rpc.Reset();
+    data->Clear();
+    AddTestRowToPB(RowOperationsPB::INSERT, schema_, key, key,
+                   test_payload, data);
+    key++;
+    ASSERT_OK(leader->tserver_proxy->Write(req, &resp, &rpc));
+
+    ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
+  }
+}
+
+void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites(
+    const TabletServerMap& tablet_servers, const string& leader_uuid) {
+
+  TServerDetails* initial_leader = FindOrDie(tablet_servers, leader_uuid);
+
+  // Calculate number of servers to leave unpaused (minority).
+  // This math is a little unintuitive but works for cluster sizes including 2 and 1.
+  // Note: We assume all of these TSes are voters.
+  int config_size = tablet_servers.size();
+  int minority_to_retain = MajoritySize(config_size) - 1;
+
+  // Only perform this part of the test if we have some servers to pause, else
+  // the failure assertions will throw.
+  if (config_size > 1) {
+    // Pause enough replicas to prevent a majority.
+    int num_to_pause = config_size - minority_to_retain;
+    LOG(INFO) << "Pausing " << num_to_pause << " tablet servers in config of size " << config_size;
+    vector<string> paused_uuids;
+    for (const TabletServerMap::value_type& entry : tablet_servers) {
+      if (paused_uuids.size() == num_to_pause) {
+        continue;
+      }
+      const string& replica_uuid = entry.first;
+      if (replica_uuid == leader_uuid) {
+        // Always leave this one alone.
+        continue;
+      }
+      ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid);
+      ASSERT_OK(replica_ts->Pause());
+      paused_uuids.push_back(replica_uuid);
+    }
+
+    // Ensure writes timeout while only a minority is alive.
+    Status s = WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE,
+                                  kTestRowKey, kTestRowIntVal, "foo",
+                                  MonoDelta::FromMilliseconds(100));
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+    // Step down.
+    ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+
+    // Assert that elections time out without a live majority.
+    // We specify a very short timeout here to keep the tests fast.
+    ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+    s = WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromMilliseconds(100));
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+    LOG(INFO) << "Expected timeout encountered on election with weakened config: " << s.ToString();
+
+    // Resume the paused servers.
+    LOG(INFO) << "Resuming " << num_to_pause << " tablet servers in config of size " << config_size;
+    for (const string& replica_uuid : paused_uuids) {
+      ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid);
+      ASSERT_OK(replica_ts->Resume());
+    }
+  }
+
+  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(20), tablet_servers, tablet_id_, 1));
+
+  // Now an election should succeed.
+  ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+  ASSERT_OK(WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+  LOG(INFO) << "Successful election with full config of size " << config_size;
+
+  // And a write should also succeed.
+  ASSERT_OK(WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE,
+                               kTestRowKey, kTestRowIntVal, Substitute("qsz=$0", config_size),
+                               MonoDelta::FromSeconds(10)));
+}
+
+void RaftConsensusITest::CreateClusterForCrashyNodesTests() {
+  if (AllowSlowTests()) {
+    FLAGS_num_tablet_servers = 7;
+    FLAGS_num_replicas = 7;
+  }
+
+  vector<string> ts_flags;
+
+  // Crash 5% of the time just before sending an RPC. With 7 servers,
+  // this means we crash about 30% of the time before we've fully
+  // replicated the NO_OP at the start of the term.
+  ts_flags.emplace_back("--fault_crash_on_leader_request_fraction=0.05");
+
+  // Inject latency to encourage the replicas to fall out of sync
+  // with each other.
+  ts_flags.emplace_back("--log_inject_latency");
+  ts_flags.emplace_back("--log_inject_latency_ms_mean=30");
+  ts_flags.emplace_back("--log_inject_latency_ms_stddev=60");
+
+  // Make leader elections faster so we get through more cycles of leaders.
+  ts_flags.emplace_back("--raft_heartbeat_interval_ms=100");
+
+  // Avoid preallocating segments since bootstrap is a little bit
+  // faster if it doesn't have to scan forward through the preallocated
+  // log area.
+  ts_flags.emplace_back("--log_preallocate_segments=false");
+
+  CreateCluster("raft_consensus-itest-crashy-nodes-cluster", ts_flags, {});
+}
+
+void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert) {
+  int crashes_to_cause = 3;
+  if (AllowSlowTests()) {
+    crashes_to_cause = 15;
+  }
+
+  workload->set_num_replicas(FLAGS_num_replicas);
+  // Set a really high write timeout so that even in the presence of many failures we
+  // can verify an exact number of rows in the end, thanks to exactly once semantics.
+  workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
+  workload->set_num_write_threads(10);
+  workload->set_num_read_threads(2);
+  workload->Setup();
+  workload->Start();
+
+  int num_crashes = 0;
+  while (num_crashes < crashes_to_cause &&
+      workload->rows_inserted() < max_rows_to_insert) {
+    num_crashes += RestartAnyCrashedTabletServers();
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  // Writers are likely ongoing. To have some chance of completing all writes,
+  // restart the tablets servers, otherwise they'll keep crashing and the writes
+  // can never complete.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    ExternalTabletServer* ts = cluster_->tablet_server(i);
+    vector<string>* flags = ts->mutable_flags();
+    bool removed_flag = false;
+    for (auto it = flags->begin(); it != flags->end(); ++it) {
+      if (HasPrefixString(*it, "--fault_crash")) {
+        flags->erase(it);
+        removed_flag = true;
+        break;
+      }
+    }
+    ASSERT_TRUE(removed_flag) << "could not remove flag from TS " << i
+                              << "\nFlags:\n" << *flags;
+    ts->Shutdown();
+    ASSERT_OK(ts->Restart());
+  }
+
+  workload->StopAndJoin();
+
+  // Ensure that the replicas converge.
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckCluster());
+  NO_FATALS(v.CheckRowCount(workload->table_name(),
+                            ClusterVerifier::EXACTLY,
+                            workload->rows_inserted()));
+}
+
+void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
+  const vector<string> kTsFlags = {
+    // Don't use the hybrid clock as we set logical timestamps on ops.
+    "--use_hybrid_clock=false",
+    "--enable_leader_failure_detection=false",
+  };
+  const vector<string> kMasterFlags = {
+    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
+  };
+
+  FLAGS_num_replicas = 3;
+  FLAGS_num_tablet_servers = 3;
+  NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
+
+  // Kill all the servers but one.
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+  ASSERT_EQ(3, tservers.size());
+
+  // Elect server 2 as leader and wait for log index 1 to propagate to all servers.
+  ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10)));
+  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1));
+
+  cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown();
+  cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown();
+
+  *replica_ts = tservers[0];
+  LOG(INFO) << "================================== Cluster setup complete.";
+}
+
 Status RaftConsensusITest::GetTermMetricValue(ExternalTabletServer* ts,
                                               int64_t *term) {
   return ts->GetInt64Metric(&METRIC_ENTITY_tablet, nullptr, &METRIC_raft_term, "value", term);
 }
 
-void RaftConsensusITest::AddFlagsForLogRolls(vector<string>* extra_tserver_flags) {
-  // We configure a small log segment size so that we roll frequently,
-  // configure a small cache size so that we evict data from the cache, and
-  // retain as few segments as possible. We also turn off async segment
-  // allocation -- this ensures that we roll many segments of logs (with async
-  // allocation, it's possible that the preallocation is slow and we wouldn't
-  // roll deterministically).
-  //
-  // Additionally, we disable log compression, since these tests write a lot of
-  // repetitive data to cause the rolls, and compression would make it all tiny.
-  extra_tserver_flags->push_back("--log_compression_codec=none");
-  extra_tserver_flags->push_back("--log_cache_size_limit_mb=1");
-  extra_tserver_flags->push_back("--log_segment_size_mb=1");
-  extra_tserver_flags->push_back("--log_async_preallocate_segments=false");
-  extra_tserver_flags->push_back("--log_min_segments_to_retain=1");
-  extra_tserver_flags->push_back("--log_max_segments_to_retain=3");
-  extra_tserver_flags->push_back("--maintenance_manager_polling_interval_ms=100");
-  extra_tserver_flags->push_back("--log_target_replay_size_mb=1");
-  // We write 128KB cells in CauseFollowerToFallBehindLogGC(): bump the limit.
-  extra_tserver_flags->push_back("--max_cell_size_bytes=1000000");
-}
-
 // Test that we can retrieve the permanent uuid of a server running
 // consensus service via RPC.
 TEST_F(RaftConsensusITest, TestGetPermanentUuid) {
@@ -571,102 +731,34 @@ TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) {
 TEST_F(RaftConsensusITest, TestInsertOnNonLeader) {
   NO_FATALS(BuildAndStart());
 
-  // Wait for the initial leader election to complete.
-  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_,
-                                  tablet_id_, 1));
-
-  // Manually construct a write RPC to a replica and make sure it responds
-  // with the correct error code.
-  WriteRequestPB req;
-  WriteResponsePB resp;
-  RpcController rpc;
-  req.set_tablet_id(tablet_id_);
-  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
-  AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal,
-                 "hello world via RPC", req.mutable_row_operations());
-
-  // Get the leader.
-  vector<TServerDetails*> followers;
-  GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
-
-  ASSERT_OK(followers[0]->tserver_proxy->Write(req, &resp, &rpc));
-  SCOPED_TRACE(SecureDebugString(resp));
-  ASSERT_TRUE(resp.has_error());
-  Status s = StatusFromPB(resp.error().status());
-  EXPECT_TRUE(s.IsIllegalState());
-  ASSERT_STR_CONTAINS(s.ToString(), "is not leader of this config. Role: FOLLOWER");
-  // TODO: need to change the error code to be something like REPLICA_NOT_LEADER
-  // so that the client can properly handle this case! plumbing this is a little difficult
-  // so not addressing at the moment.
-  NO_FATALS(AssertAllReplicasAgree(0));
-}
-
-TEST_F(RaftConsensusITest, TestRunLeaderElection) {
-  // Reset consensus rpc timeout to the default value or the election might fail often.
-  FLAGS_consensus_rpc_timeout_ms = 1000;
-  NO_FATALS(BuildAndStart());
-
-  int num_iters = AllowSlowTests() ? 10 : 1;
-
-  InsertTestRowsRemoteThread(0,
-                             FLAGS_client_inserts_per_thread * num_iters,
-                             FLAGS_client_num_batches_per_thread,
-                             vector<CountDownLatch*>());
-
-  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters));
-
-  // Select the last follower to be new leader.
-  vector<TServerDetails*> followers;
-  GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
-
-  // Now shutdown the current leader.
-  TServerDetails* leader = DCHECK_NOTNULL(GetLeaderReplicaOrNull(tablet_id_));
-  ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid());
-  leader_ets->Shutdown();
-
-  TServerDetails* replica = followers.back();
-  CHECK_NE(leader->instance_id.permanent_uuid(), replica->instance_id.permanent_uuid());
-
-  // Make the new replica leader.
-  ASSERT_OK(StartElection(replica, tablet_id_, MonoDelta::FromSeconds(10)));
-
-  // Insert a bunch more rows.
-  InsertTestRowsRemoteThread(FLAGS_client_inserts_per_thread * num_iters,
-                             FLAGS_client_inserts_per_thread * num_iters,
-                             FLAGS_client_num_batches_per_thread,
-                             vector<CountDownLatch*>());
-
-  // Restart the original replica and make sure they all agree.
-  ASSERT_OK(leader_ets->Restart());
-
-  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters * 2));
-}
-
-void RaftConsensusITest::Write128KOpsToLeader(int num_writes) {
-  TServerDetails* leader = nullptr;
-  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
+  // Wait for the initial leader election to complete.
+  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_,
+                                  tablet_id_, 1));
 
+  // Manually construct a write RPC to a replica and make sure it responds
+  // with the correct error code.
   WriteRequestPB req;
-  req.set_tablet_id(tablet_id_);
-  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
-  RowOperationsPB* data = req.mutable_row_operations();
   WriteResponsePB resp;
   RpcController rpc;
-  rpc.set_timeout(MonoDelta::FromMilliseconds(10000));
-  int key = 0;
+  req.set_tablet_id(tablet_id_);
+  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
+  AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal,
+                 "hello world via RPC", req.mutable_row_operations());
 
-  // generate a 128Kb dummy payload
-  string test_payload(128 * 1024, '0');
-  for (int i = 0; i < num_writes; i++) {
-    rpc.Reset();
-    data->Clear();
-    AddTestRowToPB(RowOperationsPB::INSERT, schema_, key, key,
-                   test_payload, data);
-    key++;
-    ASSERT_OK(leader->tserver_proxy->Write(req, &resp, &rpc));
+  // Get the leader.
+  vector<TServerDetails*> followers;
+  GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
 
-    ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
-  }
+  ASSERT_OK(followers[0]->tserver_proxy->Write(req, &resp, &rpc));
+  SCOPED_TRACE(SecureDebugString(resp));
+  ASSERT_TRUE(resp.has_error());
+  Status s = StatusFromPB(resp.error().status());
+  EXPECT_TRUE(s.IsIllegalState());
+  ASSERT_STR_CONTAINS(s.ToString(), "is not leader of this config. Role: FOLLOWER");
+  // TODO(unknown): need to change the error code to be something like REPLICA_NOT_LEADER
+  // so that the client can properly handle this case! plumbing this is a little difficult
+  // so not addressing at the moment.
+  NO_FATALS(AssertAllReplicasAgree(0));
 }
 
 // Test that when a follower is stopped for a long time, the log cache
@@ -736,85 +828,6 @@ TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
     });
 }
 
-void RaftConsensusITest::CauseFollowerToFallBehindLogGC(string* leader_uuid,
-                                                        int64_t* orig_term,
-                                                        string* fell_behind_uuid) {
-  MonoDelta kTimeout = MonoDelta::FromSeconds(10);
-  // Wait for all of the replicas to have acknowledged the elected
-  // leader and logged the first NO_OP.
-  ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
-
-  // Pause one server. This might be the leader, but pausing it will cause
-  // a leader election to happen.
-  TServerDetails* replica = (*tablet_replicas_.begin()).second;
-  ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid());
-  ASSERT_OK(replica_ets->Pause());
-
-  // Find a leader. In case we paused the leader above, this will wait until
-  // we have elected a new one.
-  TServerDetails* leader = nullptr;
-  while (true) {
-    Status s = GetLeaderReplicaWithRetries(tablet_id_, &leader);
-    if (s.ok() && leader != nullptr && leader != replica) {
-      break;
-    }
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-  *leader_uuid = leader->uuid();
-  int leader_index = cluster_->tablet_server_index_by_uuid(*leader_uuid);
-
-  TestWorkload workload(cluster_.get());
-  workload.set_table_name(kTableId);
-  workload.set_timeout_allowed(true);
-  workload.set_payload_bytes(128 * 1024); // Write ops of size 128KB.
-  workload.set_write_batch_size(1);
-  workload.set_num_write_threads(4);
-  workload.Setup();
-  workload.Start();
-
-  LOG(INFO) << "Waiting until we've written at least 4MB...";
-  while (workload.rows_inserted() < 8 * 4) {
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-  workload.StopAndJoin();
-
-  LOG(INFO) << "Waiting for log GC on " << leader->uuid();
-  // Some WAL segments must exist, but wal segment 1 must not exist.
-  ASSERT_OK(inspect_->WaitForFilePatternInTabletWalDirOnTs(
-      leader_index, tablet_id_, { "wal-" }, { "wal-000000001" }));
-
-  LOG(INFO) << "Log GC complete on " << leader->uuid();
-
-  // Then wait another couple of seconds to be sure that it has bothered to try
-  // to write to the paused peer.
-  // TODO: would be nice to be able to poll the leader with an RPC like
-  // GetLeaderStatus() which could tell us whether it has made any requests
-  // since the log GC.
-  SleepFor(MonoDelta::FromSeconds(2));
-
-  // Make a note of whatever the current term of the cluster is,
-  // before we resume the follower.
-  {
-    OpId op_id;
-    ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, consensus::RECEIVED_OPID, kTimeout,
-                                    &op_id));
-    *orig_term = op_id.term();
-    LOG(INFO) << "Servers converged with original term " << *orig_term;
-  }
-
-  // Resume the follower.
-  LOG(INFO) << "Resuming  " << replica->uuid();
-  ASSERT_OK(replica_ets->Resume());
-
-  // Ensure that none of the tablet servers crashed.
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    // Make sure it didn't crash.
-    ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive())
-      << "Tablet server " << i << " crashed";
-  }
-  *fell_behind_uuid = replica->uuid();
-}
-
 // Test that the leader doesn't crash if one of its followers has
 // fallen behind so far that the logs necessary to catch it up
 // have been GCed.
@@ -874,88 +887,6 @@ TEST_F(RaftConsensusITest, TestFollowerFallsBehindLeaderGC) {
   }
 }
 
-void RaftConsensusITest::CreateClusterForCrashyNodesTests() {
-  if (AllowSlowTests()) {
-    FLAGS_num_tablet_servers = 7;
-    FLAGS_num_replicas = 7;
-  }
-
-  vector<string> ts_flags;
-
-  // Crash 5% of the time just before sending an RPC. With 7 servers,
-  // this means we crash about 30% of the time before we've fully
-  // replicated the NO_OP at the start of the term.
-  ts_flags.emplace_back("--fault_crash_on_leader_request_fraction=0.05");
-
-  // Inject latency to encourage the replicas to fall out of sync
-  // with each other.
-  ts_flags.emplace_back("--log_inject_latency");
-  ts_flags.emplace_back("--log_inject_latency_ms_mean=30");
-  ts_flags.emplace_back("--log_inject_latency_ms_stddev=60");
-
-  // Make leader elections faster so we get through more cycles of leaders.
-  ts_flags.emplace_back("--raft_heartbeat_interval_ms=100");
-
-  // Avoid preallocating segments since bootstrap is a little bit
-  // faster if it doesn't have to scan forward through the preallocated
-  // log area.
-  ts_flags.emplace_back("--log_preallocate_segments=false");
-
-  CreateCluster("raft_consensus-itest-crashy-nodes-cluster", ts_flags, {});
-}
-
-void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert) {
-  int crashes_to_cause = 3;
-  if (AllowSlowTests()) {
-    crashes_to_cause = 15;
-  }
-
-  workload->set_num_replicas(FLAGS_num_replicas);
-  // Set a really high write timeout so that even in the presence of many failures we
-  // can verify an exact number of rows in the end, thanks to exactly once semantics.
-  workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
-  workload->set_num_write_threads(10);
-  workload->set_num_read_threads(2);
-  workload->Setup();
-  workload->Start();
-
-  int num_crashes = 0;
-  while (num_crashes < crashes_to_cause &&
-      workload->rows_inserted() < max_rows_to_insert) {
-    num_crashes += RestartAnyCrashedTabletServers();
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-
-  // Writers are likely ongoing. To have some chance of completing all writes,
-  // restart the tablets servers, otherwise they'll keep crashing and the writes
-  // can never complete.
-  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
-    ExternalTabletServer* ts = cluster_->tablet_server(i);
-    vector<string>* flags = ts->mutable_flags();
-    bool removed_flag = false;
-    for (auto it = flags->begin(); it != flags->end(); ++it) {
-      if (HasPrefixString(*it, "--fault_crash")) {
-        flags->erase(it);
-        removed_flag = true;
-        break;
-      }
-    }
-    ASSERT_TRUE(removed_flag) << "could not remove flag from TS " << i
-                              << "\nFlags:\n" << *flags;
-    ts->Shutdown();
-    ASSERT_OK(ts->Restart());
-  }
-
-  workload->StopAndJoin();
-
-  // Ensure that the replicas converge.
-  ClusterVerifier v(cluster_.get());
-  NO_FATALS(v.CheckCluster());
-  NO_FATALS(v.CheckRowCount(workload->table_name(),
-                            ClusterVerifier::EXACTLY,
-                            workload->rows_inserted()));
-}
-
 // This test starts several tablet servers, and configures them with
 // fault injection so that the leaders frequently crash just before
 // sending RPCs to followers.
@@ -988,93 +919,6 @@ TEST_F(RaftConsensusITest, InsertDuplicateKeysWithCrashyNodes) {
   NO_FATALS(DoTestCrashyNodes(&workload, 300));
 }
 
-void RaftConsensusITest::CreateClusterForChurnyElectionsTests(
-    const vector<string>& extra_ts_flags) {
-  vector<string> ts_flags;
-
-#ifdef THREAD_SANITIZER
-  // On TSAN builds, we need to be a little bit less churny in order to make
-  // any progress at all.
-  ts_flags.push_back("--raft_heartbeat_interval_ms=5");
-#else
-  ts_flags.emplace_back("--raft_heartbeat_interval_ms=1");
-#endif
-
-  ts_flags.insert(ts_flags.end(), extra_ts_flags.cbegin(), extra_ts_flags.cend());
-
-  CreateCluster("raft_consensus-itest-cluster", ts_flags, {});
-}
-
-void RaftConsensusITest::DoTestChurnyElections(TestWorkload* workload, int max_rows_to_insert) {
-  workload->set_num_replicas(FLAGS_num_replicas);
-  // Set a really high write timeout so that even in the presence of many failures we
-  // can verify an exact number of rows in the end, thanks to exactly once semantics.
-  workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
-  workload->set_num_write_threads(2);
-  workload->set_write_batch_size(1);
-  workload->Setup();
-  workload->Start();
-
-  // Run for either a prescribed number of writes, or 30 seconds,
-  // whichever comes first. This prevents test timeouts on slower
-  // build machines, TSAN builds, etc.
-  Stopwatch sw;
-  sw.start();
-  while (workload->rows_inserted() < max_rows_to_insert &&
-      sw.elapsed().wall_seconds() < 30) {
-    SleepFor(MonoDelta::FromMilliseconds(10));
-        NO_FATALS(AssertNoTabletServersCrashed());
-  }
-  workload->StopAndJoin();
-  ASSERT_GT(workload->rows_inserted(), 0) << "No rows inserted";
-
-  // Ensure that the replicas converge.
-  // We expect an exact result due to exactly once semantics and snapshot scans.
-  ClusterVerifier v(cluster_.get());
-  NO_FATALS(v.CheckCluster());
-  NO_FATALS(v.CheckRowCount(workload->table_name(),
-                            ClusterVerifier::EXACTLY,
-                            workload->rows_inserted()));
-  NO_FATALS(AssertNoTabletServersCrashed());
-}
-
-// This test sets all of the election timers to be very short, resulting
-// in a lot of churn. We expect to make some progress and not diverge or
-// crash, despite the frequent re-elections and races.
-TEST_F(RaftConsensusITest, TestChurnyElections) {
-  const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
-  CreateClusterForChurnyElectionsTests({});
-  TestWorkload workload(cluster_.get());
-  workload.set_write_batch_size(1);
-  workload.set_num_read_threads(2);
-  DoTestChurnyElections(&workload, kNumWrites);
-}
-
-// The same test, except inject artificial latency when propagating notifications
-// from the queue back to consensus. This previously reproduced bugs like KUDU-1078 which
-// normally only appear under high load.
-TEST_F(RaftConsensusITest, TestChurnyElections_WithNotificationLatency) {
-  CreateClusterForChurnyElectionsTests({"--consensus_inject_latency_ms_in_notifications=50"});
-  const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
-  TestWorkload workload(cluster_.get());
-  workload.set_write_batch_size(1);
-  workload.set_num_read_threads(2);
-  DoTestChurnyElections(&workload, kNumWrites);
-}
-
-// The same as TestChurnyElections except insert many duplicated rows.
-// This emulates cases where there are many duplicate keys which, due to two phase
-// locking, may cause deadlocks and other anomalies that cannot be observed when
-// keys are unique.
-TEST_F(RaftConsensusITest, TestChurnyElections_WithDuplicateKeys) {
-  CreateClusterForChurnyElectionsTests({});
-  const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
-  TestWorkload workload(cluster_.get());
-  workload.set_write_pattern(TestWorkload::INSERT_WITH_MANY_DUP_KEYS);
-  // Increase the number of rows per batch to get a higher chance of key collision.
-  workload.set_write_batch_size(3);
-  DoTestChurnyElections(&workload, kNumWrites);
-}
 
 TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
   int kNumElections = FLAGS_num_replicas;
@@ -1126,99 +970,14 @@ TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
     NO_FATALS(cluster_->AssertNoCrashes());
     latch->Wait();
     StopOrKillLeaderAndElectNewOne();
-  }
-
-  for (const auto& thr : threads_) {
-    CHECK_OK(ThreadJoiner(thr.get()).Join());
-  }
-
-  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads));
-  STLDeleteElements(&latches);
-}
-
-// Test automatic leader election by killing leaders.
-TEST_F(RaftConsensusITest, TestAutomaticLeaderElection) {
-  if (AllowSlowTests()) {
-    FLAGS_num_tablet_servers = 5;
-    FLAGS_num_replicas = 5;
-  }
-  NO_FATALS(BuildAndStart());
-
-  TServerDetails* leader;
-  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
-
-  unordered_set<TServerDetails*> killed_leaders;
-
-  const int kNumLeadersToKill = FLAGS_num_replicas / 2;
-  const int kFinalNumReplicas = FLAGS_num_replicas / 2 + 1;
-
-  for (int leaders_killed = 0; leaders_killed < kFinalNumReplicas; leaders_killed++) {
-    LOG(INFO) << Substitute("Writing data to leader of $0-node config ($1 alive)...",
-                            FLAGS_num_replicas, FLAGS_num_replicas - leaders_killed);
-
-    InsertTestRowsRemoteThread(leaders_killed * FLAGS_client_inserts_per_thread,
-                               FLAGS_client_inserts_per_thread,
-                               FLAGS_client_num_batches_per_thread,
-                               vector<CountDownLatch*>());
-
-    // At this point, the writes are flushed but the commit index may not be
-    // propagated to all replicas. We kill the leader anyway.
-    if (leaders_killed < kNumLeadersToKill) {
-      LOG(INFO) << "Killing current leader " << leader->instance_id.permanent_uuid() << "...";
-      cluster_->tablet_server_by_uuid(leader->uuid())->Shutdown();
-      InsertOrDie(&killed_leaders, leader);
-
-      LOG(INFO) << "Waiting for new guy to be elected leader.";
-      ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
-    }
-  }
-
-  // Restart every node that was killed, and wait for the nodes to converge
-  for (TServerDetails* killed_node : killed_leaders) {
-    CHECK_OK(cluster_->tablet_server_by_uuid(killed_node->uuid())->Restart());
-  }
-  // Verify the data on the remaining replicas.
-  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * kFinalNumReplicas));
-}
-
-// Single-replica leader election test.
-TEST_F(RaftConsensusITest, TestAutomaticLeaderElectionOneReplica) {
-  FLAGS_num_tablet_servers = 1;
-  FLAGS_num_replicas = 1;
-  NO_FATALS(BuildAndStart());
-  // Ensure that single-node Raft configs elect themselves as leader
-  // immediately upon Consensus startup.
-  ASSERT_OK(GetReplicaStatusAndCheckIfLeader(tablet_servers_[cluster_->tablet_server(0)->uuid()],
-                                             tablet_id_, MonoDelta::FromMilliseconds(500)));
-}
-
-void RaftConsensusITest::StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish) {
-  vector<TServerDetails*> servers;
-  AppendValuesFromMap(tablet_servers_, &servers);
-  CHECK_LT(replica_idx, servers.size());
-  TServerDetails* ts = servers[replica_idx];
-
-  // Manually construct an RPC to our target replica. We expect most of the calls
-  // to fail either with an "already present" or an error because we are writing
-  // to a follower. That's OK, though - what we care about for this test is
-  // just that the operations Apply() in the same order everywhere (even though
-  // in this case the result will just be an error).
-  WriteRequestPB req;
-  WriteResponsePB resp;
-  RpcController rpc;
-  req.set_tablet_id(tablet_id_);
-  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
-  AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal,
-                 "hello world", req.mutable_row_operations());
+  }
 
-  while (!finish->Load()) {
-    resp.Clear();
-    rpc.Reset();
-    rpc.set_timeout(MonoDelta::FromSeconds(10));
-    ignore_result(ts->tserver_proxy->Write(req, &resp, &rpc));
-    VLOG(1) << "Response from server " << replica_idx << ": "
-            << SecureShortDebugString(resp);
+  for (const auto& thr : threads_) {
+    CHECK_OK(ThreadJoiner(thr.get()).Join());
   }
+
+  NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads));
+  STLDeleteElements(&latches);
 }
 
 // Regression test for KUDU-597, an issue where we could mis-order operations on
@@ -1263,56 +1022,6 @@ TEST_F(RaftConsensusITest, TestKUDU_597) {
   }
 }
 
-void RaftConsensusITest::AddOp(const OpId& id, ConsensusRequestPB* req) {
-  AddOpWithTypeAndKey(id, RowOperationsPB::INSERT,
-                      id.index() * 10000 + id.term(), req);
-}
-
-void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id,
-                                             RowOperationsPB::Type op_type,
-                                             int32_t key,
-                                             ConsensusRequestPB* req) {
-  ReplicateMsg* msg = req->add_ops();
-  msg->mutable_id()->CopyFrom(id);
-  msg->set_timestamp(id.index());
-  msg->set_op_type(consensus::WRITE_OP);
-  WriteRequestPB* write_req = msg->mutable_write_request();
-  CHECK_OK(SchemaToPB(schema_, write_req->mutable_schema()));
-  write_req->set_tablet_id(tablet_id_);
-  AddTestRowToPB(op_type, schema_, key, id.term(),
-                 SecureShortDebugString(id), write_req->mutable_row_operations());
-}
-
-void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
-  const vector<string> kTsFlags = {
-    // Don't use the hybrid clock as we set logical timestamps on ops.
-    "--use_hybrid_clock=false",
-    "--enable_leader_failure_detection=false",
-  };
-  const vector<string> kMasterFlags = {
-    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
-  };
-
-  FLAGS_num_replicas = 3;
-  FLAGS_num_tablet_servers = 3;
-  NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
-
-  // Kill all the servers but one.
-  vector<TServerDetails*> tservers;
-  AppendValuesFromMap(tablet_servers_, &tservers);
-  ASSERT_EQ(3, tservers.size());
-
-  // Elect server 2 as leader and wait for log index 1 to propagate to all servers.
-  ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10)));
-  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1));
-
-  cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown();
-  cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown();
-
-  *replica_ts = tservers[0];
-  LOG(INFO) << "================================== Cluster setup complete.";
-}
-
 // Regression test for KUDU-1775: when a replica is restarted, and the first
 // request it receives from a leader results in a LMP mismatch error, the
 // replica should still respond with the correct 'last_committed_idx'.
@@ -1628,149 +1337,6 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
   }
 }
 
-TEST_F(RaftConsensusITest, TestLeaderStepDown) {
-  const vector<string> kTsFlags = {
-    "--enable_leader_failure_detection=false"
-  };
-  const vector<string> kMasterFlags = {
-    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
-  };
-
-  FLAGS_num_replicas = 3;
-  FLAGS_num_tablet_servers = 3;
-  NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
-
-  vector<TServerDetails*> tservers;
-  AppendValuesFromMap(tablet_servers_, &tservers);
-
-  // Start with no leader.
-  Status s = GetReplicaStatusAndCheckIfLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10));
-  ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader yet: " << s.ToString();
-
-  // Become leader.
-  ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
-  ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
-  ASSERT_OK(WriteSimpleTestRow(tservers[0], tablet_id_, RowOperationsPB::INSERT,
-                               kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10)));
-  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 2));
-
-  // Step down and test that a 2nd stepdown returns the expected result.
-  ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
-  TabletServerErrorPB error;
-  s = LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(10), &error);
-  ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader anymore: " << s.ToString();
-  ASSERT_EQ(TabletServerErrorPB::NOT_THE_LEADER, error.code()) << SecureShortDebugString(error);
-
-  s = WriteSimpleTestRow(tservers[0], tablet_id_, RowOperationsPB::INSERT,
-                         kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10));
-  ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not accept writes as follower: "
-                                  << s.ToString();
-}
-
-// Test for KUDU-699: sets the consensus RPC timeout to be long,
-// and freezes both followers before asking the leader to step down.
-// Prior to fixing KUDU-699, the step-down process would block
-// until the pending requests timed out.
-TEST_F(RaftConsensusITest, TestStepDownWithSlowFollower) {
-  const vector<string> kTsFlags = {
-    "--enable_leader_failure_detection=false",
-    // Bump up the RPC timeout, so that we can verify that the stepdown responds
-    // quickly even when an outbound request is hung.
-    "--consensus_rpc_timeout_ms=15000"
-  };
-  const vector<string> kMasterFlags = {
-    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
-  };
-
-  NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
-
-  vector<TServerDetails*> tservers;
-  AppendValuesFromMap(tablet_servers_, &tservers);
-  ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
-  ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
-
-  // Stop both followers.
-  for (int i = 1; i < 3; i++) {
-    ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[i]->uuid())->Pause());
-  }
-
-  // Sleep a little bit of time to make sure that the leader has outstanding heartbeats
-  // to the paused followers before requesting the stepdown.
-  SleepFor(MonoDelta::FromSeconds(1));
-
-  // Step down should respond quickly despite the hung requests.
-  ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(3)));
-}
-
-void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites(
-    const TabletServerMap& tablet_servers, const string& leader_uuid) {
-
-  TServerDetails* initial_leader = FindOrDie(tablet_servers, leader_uuid);
-
-  // Calculate number of servers to leave unpaused (minority).
-  // This math is a little unintuitive but works for cluster sizes including 2 and 1.
-  // Note: We assume all of these TSes are voters.
-  int config_size = tablet_servers.size();
-  int minority_to_retain = MajoritySize(config_size) - 1;
-
-  // Only perform this part of the test if we have some servers to pause, else
-  // the failure assertions will throw.
-  if (config_size > 1) {
-    // Pause enough replicas to prevent a majority.
-    int num_to_pause = config_size - minority_to_retain;
-    LOG(INFO) << "Pausing " << num_to_pause << " tablet servers in config of size " << config_size;
-    vector<string> paused_uuids;
-    for (const TabletServerMap::value_type& entry : tablet_servers) {
-      if (paused_uuids.size() == num_to_pause) {
-        continue;
-      }
-      const string& replica_uuid = entry.first;
-      if (replica_uuid == leader_uuid) {
-        // Always leave this one alone.
-        continue;
-      }
-      ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid);
-      ASSERT_OK(replica_ts->Pause());
-      paused_uuids.push_back(replica_uuid);
-    }
-
-    // Ensure writes timeout while only a minority is alive.
-    Status s = WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE,
-                                  kTestRowKey, kTestRowIntVal, "foo",
-                                  MonoDelta::FromMilliseconds(100));
-    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
-
-    // Step down.
-    ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
-
-    // Assert that elections time out without a live majority.
-    // We specify a very short timeout here to keep the tests fast.
-    ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
-    s = WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromMilliseconds(100));
-    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
-    LOG(INFO) << "Expected timeout encountered on election with weakened config: " << s.ToString();
-
-    // Resume the paused servers.
-    LOG(INFO) << "Resuming " << num_to_pause << " tablet servers in config of size " << config_size;
-    for (const string& replica_uuid : paused_uuids) {
-      ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid);
-      ASSERT_OK(replica_ts->Resume());
-    }
-  }
-
-  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(20), tablet_servers, tablet_id_, 1));
-
-  // Now an election should succeed.
-  ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
-  ASSERT_OK(WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
-  LOG(INFO) << "Successful election with full config of size " << config_size;
-
-  // And a write should also succeed.
-  ASSERT_OK(WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE,
-                               kTestRowKey, kTestRowIntVal, Substitute("qsz=$0", config_size),
-                               MonoDelta::FromSeconds(10)));
-}
-
 // Basic test of adding and removing servers from a configuration.
 TEST_F(RaftConsensusITest, TestAddRemoveServer) {
   const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
@@ -1997,122 +1563,6 @@ TEST_F(RaftConsensusITest, TestAtomicAddRemoveServer) {
                                   active_tablet_servers, tablet_id_, ++cur_log_index));
 }
 
-// Ensure that we can elect a server that is in the "pending" configuration.
-// This is required by the Raft protocol. See Diego Ongaro's PhD thesis, section
-// 4.1, where it states that "it is the caller’s configuration that is used in
-// reaching consensus, both for voting and for log replication".
-//
-// This test also tests the case where a node comes back from the dead to a
-// leader that was not in its configuration when it died. That should also work, i.e.
-// the revived node should accept writes from the new leader.
-TEST_F(RaftConsensusITest, TestElectPendingVoter) {
-  // Test plan:
-  //  1. Disable failure detection to avoid non-deterministic behavior.
-  //  2. Start with a configuration size of 5, all servers synced.
-  //  3. Remove one server from the configuration, wait until committed.
-  //  4. Pause the 3 remaining non-leaders (SIGSTOP).
-  //  5. Run a config change to add back the previously-removed server.
-  //     Ensure that, while the op cannot be committed yet due to lack of a
-  //     majority in the new config (only 2 out of 5 servers are alive), the op
-  //     has been replicated to both the local leader and the new member.
-  //  6. Force the existing leader to step down.
-  //  7. Resume one of the paused nodes so that a majority (of the 5-node
-  //     configuration, but not the original 4-node configuration) will be available.
-  //  8. Start a leader election on the new (pending) node. It should win.
-  //  9. Unpause the two remaining stopped nodes.
-  // 10. Wait for all nodes to sync to the new leader's log.
-  const vector<string> kTsFlags = {
-    "--enable_leader_failure_detection=false",
-  };
-  const vector<string> kMasterFlags = {
-    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
-  };
-
-  FLAGS_num_tablet_servers = 5;
-  FLAGS_num_replicas = 5;
-  NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
-
-  vector<TServerDetails*> tservers;
-  AppendValuesFromMap(tablet_servers_, &tservers);
-  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
-
-  // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
-  TServerDetails* initial_leader = tservers[0];
-  ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
-  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, initial_leader, tablet_id_,
-                                          MonoDelta::FromSeconds(10)));
-
-  // The server we will remove and then bring back.
-  TServerDetails* final_leader = tservers[4];
-
-  // Kill the master, so we can change the config without interference.
-  cluster_->master()->Shutdown();
-
-  // Now remove server 4 from the configuration.
-  TabletServerMap active_tablet_servers = tablet_servers_;
-  LOG(INFO) << "Removing tserver with uuid " << final_leader->uuid();
-  ASSERT_OK(RemoveServer(initial_leader, tablet_id_, final_leader, boost::none,
-                         MonoDelta::FromSeconds(10)));
-  ASSERT_EQ(1, active_tablet_servers.erase(final_leader->uuid()));
-  int64_t cur_log_index = 2;
-  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
-                                  active_tablet_servers, tablet_id_, cur_log_index));
-
-  // Pause tablet servers 1 through 3, so they won't see the operation to add
-  // server 4 back.
-  LOG(INFO) << "Pausing 3 replicas...";
-  for (int i = 1; i <= 3; i++) {
-    ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(tservers[i]->uuid());
-    ASSERT_OK(replica_ts->Pause());
-  }
-
-  // Now add server 4 back to the peers.
-  // This operation will time out on the client side.
-  LOG(INFO) << "Adding back Peer " << final_leader->uuid() << " and expecting timeout...";
-  Status s = AddServer(initial_leader, tablet_id_, final_leader, RaftPeerPB::VOTER, boost::none,
-                       MonoDelta::FromMilliseconds(100));
-  ASSERT_TRUE(s.IsTimedOut()) << "Expected AddServer() to time out. Result: " << s.ToString();
-  LOG(INFO) << "Timeout achieved.";
-  active_tablet_servers = tablet_servers_; // Reset to the unpaused servers.
-  for (int i = 1; i <= 3; i++) {
-    ASSERT_EQ(1, active_tablet_servers.erase(tservers[i]->uuid()));
-  }
-  // Only wait for TS 0 and 4 to agree that the new change config op has been
-  // replicated.
-  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
-                                  active_tablet_servers, tablet_id_, ++cur_log_index));
-
-  // Now that TS 4 is electable (and pending), have TS 0 step down.
-  LOG(INFO) << "Forcing Peer " << initial_leader->uuid() << " to step down...";
-  ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
-
-  // Resume TS 1 so we have a majority of 3 to elect a new leader.
-  LOG(INFO) << "Resuming Peer " << tservers[1]->uuid() << " ...";
-  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Resume());
-  InsertOrDie(&active_tablet_servers, tservers[1]->uuid(), tservers[1]);
-
-  // Now try to get TS 4 elected. It should succeed and push a NO_OP.
-  LOG(INFO) << "Trying to elect Peer " << tservers[4]->uuid() << " ...";
-  ASSERT_OK(StartElection(final_leader, tablet_id_, MonoDelta::FromSeconds(10)));
-  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
-                                  active_tablet_servers, tablet_id_, ++cur_log_index));
-
-  // Resume the remaining paused nodes.
-  LOG(INFO) << "Resuming remaining nodes...";
-  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Resume());
-  ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[3]->uuid())->Resume());
-  active_tablet_servers = tablet_servers_;
-
-  // Do one last operation on the new leader: an insert.
-  ASSERT_OK(WriteSimpleTestRow(final_leader, tablet_id_, RowOperationsPB::INSERT,
-                               kTestRowKey, kTestRowIntVal, "Ob-La-Di, Ob-La-Da",
-                               MonoDelta::FromSeconds(10)));
-
-  // Wait for all servers to replicate everything up through the last write op.
-  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
-                                  active_tablet_servers, tablet_id_, ++cur_log_index));
-}
-
 // Writes test rows in ascending order to a single tablet server.
 // Essentially a poor-man's version of TestWorkload that only operates on a
 // single tablet. Does not batch, does not tolerate timeouts, and does not
@@ -2121,12 +1571,11 @@ TEST_F(RaftConsensusITest, TestElectPendingVoter) {
 // a crash, as long as there is no crash then 'rows_inserted' will have a
 // correct count at the end of the run.
 // Crashes on any failure, so 'write_timeout' should be high.
-void DoWriteTestRows(const TServerDetails* leader_tserver,
-                     const string& tablet_id,
-                     const MonoDelta& write_timeout,
-                     AtomicInt<int32_t>* rows_inserted,
-                     const AtomicBool* finish) {
-
+static void DoWriteTestRows(const TServerDetails* leader_tserver,
+                            const string& tablet_id,
+                            const MonoDelta& write_timeout,
+                            AtomicInt<int32_t>* rows_inserted,
+                            const AtomicBool* finish) {
   while (!finish->Load()) {
     int row_key = rows_inserted->Increment();
     CHECK_OK(WriteSimpleTestRow(leader_tserver, tablet_id, RowOperationsPB::INSERT,
@@ -2626,23 +2075,6 @@ TEST_F(RaftConsensusITest, TestLargeBatches) {
   ASSERT_LE(num_wals, num_batches + 2);
 }
 
-void RaftConsensusITest::InsertPayloadIgnoreErrors(int start_row, int num_rows, int payload_size) {
-  shared_ptr<KuduTable> table;
-  CHECK_OK(client_->OpenTable(kTableId, &table));
-  shared_ptr<KuduSession> session = client_->NewSession();
-  session->SetTimeoutMillis(100);
-  CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
-  string payload(payload_size, 'x');
-  for (int i = 0; i < num_rows; i++) {
-    gscoped_ptr<KuduInsert> insert(table->NewInsert());
-    KuduPartialRow* row = insert->mutable_row();
-    CHECK_OK(row->SetInt32(0, i + start_row));
-    CHECK_OK(row->SetInt32(1, 0));
-    CHECK_OK(row->SetStringCopy(2, payload));
-    CHECK_OK(session->Apply(insert.release()));
-    ignore_result(session->Flush());
-  }
-}
 
 // Regression test for KUDU-1469, a case in which a leader and follower could get "stuck"
 // in a tight RPC loop, in which the leader would repeatedly send a batch of ops that the
@@ -2835,67 +2267,6 @@ TEST_F(RaftConsensusITest, TestEvictAbandonedFollowers) {
   ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id_, 2));
 }
 
-// Have a replica fall behind the leader's log, then fail a tablet copy. It
-// should still be able to vote in leader elections.
-TEST_F(RaftConsensusITest, TestTombstonedVoteAfterFailedTabletCopy) {
-  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
-
-  vector<string> ts_flags;
-  AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC().
-  const vector<string> kMasterFlags = {
-    "--master_add_server_when_underreplicated=false",
-  };
-  NO_FATALS(BuildAndStart(ts_flags, kMasterFlags));
-
-  TabletServerMap active_tablet_servers = tablet_servers_;
-  ASSERT_EQ(3, FLAGS_num_replicas);
-  ASSERT_EQ(3, active_tablet_servers.size());
-
-  string leader_uuid;
-  int64_t orig_term;
-  string follower_uuid;
-  NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid));
-
-  // Wait for the abandoned follower to be evicted.
-  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(2, tablet_servers_[leader_uuid],
-                                                tablet_id_, kTimeout));
-  ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid));
-  ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, 2));
-
-  // Now the follower is evicted and will be tombstoned.
-  // We'll add it back to the config and then crash the follower during the
-  // resulting tablet copy.
-  ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server_by_uuid(follower_uuid),
-            "tablet_copy_fault_crash_on_fetch_all", "1.0"));
-  auto* leader_ts = tablet_servers_[leader_uuid];
-  auto* follower_ts = tablet_servers_[follower_uuid];
-  ASSERT_OK(itest::AddServer(leader_ts, tablet_id_, follower_ts, RaftPeerPB::VOTER,
-                             boost::none, kTimeout));
-  ASSERT_OK(cluster_->tablet_server_by_uuid(follower_uuid)->WaitForInjectedCrash(kTimeout));
-
-  // Shut down the rest of the cluster, then only bring back the node we
-  // crashed, plus one other node. The tombstoned replica should still be able
-  // to vote and the tablet should come online.
-  cluster_->Shutdown();
-
-  int follower_idx = cluster_->tablet_server_index_by_uuid(follower_uuid);
-  ASSERT_OK(inspect_->CheckTabletDataStateOnTS(follower_idx, tablet_id_, { TABLET_DATA_COPYING }));
-
-  ASSERT_OK(cluster_->master()->Restart());
-  ASSERT_OK(cluster_->tablet_server_by_uuid(leader_uuid)->Restart());
-  ASSERT_OK(cluster_->tablet_server_by_uuid(follower_uuid)->Restart());
-
-  TestWorkload workload(cluster_.get());
-  workload.set_table_name(kTableId);
-  workload.set_timeout_allowed(true);
-  workload.Setup();
-  workload.Start();
-  ASSERT_EVENTUALLY([&] {
-    ASSERT_GE(workload.rows_inserted(), 100);
-  });
-  workload.StopAndJoin();
-}
-
 // Test that, after followers are evicted from the config, the master re-adds a new
 // replica for that follower and it eventually catches back up.
 TEST_F(RaftConsensusITest, TestMasterReplacesEvictedFollowers) {


[3/3] kudu git commit: [raft_consensus-itest] separate Raft election tests

Posted by al...@apache.org.
[raft_consensus-itest] separate Raft election tests

Separated election-related tests from raft_consensus-itest
into raft_consensus-election-itest.

This changelist does not contain any functional changes.

Change-Id: Ic5289b3de097f9a2ca152034fa2635b67ccddb93
Reviewed-on: http://gerrit.cloudera.org:8080/8278
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ab77ce02
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ab77ce02
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ab77ce02

Branch: refs/heads/master
Commit: ab77ce025d9fe68c993d26e15b604c4188e36860
Parents: 4414089
Author: Alexey Serbin <as...@cloudera.com>
Authored: Sat Oct 14 23:18:59 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Tue Oct 17 01:09:23 2017 +0000

----------------------------------------------------------------------
 build-support/dist_test.py                      |    2 +-
 src/kudu/integration-tests/CMakeLists.txt       |    2 +
 .../raft_consensus-itest-base.cc                |  273 ++++
 .../raft_consensus-itest-base.h                 |   68 +
 .../integration-tests/raft_consensus-itest.cc   | 1507 +++++-------------
 .../raft_consensus_election-itest.cc            |  532 +++++++
 src/kudu/tablet/mt-tablet-test.cc               |    2 +-
 src/kudu/tserver/tablet_server-test-base.cc     |    1 -
 8 files changed, 1316 insertions(+), 1071 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/build-support/dist_test.py
----------------------------------------------------------------------
diff --git a/build-support/dist_test.py b/build-support/dist_test.py
index a91d7d3..328d2b8 100755
--- a/build-support/dist_test.py
+++ b/build-support/dist_test.py
@@ -89,7 +89,7 @@ NUM_SHARDS_BY_TEST = {
   'delete_table-test': 8,
   'flex_partitioning-itest': 8,
   'mt-tablet-test': 4,
-  'raft_consensus-itest': 8
+  'raft_consensus-itest': 6
 }
 
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index d124662..ecdda20 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -27,6 +27,7 @@ set(INTEGRATION_TESTS_SRCS
   external_mini_cluster_fs_inspector.cc
   internal_mini_cluster-itest-base.cc
   log_verifier.cc
+  raft_consensus-itest-base.cc
   test_workload.cc
   ts_itest-base.cc
 )
@@ -86,6 +87,7 @@ ADD_KUDU_TEST(multidir_cluster-itest)
 ADD_KUDU_TEST(open-readonly-fs-itest)
 ADD_KUDU_TEST(raft_config_change-itest)
 ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true)
+ADD_KUDU_TEST(raft_consensus_election-itest)
 ADD_KUDU_TEST(registration-test RESOURCE_LOCK "master-web-port")
 ADD_KUDU_TEST(security-faults-itest)
 ADD_KUDU_TEST(security-itest)

http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/integration-tests/raft_consensus-itest-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest-base.cc b/src/kudu/integration-tests/raft_consensus-itest-base.cc
new file mode 100644
index 0000000..afd8c64
--- /dev/null
+++ b/src/kudu/integration-tests/raft_consensus-itest-base.cc
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/integration-tests/raft_consensus-itest-base.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <ostream>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/client.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/client/write_op.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/integration-tests/ts_itest-base.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+DEFINE_int32(num_client_threads, 8,
+             "Number of client threads to launch");
+DEFINE_int64(client_inserts_per_thread, 50,
+             "Number of rows inserted by each client thread");
+DECLARE_int32(consensus_rpc_timeout_ms);
+DEFINE_int64(client_num_batches_per_thread, 5,
+             "In how many batches to group the rows, for each client");
+
+METRIC_DECLARE_entity(tablet);
+METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
+METRIC_DECLARE_gauge_int64(raft_term);
+
+using kudu::client::KuduInsert;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
+using kudu::client::sp::shared_ptr;
+using kudu::cluster::ExternalTabletServer;
+using kudu::consensus::OpId;
+using kudu::itest::TServerDetails;
+using kudu::pb_util::SecureDebugString;
+using kudu::rpc::RpcController;
+using std::string;
+using std::vector;
+
+namespace kudu {
+namespace tserver {
+
+static const int kConsensusRpcTimeoutForTests = 50;
+
+RaftConsensusITestBase::RaftConsensusITestBase()
+      : inserters_(FLAGS_num_client_threads) {
+}
+
+void RaftConsensusITestBase::SetUp() {
+  TabletServerIntegrationTestBase::SetUp();
+  FLAGS_consensus_rpc_timeout_ms = kConsensusRpcTimeoutForTests;
+}
+
+void RaftConsensusITestBase::ScanReplica(TabletServerServiceProxy* replica_proxy,
+                                         vector<string>* results) {
+  ScanRequestPB req;
+  ScanResponsePB resp;
+  RpcController rpc;
+  rpc.set_timeout(MonoDelta::FromSeconds(10)); // Squelch warnings.
+
+  NewScanRequestPB* scan = req.mutable_new_scan_request();
+  scan->set_tablet_id(tablet_id_);
+  ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns()));
+
+  // Send the call
+  {
+    req.set_batch_size_bytes(0);
+    SCOPED_TRACE(SecureDebugString(req));
+    ASSERT_OK(replica_proxy->Scan(req, &resp, &rpc));
+    SCOPED_TRACE(SecureDebugString(resp));
+    if (resp.has_error()) {
+      ASSERT_OK(StatusFromPB(resp.error().status()));
+    }
+  }
+
+  if (!resp.has_more_results()) {
+    return;
+  }
+
+  // Drain all the rows from the scanner.
+  NO_FATALS(DrainScannerToStrings(resp.scanner_id(),
+                                  schema_,
+                                  results,
+                                  replica_proxy));
+
+  std::sort(results->begin(), results->end());
+}
+
+void RaftConsensusITestBase::InsertTestRowsRemoteThread(
+    uint64_t first_row,
+    uint64_t count,
+    uint64_t num_batches,
+    const vector<CountDownLatch*>& latches) {
+  shared_ptr<KuduTable> table;
+  CHECK_OK(client_->OpenTable(kTableId, &table));
+
+  shared_ptr<KuduSession> session = client_->NewSession();
+  session->SetTimeoutMillis(60000);
+  CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+  for (int i = 0; i < num_batches; i++) {
+    uint64_t first_row_in_batch = first_row + (i * count / num_batches);
+    uint64_t last_row_in_batch = first_row_in_batch + count / num_batches;
+
+    for (int j = first_row_in_batch; j < last_row_in_batch; j++) {
+      gscoped_ptr<KuduInsert> insert(table->NewInsert());
+      KuduPartialRow* row = insert->mutable_row();
+      CHECK_OK(row->SetInt32(0, j));
+      CHECK_OK(row->SetInt32(1, j * 2));
+      CHECK_OK(row->SetStringCopy(2, Slice(StringPrintf("hello %d", j))));
+      CHECK_OK(session->Apply(insert.release()));
+    }
+
+    FlushSessionOrDie(session);
+
+    int inserted = last_row_in_batch - first_row_in_batch;
+    for (CountDownLatch* latch : latches) {
+      latch->CountDown(inserted);
+    }
+  }
+
+  inserters_.CountDown();
+}
+
+void RaftConsensusITestBase::AddFlagsForLogRolls(vector<string>* extra_tserver_flags) {
+  // We configure a small log segment size so that we roll frequently,
+  // configure a small cache size so that we evict data from the cache, and
+  // retain as few segments as possible. We also turn off async segment
+  // allocation -- this ensures that we roll many segments of logs (with async
+  // allocation, it's possible that the preallocation is slow and we wouldn't
+  // roll deterministically).
+  //
+  // Additionally, we disable log compression, since these tests write a lot of
+  // repetitive data to cause the rolls, and compression would make it all tiny.
+  extra_tserver_flags->push_back("--log_compression_codec=none");
+  extra_tserver_flags->push_back("--log_cache_size_limit_mb=1");
+  extra_tserver_flags->push_back("--log_segment_size_mb=1");
+  extra_tserver_flags->push_back("--log_async_preallocate_segments=false");
+  extra_tserver_flags->push_back("--log_min_segments_to_retain=1");
+  extra_tserver_flags->push_back("--log_max_segments_to_retain=3");
+  extra_tserver_flags->push_back("--maintenance_manager_polling_interval_ms=100");
+  extra_tserver_flags->push_back("--log_target_replay_size_mb=1");
+  // We write 128KB cells in CauseFollowerToFallBehindLogGC(): bump the limit.
+  extra_tserver_flags->push_back("--max_cell_size_bytes=1000000");
+}
+
+void RaftConsensusITestBase::CauseFollowerToFallBehindLogGC(
+    string* leader_uuid,
+    int64_t* orig_term,
+    string* fell_behind_uuid) {
+  MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  // Wait for all of the replicas to have acknowledged the elected
+  // leader and logged the first NO_OP.
+  ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
+
+  // Pause one server. This might be the leader, but pausing it will cause
+  // a leader election to happen.
+  TServerDetails* replica = (*tablet_replicas_.begin()).second;
+  ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid());
+  ASSERT_OK(replica_ets->Pause());
+
+  // Find a leader. In case we paused the leader above, this will wait until
+  // we have elected a new one.
+  TServerDetails* leader = nullptr;
+  while (true) {
+    Status s = GetLeaderReplicaWithRetries(tablet_id_, &leader);
+    if (s.ok() && leader != nullptr && leader != replica) {
+      break;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  *leader_uuid = leader->uuid();
+  int leader_index = cluster_->tablet_server_index_by_uuid(*leader_uuid);
+
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableId);
+  workload.set_timeout_allowed(true);
+  workload.set_payload_bytes(128 * 1024); // Write ops of size 128KB.
+  workload.set_write_batch_size(1);
+  workload.set_num_write_threads(4);
+  workload.Setup();
+  workload.Start();
+
+  LOG(INFO) << "Waiting until we've written at least 4MB...";
+  while (workload.rows_inserted() < 8 * 4) {
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+  workload.StopAndJoin();
+
+  LOG(INFO) << "Waiting for log GC on " << leader->uuid();
+  // Some WAL segments must exist, but wal segment 1 must not exist.
+  ASSERT_OK(inspect_->WaitForFilePatternInTabletWalDirOnTs(
+      leader_index, tablet_id_, { "wal-" }, { "wal-000000001" }));
+
+  LOG(INFO) << "Log GC complete on " << leader->uuid();
+
+  // Then wait another couple of seconds to be sure that it has bothered to try
+  // to write to the paused peer.
+  // TODO(unknown): would be nice to be able to poll the leader with an RPC like
+  // GetLeaderStatus() which could tell us whether it has made any requests
+  // since the log GC.
+  SleepFor(MonoDelta::FromSeconds(2));
+
+  // Make a note of whatever the current term of the cluster is,
+  // before we resume the follower.
+  {
+    OpId op_id;
+    ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, consensus::RECEIVED_OPID, kTimeout,
+                                    &op_id));
+    *orig_term = op_id.term();
+    LOG(INFO) << "Servers converged with original term " << *orig_term;
+  }
+
+  // Resume the follower.
+  LOG(INFO) << "Resuming  " << replica->uuid();
+  ASSERT_OK(replica_ets->Resume());
+
+  // Ensure that none of the tablet servers crashed.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    // Make sure it didn't crash.
+    ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive())
+      << "Tablet server " << i << " crashed";
+  }
+  *fell_behind_uuid = replica->uuid();
+}
+
+}  // namespace tserver
+}  // namespace kudu
+

http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/integration-tests/raft_consensus-itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest-base.h b/src/kudu/integration-tests/raft_consensus-itest-base.h
new file mode 100644
index 0000000..98a379e
--- /dev/null
+++ b/src/kudu/integration-tests/raft_consensus-itest-base.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include "kudu/integration-tests/ts_itest-base.h"
+#include "kudu/util/countdown_latch.h"
+
+namespace kudu {
+namespace tserver {
+
+class TabletServerServiceProxy;
+
+// Integration test for the raft consensus implementation.
+// Uses the whole tablet server stack with ExternalMiniCluster.
+class RaftConsensusITestBase : public TabletServerIntegrationTestBase {
+ public:
+  RaftConsensusITestBase();
+
+  void SetUp() override;
+
+  void ScanReplica(TabletServerServiceProxy* replica_proxy,
+                   std::vector<std::string>* results);
+
+  void InsertTestRowsRemoteThread(uint64_t first_row,
+                                  uint64_t count,
+                                  uint64_t num_batches,
+                                  const std::vector<CountDownLatch*>& latches);
+ protected:
+  // Flags needed for CauseFollowerToFallBehindLogGC() to work well.
+  void AddFlagsForLogRolls(std::vector<std::string>* extra_tserver_flags);
+
+  // Pause one of the followers and write enough data to the remaining replicas
+  // to cause log GC, then resume the paused follower. On success,
+  // 'leader_uuid' will be set to the UUID of the leader, 'orig_term' will be
+  // set to the term of the leader before un-pausing the follower, and
+  // 'fell_behind_uuid' will be set to the UUID of the follower that was paused
+  // and caused to fall behind. These can be used for verification purposes.
+  //
+  // Certain flags should be set. You can add the required flags with
+  // AddFlagsForLogRolls() before starting the cluster.
+  void CauseFollowerToFallBehindLogGC(std::string* leader_uuid,
+                                      int64_t* orig_term,
+                                      std::string* fell_behind_uuid);
+
+  CountDownLatch inserters_;
+};
+
+}  // namespace tserver
+}  // namespace kudu