You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/10/17 01:10:32 UTC
[1/3] kudu git commit: [raft_consensus-itest] separate Raft election
tests
Repository: kudu
Updated Branches:
refs/heads/master 441408908 -> ab77ce025
http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/integration-tests/raft_consensus_election-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus_election-itest.cc b/src/kudu/integration-tests/raft_consensus_election-itest.cc
new file mode 100644
index 0000000..ba4d538
--- /dev/null
+++ b/src/kudu/integration-tests/raft_consensus_election-itest.cc
@@ -0,0 +1,532 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <ostream>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/cluster_verifier.h"
+#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
+#include "kudu/integration-tests/raft_consensus-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_int64(client_inserts_per_thread);
+DECLARE_int64(client_num_batches_per_thread);
+DECLARE_int32(consensus_rpc_timeout_ms);
+DECLARE_int32(num_client_threads);
+DECLARE_int32(num_replicas);
+DECLARE_int32(num_tablet_servers);
+
+METRIC_DECLARE_entity(tablet);
+METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
+METRIC_DECLARE_gauge_int64(raft_term);
+
+using kudu::cluster::ExternalTabletServer;
+using kudu::consensus::RaftPeerPB;
+using kudu::itest::AddServer;
+using kudu::itest::GetReplicaStatusAndCheckIfLeader;
+using kudu::itest::LeaderStepDown;
+using kudu::itest::RemoveServer;
+using kudu::itest::StartElection;
+using kudu::itest::TabletServerMap;
+using kudu::itest::TServerDetails;
+using kudu::itest::WaitUntilLeader;
+using kudu::itest::WriteSimpleTestRow;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::tablet::TABLET_DATA_COPYING;
+using std::string;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace tserver {
+
+static const int kTestRowKey = 1234;
+static const int kTestRowIntVal = 5678;
+
+class RaftConsensusElectionITest : public RaftConsensusITestBase {
+ protected:
+ void CreateClusterForChurnyElectionsTests(const vector<string>& extra_ts_flags);
+ void DoTestChurnyElections(TestWorkload* workload, int max_rows_to_insert);
+};
+
+void RaftConsensusElectionITest::CreateClusterForChurnyElectionsTests(
+ const vector<string>& extra_ts_flags) {
+ vector<string> ts_flags;
+
+#ifdef THREAD_SANITIZER
+ // On TSAN builds, we need to be a little bit less churny in order to make
+ // any progress at all.
+ ts_flags.push_back("--raft_heartbeat_interval_ms=5");
+#else
+ ts_flags.emplace_back("--raft_heartbeat_interval_ms=1");
+#endif
+
+ ts_flags.insert(ts_flags.end(), extra_ts_flags.cbegin(), extra_ts_flags.cend());
+
+ CreateCluster("raft_consensus-itest-cluster", ts_flags, {});
+}
+
+void RaftConsensusElectionITest::DoTestChurnyElections(TestWorkload* workload,
+ int max_rows_to_insert) {
+ workload->set_num_replicas(FLAGS_num_replicas);
+ // Set a really high write timeout so that even in the presence of many failures we
+ // can verify an exact number of rows in the end, thanks to exactly once semantics.
+ workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
+ workload->set_num_write_threads(2);
+ workload->set_write_batch_size(1);
+ workload->Setup();
+ workload->Start();
+
+ // Run for either a prescribed number of writes, or 30 seconds,
+ // whichever comes first. This prevents test timeouts on slower
+ // build machines, TSAN builds, etc.
+ Stopwatch sw;
+ sw.start();
+ while (workload->rows_inserted() < max_rows_to_insert &&
+ sw.elapsed().wall_seconds() < 30) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ NO_FATALS(AssertNoTabletServersCrashed());
+ }
+ workload->StopAndJoin();
+ ASSERT_GT(workload->rows_inserted(), 0) << "No rows inserted";
+
+ // Ensure that the replicas converge.
+ // We expect an exact result due to exactly once semantics and snapshot scans.
+ ClusterVerifier v(cluster_.get());
+ NO_FATALS(v.CheckCluster());
+ NO_FATALS(v.CheckRowCount(workload->table_name(),
+ ClusterVerifier::EXACTLY,
+ workload->rows_inserted()));
+ NO_FATALS(AssertNoTabletServersCrashed());
+}
+
+TEST_F(RaftConsensusElectionITest, RunLeaderElection) {
+ // Reset consensus RPC timeout to the default value, otherwise the election
+ // might fail often, making the test flaky.
+ FLAGS_consensus_rpc_timeout_ms = 1000;
+ NO_FATALS(BuildAndStart());
+
+ int num_iters = AllowSlowTests() ? 10 : 1;
+
+ InsertTestRowsRemoteThread(0,
+ FLAGS_client_inserts_per_thread * num_iters,
+ FLAGS_client_num_batches_per_thread,
+ {});
+
+ NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters));
+
+ // Select the last follower to be new leader.
+ vector<TServerDetails*> followers;
+ GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
+
+ // Now shutdown the current leader.
+ TServerDetails* leader = DCHECK_NOTNULL(GetLeaderReplicaOrNull(tablet_id_));
+ ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid());
+ leader_ets->Shutdown();
+
+ TServerDetails* replica = followers.back();
+ CHECK_NE(leader->instance_id.permanent_uuid(), replica->instance_id.permanent_uuid());
+
+ // Make the new replica leader.
+ ASSERT_OK(StartElection(replica, tablet_id_, MonoDelta::FromSeconds(10)));
+
+ // Insert a bunch more rows.
+ InsertTestRowsRemoteThread(FLAGS_client_inserts_per_thread * num_iters,
+ FLAGS_client_inserts_per_thread * num_iters,
+ FLAGS_client_num_batches_per_thread,
+ {});
+
+ // Restart the original replica and make sure they all agree.
+ ASSERT_OK(leader_ets->Restart());
+
+ NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters * 2));
+}
+
+// This test sets all of the election timers to be very short, resulting
+// in a lot of churn. We expect to make some progress and not diverge or
+// crash, despite the frequent re-elections and races.
+TEST_F(RaftConsensusElectionITest, ChurnyElections) {
+ const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
+ CreateClusterForChurnyElectionsTests({});
+ TestWorkload workload(cluster_.get());
+ workload.set_write_batch_size(1);
+ workload.set_num_read_threads(2);
+ DoTestChurnyElections(&workload, kNumWrites);
+}
+
+// The same test, except inject artificial latency when propagating notifications
+// from the queue back to consensus. This previously reproduced bugs like KUDU-1078 which
+// normally only appear under high load.
+TEST_F(RaftConsensusElectionITest, ChurnyElections_WithNotificationLatency) {
+ CreateClusterForChurnyElectionsTests({"--consensus_inject_latency_ms_in_notifications=50"});
+ const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
+ TestWorkload workload(cluster_.get());
+ workload.set_write_batch_size(1);
+ workload.set_num_read_threads(2);
+ DoTestChurnyElections(&workload, kNumWrites);
+}
+
+// The same as TestChurnyElections except insert many duplicated rows.
+// This emulates cases where there are many duplicate keys which, due to two phase
+// locking, may cause deadlocks and other anomalies that cannot be observed when
+// keys are unique.
+TEST_F(RaftConsensusElectionITest, ChurnyElections_WithDuplicateKeys) {
+ CreateClusterForChurnyElectionsTests({});
+ const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
+ TestWorkload workload(cluster_.get());
+ workload.set_write_pattern(TestWorkload::INSERT_WITH_MANY_DUP_KEYS);
+ // Increase the number of rows per batch to get a higher chance of key collision.
+ workload.set_write_batch_size(3);
+ DoTestChurnyElections(&workload, kNumWrites);
+}
+
+// Test automatic leader election by killing leaders.
+TEST_F(RaftConsensusElectionITest, AutomaticLeaderElection) {
+ if (AllowSlowTests()) {
+ FLAGS_num_tablet_servers = 5;
+ FLAGS_num_replicas = 5;
+ }
+ NO_FATALS(BuildAndStart());
+
+ TServerDetails* leader;
+ ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
+
+ unordered_set<TServerDetails*> killed_leaders;
+
+ const int kNumLeadersToKill = FLAGS_num_replicas / 2;
+ const int kFinalNumReplicas = FLAGS_num_replicas / 2 + 1;
+
+ for (int leaders_killed = 0; leaders_killed < kFinalNumReplicas; leaders_killed++) {
+ LOG(INFO) << Substitute("Writing data to leader of $0-node config ($1 alive)...",
+ FLAGS_num_replicas, FLAGS_num_replicas - leaders_killed);
+
+ InsertTestRowsRemoteThread(leaders_killed * FLAGS_client_inserts_per_thread,
+ FLAGS_client_inserts_per_thread,
+ FLAGS_client_num_batches_per_thread,
+ {});
+
+ // At this point, the writes are flushed but the commit index may not be
+ // propagated to all replicas. We kill the leader anyway.
+ if (leaders_killed < kNumLeadersToKill) {
+ LOG(INFO) << "Killing current leader " << leader->instance_id.permanent_uuid() << "...";
+ cluster_->tablet_server_by_uuid(leader->uuid())->Shutdown();
+ InsertOrDie(&killed_leaders, leader);
+
+ LOG(INFO) << "Waiting for new guy to be elected leader.";
+ ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
+ }
+ }
+
+ // Restart every node that was killed, and wait for the nodes to converge
+ for (TServerDetails* killed_node : killed_leaders) {
+ CHECK_OK(cluster_->tablet_server_by_uuid(killed_node->uuid())->Restart());
+ }
+ // Verify the data on the remaining replicas.
+ NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * kFinalNumReplicas));
+}
+
+// Single-replica leader election test.
+TEST_F(RaftConsensusElectionITest, AutomaticLeaderElectionOneReplica) {
+ FLAGS_num_tablet_servers = 1;
+ FLAGS_num_replicas = 1;
+ NO_FATALS(BuildAndStart());
+ // Ensure that single-node Raft configs elect themselves as leader
+ // immediately upon Consensus startup.
+ ASSERT_OK(GetReplicaStatusAndCheckIfLeader(tablet_servers_[cluster_->tablet_server(0)->uuid()],
+ tablet_id_, MonoDelta::FromMilliseconds(500)));
+}
+
+TEST_F(RaftConsensusElectionITest, LeaderStepDown) {
+ const vector<string> kTsFlags = {
+ "--enable_leader_failure_detection=false"
+ };
+ const vector<string> kMasterFlags = {
+ "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
+ };
+
+ FLAGS_num_replicas = 3;
+ FLAGS_num_tablet_servers = 3;
+ NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
+
+ vector<TServerDetails*> tservers;
+ AppendValuesFromMap(tablet_servers_, &tservers);
+
+ // Start with no leader.
+ Status s = GetReplicaStatusAndCheckIfLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10));
+ ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader yet: " << s.ToString();
+
+ // Become leader.
+ ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
+ ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
+ ASSERT_OK(WriteSimpleTestRow(tservers[0], tablet_id_, RowOperationsPB::INSERT,
+ kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10)));
+ ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 2));
+
+ // Step down and test that a 2nd stepdown returns the expected result.
+ ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
+ TabletServerErrorPB error;
+ s = LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(10), &error);
+ ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader anymore: " << s.ToString();
+ ASSERT_EQ(TabletServerErrorPB::NOT_THE_LEADER, error.code()) << SecureShortDebugString(error);
+
+ s = WriteSimpleTestRow(tservers[0], tablet_id_, RowOperationsPB::INSERT,
+ kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10));
+ ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not accept writes as follower: "
+ << s.ToString();
+}
+
+// Test for KUDU-699: sets the consensus RPC timeout to be long,
+// and freezes both followers before asking the leader to step down.
+// Prior to fixing KUDU-699, the step-down process would block
+// until the pending requests timed out.
+TEST_F(RaftConsensusElectionITest, StepDownWithSlowFollower) {
+ const vector<string> kTsFlags = {
+ "--enable_leader_failure_detection=false",
+ // Bump up the RPC timeout, so that we can verify that the stepdown responds
+ // quickly even when an outbound request is hung.
+ "--consensus_rpc_timeout_ms=15000"
+ };
+ const vector<string> kMasterFlags = {
+ "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
+ };
+
+ NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
+
+ vector<TServerDetails*> tservers;
+ AppendValuesFromMap(tablet_servers_, &tservers);
+ ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
+ ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
+
+ // Stop both followers.
+ for (int i = 1; i < 3; i++) {
+ ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[i]->uuid())->Pause());
+ }
+
+ // Sleep a little bit of time to make sure that the leader has outstanding heartbeats
+ // to the paused followers before requesting the stepdown.
+ SleepFor(MonoDelta::FromSeconds(1));
+
+ // Step down should respond quickly despite the hung requests.
+ ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(3)));
+}
+
+// Ensure that we can elect a server that is in the "pending" configuration.
+// This is required by the Raft protocol. See Diego Ongaro's PhD thesis, section
+// 4.1, where it states that "it is the caller’s configuration that is used in
+// reaching consensus, both for voting and for log replication".
+//
+// This test also tests the case where a node comes back from the dead to a
+// leader that was not in its configuration when it died. That should also work, i.e.
+// the revived node should accept writes from the new leader.
+TEST_F(RaftConsensusElectionITest, ElectPendingVoter) {
+ // Test plan:
+ // 1. Disable failure detection to avoid non-deterministic behavior.
+ // 2. Start with a configuration size of 5, all servers synced.
+ // 3. Remove one server from the configuration, wait until committed.
+ // 4. Pause the 3 remaining non-leaders (SIGSTOP).
+ // 5. Run a config change to add back the previously-removed server.
+ // Ensure that, while the op cannot be committed yet due to lack of a
+ // majority in the new config (only 2 out of 5 servers are alive), the op
+ // has been replicated to both the local leader and the new member.
+ // 6. Force the existing leader to step down.
+ // 7. Resume one of the paused nodes so that a majority (of the 5-node
+ // configuration, but not the original 4-node configuration) will be available.
+ // 8. Start a leader election on the new (pending) node. It should win.
+ // 9. Unpause the two remaining stopped nodes.
+ // 10. Wait for all nodes to sync to the new leader's log.
+ const vector<string> kTsFlags = {
+ "--enable_leader_failure_detection=false",
+ };
+ const vector<string> kMasterFlags = {
+ "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
+ };
+
+ FLAGS_num_tablet_servers = 5;
+ FLAGS_num_replicas = 5;
+ NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
+
+ vector<TServerDetails*> tservers;
+ AppendValuesFromMap(tablet_servers_, &tservers);
+ ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
+
+ // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
+ TServerDetails* initial_leader = tservers[0];
+ ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+ ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, initial_leader, tablet_id_,
+ MonoDelta::FromSeconds(10)));
+
+ // The server we will remove and then bring back.
+ TServerDetails* final_leader = tservers[4];
+
+ // Kill the master, so we can change the config without interference.
+ cluster_->master()->Shutdown();
+
+ // Now remove server 4 from the configuration.
+ TabletServerMap active_tablet_servers = tablet_servers_;
+ LOG(INFO) << "Removing tserver with uuid " << final_leader->uuid();
+ ASSERT_OK(RemoveServer(initial_leader, tablet_id_, final_leader, boost::none,
+ MonoDelta::FromSeconds(10)));
+ ASSERT_EQ(1, active_tablet_servers.erase(final_leader->uuid()));
+ int64_t cur_log_index = 2;
+ ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
+ active_tablet_servers, tablet_id_, cur_log_index));
+
+ // Pause tablet servers 1 through 3, so they won't see the operation to add
+ // server 4 back.
+ LOG(INFO) << "Pausing 3 replicas...";
+ for (int i = 1; i <= 3; i++) {
+ ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(tservers[i]->uuid());
+ ASSERT_OK(replica_ts->Pause());
+ }
+
+ // Now add server 4 back to the peers.
+ // This operation will time out on the client side.
+ LOG(INFO) << "Adding back Peer " << final_leader->uuid() << " and expecting timeout...";
+ Status s = AddServer(initial_leader, tablet_id_, final_leader, RaftPeerPB::VOTER, boost::none,
+ MonoDelta::FromMilliseconds(100));
+ ASSERT_TRUE(s.IsTimedOut()) << "Expected AddServer() to time out. Result: " << s.ToString();
+ LOG(INFO) << "Timeout achieved.";
+ active_tablet_servers = tablet_servers_; // Reset to the unpaused servers.
+ for (int i = 1; i <= 3; i++) {
+ ASSERT_EQ(1, active_tablet_servers.erase(tservers[i]->uuid()));
+ }
+ // Only wait for TS 0 and 4 to agree that the new change config op has been
+ // replicated.
+ ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
+ active_tablet_servers, tablet_id_, ++cur_log_index));
+
+ // Now that TS 4 is electable (and pending), have TS 0 step down.
+ LOG(INFO) << "Forcing Peer " << initial_leader->uuid() << " to step down...";
+ ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+
+ // Resume TS 1 so we have a majority of 3 to elect a new leader.
+ LOG(INFO) << "Resuming Peer " << tservers[1]->uuid() << " ...";
+ ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Resume());
+ InsertOrDie(&active_tablet_servers, tservers[1]->uuid(), tservers[1]);
+
+ // Now try to get TS 4 elected. It should succeed and push a NO_OP.
+ LOG(INFO) << "Trying to elect Peer " << tservers[4]->uuid() << " ...";
+ ASSERT_OK(StartElection(final_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+ ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
+ active_tablet_servers, tablet_id_, ++cur_log_index));
+
+ // Resume the remaining paused nodes.
+ LOG(INFO) << "Resuming remaining nodes...";
+ ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Resume());
+ ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[3]->uuid())->Resume());
+ active_tablet_servers = tablet_servers_;
+
+ // Do one last operation on the new leader: an insert.
+ ASSERT_OK(WriteSimpleTestRow(final_leader, tablet_id_, RowOperationsPB::INSERT,
+ kTestRowKey, kTestRowIntVal, "Ob-La-Di, Ob-La-Da",
+ MonoDelta::FromSeconds(10)));
+
+ // Wait for all servers to replicate everything up through the last write op.
+ ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
+ active_tablet_servers, tablet_id_, ++cur_log_index));
+}
+
+// Have a replica fall behind the leader's log, then fail a tablet copy. It
+// should still be able to vote in leader elections.
+TEST_F(RaftConsensusElectionITest, TombstonedVoteAfterFailedTabletCopy) {
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+
+ vector<string> ts_flags;
+ AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC().
+ const vector<string> kMasterFlags = {
+ "--master_add_server_when_underreplicated=false",
+ };
+ NO_FATALS(BuildAndStart(ts_flags, kMasterFlags));
+
+ TabletServerMap active_tablet_servers = tablet_servers_;
+ ASSERT_EQ(3, FLAGS_num_replicas);
+ ASSERT_EQ(3, active_tablet_servers.size());
+
+ string leader_uuid;
+ int64_t orig_term;
+ string follower_uuid;
+ NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid));
+
+ // Wait for the abandoned follower to be evicted.
+ ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(2, tablet_servers_[leader_uuid],
+ tablet_id_, kTimeout));
+ ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid));
+ ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, 2));
+
+ // Now the follower is evicted and will be tombstoned.
+ // We'll add it back to the config and then crash the follower during the
+ // resulting tablet copy.
+ ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server_by_uuid(follower_uuid),
+ "tablet_copy_fault_crash_on_fetch_all", "1.0"));
+ auto* leader_ts = tablet_servers_[leader_uuid];
+ auto* follower_ts = tablet_servers_[follower_uuid];
+ ASSERT_OK(itest::AddServer(leader_ts, tablet_id_, follower_ts, RaftPeerPB::VOTER,
+ boost::none, kTimeout));
+ ASSERT_OK(cluster_->tablet_server_by_uuid(follower_uuid)->WaitForInjectedCrash(kTimeout));
+
+ // Shut down the rest of the cluster, then only bring back the node we
+ // crashed, plus one other node. The tombstoned replica should still be able
+ // to vote and the tablet should come online.
+ cluster_->Shutdown();
+
+ int follower_idx = cluster_->tablet_server_index_by_uuid(follower_uuid);
+ ASSERT_OK(inspect_->CheckTabletDataStateOnTS(follower_idx, tablet_id_, { TABLET_DATA_COPYING }));
+
+ ASSERT_OK(cluster_->master()->Restart());
+ ASSERT_OK(cluster_->tablet_server_by_uuid(leader_uuid)->Restart());
+ ASSERT_OK(cluster_->tablet_server_by_uuid(follower_uuid)->Restart());
+
+ TestWorkload workload(cluster_.get());
+ workload.set_table_name(kTableId);
+ workload.set_timeout_allowed(true);
+ workload.Setup();
+ workload.Start();
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_GE(workload.rows_inserted(), 100);
+ });
+ workload.StopAndJoin();
+}
+
+} // namespace tserver
+} // namespace kudu
+
http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/tablet/mt-tablet-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index 10bb8dc..d46155b 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -59,11 +59,11 @@ DECLARE_int32(tablet_delta_store_minor_compact_max);
DEFINE_int32(num_insert_threads, 8, "Number of inserting threads to launch");
DEFINE_int32(num_counter_threads, 8, "Number of counting threads to launch");
DEFINE_int32(num_summer_threads, 1, "Number of summing threads to launch");
-DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
DEFINE_int32(num_slowreader_threads, 1, "Number of 'slow' reader threads to launch");
DEFINE_int32(num_flush_threads, 1, "Number of flusher reader threads to launch");
DEFINE_int32(num_compact_threads, 1, "Number of compactor threads to launch");
DEFINE_int32(num_undo_delta_gc_threads, 1, "Number of undo delta gc threads to launch");
+DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
DEFINE_int32(num_flush_delta_threads, 1, "Number of delta flusher reader threads to launch");
DEFINE_int32(num_minor_compact_deltas_threads, 1,
"Number of delta minor compactor threads to launch");
http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/tserver/tablet_server-test-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc
index 192e3a0..8e701ce 100644
--- a/src/kudu/tserver/tablet_server-test-base.cc
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -63,7 +63,6 @@
#include "kudu/util/test_util.h"
DEFINE_int32(rpc_timeout, 1000, "Timeout for RPC calls, in seconds");
-DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
DECLARE_bool(enable_maintenance_manager);
DECLARE_int32(heartbeat_rpc_timeout_ms);
[2/3] kudu git commit: [raft_consensus-itest] separate Raft election
tests
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index b3b63f9..4d030c5 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <memory>
@@ -27,13 +26,11 @@
#include <vector>
#include <boost/optional/optional.hpp>
-#include <gflags/gflags.h>
#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <glog/stl_logging.h>
#include <gtest/gtest.h>
-#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/client/shared_ptr.h"
#include "kudu/client/write_op.h"
@@ -52,10 +49,8 @@
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
@@ -63,8 +58,8 @@
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
#include "kudu/integration-tests/log_verifier.h"
+#include "kudu/integration-tests/raft_consensus-itest-base.h"
#include "kudu/integration-tests/test_workload.h"
-#include "kudu/integration-tests/ts_itest-base.h"
#include "kudu/master/master.pb.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/rpc/messenger.h"
@@ -81,23 +76,18 @@
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random.h"
-#include "kudu/util/slice.h"
#include "kudu/util/status.h"
-#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
#include "kudu/util/thread.h"
+DECLARE_int64(client_inserts_per_thread);
+DECLARE_int64(client_num_batches_per_thread);
DECLARE_int32(consensus_rpc_timeout_ms);
+DECLARE_int32(num_client_threads);
DECLARE_int32(num_replicas);
DECLARE_int32(num_tablet_servers);
DECLARE_int32(rpc_timeout);
-DEFINE_int32(num_client_threads, 8,
- "Number of client threads to launch");
-DEFINE_int64(client_inserts_per_thread, 50,
- "Number of rows inserted by each client thread");
-DEFINE_int64(client_num_batches_per_thread, 5,
- "In how many batches to group the rows, for each client");
METRIC_DECLARE_entity(tablet);
METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
@@ -148,50 +138,9 @@ static const int kTestRowIntVal = 5678;
// Integration test for the raft consensus implementation.
// Uses the whole tablet server stack with ExternalMiniCluster.
-class RaftConsensusITest : public TabletServerIntegrationTestBase {
+class RaftConsensusITest : public RaftConsensusITestBase {
public:
- RaftConsensusITest()
- : inserters_(FLAGS_num_client_threads) {
- }
-
- virtual void SetUp() OVERRIDE {
- TabletServerIntegrationTestBase::SetUp();
- FLAGS_consensus_rpc_timeout_ms = kConsensusRpcTimeoutForTests;
- }
-
- void ScanReplica(TabletServerServiceProxy* replica_proxy,
- vector<string>* results) {
-
- ScanRequestPB req;
- ScanResponsePB resp;
- RpcController rpc;
- rpc.set_timeout(MonoDelta::FromSeconds(10)); // Squelch warnings.
-
- NewScanRequestPB* scan = req.mutable_new_scan_request();
- scan->set_tablet_id(tablet_id_);
- ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns()));
-
- // Send the call
- {
- req.set_batch_size_bytes(0);
- SCOPED_TRACE(SecureDebugString(req));
- ASSERT_OK(replica_proxy->Scan(req, &resp, &rpc));
- SCOPED_TRACE(SecureDebugString(resp));
- if (resp.has_error()) {
- ASSERT_OK(StatusFromPB(resp.error().status()));
- }
- }
-
- if (!resp.has_more_results())
- return;
-
- // Drain all the rows from the scanner.
- NO_FATALS(DrainScannerToStrings(resp.scanner_id(),
- schema_,
- results,
- replica_proxy));
-
- std::sort(results->begin(), results->end());
+ RaftConsensusITest() {
}
// Scan the given replica in a loop until the number of rows
@@ -199,28 +148,7 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
// fails the test.
void WaitForRowCount(TabletServerServiceProxy* replica_proxy,
int expected_count,
- vector<string>* results) {
- LOG(INFO) << "Waiting for row count " << expected_count << "...";
- MonoTime start = MonoTime::Now();
- MonoTime deadline = start + MonoDelta::FromSeconds(90);
- while (true) {
- results->clear();
- NO_FATALS(ScanReplica(replica_proxy, results));
- if (results->size() == expected_count) {
- return;
- }
- SleepFor(MonoDelta::FromMilliseconds(10));
- if (MonoTime::Now() >= deadline) {
- break;
- }
- }
- MonoTime end = MonoTime::Now();
- LOG(WARNING) << "Didn't reach row count " << expected_count;
- FAIL() << "Did not reach expected row count " << expected_count
- << " after " << (end - start).ToString()
- << ": rows: " << *results;
- }
-
+ vector<string>* results);
// Add an Insert operation to the given consensus request.
// The row to be inserted is generated based on the OpId.
@@ -230,100 +158,19 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
int32_t key,
ConsensusRequestPB* req);
-
string DumpToString(TServerDetails* leader,
const vector<string>& leader_results,
TServerDetails* replica,
- const vector<string>& replica_results) {
- string ret = strings::Substitute("Replica results did not match the leaders."
- "\nLeader: $0\nReplica: $1. Results size "
- "L: $2 R: $3",
- leader->ToString(),
- replica->ToString(),
- leader_results.size(),
- replica_results.size());
-
- StrAppend(&ret, "Leader Results: \n");
- for (const string& result : leader_results) {
- StrAppend(&ret, result, "\n");
- }
-
- StrAppend(&ret, "Replica Results: \n");
- for (const string& result : replica_results) {
- StrAppend(&ret, result, "\n");
- }
-
- return ret;
- }
+ const vector<string>& replica_results);
// Insert 'num_rows' rows starting with row key 'start_row'.
// Each row will have size 'payload_size'. A short (100ms) timeout is
// used. If the flush generates any errors they will be ignored.
void InsertPayloadIgnoreErrors(int start_row, int num_rows, int payload_size);
- void InsertTestRowsRemoteThread(uint64_t first_row,
- uint64_t count,
- uint64_t num_batches,
- const vector<CountDownLatch*>& latches) {
- shared_ptr<KuduTable> table;
- CHECK_OK(client_->OpenTable(kTableId, &table));
-
- shared_ptr<KuduSession> session = client_->NewSession();
- session->SetTimeoutMillis(60000);
- CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
-
- for (int i = 0; i < num_batches; i++) {
- uint64_t first_row_in_batch = first_row + (i * count / num_batches);
- uint64_t last_row_in_batch = first_row_in_batch + count / num_batches;
-
- for (int j = first_row_in_batch; j < last_row_in_batch; j++) {
- gscoped_ptr<KuduInsert> insert(table->NewInsert());
- KuduPartialRow* row = insert->mutable_row();
- CHECK_OK(row->SetInt32(0, j));
- CHECK_OK(row->SetInt32(1, j * 2));
- CHECK_OK(row->SetStringCopy(2, Slice(StringPrintf("hello %d", j))));
- CHECK_OK(session->Apply(insert.release()));
- }
-
- FlushSessionOrDie(session);
-
- int inserted = last_row_in_batch - first_row_in_batch;
- for (CountDownLatch* latch : latches) {
- latch->CountDown(inserted);
- }
- }
-
- inserters_.CountDown();
- }
-
// Brings Chaos to a MiniTabletServer by introducing random delays. Does this by
// pausing the daemon a random amount of time.
- void DelayInjectorThread(ExternalTabletServer* tablet_server, int timeout_msec) {
- while (inserters_.count() > 0) {
-
- // Adjust the value obtained from the normalized gauss. dist. so that we steal the lock
- // longer than the the timeout a small (~5%) percentage of the times.
- // (95% corresponds to 1.64485, in a normalized (0,1) gaussian distribution).
- double sleep_time_usec = 1000 *
- ((random_.Normal(0, 1) * timeout_msec) / 1.64485);
-
- if (sleep_time_usec < 0) sleep_time_usec = 0;
-
- // Additionally only cause timeouts at all 50% of the time, otherwise sleep.
- double val = (rand() * 1.0) / RAND_MAX;
- if (val < 0.5) {
- SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec));
- continue;
- }
-
- ASSERT_OK(tablet_server->Pause());
- LOG_IF(INFO, sleep_time_usec > 0.0)
- << "Delay injector thread for TS " << tablet_server->instance_id().permanent_uuid()
- << " SIGSTOPped the ts, sleeping for " << sleep_time_usec << " usec...";
- SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec));
- ASSERT_OK(tablet_server->Resume());
- }
- }
+ void DelayInjectorThread(ExternalTabletServer* tablet_server, int timeout_msec);
// Thread which loops until '*finish' becomes true, trying to insert a row
// on the given tablet server identified by 'replica_idx'.
@@ -332,49 +179,7 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
// Stops the current leader of the configuration, runs leader election and then brings it back.
// Before stopping the leader this pauses all follower nodes in regular intervals so that
// we get an increased chance of stuff being pending.
- void StopOrKillLeaderAndElectNewOne() {
- TServerDetails* leader;
- vector<TServerDetails*> followers;
- CHECK_OK(GetTabletLeaderAndFollowers(tablet_id_, &leader, &followers));
- ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid());
-
- for (const auto* ts : followers) {
- ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid());
- CHECK_OK(ets->Pause());
- SleepFor(MonoDelta::FromMilliseconds(100));
- }
-
- // When all are paused also pause or kill the current leader. Since we've waited a bit
- // the old leader is likely to have operations that must be aborted.
- const bool do_kill = rand() % 2 == 0;
- if (do_kill) {
- leader_ets->Shutdown();
- } else {
- CHECK_OK(leader_ets->Pause());
- }
-
- // Resume the replicas.
- for (const auto* ts : followers) {
- ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid());
- CHECK_OK(ets->Resume());
- }
-
- // Get the new leader.
- TServerDetails* new_leader;
- CHECK_OK(GetLeaderReplicaWithRetries(tablet_id_, &new_leader));
-
- // Bring the old leader back.
- if (do_kill) {
- CHECK_OK(leader_ets->Restart());
- // Wait until we have the same number of followers.
- const auto initial_followers = followers.size();
- do {
- GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
- } while (followers.size() < initial_followers);
- } else {
- CHECK_OK(leader_ets->Resume());
- }
- }
+ void StopOrKillLeaderAndElectNewOne();
// Writes 'num_writes' operations to the current leader. Each of the operations
// has a payload of around 128KB. Causes a gtest failure on error.
@@ -388,8 +193,6 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
void AssertMajorityRequiredForElectionsAndWrites(const TabletServerMap& tablet_servers,
const string& leader_uuid);
- void CreateClusterForChurnyElectionsTests(const vector<string>& extra_ts_flags);
- void DoTestChurnyElections(TestWorkload* workload, int max_rows_to_insert);
void CreateClusterForCrashyNodesTests();
void DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert);
@@ -398,57 +201,414 @@ class RaftConsensusITest : public TabletServerIntegrationTestBase {
void SetupSingleReplicaTest(TServerDetails** replica_ts);
protected:
- // Flags needed for CauseFollowerToFallBehindLogGC() to work well.
- void AddFlagsForLogRolls(vector<string>* extra_tserver_flags);
-
- // Pause one of the followers and write enough data to the remaining replicas
- // to cause log GC, then resume the paused follower. On success,
- // 'leader_uuid' will be set to the UUID of the leader, 'orig_term' will be
- // set to the term of the leader before un-pausing the follower, and
- // 'fell_behind_uuid' will be set to the UUID of the follower that was paused
- // and caused to fall behind. These can be used for verification purposes.
- //
- // Certain flags should be set. You can add the required flags with
- // AddFlagsForLogRolls() before starting the cluster.
- void CauseFollowerToFallBehindLogGC(string* leader_uuid,
- int64_t* orig_term,
- string* fell_behind_uuid);
-
// Retrieve the current term of the first tablet on this tablet server.
Status GetTermMetricValue(ExternalTabletServer* ts, int64_t* term);
shared_ptr<KuduTable> table_;
vector<scoped_refptr<kudu::Thread> > threads_;
- CountDownLatch inserters_;
};
+void RaftConsensusITest::WaitForRowCount(TabletServerServiceProxy* replica_proxy,
+ int expected_count,
+ vector<string>* results) {
+ LOG(INFO) << "Waiting for row count " << expected_count << "...";
+ MonoTime start = MonoTime::Now();
+ MonoTime deadline = start + MonoDelta::FromSeconds(90);
+ while (true) {
+ results->clear();
+ NO_FATALS(ScanReplica(replica_proxy, results));
+ if (results->size() == expected_count) {
+ return;
+ }
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ if (MonoTime::Now() >= deadline) {
+ break;
+ }
+ }
+ FAIL() << "Did not reach expected row count " << expected_count
+ << " after " << (MonoTime::Now() - start).ToString()
+ << ": rows: " << *results;
+}
+
+void RaftConsensusITest::AddOp(const OpId& id, ConsensusRequestPB* req) {
+ AddOpWithTypeAndKey(id, RowOperationsPB::INSERT,
+ id.index() * 10000 + id.term(), req);
+}
+
+void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id,
+ RowOperationsPB::Type op_type,
+ int32_t key,
+ ConsensusRequestPB* req) {
+ ReplicateMsg* msg = req->add_ops();
+ msg->mutable_id()->CopyFrom(id);
+ msg->set_timestamp(id.index());
+ msg->set_op_type(consensus::WRITE_OP);
+ WriteRequestPB* write_req = msg->mutable_write_request();
+ CHECK_OK(SchemaToPB(schema_, write_req->mutable_schema()));
+ write_req->set_tablet_id(tablet_id_);
+ AddTestRowToPB(op_type, schema_, key, id.term(),
+ SecureShortDebugString(id), write_req->mutable_row_operations());
+}
+
+string RaftConsensusITest::DumpToString(TServerDetails* leader,
+ const vector<string>& leader_results,
+ TServerDetails* replica,
+ const vector<string>& replica_results) {
+ string ret = strings::Substitute("Replica results did not match the leaders."
+ "\nLeader: $0\nReplica: $1. Results size "
+ "L: $2 R: $3",
+ leader->ToString(),
+ replica->ToString(),
+ leader_results.size(),
+ replica_results.size());
+
+ StrAppend(&ret, "Leader Results: \n");
+ for (const string& result : leader_results) {
+ StrAppend(&ret, result, "\n");
+ }
+
+ StrAppend(&ret, "Replica Results: \n");
+ for (const string& result : replica_results) {
+ StrAppend(&ret, result, "\n");
+ }
+
+ return ret;
+}
+
+void RaftConsensusITest::InsertPayloadIgnoreErrors(int start_row,
+ int num_rows,
+ int payload_size) {
+ shared_ptr<KuduTable> table;
+ CHECK_OK(client_->OpenTable(kTableId, &table));
+ shared_ptr<KuduSession> session = client_->NewSession();
+ session->SetTimeoutMillis(100);
+ CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ string payload(payload_size, 'x');
+ for (int i = 0; i < num_rows; i++) {
+ gscoped_ptr<KuduInsert> insert(table->NewInsert());
+ KuduPartialRow* row = insert->mutable_row();
+ CHECK_OK(row->SetInt32(0, i + start_row));
+ CHECK_OK(row->SetInt32(1, 0));
+ CHECK_OK(row->SetStringCopy(2, payload));
+ CHECK_OK(session->Apply(insert.release()));
+ ignore_result(session->Flush());
+ }
+}
+
+void RaftConsensusITest::DelayInjectorThread(
+ ExternalTabletServer* tablet_server, int timeout_msec) {
+
+ while (inserters_.count() > 0) {
+ // Adjust the value obtained from the normalized gauss. dist. so that we steal the lock
+ // longer than the the timeout a small (~5%) percentage of the times.
+ // (95% corresponds to 1.64485, in a normalized (0,1) gaussian distribution).
+ double sleep_time_usec = 1000 *
+ ((random_.Normal(0, 1) * timeout_msec) / 1.64485);
+
+ if (sleep_time_usec < 0) sleep_time_usec = 0;
+
+ // Additionally only cause timeouts at all 50% of the time, otherwise sleep.
+ double val = (rand() * 1.0) / RAND_MAX;
+ if (val < 0.5) {
+ SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec));
+ continue;
+ }
+
+ ASSERT_OK(tablet_server->Pause());
+ LOG_IF(INFO, sleep_time_usec > 0.0)
+ << "Delay injector thread for TS " << tablet_server->instance_id().permanent_uuid()
+ << " SIGSTOPped the ts, sleeping for " << sleep_time_usec << " usec...";
+ SleepFor(MonoDelta::FromMicroseconds(sleep_time_usec));
+ ASSERT_OK(tablet_server->Resume());
+ }
+}
+
+void RaftConsensusITest::StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish) {
+ vector<TServerDetails*> servers;
+ AppendValuesFromMap(tablet_servers_, &servers);
+ CHECK_LT(replica_idx, servers.size());
+ TServerDetails* ts = servers[replica_idx];
+
+ // Manually construct an RPC to our target replica. We expect most of the calls
+ // to fail either with an "already present" or an error because we are writing
+ // to a follower. That's OK, though - what we care about for this test is
+ // just that the operations Apply() in the same order everywhere (even though
+ // in this case the result will just be an error).
+ WriteRequestPB req;
+ WriteResponsePB resp;
+ RpcController rpc;
+ req.set_tablet_id(tablet_id_);
+ ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
+ AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal,
+ "hello world", req.mutable_row_operations());
+
+ while (!finish->Load()) {
+ resp.Clear();
+ rpc.Reset();
+ rpc.set_timeout(MonoDelta::FromSeconds(10));
+ ignore_result(ts->tserver_proxy->Write(req, &resp, &rpc));
+ VLOG(1) << "Response from server " << replica_idx << ": "
+ << SecureShortDebugString(resp);
+ }
+}
+
+void RaftConsensusITest::StopOrKillLeaderAndElectNewOne() {
+ TServerDetails* leader;
+ vector<TServerDetails*> followers;
+ CHECK_OK(GetTabletLeaderAndFollowers(tablet_id_, &leader, &followers));
+ ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid());
+
+ for (const auto* ts : followers) {
+ ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid());
+ CHECK_OK(ets->Pause());
+ SleepFor(MonoDelta::FromMilliseconds(100));
+ }
+
+ // When all are paused also pause or kill the current leader. Since we've waited a bit
+ // the old leader is likely to have operations that must be aborted.
+ const bool do_kill = rand() % 2 == 0;
+ if (do_kill) {
+ leader_ets->Shutdown();
+ } else {
+ CHECK_OK(leader_ets->Pause());
+ }
+
+ // Resume the replicas.
+ for (const auto* ts : followers) {
+ ExternalTabletServer* ets = cluster_->tablet_server_by_uuid(ts->uuid());
+ CHECK_OK(ets->Resume());
+ }
+
+ // Get the new leader.
+ TServerDetails* new_leader;
+ CHECK_OK(GetLeaderReplicaWithRetries(tablet_id_, &new_leader));
+
+ // Bring the old leader back.
+ if (do_kill) {
+ CHECK_OK(leader_ets->Restart());
+ // Wait until we have the same number of followers.
+ const auto initial_followers = followers.size();
+ do {
+ GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
+ } while (followers.size() < initial_followers);
+ } else {
+ CHECK_OK(leader_ets->Resume());
+ }
+}
+
+void RaftConsensusITest::Write128KOpsToLeader(int num_writes) {
+ TServerDetails* leader = nullptr;
+ ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
+
+ WriteRequestPB req;
+ req.set_tablet_id(tablet_id_);
+ ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
+ RowOperationsPB* data = req.mutable_row_operations();
+ WriteResponsePB resp;
+ RpcController rpc;
+ rpc.set_timeout(MonoDelta::FromMilliseconds(10000));
+ int key = 0;
+
+ // generate a 128Kb dummy payload
+ string test_payload(128 * 1024, '0');
+ for (int i = 0; i < num_writes; i++) {
+ rpc.Reset();
+ data->Clear();
+ AddTestRowToPB(RowOperationsPB::INSERT, schema_, key, key,
+ test_payload, data);
+ key++;
+ ASSERT_OK(leader->tserver_proxy->Write(req, &resp, &rpc));
+
+ ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
+ }
+}
+
+void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites(
+ const TabletServerMap& tablet_servers, const string& leader_uuid) {
+
+ TServerDetails* initial_leader = FindOrDie(tablet_servers, leader_uuid);
+
+ // Calculate number of servers to leave unpaused (minority).
+ // This math is a little unintuitive but works for cluster sizes including 2 and 1.
+ // Note: We assume all of these TSes are voters.
+ int config_size = tablet_servers.size();
+ int minority_to_retain = MajoritySize(config_size) - 1;
+
+ // Only perform this part of the test if we have some servers to pause, else
+ // the failure assertions will throw.
+ if (config_size > 1) {
+ // Pause enough replicas to prevent a majority.
+ int num_to_pause = config_size - minority_to_retain;
+ LOG(INFO) << "Pausing " << num_to_pause << " tablet servers in config of size " << config_size;
+ vector<string> paused_uuids;
+ for (const TabletServerMap::value_type& entry : tablet_servers) {
+ if (paused_uuids.size() == num_to_pause) {
+ continue;
+ }
+ const string& replica_uuid = entry.first;
+ if (replica_uuid == leader_uuid) {
+ // Always leave this one alone.
+ continue;
+ }
+ ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid);
+ ASSERT_OK(replica_ts->Pause());
+ paused_uuids.push_back(replica_uuid);
+ }
+
+ // Ensure writes timeout while only a minority is alive.
+ Status s = WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE,
+ kTestRowKey, kTestRowIntVal, "foo",
+ MonoDelta::FromMilliseconds(100));
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+ // Step down.
+ ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+
+ // Assert that elections time out without a live majority.
+ // We specify a very short timeout here to keep the tests fast.
+ ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+ s = WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromMilliseconds(100));
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ LOG(INFO) << "Expected timeout encountered on election with weakened config: " << s.ToString();
+
+ // Resume the paused servers.
+ LOG(INFO) << "Resuming " << num_to_pause << " tablet servers in config of size " << config_size;
+ for (const string& replica_uuid : paused_uuids) {
+ ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid);
+ ASSERT_OK(replica_ts->Resume());
+ }
+ }
+
+ ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(20), tablet_servers, tablet_id_, 1));
+
+ // Now an election should succeed.
+ ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+ ASSERT_OK(WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
+ LOG(INFO) << "Successful election with full config of size " << config_size;
+
+ // And a write should also succeed.
+ ASSERT_OK(WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE,
+ kTestRowKey, kTestRowIntVal, Substitute("qsz=$0", config_size),
+ MonoDelta::FromSeconds(10)));
+}
+
+void RaftConsensusITest::CreateClusterForCrashyNodesTests() {
+ if (AllowSlowTests()) {
+ FLAGS_num_tablet_servers = 7;
+ FLAGS_num_replicas = 7;
+ }
+
+ vector<string> ts_flags;
+
+ // Crash 5% of the time just before sending an RPC. With 7 servers,
+ // this means we crash about 30% of the time before we've fully
+ // replicated the NO_OP at the start of the term.
+ ts_flags.emplace_back("--fault_crash_on_leader_request_fraction=0.05");
+
+ // Inject latency to encourage the replicas to fall out of sync
+ // with each other.
+ ts_flags.emplace_back("--log_inject_latency");
+ ts_flags.emplace_back("--log_inject_latency_ms_mean=30");
+ ts_flags.emplace_back("--log_inject_latency_ms_stddev=60");
+
+ // Make leader elections faster so we get through more cycles of leaders.
+ ts_flags.emplace_back("--raft_heartbeat_interval_ms=100");
+
+ // Avoid preallocating segments since bootstrap is a little bit
+ // faster if it doesn't have to scan forward through the preallocated
+ // log area.
+ ts_flags.emplace_back("--log_preallocate_segments=false");
+
+ CreateCluster("raft_consensus-itest-crashy-nodes-cluster", ts_flags, {});
+}
+
+void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert) {
+ int crashes_to_cause = 3;
+ if (AllowSlowTests()) {
+ crashes_to_cause = 15;
+ }
+
+ workload->set_num_replicas(FLAGS_num_replicas);
+ // Set a really high write timeout so that even in the presence of many failures we
+ // can verify an exact number of rows in the end, thanks to exactly once semantics.
+ workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
+ workload->set_num_write_threads(10);
+ workload->set_num_read_threads(2);
+ workload->Setup();
+ workload->Start();
+
+ int num_crashes = 0;
+ while (num_crashes < crashes_to_cause &&
+ workload->rows_inserted() < max_rows_to_insert) {
+ num_crashes += RestartAnyCrashedTabletServers();
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+
+ // Writers are likely ongoing. To have some chance of completing all writes,
+ // restart the tablets servers, otherwise they'll keep crashing and the writes
+ // can never complete.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ ExternalTabletServer* ts = cluster_->tablet_server(i);
+ vector<string>* flags = ts->mutable_flags();
+ bool removed_flag = false;
+ for (auto it = flags->begin(); it != flags->end(); ++it) {
+ if (HasPrefixString(*it, "--fault_crash")) {
+ flags->erase(it);
+ removed_flag = true;
+ break;
+ }
+ }
+ ASSERT_TRUE(removed_flag) << "could not remove flag from TS " << i
+ << "\nFlags:\n" << *flags;
+ ts->Shutdown();
+ ASSERT_OK(ts->Restart());
+ }
+
+ workload->StopAndJoin();
+
+ // Ensure that the replicas converge.
+ ClusterVerifier v(cluster_.get());
+ NO_FATALS(v.CheckCluster());
+ NO_FATALS(v.CheckRowCount(workload->table_name(),
+ ClusterVerifier::EXACTLY,
+ workload->rows_inserted()));
+}
+
+void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
+ const vector<string> kTsFlags = {
+ // Don't use the hybrid clock as we set logical timestamps on ops.
+ "--use_hybrid_clock=false",
+ "--enable_leader_failure_detection=false",
+ };
+ const vector<string> kMasterFlags = {
+ "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
+ };
+
+ FLAGS_num_replicas = 3;
+ FLAGS_num_tablet_servers = 3;
+ NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
+
+ // Kill all the servers but one.
+ vector<TServerDetails*> tservers;
+ AppendValuesFromMap(tablet_servers_, &tservers);
+ ASSERT_EQ(3, tservers.size());
+
+ // Elect server 2 as leader and wait for log index 1 to propagate to all servers.
+ ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10)));
+ ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1));
+
+ cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown();
+ cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown();
+
+ *replica_ts = tservers[0];
+ LOG(INFO) << "================================== Cluster setup complete.";
+}
+
Status RaftConsensusITest::GetTermMetricValue(ExternalTabletServer* ts,
int64_t *term) {
return ts->GetInt64Metric(&METRIC_ENTITY_tablet, nullptr, &METRIC_raft_term, "value", term);
}
-void RaftConsensusITest::AddFlagsForLogRolls(vector<string>* extra_tserver_flags) {
- // We configure a small log segment size so that we roll frequently,
- // configure a small cache size so that we evict data from the cache, and
- // retain as few segments as possible. We also turn off async segment
- // allocation -- this ensures that we roll many segments of logs (with async
- // allocation, it's possible that the preallocation is slow and we wouldn't
- // roll deterministically).
- //
- // Additionally, we disable log compression, since these tests write a lot of
- // repetitive data to cause the rolls, and compression would make it all tiny.
- extra_tserver_flags->push_back("--log_compression_codec=none");
- extra_tserver_flags->push_back("--log_cache_size_limit_mb=1");
- extra_tserver_flags->push_back("--log_segment_size_mb=1");
- extra_tserver_flags->push_back("--log_async_preallocate_segments=false");
- extra_tserver_flags->push_back("--log_min_segments_to_retain=1");
- extra_tserver_flags->push_back("--log_max_segments_to_retain=3");
- extra_tserver_flags->push_back("--maintenance_manager_polling_interval_ms=100");
- extra_tserver_flags->push_back("--log_target_replay_size_mb=1");
- // We write 128KB cells in CauseFollowerToFallBehindLogGC(): bump the limit.
- extra_tserver_flags->push_back("--max_cell_size_bytes=1000000");
-}
-
// Test that we can retrieve the permanent uuid of a server running
// consensus service via RPC.
TEST_F(RaftConsensusITest, TestGetPermanentUuid) {
@@ -571,102 +731,34 @@ TEST_F(RaftConsensusITest, MultiThreadedMutateAndInsertThroughConsensus) {
TEST_F(RaftConsensusITest, TestInsertOnNonLeader) {
NO_FATALS(BuildAndStart());
- // Wait for the initial leader election to complete.
- ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_,
- tablet_id_, 1));
-
- // Manually construct a write RPC to a replica and make sure it responds
- // with the correct error code.
- WriteRequestPB req;
- WriteResponsePB resp;
- RpcController rpc;
- req.set_tablet_id(tablet_id_);
- ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
- AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal,
- "hello world via RPC", req.mutable_row_operations());
-
- // Get the leader.
- vector<TServerDetails*> followers;
- GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
-
- ASSERT_OK(followers[0]->tserver_proxy->Write(req, &resp, &rpc));
- SCOPED_TRACE(SecureDebugString(resp));
- ASSERT_TRUE(resp.has_error());
- Status s = StatusFromPB(resp.error().status());
- EXPECT_TRUE(s.IsIllegalState());
- ASSERT_STR_CONTAINS(s.ToString(), "is not leader of this config. Role: FOLLOWER");
- // TODO: need to change the error code to be something like REPLICA_NOT_LEADER
- // so that the client can properly handle this case! plumbing this is a little difficult
- // so not addressing at the moment.
- NO_FATALS(AssertAllReplicasAgree(0));
-}
-
-TEST_F(RaftConsensusITest, TestRunLeaderElection) {
- // Reset consensus rpc timeout to the default value or the election might fail often.
- FLAGS_consensus_rpc_timeout_ms = 1000;
- NO_FATALS(BuildAndStart());
-
- int num_iters = AllowSlowTests() ? 10 : 1;
-
- InsertTestRowsRemoteThread(0,
- FLAGS_client_inserts_per_thread * num_iters,
- FLAGS_client_num_batches_per_thread,
- vector<CountDownLatch*>());
-
- NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters));
-
- // Select the last follower to be new leader.
- vector<TServerDetails*> followers;
- GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
-
- // Now shutdown the current leader.
- TServerDetails* leader = DCHECK_NOTNULL(GetLeaderReplicaOrNull(tablet_id_));
- ExternalTabletServer* leader_ets = cluster_->tablet_server_by_uuid(leader->uuid());
- leader_ets->Shutdown();
-
- TServerDetails* replica = followers.back();
- CHECK_NE(leader->instance_id.permanent_uuid(), replica->instance_id.permanent_uuid());
-
- // Make the new replica leader.
- ASSERT_OK(StartElection(replica, tablet_id_, MonoDelta::FromSeconds(10)));
-
- // Insert a bunch more rows.
- InsertTestRowsRemoteThread(FLAGS_client_inserts_per_thread * num_iters,
- FLAGS_client_inserts_per_thread * num_iters,
- FLAGS_client_num_batches_per_thread,
- vector<CountDownLatch*>());
-
- // Restart the original replica and make sure they all agree.
- ASSERT_OK(leader_ets->Restart());
-
- NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * num_iters * 2));
-}
-
-void RaftConsensusITest::Write128KOpsToLeader(int num_writes) {
- TServerDetails* leader = nullptr;
- ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
+ // Wait for the initial leader election to complete.
+ ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_,
+ tablet_id_, 1));
+ // Manually construct a write RPC to a replica and make sure it responds
+ // with the correct error code.
WriteRequestPB req;
- req.set_tablet_id(tablet_id_);
- ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
- RowOperationsPB* data = req.mutable_row_operations();
WriteResponsePB resp;
RpcController rpc;
- rpc.set_timeout(MonoDelta::FromMilliseconds(10000));
- int key = 0;
+ req.set_tablet_id(tablet_id_);
+ ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
+ AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal,
+ "hello world via RPC", req.mutable_row_operations());
- // generate a 128Kb dummy payload
- string test_payload(128 * 1024, '0');
- for (int i = 0; i < num_writes; i++) {
- rpc.Reset();
- data->Clear();
- AddTestRowToPB(RowOperationsPB::INSERT, schema_, key, key,
- test_payload, data);
- key++;
- ASSERT_OK(leader->tserver_proxy->Write(req, &resp, &rpc));
+ // Get the leader.
+ vector<TServerDetails*> followers;
+ GetOnlyLiveFollowerReplicas(tablet_id_, &followers);
- ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
- }
+ ASSERT_OK(followers[0]->tserver_proxy->Write(req, &resp, &rpc));
+ SCOPED_TRACE(SecureDebugString(resp));
+ ASSERT_TRUE(resp.has_error());
+ Status s = StatusFromPB(resp.error().status());
+ EXPECT_TRUE(s.IsIllegalState());
+ ASSERT_STR_CONTAINS(s.ToString(), "is not leader of this config. Role: FOLLOWER");
+ // TODO(unknown): need to change the error code to be something like REPLICA_NOT_LEADER
+ // so that the client can properly handle this case! plumbing this is a little difficult
+ // so not addressing at the moment.
+ NO_FATALS(AssertAllReplicasAgree(0));
}
// Test that when a follower is stopped for a long time, the log cache
@@ -736,85 +828,6 @@ TEST_F(RaftConsensusITest, TestCatchupAfterOpsEvicted) {
});
}
-void RaftConsensusITest::CauseFollowerToFallBehindLogGC(string* leader_uuid,
- int64_t* orig_term,
- string* fell_behind_uuid) {
- MonoDelta kTimeout = MonoDelta::FromSeconds(10);
- // Wait for all of the replicas to have acknowledged the elected
- // leader and logged the first NO_OP.
- ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
-
- // Pause one server. This might be the leader, but pausing it will cause
- // a leader election to happen.
- TServerDetails* replica = (*tablet_replicas_.begin()).second;
- ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid());
- ASSERT_OK(replica_ets->Pause());
-
- // Find a leader. In case we paused the leader above, this will wait until
- // we have elected a new one.
- TServerDetails* leader = nullptr;
- while (true) {
- Status s = GetLeaderReplicaWithRetries(tablet_id_, &leader);
- if (s.ok() && leader != nullptr && leader != replica) {
- break;
- }
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
- *leader_uuid = leader->uuid();
- int leader_index = cluster_->tablet_server_index_by_uuid(*leader_uuid);
-
- TestWorkload workload(cluster_.get());
- workload.set_table_name(kTableId);
- workload.set_timeout_allowed(true);
- workload.set_payload_bytes(128 * 1024); // Write ops of size 128KB.
- workload.set_write_batch_size(1);
- workload.set_num_write_threads(4);
- workload.Setup();
- workload.Start();
-
- LOG(INFO) << "Waiting until we've written at least 4MB...";
- while (workload.rows_inserted() < 8 * 4) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
- workload.StopAndJoin();
-
- LOG(INFO) << "Waiting for log GC on " << leader->uuid();
- // Some WAL segments must exist, but wal segment 1 must not exist.
- ASSERT_OK(inspect_->WaitForFilePatternInTabletWalDirOnTs(
- leader_index, tablet_id_, { "wal-" }, { "wal-000000001" }));
-
- LOG(INFO) << "Log GC complete on " << leader->uuid();
-
- // Then wait another couple of seconds to be sure that it has bothered to try
- // to write to the paused peer.
- // TODO: would be nice to be able to poll the leader with an RPC like
- // GetLeaderStatus() which could tell us whether it has made any requests
- // since the log GC.
- SleepFor(MonoDelta::FromSeconds(2));
-
- // Make a note of whatever the current term of the cluster is,
- // before we resume the follower.
- {
- OpId op_id;
- ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, consensus::RECEIVED_OPID, kTimeout,
- &op_id));
- *orig_term = op_id.term();
- LOG(INFO) << "Servers converged with original term " << *orig_term;
- }
-
- // Resume the follower.
- LOG(INFO) << "Resuming " << replica->uuid();
- ASSERT_OK(replica_ets->Resume());
-
- // Ensure that none of the tablet servers crashed.
- for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
- // Make sure it didn't crash.
- ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive())
- << "Tablet server " << i << " crashed";
- }
- *fell_behind_uuid = replica->uuid();
-}
-
// Test that the leader doesn't crash if one of its followers has
// fallen behind so far that the logs necessary to catch it up
// have been GCed.
@@ -874,88 +887,6 @@ TEST_F(RaftConsensusITest, TestFollowerFallsBehindLeaderGC) {
}
}
-void RaftConsensusITest::CreateClusterForCrashyNodesTests() {
- if (AllowSlowTests()) {
- FLAGS_num_tablet_servers = 7;
- FLAGS_num_replicas = 7;
- }
-
- vector<string> ts_flags;
-
- // Crash 5% of the time just before sending an RPC. With 7 servers,
- // this means we crash about 30% of the time before we've fully
- // replicated the NO_OP at the start of the term.
- ts_flags.emplace_back("--fault_crash_on_leader_request_fraction=0.05");
-
- // Inject latency to encourage the replicas to fall out of sync
- // with each other.
- ts_flags.emplace_back("--log_inject_latency");
- ts_flags.emplace_back("--log_inject_latency_ms_mean=30");
- ts_flags.emplace_back("--log_inject_latency_ms_stddev=60");
-
- // Make leader elections faster so we get through more cycles of leaders.
- ts_flags.emplace_back("--raft_heartbeat_interval_ms=100");
-
- // Avoid preallocating segments since bootstrap is a little bit
- // faster if it doesn't have to scan forward through the preallocated
- // log area.
- ts_flags.emplace_back("--log_preallocate_segments=false");
-
- CreateCluster("raft_consensus-itest-crashy-nodes-cluster", ts_flags, {});
-}
-
-void RaftConsensusITest::DoTestCrashyNodes(TestWorkload* workload, int max_rows_to_insert) {
- int crashes_to_cause = 3;
- if (AllowSlowTests()) {
- crashes_to_cause = 15;
- }
-
- workload->set_num_replicas(FLAGS_num_replicas);
- // Set a really high write timeout so that even in the presence of many failures we
- // can verify an exact number of rows in the end, thanks to exactly once semantics.
- workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
- workload->set_num_write_threads(10);
- workload->set_num_read_threads(2);
- workload->Setup();
- workload->Start();
-
- int num_crashes = 0;
- while (num_crashes < crashes_to_cause &&
- workload->rows_inserted() < max_rows_to_insert) {
- num_crashes += RestartAnyCrashedTabletServers();
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
-
- // Writers are likely ongoing. To have some chance of completing all writes,
- // restart the tablets servers, otherwise they'll keep crashing and the writes
- // can never complete.
- for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
- ExternalTabletServer* ts = cluster_->tablet_server(i);
- vector<string>* flags = ts->mutable_flags();
- bool removed_flag = false;
- for (auto it = flags->begin(); it != flags->end(); ++it) {
- if (HasPrefixString(*it, "--fault_crash")) {
- flags->erase(it);
- removed_flag = true;
- break;
- }
- }
- ASSERT_TRUE(removed_flag) << "could not remove flag from TS " << i
- << "\nFlags:\n" << *flags;
- ts->Shutdown();
- ASSERT_OK(ts->Restart());
- }
-
- workload->StopAndJoin();
-
- // Ensure that the replicas converge.
- ClusterVerifier v(cluster_.get());
- NO_FATALS(v.CheckCluster());
- NO_FATALS(v.CheckRowCount(workload->table_name(),
- ClusterVerifier::EXACTLY,
- workload->rows_inserted()));
-}
-
// This test starts several tablet servers, and configures them with
// fault injection so that the leaders frequently crash just before
// sending RPCs to followers.
@@ -988,93 +919,6 @@ TEST_F(RaftConsensusITest, InsertDuplicateKeysWithCrashyNodes) {
NO_FATALS(DoTestCrashyNodes(&workload, 300));
}
-void RaftConsensusITest::CreateClusterForChurnyElectionsTests(
- const vector<string>& extra_ts_flags) {
- vector<string> ts_flags;
-
-#ifdef THREAD_SANITIZER
- // On TSAN builds, we need to be a little bit less churny in order to make
- // any progress at all.
- ts_flags.push_back("--raft_heartbeat_interval_ms=5");
-#else
- ts_flags.emplace_back("--raft_heartbeat_interval_ms=1");
-#endif
-
- ts_flags.insert(ts_flags.end(), extra_ts_flags.cbegin(), extra_ts_flags.cend());
-
- CreateCluster("raft_consensus-itest-cluster", ts_flags, {});
-}
-
-void RaftConsensusITest::DoTestChurnyElections(TestWorkload* workload, int max_rows_to_insert) {
- workload->set_num_replicas(FLAGS_num_replicas);
- // Set a really high write timeout so that even in the presence of many failures we
- // can verify an exact number of rows in the end, thanks to exactly once semantics.
- workload->set_write_timeout_millis(60 * 1000 /* 60 seconds */);
- workload->set_num_write_threads(2);
- workload->set_write_batch_size(1);
- workload->Setup();
- workload->Start();
-
- // Run for either a prescribed number of writes, or 30 seconds,
- // whichever comes first. This prevents test timeouts on slower
- // build machines, TSAN builds, etc.
- Stopwatch sw;
- sw.start();
- while (workload->rows_inserted() < max_rows_to_insert &&
- sw.elapsed().wall_seconds() < 30) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- NO_FATALS(AssertNoTabletServersCrashed());
- }
- workload->StopAndJoin();
- ASSERT_GT(workload->rows_inserted(), 0) << "No rows inserted";
-
- // Ensure that the replicas converge.
- // We expect an exact result due to exactly once semantics and snapshot scans.
- ClusterVerifier v(cluster_.get());
- NO_FATALS(v.CheckCluster());
- NO_FATALS(v.CheckRowCount(workload->table_name(),
- ClusterVerifier::EXACTLY,
- workload->rows_inserted()));
- NO_FATALS(AssertNoTabletServersCrashed());
-}
-
-// This test sets all of the election timers to be very short, resulting
-// in a lot of churn. We expect to make some progress and not diverge or
-// crash, despite the frequent re-elections and races.
-TEST_F(RaftConsensusITest, TestChurnyElections) {
- const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
- CreateClusterForChurnyElectionsTests({});
- TestWorkload workload(cluster_.get());
- workload.set_write_batch_size(1);
- workload.set_num_read_threads(2);
- DoTestChurnyElections(&workload, kNumWrites);
-}
-
-// The same test, except inject artificial latency when propagating notifications
-// from the queue back to consensus. This previously reproduced bugs like KUDU-1078 which
-// normally only appear under high load.
-TEST_F(RaftConsensusITest, TestChurnyElections_WithNotificationLatency) {
- CreateClusterForChurnyElectionsTests({"--consensus_inject_latency_ms_in_notifications=50"});
- const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
- TestWorkload workload(cluster_.get());
- workload.set_write_batch_size(1);
- workload.set_num_read_threads(2);
- DoTestChurnyElections(&workload, kNumWrites);
-}
-
-// The same as TestChurnyElections except insert many duplicated rows.
-// This emulates cases where there are many duplicate keys which, due to two phase
-// locking, may cause deadlocks and other anomalies that cannot be observed when
-// keys are unique.
-TEST_F(RaftConsensusITest, TestChurnyElections_WithDuplicateKeys) {
- CreateClusterForChurnyElectionsTests({});
- const int kNumWrites = AllowSlowTests() ? 10000 : 1000;
- TestWorkload workload(cluster_.get());
- workload.set_write_pattern(TestWorkload::INSERT_WITH_MANY_DUP_KEYS);
- // Increase the number of rows per batch to get a higher chance of key collision.
- workload.set_write_batch_size(3);
- DoTestChurnyElections(&workload, kNumWrites);
-}
TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
int kNumElections = FLAGS_num_replicas;
@@ -1126,99 +970,14 @@ TEST_F(RaftConsensusITest, MultiThreadedInsertWithFailovers) {
NO_FATALS(cluster_->AssertNoCrashes());
latch->Wait();
StopOrKillLeaderAndElectNewOne();
- }
-
- for (const auto& thr : threads_) {
- CHECK_OK(ThreadJoiner(thr.get()).Join());
- }
-
- NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads));
- STLDeleteElements(&latches);
-}
-
-// Test automatic leader election by killing leaders.
-TEST_F(RaftConsensusITest, TestAutomaticLeaderElection) {
- if (AllowSlowTests()) {
- FLAGS_num_tablet_servers = 5;
- FLAGS_num_replicas = 5;
- }
- NO_FATALS(BuildAndStart());
-
- TServerDetails* leader;
- ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
-
- unordered_set<TServerDetails*> killed_leaders;
-
- const int kNumLeadersToKill = FLAGS_num_replicas / 2;
- const int kFinalNumReplicas = FLAGS_num_replicas / 2 + 1;
-
- for (int leaders_killed = 0; leaders_killed < kFinalNumReplicas; leaders_killed++) {
- LOG(INFO) << Substitute("Writing data to leader of $0-node config ($1 alive)...",
- FLAGS_num_replicas, FLAGS_num_replicas - leaders_killed);
-
- InsertTestRowsRemoteThread(leaders_killed * FLAGS_client_inserts_per_thread,
- FLAGS_client_inserts_per_thread,
- FLAGS_client_num_batches_per_thread,
- vector<CountDownLatch*>());
-
- // At this point, the writes are flushed but the commit index may not be
- // propagated to all replicas. We kill the leader anyway.
- if (leaders_killed < kNumLeadersToKill) {
- LOG(INFO) << "Killing current leader " << leader->instance_id.permanent_uuid() << "...";
- cluster_->tablet_server_by_uuid(leader->uuid())->Shutdown();
- InsertOrDie(&killed_leaders, leader);
-
- LOG(INFO) << "Waiting for new guy to be elected leader.";
- ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &leader));
- }
- }
-
- // Restart every node that was killed, and wait for the nodes to converge
- for (TServerDetails* killed_node : killed_leaders) {
- CHECK_OK(cluster_->tablet_server_by_uuid(killed_node->uuid())->Restart());
- }
- // Verify the data on the remaining replicas.
- NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * kFinalNumReplicas));
-}
-
-// Single-replica leader election test.
-TEST_F(RaftConsensusITest, TestAutomaticLeaderElectionOneReplica) {
- FLAGS_num_tablet_servers = 1;
- FLAGS_num_replicas = 1;
- NO_FATALS(BuildAndStart());
- // Ensure that single-node Raft configs elect themselves as leader
- // immediately upon Consensus startup.
- ASSERT_OK(GetReplicaStatusAndCheckIfLeader(tablet_servers_[cluster_->tablet_server(0)->uuid()],
- tablet_id_, MonoDelta::FromMilliseconds(500)));
-}
-
-void RaftConsensusITest::StubbornlyWriteSameRowThread(int replica_idx, const AtomicBool* finish) {
- vector<TServerDetails*> servers;
- AppendValuesFromMap(tablet_servers_, &servers);
- CHECK_LT(replica_idx, servers.size());
- TServerDetails* ts = servers[replica_idx];
-
- // Manually construct an RPC to our target replica. We expect most of the calls
- // to fail either with an "already present" or an error because we are writing
- // to a follower. That's OK, though - what we care about for this test is
- // just that the operations Apply() in the same order everywhere (even though
- // in this case the result will just be an error).
- WriteRequestPB req;
- WriteResponsePB resp;
- RpcController rpc;
- req.set_tablet_id(tablet_id_);
- ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));
- AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal,
- "hello world", req.mutable_row_operations());
+ }
- while (!finish->Load()) {
- resp.Clear();
- rpc.Reset();
- rpc.set_timeout(MonoDelta::FromSeconds(10));
- ignore_result(ts->tserver_proxy->Write(req, &resp, &rpc));
- VLOG(1) << "Response from server " << replica_idx << ": "
- << SecureShortDebugString(resp);
+ for (const auto& thr : threads_) {
+ CHECK_OK(ThreadJoiner(thr.get()).Join());
}
+
+ NO_FATALS(AssertAllReplicasAgree(FLAGS_client_inserts_per_thread * FLAGS_num_client_threads));
+ STLDeleteElements(&latches);
}
// Regression test for KUDU-597, an issue where we could mis-order operations on
@@ -1263,56 +1022,6 @@ TEST_F(RaftConsensusITest, TestKUDU_597) {
}
}
-void RaftConsensusITest::AddOp(const OpId& id, ConsensusRequestPB* req) {
- AddOpWithTypeAndKey(id, RowOperationsPB::INSERT,
- id.index() * 10000 + id.term(), req);
-}
-
-void RaftConsensusITest::AddOpWithTypeAndKey(const OpId& id,
- RowOperationsPB::Type op_type,
- int32_t key,
- ConsensusRequestPB* req) {
- ReplicateMsg* msg = req->add_ops();
- msg->mutable_id()->CopyFrom(id);
- msg->set_timestamp(id.index());
- msg->set_op_type(consensus::WRITE_OP);
- WriteRequestPB* write_req = msg->mutable_write_request();
- CHECK_OK(SchemaToPB(schema_, write_req->mutable_schema()));
- write_req->set_tablet_id(tablet_id_);
- AddTestRowToPB(op_type, schema_, key, id.term(),
- SecureShortDebugString(id), write_req->mutable_row_operations());
-}
-
-void RaftConsensusITest::SetupSingleReplicaTest(TServerDetails** replica_ts) {
- const vector<string> kTsFlags = {
- // Don't use the hybrid clock as we set logical timestamps on ops.
- "--use_hybrid_clock=false",
- "--enable_leader_failure_detection=false",
- };
- const vector<string> kMasterFlags = {
- "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
- };
-
- FLAGS_num_replicas = 3;
- FLAGS_num_tablet_servers = 3;
- NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
-
- // Kill all the servers but one.
- vector<TServerDetails*> tservers;
- AppendValuesFromMap(tablet_servers_, &tservers);
- ASSERT_EQ(3, tservers.size());
-
- // Elect server 2 as leader and wait for log index 1 to propagate to all servers.
- ASSERT_OK(StartElection(tservers[2], tablet_id_, MonoDelta::FromSeconds(10)));
- ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 1));
-
- cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Shutdown();
- cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Shutdown();
-
- *replica_ts = tservers[0];
- LOG(INFO) << "================================== Cluster setup complete.";
-}
-
// Regression test for KUDU-1775: when a replica is restarted, and the first
// request it receives from a leader results in a LMP mismatch error, the
// replica should still respond with the correct 'last_committed_idx'.
@@ -1628,149 +1337,6 @@ TEST_F(RaftConsensusITest, TestReplicaBehaviorViaRPC) {
}
}
-TEST_F(RaftConsensusITest, TestLeaderStepDown) {
- const vector<string> kTsFlags = {
- "--enable_leader_failure_detection=false"
- };
- const vector<string> kMasterFlags = {
- "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
- };
-
- FLAGS_num_replicas = 3;
- FLAGS_num_tablet_servers = 3;
- NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
-
- vector<TServerDetails*> tservers;
- AppendValuesFromMap(tablet_servers_, &tservers);
-
- // Start with no leader.
- Status s = GetReplicaStatusAndCheckIfLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10));
- ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader yet: " << s.ToString();
-
- // Become leader.
- ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
- ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
- ASSERT_OK(WriteSimpleTestRow(tservers[0], tablet_id_, RowOperationsPB::INSERT,
- kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10)));
- ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10), tablet_servers_, tablet_id_, 2));
-
- // Step down and test that a 2nd stepdown returns the expected result.
- ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
- TabletServerErrorPB error;
- s = LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(10), &error);
- ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not be leader anymore: " << s.ToString();
- ASSERT_EQ(TabletServerErrorPB::NOT_THE_LEADER, error.code()) << SecureShortDebugString(error);
-
- s = WriteSimpleTestRow(tservers[0], tablet_id_, RowOperationsPB::INSERT,
- kTestRowKey, kTestRowIntVal, "foo", MonoDelta::FromSeconds(10));
- ASSERT_TRUE(s.IsIllegalState()) << "TS #0 should not accept writes as follower: "
- << s.ToString();
-}
-
-// Test for KUDU-699: sets the consensus RPC timeout to be long,
-// and freezes both followers before asking the leader to step down.
-// Prior to fixing KUDU-699, the step-down process would block
-// until the pending requests timed out.
-TEST_F(RaftConsensusITest, TestStepDownWithSlowFollower) {
- const vector<string> kTsFlags = {
- "--enable_leader_failure_detection=false",
- // Bump up the RPC timeout, so that we can verify that the stepdown responds
- // quickly even when an outbound request is hung.
- "--consensus_rpc_timeout_ms=15000"
- };
- const vector<string> kMasterFlags = {
- "--catalog_manager_wait_for_new_tablets_to_elect_leader=false"
- };
-
- NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
-
- vector<TServerDetails*> tservers;
- AppendValuesFromMap(tablet_servers_, &tservers);
- ASSERT_OK(StartElection(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
- ASSERT_OK(WaitUntilLeader(tservers[0], tablet_id_, MonoDelta::FromSeconds(10)));
-
- // Stop both followers.
- for (int i = 1; i < 3; i++) {
- ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[i]->uuid())->Pause());
- }
-
- // Sleep a little bit of time to make sure that the leader has outstanding heartbeats
- // to the paused followers before requesting the stepdown.
- SleepFor(MonoDelta::FromSeconds(1));
-
- // Step down should respond quickly despite the hung requests.
- ASSERT_OK(LeaderStepDown(tservers[0], tablet_id_, MonoDelta::FromSeconds(3)));
-}
-
-void RaftConsensusITest::AssertMajorityRequiredForElectionsAndWrites(
- const TabletServerMap& tablet_servers, const string& leader_uuid) {
-
- TServerDetails* initial_leader = FindOrDie(tablet_servers, leader_uuid);
-
- // Calculate number of servers to leave unpaused (minority).
- // This math is a little unintuitive but works for cluster sizes including 2 and 1.
- // Note: We assume all of these TSes are voters.
- int config_size = tablet_servers.size();
- int minority_to_retain = MajoritySize(config_size) - 1;
-
- // Only perform this part of the test if we have some servers to pause, else
- // the failure assertions will throw.
- if (config_size > 1) {
- // Pause enough replicas to prevent a majority.
- int num_to_pause = config_size - minority_to_retain;
- LOG(INFO) << "Pausing " << num_to_pause << " tablet servers in config of size " << config_size;
- vector<string> paused_uuids;
- for (const TabletServerMap::value_type& entry : tablet_servers) {
- if (paused_uuids.size() == num_to_pause) {
- continue;
- }
- const string& replica_uuid = entry.first;
- if (replica_uuid == leader_uuid) {
- // Always leave this one alone.
- continue;
- }
- ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid);
- ASSERT_OK(replica_ts->Pause());
- paused_uuids.push_back(replica_uuid);
- }
-
- // Ensure writes timeout while only a minority is alive.
- Status s = WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE,
- kTestRowKey, kTestRowIntVal, "foo",
- MonoDelta::FromMilliseconds(100));
- ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
-
- // Step down.
- ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
-
- // Assert that elections time out without a live majority.
- // We specify a very short timeout here to keep the tests fast.
- ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
- s = WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromMilliseconds(100));
- ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
- LOG(INFO) << "Expected timeout encountered on election with weakened config: " << s.ToString();
-
- // Resume the paused servers.
- LOG(INFO) << "Resuming " << num_to_pause << " tablet servers in config of size " << config_size;
- for (const string& replica_uuid : paused_uuids) {
- ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(replica_uuid);
- ASSERT_OK(replica_ts->Resume());
- }
- }
-
- ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(20), tablet_servers, tablet_id_, 1));
-
- // Now an election should succeed.
- ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
- ASSERT_OK(WaitUntilLeader(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
- LOG(INFO) << "Successful election with full config of size " << config_size;
-
- // And a write should also succeed.
- ASSERT_OK(WriteSimpleTestRow(initial_leader, tablet_id_, RowOperationsPB::UPDATE,
- kTestRowKey, kTestRowIntVal, Substitute("qsz=$0", config_size),
- MonoDelta::FromSeconds(10)));
-}
-
// Basic test of adding and removing servers from a configuration.
TEST_F(RaftConsensusITest, TestAddRemoveServer) {
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
@@ -1997,122 +1563,6 @@ TEST_F(RaftConsensusITest, TestAtomicAddRemoveServer) {
active_tablet_servers, tablet_id_, ++cur_log_index));
}
-// Ensure that we can elect a server that is in the "pending" configuration.
-// This is required by the Raft protocol. See Diego Ongaro's PhD thesis, section
-// 4.1, where it states that "it is the caller’s configuration that is used in
-// reaching consensus, both for voting and for log replication".
-//
-// This test also tests the case where a node comes back from the dead to a
-// leader that was not in its configuration when it died. That should also work, i.e.
-// the revived node should accept writes from the new leader.
-TEST_F(RaftConsensusITest, TestElectPendingVoter) {
- // Test plan:
- // 1. Disable failure detection to avoid non-deterministic behavior.
- // 2. Start with a configuration size of 5, all servers synced.
- // 3. Remove one server from the configuration, wait until committed.
- // 4. Pause the 3 remaining non-leaders (SIGSTOP).
- // 5. Run a config change to add back the previously-removed server.
- // Ensure that, while the op cannot be committed yet due to lack of a
- // majority in the new config (only 2 out of 5 servers are alive), the op
- // has been replicated to both the local leader and the new member.
- // 6. Force the existing leader to step down.
- // 7. Resume one of the paused nodes so that a majority (of the 5-node
- // configuration, but not the original 4-node configuration) will be available.
- // 8. Start a leader election on the new (pending) node. It should win.
- // 9. Unpause the two remaining stopped nodes.
- // 10. Wait for all nodes to sync to the new leader's log.
- const vector<string> kTsFlags = {
- "--enable_leader_failure_detection=false",
- };
- const vector<string> kMasterFlags = {
- "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
- };
-
- FLAGS_num_tablet_servers = 5;
- FLAGS_num_replicas = 5;
- NO_FATALS(BuildAndStart(kTsFlags, kMasterFlags));
-
- vector<TServerDetails*> tservers;
- AppendValuesFromMap(tablet_servers_, &tservers);
- ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
-
- // Elect server 0 as leader and wait for log index 1 to propagate to all servers.
- TServerDetails* initial_leader = tservers[0];
- ASSERT_OK(StartElection(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
- ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, initial_leader, tablet_id_,
- MonoDelta::FromSeconds(10)));
-
- // The server we will remove and then bring back.
- TServerDetails* final_leader = tservers[4];
-
- // Kill the master, so we can change the config without interference.
- cluster_->master()->Shutdown();
-
- // Now remove server 4 from the configuration.
- TabletServerMap active_tablet_servers = tablet_servers_;
- LOG(INFO) << "Removing tserver with uuid " << final_leader->uuid();
- ASSERT_OK(RemoveServer(initial_leader, tablet_id_, final_leader, boost::none,
- MonoDelta::FromSeconds(10)));
- ASSERT_EQ(1, active_tablet_servers.erase(final_leader->uuid()));
- int64_t cur_log_index = 2;
- ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
- active_tablet_servers, tablet_id_, cur_log_index));
-
- // Pause tablet servers 1 through 3, so they won't see the operation to add
- // server 4 back.
- LOG(INFO) << "Pausing 3 replicas...";
- for (int i = 1; i <= 3; i++) {
- ExternalTabletServer* replica_ts = cluster_->tablet_server_by_uuid(tservers[i]->uuid());
- ASSERT_OK(replica_ts->Pause());
- }
-
- // Now add server 4 back to the peers.
- // This operation will time out on the client side.
- LOG(INFO) << "Adding back Peer " << final_leader->uuid() << " and expecting timeout...";
- Status s = AddServer(initial_leader, tablet_id_, final_leader, RaftPeerPB::VOTER, boost::none,
- MonoDelta::FromMilliseconds(100));
- ASSERT_TRUE(s.IsTimedOut()) << "Expected AddServer() to time out. Result: " << s.ToString();
- LOG(INFO) << "Timeout achieved.";
- active_tablet_servers = tablet_servers_; // Reset to the unpaused servers.
- for (int i = 1; i <= 3; i++) {
- ASSERT_EQ(1, active_tablet_servers.erase(tservers[i]->uuid()));
- }
- // Only wait for TS 0 and 4 to agree that the new change config op has been
- // replicated.
- ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
- active_tablet_servers, tablet_id_, ++cur_log_index));
-
- // Now that TS 4 is electable (and pending), have TS 0 step down.
- LOG(INFO) << "Forcing Peer " << initial_leader->uuid() << " to step down...";
- ASSERT_OK(LeaderStepDown(initial_leader, tablet_id_, MonoDelta::FromSeconds(10)));
-
- // Resume TS 1 so we have a majority of 3 to elect a new leader.
- LOG(INFO) << "Resuming Peer " << tservers[1]->uuid() << " ...";
- ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[1]->uuid())->Resume());
- InsertOrDie(&active_tablet_servers, tservers[1]->uuid(), tservers[1]);
-
- // Now try to get TS 4 elected. It should succeed and push a NO_OP.
- LOG(INFO) << "Trying to elect Peer " << tservers[4]->uuid() << " ...";
- ASSERT_OK(StartElection(final_leader, tablet_id_, MonoDelta::FromSeconds(10)));
- ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
- active_tablet_servers, tablet_id_, ++cur_log_index));
-
- // Resume the remaining paused nodes.
- LOG(INFO) << "Resuming remaining nodes...";
- ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[2]->uuid())->Resume());
- ASSERT_OK(cluster_->tablet_server_by_uuid(tservers[3]->uuid())->Resume());
- active_tablet_servers = tablet_servers_;
-
- // Do one last operation on the new leader: an insert.
- ASSERT_OK(WriteSimpleTestRow(final_leader, tablet_id_, RowOperationsPB::INSERT,
- kTestRowKey, kTestRowIntVal, "Ob-La-Di, Ob-La-Da",
- MonoDelta::FromSeconds(10)));
-
- // Wait for all servers to replicate everything up through the last write op.
- ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(10),
- active_tablet_servers, tablet_id_, ++cur_log_index));
-}
-
// Writes test rows in ascending order to a single tablet server.
// Essentially a poor-man's version of TestWorkload that only operates on a
// single tablet. Does not batch, does not tolerate timeouts, and does not
@@ -2121,12 +1571,11 @@ TEST_F(RaftConsensusITest, TestElectPendingVoter) {
// a crash, as long as there is no crash then 'rows_inserted' will have a
// correct count at the end of the run.
// Crashes on any failure, so 'write_timeout' should be high.
-void DoWriteTestRows(const TServerDetails* leader_tserver,
- const string& tablet_id,
- const MonoDelta& write_timeout,
- AtomicInt<int32_t>* rows_inserted,
- const AtomicBool* finish) {
-
+static void DoWriteTestRows(const TServerDetails* leader_tserver,
+ const string& tablet_id,
+ const MonoDelta& write_timeout,
+ AtomicInt<int32_t>* rows_inserted,
+ const AtomicBool* finish) {
while (!finish->Load()) {
int row_key = rows_inserted->Increment();
CHECK_OK(WriteSimpleTestRow(leader_tserver, tablet_id, RowOperationsPB::INSERT,
@@ -2626,23 +2075,6 @@ TEST_F(RaftConsensusITest, TestLargeBatches) {
ASSERT_LE(num_wals, num_batches + 2);
}
-void RaftConsensusITest::InsertPayloadIgnoreErrors(int start_row, int num_rows, int payload_size) {
- shared_ptr<KuduTable> table;
- CHECK_OK(client_->OpenTable(kTableId, &table));
- shared_ptr<KuduSession> session = client_->NewSession();
- session->SetTimeoutMillis(100);
- CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
- string payload(payload_size, 'x');
- for (int i = 0; i < num_rows; i++) {
- gscoped_ptr<KuduInsert> insert(table->NewInsert());
- KuduPartialRow* row = insert->mutable_row();
- CHECK_OK(row->SetInt32(0, i + start_row));
- CHECK_OK(row->SetInt32(1, 0));
- CHECK_OK(row->SetStringCopy(2, payload));
- CHECK_OK(session->Apply(insert.release()));
- ignore_result(session->Flush());
- }
-}
// Regression test for KUDU-1469, a case in which a leader and follower could get "stuck"
// in a tight RPC loop, in which the leader would repeatedly send a batch of ops that the
@@ -2835,67 +2267,6 @@ TEST_F(RaftConsensusITest, TestEvictAbandonedFollowers) {
ASSERT_OK(WaitForServersToAgree(timeout, active_tablet_servers, tablet_id_, 2));
}
-// Have a replica fall behind the leader's log, then fail a tablet copy. It
-// should still be able to vote in leader elections.
-TEST_F(RaftConsensusITest, TestTombstonedVoteAfterFailedTabletCopy) {
- const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
-
- vector<string> ts_flags;
- AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC().
- const vector<string> kMasterFlags = {
- "--master_add_server_when_underreplicated=false",
- };
- NO_FATALS(BuildAndStart(ts_flags, kMasterFlags));
-
- TabletServerMap active_tablet_servers = tablet_servers_;
- ASSERT_EQ(3, FLAGS_num_replicas);
- ASSERT_EQ(3, active_tablet_servers.size());
-
- string leader_uuid;
- int64_t orig_term;
- string follower_uuid;
- NO_FATALS(CauseFollowerToFallBehindLogGC(&leader_uuid, &orig_term, &follower_uuid));
-
- // Wait for the abandoned follower to be evicted.
- ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(2, tablet_servers_[leader_uuid],
- tablet_id_, kTimeout));
- ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid));
- ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id_, 2));
-
- // Now the follower is evicted and will be tombstoned.
- // We'll add it back to the config and then crash the follower during the
- // resulting tablet copy.
- ASSERT_OK(cluster_->SetFlag(cluster_->tablet_server_by_uuid(follower_uuid),
- "tablet_copy_fault_crash_on_fetch_all", "1.0"));
- auto* leader_ts = tablet_servers_[leader_uuid];
- auto* follower_ts = tablet_servers_[follower_uuid];
- ASSERT_OK(itest::AddServer(leader_ts, tablet_id_, follower_ts, RaftPeerPB::VOTER,
- boost::none, kTimeout));
- ASSERT_OK(cluster_->tablet_server_by_uuid(follower_uuid)->WaitForInjectedCrash(kTimeout));
-
- // Shut down the rest of the cluster, then only bring back the node we
- // crashed, plus one other node. The tombstoned replica should still be able
- // to vote and the tablet should come online.
- cluster_->Shutdown();
-
- int follower_idx = cluster_->tablet_server_index_by_uuid(follower_uuid);
- ASSERT_OK(inspect_->CheckTabletDataStateOnTS(follower_idx, tablet_id_, { TABLET_DATA_COPYING }));
-
- ASSERT_OK(cluster_->master()->Restart());
- ASSERT_OK(cluster_->tablet_server_by_uuid(leader_uuid)->Restart());
- ASSERT_OK(cluster_->tablet_server_by_uuid(follower_uuid)->Restart());
-
- TestWorkload workload(cluster_.get());
- workload.set_table_name(kTableId);
- workload.set_timeout_allowed(true);
- workload.Setup();
- workload.Start();
- ASSERT_EVENTUALLY([&] {
- ASSERT_GE(workload.rows_inserted(), 100);
- });
- workload.StopAndJoin();
-}
-
// Test that, after followers are evicted from the config, the master re-adds a new
// replica for that follower and it eventually catches back up.
TEST_F(RaftConsensusITest, TestMasterReplacesEvictedFollowers) {
[3/3] kudu git commit: [raft_consensus-itest] separate Raft election
tests
Posted by al...@apache.org.
[raft_consensus-itest] separate Raft election tests
Separated election-related tests from raft_consensus-itest
into raft_consensus-election-itest.
This changelist does not contain any functional changes.
Change-Id: Ic5289b3de097f9a2ca152034fa2635b67ccddb93
Reviewed-on: http://gerrit.cloudera.org:8080/8278
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Alexey Serbin <as...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/ab77ce02
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/ab77ce02
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/ab77ce02
Branch: refs/heads/master
Commit: ab77ce025d9fe68c993d26e15b604c4188e36860
Parents: 4414089
Author: Alexey Serbin <as...@cloudera.com>
Authored: Sat Oct 14 23:18:59 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Tue Oct 17 01:09:23 2017 +0000
----------------------------------------------------------------------
build-support/dist_test.py | 2 +-
src/kudu/integration-tests/CMakeLists.txt | 2 +
.../raft_consensus-itest-base.cc | 273 ++++
.../raft_consensus-itest-base.h | 68 +
.../integration-tests/raft_consensus-itest.cc | 1507 +++++-------------
.../raft_consensus_election-itest.cc | 532 +++++++
src/kudu/tablet/mt-tablet-test.cc | 2 +-
src/kudu/tserver/tablet_server-test-base.cc | 1 -
8 files changed, 1316 insertions(+), 1071 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/build-support/dist_test.py
----------------------------------------------------------------------
diff --git a/build-support/dist_test.py b/build-support/dist_test.py
index a91d7d3..328d2b8 100755
--- a/build-support/dist_test.py
+++ b/build-support/dist_test.py
@@ -89,7 +89,7 @@ NUM_SHARDS_BY_TEST = {
'delete_table-test': 8,
'flex_partitioning-itest': 8,
'mt-tablet-test': 4,
- 'raft_consensus-itest': 8
+ 'raft_consensus-itest': 6
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/integration-tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index d124662..ecdda20 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -27,6 +27,7 @@ set(INTEGRATION_TESTS_SRCS
external_mini_cluster_fs_inspector.cc
internal_mini_cluster-itest-base.cc
log_verifier.cc
+ raft_consensus-itest-base.cc
test_workload.cc
ts_itest-base.cc
)
@@ -86,6 +87,7 @@ ADD_KUDU_TEST(multidir_cluster-itest)
ADD_KUDU_TEST(open-readonly-fs-itest)
ADD_KUDU_TEST(raft_config_change-itest)
ADD_KUDU_TEST(raft_consensus-itest RUN_SERIAL true)
+ADD_KUDU_TEST(raft_consensus_election-itest)
ADD_KUDU_TEST(registration-test RESOURCE_LOCK "master-web-port")
ADD_KUDU_TEST(security-faults-itest)
ADD_KUDU_TEST(security-itest)
http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/integration-tests/raft_consensus-itest-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest-base.cc b/src/kudu/integration-tests/raft_consensus-itest-base.cc
new file mode 100644
index 0000000..afd8c64
--- /dev/null
+++ b/src/kudu/integration-tests/raft_consensus-itest-base.cc
@@ -0,0 +1,273 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/integration-tests/raft_consensus-itest-base.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <ostream>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/client.h"
+#include "kudu/client/shared_ptr.h"
+#include "kudu/client/write_op.h"
+#include "kudu/common/partial_row.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/opid.pb.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/external_mini_cluster_fs_inspector.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/integration-tests/ts_itest-base.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/tserver/tablet_server-test-base.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+
+DEFINE_int32(num_client_threads, 8,
+ "Number of client threads to launch");
+DEFINE_int64(client_inserts_per_thread, 50,
+ "Number of rows inserted by each client thread");
+DECLARE_int32(consensus_rpc_timeout_ms);
+DEFINE_int64(client_num_batches_per_thread, 5,
+ "In how many batches to group the rows, for each client");
+
+METRIC_DECLARE_entity(tablet);
+METRIC_DECLARE_counter(transaction_memory_pressure_rejections);
+METRIC_DECLARE_gauge_int64(raft_term);
+
+using kudu::client::KuduInsert;
+using kudu::client::KuduSession;
+using kudu::client::KuduTable;
+using kudu::client::sp::shared_ptr;
+using kudu::cluster::ExternalTabletServer;
+using kudu::consensus::OpId;
+using kudu::itest::TServerDetails;
+using kudu::pb_util::SecureDebugString;
+using kudu::rpc::RpcController;
+using std::string;
+using std::vector;
+
+namespace kudu {
+namespace tserver {
+
+static const int kConsensusRpcTimeoutForTests = 50;
+
+RaftConsensusITestBase::RaftConsensusITestBase()
+ : inserters_(FLAGS_num_client_threads) {
+}
+
+void RaftConsensusITestBase::SetUp() {
+ TabletServerIntegrationTestBase::SetUp();
+ FLAGS_consensus_rpc_timeout_ms = kConsensusRpcTimeoutForTests;
+}
+
+void RaftConsensusITestBase::ScanReplica(TabletServerServiceProxy* replica_proxy,
+ vector<string>* results) {
+ ScanRequestPB req;
+ ScanResponsePB resp;
+ RpcController rpc;
+ rpc.set_timeout(MonoDelta::FromSeconds(10)); // Squelch warnings.
+
+ NewScanRequestPB* scan = req.mutable_new_scan_request();
+ scan->set_tablet_id(tablet_id_);
+ ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns()));
+
+ // Send the call
+ {
+ req.set_batch_size_bytes(0);
+ SCOPED_TRACE(SecureDebugString(req));
+ ASSERT_OK(replica_proxy->Scan(req, &resp, &rpc));
+ SCOPED_TRACE(SecureDebugString(resp));
+ if (resp.has_error()) {
+ ASSERT_OK(StatusFromPB(resp.error().status()));
+ }
+ }
+
+ if (!resp.has_more_results()) {
+ return;
+ }
+
+ // Drain all the rows from the scanner.
+ NO_FATALS(DrainScannerToStrings(resp.scanner_id(),
+ schema_,
+ results,
+ replica_proxy));
+
+ std::sort(results->begin(), results->end());
+}
+
+void RaftConsensusITestBase::InsertTestRowsRemoteThread(
+ uint64_t first_row,
+ uint64_t count,
+ uint64_t num_batches,
+ const vector<CountDownLatch*>& latches) {
+ shared_ptr<KuduTable> table;
+ CHECK_OK(client_->OpenTable(kTableId, &table));
+
+ shared_ptr<KuduSession> session = client_->NewSession();
+ session->SetTimeoutMillis(60000);
+ CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+ for (int i = 0; i < num_batches; i++) {
+ uint64_t first_row_in_batch = first_row + (i * count / num_batches);
+ uint64_t last_row_in_batch = first_row_in_batch + count / num_batches;
+
+ for (int j = first_row_in_batch; j < last_row_in_batch; j++) {
+ gscoped_ptr<KuduInsert> insert(table->NewInsert());
+ KuduPartialRow* row = insert->mutable_row();
+ CHECK_OK(row->SetInt32(0, j));
+ CHECK_OK(row->SetInt32(1, j * 2));
+ CHECK_OK(row->SetStringCopy(2, Slice(StringPrintf("hello %d", j))));
+ CHECK_OK(session->Apply(insert.release()));
+ }
+
+ FlushSessionOrDie(session);
+
+ int inserted = last_row_in_batch - first_row_in_batch;
+ for (CountDownLatch* latch : latches) {
+ latch->CountDown(inserted);
+ }
+ }
+
+ inserters_.CountDown();
+}
+
+void RaftConsensusITestBase::AddFlagsForLogRolls(vector<string>* extra_tserver_flags) {
+ // We configure a small log segment size so that we roll frequently,
+ // configure a small cache size so that we evict data from the cache, and
+ // retain as few segments as possible. We also turn off async segment
+ // allocation -- this ensures that we roll many segments of logs (with async
+ // allocation, it's possible that the preallocation is slow and we wouldn't
+ // roll deterministically).
+ //
+ // Additionally, we disable log compression, since these tests write a lot of
+ // repetitive data to cause the rolls, and compression would make it all tiny.
+ extra_tserver_flags->push_back("--log_compression_codec=none");
+ extra_tserver_flags->push_back("--log_cache_size_limit_mb=1");
+ extra_tserver_flags->push_back("--log_segment_size_mb=1");
+ extra_tserver_flags->push_back("--log_async_preallocate_segments=false");
+ extra_tserver_flags->push_back("--log_min_segments_to_retain=1");
+ extra_tserver_flags->push_back("--log_max_segments_to_retain=3");
+ extra_tserver_flags->push_back("--maintenance_manager_polling_interval_ms=100");
+ extra_tserver_flags->push_back("--log_target_replay_size_mb=1");
+ // We write 128KB cells in CauseFollowerToFallBehindLogGC(): bump the limit.
+ extra_tserver_flags->push_back("--max_cell_size_bytes=1000000");
+}
+
+void RaftConsensusITestBase::CauseFollowerToFallBehindLogGC(
+ string* leader_uuid,
+ int64_t* orig_term,
+ string* fell_behind_uuid) {
+ MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+ // Wait for all of the replicas to have acknowledged the elected
+ // leader and logged the first NO_OP.
+ ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_, tablet_id_, 1));
+
+ // Pause one server. This might be the leader, but pausing it will cause
+ // a leader election to happen.
+ TServerDetails* replica = (*tablet_replicas_.begin()).second;
+ ExternalTabletServer* replica_ets = cluster_->tablet_server_by_uuid(replica->uuid());
+ ASSERT_OK(replica_ets->Pause());
+
+ // Find a leader. In case we paused the leader above, this will wait until
+ // we have elected a new one.
+ TServerDetails* leader = nullptr;
+ while (true) {
+ Status s = GetLeaderReplicaWithRetries(tablet_id_, &leader);
+ if (s.ok() && leader != nullptr && leader != replica) {
+ break;
+ }
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ *leader_uuid = leader->uuid();
+ int leader_index = cluster_->tablet_server_index_by_uuid(*leader_uuid);
+
+ TestWorkload workload(cluster_.get());
+ workload.set_table_name(kTableId);
+ workload.set_timeout_allowed(true);
+ workload.set_payload_bytes(128 * 1024); // Write ops of size 128KB.
+ workload.set_write_batch_size(1);
+ workload.set_num_write_threads(4);
+ workload.Setup();
+ workload.Start();
+
+ LOG(INFO) << "Waiting until we've written at least 4MB...";
+ while (workload.rows_inserted() < 8 * 4) {
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+ workload.StopAndJoin();
+
+ LOG(INFO) << "Waiting for log GC on " << leader->uuid();
+ // Some WAL segments must exist, but wal segment 1 must not exist.
+ ASSERT_OK(inspect_->WaitForFilePatternInTabletWalDirOnTs(
+ leader_index, tablet_id_, { "wal-" }, { "wal-000000001" }));
+
+ LOG(INFO) << "Log GC complete on " << leader->uuid();
+
+ // Then wait another couple of seconds to be sure that it has bothered to try
+ // to write to the paused peer.
+ // TODO(unknown): would be nice to be able to poll the leader with an RPC like
+ // GetLeaderStatus() which could tell us whether it has made any requests
+ // since the log GC.
+ SleepFor(MonoDelta::FromSeconds(2));
+
+ // Make a note of whatever the current term of the cluster is,
+ // before we resume the follower.
+ {
+ OpId op_id;
+ ASSERT_OK(GetLastOpIdForReplica(tablet_id_, leader, consensus::RECEIVED_OPID, kTimeout,
+ &op_id));
+ *orig_term = op_id.term();
+ LOG(INFO) << "Servers converged with original term " << *orig_term;
+ }
+
+ // Resume the follower.
+ LOG(INFO) << "Resuming " << replica->uuid();
+ ASSERT_OK(replica_ets->Resume());
+
+ // Ensure that none of the tablet servers crashed.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ // Make sure it didn't crash.
+ ASSERT_TRUE(cluster_->tablet_server(i)->IsProcessAlive())
+ << "Tablet server " << i << " crashed";
+ }
+ *fell_behind_uuid = replica->uuid();
+}
+
+} // namespace tserver
+} // namespace kudu
+
http://git-wip-us.apache.org/repos/asf/kudu/blob/ab77ce02/src/kudu/integration-tests/raft_consensus-itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest-base.h b/src/kudu/integration-tests/raft_consensus-itest-base.h
new file mode 100644
index 0000000..98a379e
--- /dev/null
+++ b/src/kudu/integration-tests/raft_consensus-itest-base.h
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include "kudu/integration-tests/ts_itest-base.h"
+#include "kudu/util/countdown_latch.h"
+
+namespace kudu {
+namespace tserver {
+
+class TabletServerServiceProxy;
+
+// Integration test for the raft consensus implementation.
+// Uses the whole tablet server stack with ExternalMiniCluster.
+class RaftConsensusITestBase : public TabletServerIntegrationTestBase {
+ public:
+ RaftConsensusITestBase();
+
+ void SetUp() override;
+
+ void ScanReplica(TabletServerServiceProxy* replica_proxy,
+ std::vector<std::string>* results);
+
+ void InsertTestRowsRemoteThread(uint64_t first_row,
+ uint64_t count,
+ uint64_t num_batches,
+ const std::vector<CountDownLatch*>& latches);
+ protected:
+ // Flags needed for CauseFollowerToFallBehindLogGC() to work well.
+ void AddFlagsForLogRolls(std::vector<std::string>* extra_tserver_flags);
+
+ // Pause one of the followers and write enough data to the remaining replicas
+ // to cause log GC, then resume the paused follower. On success,
+ // 'leader_uuid' will be set to the UUID of the leader, 'orig_term' will be
+ // set to the term of the leader before un-pausing the follower, and
+ // 'fell_behind_uuid' will be set to the UUID of the follower that was paused
+ // and caused to fall behind. These can be used for verification purposes.
+ //
+ // Certain flags should be set. You can add the required flags with
+ // AddFlagsForLogRolls() before starting the cluster.
+ void CauseFollowerToFallBehindLogGC(std::string* leader_uuid,
+ int64_t* orig_term,
+ std::string* fell_behind_uuid);
+
+ CountDownLatch inserters_;
+};
+
+} // namespace tserver
+} // namespace kudu