You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/01/16 03:28:44 UTC
[kudu] 01/02: KUDU-3011 p5: transfer leadership when quiescing
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 54db215511e84785a8649ba1e52911f8adfb11e4
Author: Andrew Wong <aw...@apache.org>
AuthorDate: Wed Jan 8 23:28:12 2020 -0800
KUDU-3011 p5: transfer leadership when quiescing
This amends the behavior of quiescing such that when a tablet server is
quiescing, it will transfer leadership to a caught-up follower as soon
as it can.
While in this state, unlike while in a graceful stepdown period, the
tablet can still be written to, as to not obstruct on-going workloads.
Tests are added to exercise:
- The basic behavior: even without injecting any errors that might cause
elections, a quiescing leader will relinquish leadership.
- The behavior when there are followers being caught up. In such cases,
the leader won't immediately relinquish leadership -- instead, it will
wait for the followers to catch up before stepping down.
- The behavior when being written to. The fact that a leader is
quiescing shouldn't affect its ability to be written to.
- The behavior of the PeerMessageQueue when responding to various peer
responses.
I also removed some election-causing injection in a couple existing
tests that was previously required to transfer leadership while
quiescing.
Note: right now, if all tablet servers are quiescing while there is a
write workload on-going, a large number of StartElection requests will
be sent from the leaders to the followers. A follow-up patch will
address this.
Change-Id: Idbf0716f5c9455f83ff5f6f601b0f5042f77d078
Reviewed-on: http://gerrit.cloudera.org:8080/15012
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/consensus/consensus_peers-test.cc | 1 +
src/kudu/consensus/consensus_queue-test.cc | 116 ++++++++++++-
src/kudu/consensus/consensus_queue.cc | 20 ++-
src/kudu/consensus/consensus_queue.h | 6 +
src/kudu/consensus/raft_consensus.cc | 8 +-
.../tablet_server_quiescing-itest.cc | 180 +++++++++++++++++++--
6 files changed, 306 insertions(+), 25 deletions(-)
diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index 8ac75a8..49c9812 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -103,6 +103,7 @@ class ConsensusPeersTest : public KuduTest {
FakeRaftPeerPB(kLeaderUuid),
kTabletId,
raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
+ /*server_quiescing*/nullptr,
MinimumOpId(),
MinimumOpId()));
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 094f2cc..092d475 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -17,16 +17,20 @@
#include "kudu/consensus/consensus_queue.h"
-#include <cstddef>
+#include <algorithm>
+#include <atomic>
#include <cstdint>
+#include <deque>
#include <memory>
#include <ostream>
#include <string>
#include <vector>
+#include <boost/optional/optional.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include <gtest/gtest_prod.h>
#include "kudu/clock/clock.h"
#include "kudu/clock/hybrid_clock.h"
@@ -63,6 +67,9 @@ DECLARE_int32(consensus_max_batch_size_bytes);
DECLARE_int32(follower_unavailable_considered_failed_sec);
using kudu::consensus::HealthReportPB;
+using std::atomic;
+using std::deque;
+using std::string;
using std::unique_ptr;
using std::vector;
@@ -78,7 +85,8 @@ class ConsensusQueueTest : public KuduTest {
ConsensusQueueTest()
: schema_(GetSimpleTestSchema()),
metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "queue-test")),
- registry_(new log::LogAnchorRegistry) {
+ registry_(new log::LogAnchorRegistry),
+ quiescing_(false) {
}
virtual void SetUp() OVERRIDE {
@@ -90,8 +98,8 @@ class ConsensusQueueTest : public KuduTest {
fs_manager_.get(),
kTestTablet,
schema_,
- 0, // schema_version
- NULL,
+ /*schema_version*/0,
+ /*metric_entity*/nullptr,
&log_));
clock_.reset(new clock::HybridClock());
ASSERT_OK(clock_->Init());
@@ -109,6 +117,7 @@ class ConsensusQueueTest : public KuduTest {
FakeRaftPeerPB(kLeaderUuid),
kTestTablet,
raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
+ &quiescing_,
replicated_opid,
committed_opid));
}
@@ -233,8 +242,105 @@ class ConsensusQueueTest : public KuduTest {
gscoped_ptr<PeerMessageQueue> queue_;
scoped_refptr<log::LogAnchorRegistry> registry_;
unique_ptr<clock::Clock> clock_;
+ atomic<bool> quiescing_;
};
+// Observer of a PeerMessageQueue that tracks the notifications sent to
+// observers.
+class SimpleObserver : public PeerMessageQueueObserver {
+ public:
+ SimpleObserver() = default;
+
+ void NotifyPeerToStartElection(const string& peer_uuid) override {
+ peers_to_start_election_.emplace_back(peer_uuid);
+ }
+
+ // Other notifications aren't implemented. Just no-op.
+ void NotifyCommitIndex(int64_t /*commit_index*/) override {}
+ void NotifyTermChange(int64_t /*term*/) override {}
+ void NotifyFailedFollower(const string& /*peer_uuid*/, int64_t /*term*/,
+ const string& /*reason*/) override {}
+ void NotifyPeerToPromote(const string& /*peer_uuid*/) override {}
+ void NotifyPeerHealthChange() override {}
+
+ private:
+ FRIEND_TEST(ConsensusQueueTest, TestTransferLeadershipWhenAppropriate);
+
+ // The following track the notifications sent in chronological order.
+ deque<string> peers_to_start_election_;
+};
+
+// Test that the leader consensus queue will only attempt to trigger elections
+// when appropriate.
+TEST_F(ConsensusQueueTest, TestTransferLeadershipWhenAppropriate) {
+ SimpleObserver observer;
+ queue_->RegisterObserver(&observer);
+ RaftConfigPB config = BuildRaftConfigPBForTests(/*num_voters*/2);
+ queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, config);
+ RaftPeerPB follower = MakePeer(kPeerUuid, RaftPeerPB::VOTER);
+ queue_->TrackPeer(follower);
+
+ AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10);
+ WaitForLocalPeerToAckIndex(10);
+
+ ConsensusResponsePB peer_response;
+ peer_response.set_responder_term(1);
+ peer_response.set_responder_uuid(kPeerUuid);
+ SetLastReceivedAndLastCommitted(&peer_response, MakeOpId(1, 9), MinimumOpId().index());
+
+ int elections_so_far = 0;
+ // Simulates receiving the peer's response and checks that, upon receiving
+ // it, the PeerMessageQueue either did or didn't notify that the peer should
+ // start an election.
+ auto verify_elections = [&] (bool election_happened) {
+ ASSERT_TRUE(queue_->ResponseFromPeer(kPeerUuid, peer_response));
+ // Notifications are communicated via the Raft threadpool, so wait for any
+ // such notifying tasks to finish.
+ raft_pool_->Wait();
+ if (election_happened) {
+ elections_so_far++;
+ }
+ ASSERT_EQ(elections_so_far, observer.peers_to_start_election_.size());
+ };
+ // We haven't begun watching for a successor yet and our conditions aren't
+ // met for this peer to become a leader.
+ NO_FATALS(verify_elections(/*election_happened*/false));
+
+ // Even after waiting for a successor, this peer isn't ready yet.
+ queue_->BeginWatchForSuccessor(boost::none);
+ NO_FATALS(verify_elections(/*election_happened*/false));
+
+ // Once the peer says it's gotten the last-appended op, we should be good to
+ // transfer leadership to it.
+ SetLastReceivedAndLastCommitted(&peer_response, MakeOpId(1, 10), MinimumOpId().index());
+ NO_FATALS(verify_elections(/*election_happened*/true));
+
+ // After we've triggered our election, we shouldn't trigger another.
+ NO_FATALS(verify_elections(/*election_happened*/false));
+
+ // And if we try to step down but specify a different peer, we also won't try
+ // electing the peer in-hand.
+ queue_->BeginWatchForSuccessor(boost::make_optional<string>("different-peer"));
+ NO_FATALS(verify_elections(/*election_happened*/false));
+
+ // Even if we begin quiescing, because we're looking for a specific
+ // successor, we shouldn't see an election.
+ quiescing_ = true;
+ NO_FATALS(verify_elections(/*election_happened*/false));
+
+ // If we stop watching for that successor and we're quiescing, we'll trigger
+ // elections.
+ queue_->EndWatchForSuccessor();
+ for (int i = 0; i < 3; i++) {
+ NO_FATALS(verify_elections(/*election_happened*/true));
+ }
+
+ // If the peer weren't a voter, we would also not trigger elections.
+ config.mutable_peers(1)->set_member_type(RaftPeerPB::NON_VOTER);
+ queue_->SetLeaderMode(10, 1, config);
+ NO_FATALS(verify_elections(/*election_happened*/false));
+}
+
// Tests that the queue is able to track a peer when it starts tracking a peer
// after the initial message in the queue. In particular this creates a queue
// with several messages and then starts to track a peer whose watermark
@@ -928,7 +1034,7 @@ TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10);
WaitForLocalPeerToAckIndex(10);
- // The committed_index should be MinimumOpId() since UpdateFollowerCommittedIndex
+ // The committed_index should be MinimumOpId() since UpdateFollowerWatermarks
// has not been called.
ASSERT_EQ(0, queue_->GetCommittedIndex());
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index a8aaa8f..3c6373f 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -17,6 +17,7 @@
#include "kudu/consensus/consensus_queue.h"
#include <algorithm>
+#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
@@ -80,6 +81,7 @@ DECLARE_int64(rpc_max_message_size);
using kudu::log::Log;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
+using std::atomic;
using std::string;
using std::unique_ptr;
using std::unordered_map;
@@ -174,9 +176,11 @@ PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_ent
RaftPeerPB local_peer_pb,
string tablet_id,
unique_ptr<ThreadPoolToken> raft_pool_observers_token,
+ const atomic<bool>* server_quiescing,
OpId last_locally_replicated,
const OpId& last_locally_committed)
: raft_pool_observers_token_(std::move(raft_pool_observers_token)),
+ server_quiescing_(server_quiescing),
local_peer_pb_(std::move(local_peer_pb)),
tablet_id_(std::move(tablet_id)),
successor_watch_in_progress_(false),
@@ -1065,16 +1069,24 @@ void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer& pre
void PeerMessageQueue::TransferLeadershipIfNeeded(const TrackedPeer& peer,
const ConsensusStatusPB& status) {
DCHECK(queue_lock_.is_locked());
- if (!successor_watch_in_progress_) {
+ bool server_quiescing = server_quiescing_ && *server_quiescing_;
+ // Only transfer leadership if the local peer has begun looking for a
+ // successor, or if the server is quiescing. Otherwise, exit early.
+ if (!successor_watch_in_progress_ && !server_quiescing) {
return;
}
- if (designated_successor_uuid_ && peer.uuid() != designated_successor_uuid_.get()) {
+ // Do some basic sanity checks that we can actually transfer leadership to
+ // the given peer.
+ if (queue_state_.mode != PeerMessageQueue::LEADER ||
+ peer.last_exchange_status != PeerStatus::OK ||
+ local_peer_pb_.permanent_uuid() == peer.uuid()) {
return;
}
- if (queue_state_.mode != PeerMessageQueue::LEADER ||
- peer.last_exchange_status != PeerStatus::OK) {
+ // If looking for a specific successor, ignore peers as appropriate.
+ if (successor_watch_in_progress_ &&
+ designated_successor_uuid_ && peer.uuid() != designated_successor_uuid_.get()) {
return;
}
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index d2257bc..430a313 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -16,6 +16,7 @@
// under the License.
#pragma once
+#include <atomic>
#include <cstdint>
#include <functional>
#include <ostream>
@@ -186,6 +187,7 @@ class PeerMessageQueue {
RaftPeerPB local_peer_pb,
std::string tablet_id,
std::unique_ptr<ThreadPoolToken> raft_pool_observers_token,
+ const std::atomic<bool>* server_quiescing,
OpId last_locally_replicated,
const OpId& last_locally_committed);
@@ -545,6 +547,10 @@ class PeerMessageQueue {
// The pool token which executes observer notifications.
std::unique_ptr<ThreadPoolToken> raft_pool_observers_token_;
+ // Shared boolean that indicates whether the server is quiescing, in which
+ // case leadership should be transferred away from this peer.
+ const std::atomic<bool>* server_quiescing_;
+
// PB containing identifying information about the local peer.
const RaftPeerPB local_peer_pb_;
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 618e542..4f99b71 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -277,6 +277,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
local_peer_pb_,
options_.tablet_id,
raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL),
+ server_ctx_.quiescing,
info.last_id,
info.last_committed_id));
@@ -913,11 +914,12 @@ void RaftConsensus::NotifyPeerToPromote(const string& peer_uuid) {
}
void RaftConsensus::NotifyPeerToStartElection(const string& peer_uuid) {
- LOG(INFO) << "Instructing follower " << peer_uuid << " to start an election";
+ const auto& log_prefix = LogPrefixThreadSafe();
+ LOG(INFO) << log_prefix << ": Instructing follower " << peer_uuid << " to start an election";
WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryStartElectionOnPeerTask,
shared_from_this(),
peer_uuid)),
- LogPrefixThreadSafe() + "Unable to start TryStartElectionOnPeerTask");
+ log_prefix + "Unable to start TryStartElectionOnPeerTask");
}
void RaftConsensus::NotifyPeerHealthChange() {
@@ -1002,7 +1004,7 @@ void RaftConsensus::TryStartElectionOnPeerTask(const string& peer_uuid) {
return;
}
LOG_WITH_PREFIX_UNLOCKED(INFO) << "Signalling peer " << peer_uuid
- << "to start an election";
+ << " to start an election";
WARN_NOT_OK(peer_manager_->StartElection(peer_uuid),
Substitute("unable to start election on peer $0", peer_uuid));
}
diff --git a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
index bf4fcec..70c4eb9 100644
--- a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
+++ b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
@@ -36,6 +36,7 @@
#include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/tablet/metadata.pb.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/scanners.h"
#include "kudu/tserver/tablet_server.h"
@@ -50,6 +51,8 @@ DECLARE_bool(enable_leader_failure_detection);
DECLARE_bool(catalog_manager_wait_for_new_tablets_to_elect_leader);
DECLARE_double(leader_failure_max_missed_heartbeat_periods);
DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(tablet_copy_download_file_inject_latency_ms);
+DECLARE_int32(tablet_copy_transfer_chunk_size_bytes);
DECLARE_int32(scanner_default_batch_size_bytes);
DECLARE_int32(scanner_ttl_ms);
DECLARE_int32(raft_heartbeat_interval_ms);
@@ -137,16 +140,16 @@ TEST_F(TServerQuiescingITest, TestQuiescingServerDoesntTriggerElections) {
LOG(INFO) << Substitute("Quiescing ts $0", ts->uuid());
*ts->server()->mutable_quiescing() = true;
- // Cause a bunch of elections.
- FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
- FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
-
// Soon enough, elections will occur, and our quiescing server will cease to
// be leader.
ASSERT_EVENTUALLY([&] {
ASSERT_EQ(0, ts->server()->num_raft_leaders()->value());
});
+ // Cause a bunch of elections.
+ FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
+ FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
+
// When we stop quiescing the server, we should eventually see some
// leadership return to the server.
*ts->server()->mutable_quiescing() = false;
@@ -155,11 +158,36 @@ TEST_F(TServerQuiescingITest, TestQuiescingServerDoesntTriggerElections) {
});
}
+// Test that after quiescing a tablet's leader, leadership will be transferred
+// elsewhere.
+TEST_F(TServerQuiescingITest, TestQuiescingLeaderTransfersLeadership) {
+ const int kNumReplicas = 3;
+ NO_FATALS(StartCluster(kNumReplicas));
+ vector<string> tablet_ids;
+ NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids));
+ string tablet_id = tablet_ids[0];
+
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+ TServerDetails* leader_details;
+ ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details));
+
+ // Start quiescing the leader.
+ const auto& orig_leader_uuid = leader_details->uuid();
+ auto* leader_ts = cluster_->mini_tablet_server_by_uuid(orig_leader_uuid);
+ *leader_ts->server()->mutable_quiescing() = true;
+
+ // The leader tserver will relinquish leadership soon enough.
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details));
+ ASSERT_NE(orig_leader_uuid, leader_details->uuid());
+ });
+}
+
// Test that even if a majority of replicas are quiescing, a tablet is still
// able to elect a leader.
TEST_F(TServerQuiescingITest, TestMajorityQuiescingElectsLeader) {
const int kNumReplicas = 3;
- FLAGS_raft_heartbeat_interval_ms = 50;
+ FLAGS_raft_heartbeat_interval_ms = 100;
NO_FATALS(StartCluster(kNumReplicas));
vector<string> tablet_ids;
NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids));
@@ -170,10 +198,6 @@ TEST_F(TServerQuiescingITest, TestMajorityQuiescingElectsLeader) {
*cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true;
}
- // Cause a bunch of elections.
- FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
- FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
-
// Eventually the first tserver will be elected leader.
const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
TServerDetails* leader_details;
@@ -234,10 +258,6 @@ TEST_F(TServerQuiescingITest, TestDoesntAllowNewScansLeadersOnly) {
rw_workload->Setup();
rw_workload->Start();
- // Inject a bunch of leader elections to stress leadership changes.
- FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
- FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
-
// Wait for the scans to begin.
MiniTabletServer* ts = nullptr;
ASSERT_EVENTUALLY([&] {
@@ -263,6 +283,74 @@ TEST_F(TServerQuiescingITest, TestDoesntAllowNewScansLeadersOnly) {
NO_FATALS(rw_workload->StopAndJoin());
}
+// Test that when all followers are behind (e.g. because the others are down),
+// the leader, even while quiescing, will remain leader.
+TEST_F(TServerQuiescingITest, TestQuiesceLeaderWhileFollowersCatchingUp) {
+ const int kNumReplicas = 3;
+ FLAGS_raft_heartbeat_interval_ms = 100;
+ NO_FATALS(StartCluster(kNumReplicas));
+ auto rw_workload = CreateFaultIntolerantRWWorkload();
+ rw_workload->set_num_tablets(1);
+ rw_workload->Setup();
+ rw_workload->Start();
+ while (rw_workload->rows_inserted() < 10000) {
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ }
+ TServerDetails* leader_details;
+ const auto kTimeout = MonoDelta::FromSeconds(10);
+ const string tablet_id = cluster_->mini_tablet_server(0)->ListTablets()[0];
+ ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details));
+ const string leader_uuid = leader_details->uuid();
+
+ // Slow down tablet copies so our leader will be catching up followers long
+ // enough for us to observe.
+ FLAGS_tablet_copy_transfer_chunk_size_bytes = 512;
+ FLAGS_tablet_copy_download_file_inject_latency_ms = 500;
+
+ // Stop our writes and delete the replicas on the follower servers, setting
+ // them up for tablet copies.
+ NO_FATALS(rw_workload->StopAndJoin());
+ for (const auto& ts_and_details : ts_map_) {
+ const auto& ts_uuid = ts_and_details.first;
+ if (ts_uuid != leader_uuid) {
+ const auto* ts_details = ts_and_details.second;
+ ASSERT_OK(DeleteTablet(ts_details, tablet_id,
+ tablet::TabletDataState::TABLET_DATA_TOMBSTONED,
+ kTimeout));
+ ASSERT_EVENTUALLY([&] {
+ vector<string> running_tablets;
+ ASSERT_OK(ListRunningTabletIds(ts_details, kTimeout, &running_tablets));
+ ASSERT_EQ(0, running_tablets.size());
+ });
+ }
+ }
+ // Quiesce the leader and wait for a bit. While the leader is catching up
+ // replicas, it shouldn't relinquish leadership.
+ auto* leader_ts = cluster_->mini_tablet_server_by_uuid(leader_uuid);
+ *leader_ts->server()->mutable_quiescing() = true;
+ SleepFor(MonoDelta::FromSeconds(3));
+ ASSERT_EQ(1, leader_ts->server()->num_raft_leaders()->value());
+ ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details));
+ ASSERT_EQ(leader_uuid, leader_details->uuid());
+
+ // Once we let the copy finish, the leader should relinquish leadership.
+ FLAGS_tablet_copy_download_file_inject_latency_ms = 0;
+ FLAGS_tablet_copy_transfer_chunk_size_bytes = 4 * 1024 * 1024;
+ for (const auto& ts_and_details : ts_map_) {
+ ASSERT_EVENTUALLY([&] {
+ vector<string> running_tablets;
+ ASSERT_OK(ListRunningTabletIds(ts_and_details.second, kTimeout, &running_tablets));
+ ASSERT_EQ(1, running_tablets.size());
+ });
+ }
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_EQ(0, leader_ts->server()->num_raft_leaders()->value());
+ TServerDetails* new_leader_details;
+ ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &new_leader_details));
+ ASSERT_NE(leader_uuid, new_leader_details->uuid());
+ });
+}
+
class TServerQuiescingParamITest : public TServerQuiescingITest,
public testing::WithParamInterface<int> {};
@@ -394,6 +482,72 @@ TEST_P(TServerQuiescingParamITest, TestScansRetry) {
}
}
+// Test that when all the tablet servers hosting a replica are quiescing, we
+// can still write (assuming a leader had previously been elected).
+TEST_P(TServerQuiescingParamITest, TestWriteWhileAllQuiescing) {
+ const int kNumReplicas = GetParam();
+ NO_FATALS(StartCluster(kNumReplicas));
+ auto start_write_workload = [&] {
+ // Start up a workload with some writes, with no write error tolerance.
+ unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get()));
+ workload->set_num_replicas(kNumReplicas);
+ workload->set_num_write_threads(3);
+ workload->set_num_tablets(1);
+ workload->Setup();
+ workload->Start();
+ return workload;
+ };
+ auto first_workload = start_write_workload();
+ string tablet_id;
+ ASSERT_EVENTUALLY([&] {
+ vector<string> tablet_ids;
+ tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+ ASSERT_EQ(1, tablet_ids.size());
+ tablet_id = tablet_ids[0];
+ });
+
+ TServerDetails* leader_details;
+ const auto kLeaderTimeout = MonoDelta::FromSeconds(10);
+ ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kLeaderTimeout, &leader_details));
+
+ // Now quiesce all the tablet servers.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true;
+ }
+
+ // We should continue to write uninterrupted.
+ int start_rows = first_workload->rows_inserted();
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_GT(first_workload->rows_inserted(), start_rows + 1000);
+ });
+}
+
+TEST_P(TServerQuiescingParamITest, TestAbruptStepdownWhileAllQuiescing) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ const int kNumReplicas = GetParam();
+ NO_FATALS(StartCluster(kNumReplicas));
+ vector<string> tablet_ids;
+ NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids));
+
+ TServerDetails* leader_details;
+ const auto kLeaderTimeout = MonoDelta::FromSeconds(10);
+ const auto& tablet_id = tablet_ids[0];
+ ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kLeaderTimeout, &leader_details));
+
+ // Now quiesce all the tablet servers.
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true;
+ }
+ // Once we've stepped down, while quiescing, no new leader should be elected.
+ // Wait extra long to be sure.
+ ASSERT_OK(LeaderStepDown(leader_details, tablet_id, kLeaderTimeout));
+ MonoDelta election_timeout = MonoDelta::FromMilliseconds(
+ 2 * FLAGS_raft_heartbeat_interval_ms * FLAGS_leader_failure_max_missed_heartbeat_periods);
+ Status s = FindTabletLeader(ts_map_, tablet_id, election_timeout, &leader_details);
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+}
+
INSTANTIATE_TEST_CASE_P(NumReplicas, TServerQuiescingParamITest, ::testing::Values(1, 3));
} // namespace itest