You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/01/16 03:28:43 UTC

[kudu] branch master updated (926bca8 -> 31ed4a1)

This is an automated email from the ASF dual-hosted git repository.

awong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 926bca8  [metrics] Fix bugs when metrics do merge
     new 54db215  KUDU-3011 p5: transfer leadership when quiescing
     new 31ed4a1  KUDU-3011 p6: don't transfer leadership to quiescing followers

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/consensus/consensus.proto                 |   5 +
 src/kudu/consensus/consensus_peers-test.cc         |   1 +
 src/kudu/consensus/consensus_queue-test.cc         | 133 +++++++++++++-
 src/kudu/consensus/consensus_queue.cc              |  26 ++-
 src/kudu/consensus/consensus_queue.h               |  10 ++
 src/kudu/consensus/raft_consensus.cc               |  11 +-
 .../tablet_server_quiescing-itest.cc               | 200 +++++++++++++++++++--
 7 files changed, 361 insertions(+), 25 deletions(-)


[kudu] 01/02: KUDU-3011 p5: transfer leadership when quiescing

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 54db215511e84785a8649ba1e52911f8adfb11e4
Author: Andrew Wong <aw...@apache.org>
AuthorDate: Wed Jan 8 23:28:12 2020 -0800

    KUDU-3011 p5: transfer leadership when quiescing
    
    This amends the behavior of quiescing such that when a tablet server is
    quiescing, it will transfer leadership to a caught-up follower as soon
    as it can.
    
    While in this state, unlike while in a graceful stepdown period, the
    tablet can still be written to, as to not obstruct on-going workloads.
    
    Tests are added to exercise:
    - The basic behavior: even without injecting any errors that might cause
      elections, a quiescing leader will relinquish leadership.
    - The behavior when there are followers being caught up. In such cases,
      the leader won't immediately relinquish leadership -- instead, it will
      wait for the followers to catch up before stepping down.
    - The behavior when being written to. The fact that a leader is
      quiescing shouldn't affect its ability to be written to.
    - The behavior of the PeerMessageQueue when responding to various peer
      responses.
    
    I also removed some election-causing injection in a couple existing
    tests that was previously required to transfer leadership while
    quiescing.
    
    Note: right now, if all tablet servers are quiescing while there is a
    write workload on-going, a large number of StartElection requests will
    be sent from the leaders to the followers. A follow-up patch will
    address this.
    
    Change-Id: Idbf0716f5c9455f83ff5f6f601b0f5042f77d078
    Reviewed-on: http://gerrit.cloudera.org:8080/15012
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/consensus/consensus_peers-test.cc         |   1 +
 src/kudu/consensus/consensus_queue-test.cc         | 116 ++++++++++++-
 src/kudu/consensus/consensus_queue.cc              |  20 ++-
 src/kudu/consensus/consensus_queue.h               |   6 +
 src/kudu/consensus/raft_consensus.cc               |   8 +-
 .../tablet_server_quiescing-itest.cc               | 180 +++++++++++++++++++--
 6 files changed, 306 insertions(+), 25 deletions(-)

diff --git a/src/kudu/consensus/consensus_peers-test.cc b/src/kudu/consensus/consensus_peers-test.cc
index 8ac75a8..49c9812 100644
--- a/src/kudu/consensus/consensus_peers-test.cc
+++ b/src/kudu/consensus/consensus_peers-test.cc
@@ -103,6 +103,7 @@ class ConsensusPeersTest : public KuduTest {
         FakeRaftPeerPB(kLeaderUuid),
         kTabletId,
         raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
+        /*server_quiescing*/nullptr,
         MinimumOpId(),
         MinimumOpId()));
 
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 094f2cc..092d475 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -17,16 +17,20 @@
 
 #include "kudu/consensus/consensus_queue.h"
 
-#include <cstddef>
+#include <algorithm>
+#include <atomic>
 #include <cstdint>
+#include <deque>
 #include <memory>
 #include <ostream>
 #include <string>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
+#include <gtest/gtest_prod.h>
 
 #include "kudu/clock/clock.h"
 #include "kudu/clock/hybrid_clock.h"
@@ -63,6 +67,9 @@ DECLARE_int32(consensus_max_batch_size_bytes);
 DECLARE_int32(follower_unavailable_considered_failed_sec);
 
 using kudu::consensus::HealthReportPB;
+using std::atomic;
+using std::deque;
+using std::string;
 using std::unique_ptr;
 using std::vector;
 
@@ -78,7 +85,8 @@ class ConsensusQueueTest : public KuduTest {
   ConsensusQueueTest()
       : schema_(GetSimpleTestSchema()),
         metric_entity_(METRIC_ENTITY_tablet.Instantiate(&metric_registry_, "queue-test")),
-        registry_(new log::LogAnchorRegistry) {
+        registry_(new log::LogAnchorRegistry),
+        quiescing_(false) {
   }
 
   virtual void SetUp() OVERRIDE {
@@ -90,8 +98,8 @@ class ConsensusQueueTest : public KuduTest {
                             fs_manager_.get(),
                             kTestTablet,
                             schema_,
-                            0, // schema_version
-                            NULL,
+                            /*schema_version*/0,
+                            /*metric_entity*/nullptr,
                             &log_));
     clock_.reset(new clock::HybridClock());
     ASSERT_OK(clock_->Init());
@@ -109,6 +117,7 @@ class ConsensusQueueTest : public KuduTest {
         FakeRaftPeerPB(kLeaderUuid),
         kTestTablet,
         raft_pool_->NewToken(ThreadPool::ExecutionMode::SERIAL),
+        &quiescing_,
         replicated_opid,
         committed_opid));
   }
@@ -233,8 +242,105 @@ class ConsensusQueueTest : public KuduTest {
   gscoped_ptr<PeerMessageQueue> queue_;
   scoped_refptr<log::LogAnchorRegistry> registry_;
   unique_ptr<clock::Clock> clock_;
+  atomic<bool> quiescing_;
 };
 
+// Observer of a PeerMessageQueue that tracks the notifications sent to
+// observers.
+class SimpleObserver : public PeerMessageQueueObserver {
+ public:
+  SimpleObserver() = default;
+
+  void NotifyPeerToStartElection(const string& peer_uuid) override {
+    peers_to_start_election_.emplace_back(peer_uuid);
+  }
+
+  // Other notifications aren't implemented. Just no-op.
+  void NotifyCommitIndex(int64_t /*commit_index*/) override {}
+  void NotifyTermChange(int64_t /*term*/) override {}
+  void NotifyFailedFollower(const string& /*peer_uuid*/, int64_t /*term*/,
+                            const string& /*reason*/) override {}
+  void NotifyPeerToPromote(const string& /*peer_uuid*/) override {}
+  void NotifyPeerHealthChange() override {}
+
+ private:
+  FRIEND_TEST(ConsensusQueueTest, TestTransferLeadershipWhenAppropriate);
+
+  // The following track the notifications sent in chronological order.
+  deque<string> peers_to_start_election_;
+};
+
+// Test that the leader consensus queue will only attempt to trigger elections
+// when appropriate.
+TEST_F(ConsensusQueueTest, TestTransferLeadershipWhenAppropriate) {
+  SimpleObserver observer;
+  queue_->RegisterObserver(&observer);
+  RaftConfigPB config = BuildRaftConfigPBForTests(/*num_voters*/2);
+  queue_->SetLeaderMode(kMinimumOpIdIndex, kMinimumTerm, config);
+  RaftPeerPB follower = MakePeer(kPeerUuid, RaftPeerPB::VOTER);
+  queue_->TrackPeer(follower);
+
+  AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10);
+  WaitForLocalPeerToAckIndex(10);
+
+  ConsensusResponsePB peer_response;
+  peer_response.set_responder_term(1);
+  peer_response.set_responder_uuid(kPeerUuid);
+  SetLastReceivedAndLastCommitted(&peer_response, MakeOpId(1, 9), MinimumOpId().index());
+
+  int elections_so_far = 0;
+  // Simulates receiving the peer's response and checks that, upon receiving
+  // it, the PeerMessageQueue either did or didn't notify that the peer should
+  // start an election.
+  auto verify_elections = [&] (bool election_happened) {
+    ASSERT_TRUE(queue_->ResponseFromPeer(kPeerUuid, peer_response));
+    // Notifications are communicated via the Raft threadpool, so wait for any
+    // such notifying tasks to finish.
+    raft_pool_->Wait();
+    if (election_happened) {
+      elections_so_far++;
+    }
+    ASSERT_EQ(elections_so_far, observer.peers_to_start_election_.size());
+  };
+  // We haven't begun watching for a successor yet and our conditions aren't
+  // met for this peer to become a leader.
+  NO_FATALS(verify_elections(/*election_happened*/false));
+
+  // Even after waiting for a successor, this peer isn't ready yet.
+  queue_->BeginWatchForSuccessor(boost::none);
+  NO_FATALS(verify_elections(/*election_happened*/false));
+
+  // Once the peer says it's gotten the last-appended op, we should be good to
+  // transfer leadership to it.
+  SetLastReceivedAndLastCommitted(&peer_response, MakeOpId(1, 10), MinimumOpId().index());
+  NO_FATALS(verify_elections(/*election_happened*/true));
+
+  // After we've triggered our election, we shouldn't trigger another.
+  NO_FATALS(verify_elections(/*election_happened*/false));
+
+  // And if we try to step down but specify a different peer, we also won't try
+  // electing the peer in-hand.
+  queue_->BeginWatchForSuccessor(boost::make_optional<string>("different-peer"));
+  NO_FATALS(verify_elections(/*election_happened*/false));
+
+  // Even if we begin quiescing, because we're looking for a specific
+  // successor, we shouldn't see an election.
+  quiescing_ = true;
+  NO_FATALS(verify_elections(/*election_happened*/false));
+
+  // If we stop watching for that successor and we're quiescing, we'll trigger
+  // elections.
+  queue_->EndWatchForSuccessor();
+  for (int i = 0; i < 3; i++) {
+    NO_FATALS(verify_elections(/*election_happened*/true));
+  }
+
+  // If the peer weren't a voter, we would also not trigger elections.
+  config.mutable_peers(1)->set_member_type(RaftPeerPB::NON_VOTER);
+  queue_->SetLeaderMode(10, 1, config);
+  NO_FATALS(verify_elections(/*election_happened*/false));
+}
+
 // Tests that the queue is able to track a peer when it starts tracking a peer
 // after the initial message in the queue. In particular this creates a queue
 // with several messages and then starts to track a peer whose watermark
@@ -928,7 +1034,7 @@ TEST_F(ConsensusQueueTest, TestFollowerCommittedIndexAndMetrics) {
   AppendReplicateMessagesToQueue(queue_.get(), clock_.get(), 1, 10);
   WaitForLocalPeerToAckIndex(10);
 
-  // The committed_index should be MinimumOpId() since UpdateFollowerCommittedIndex
+  // The committed_index should be MinimumOpId() since UpdateFollowerWatermarks
   // has not been called.
   ASSERT_EQ(0, queue_->GetCommittedIndex());
 
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index a8aaa8f..3c6373f 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -17,6 +17,7 @@
 #include "kudu/consensus/consensus_queue.h"
 
 #include <algorithm>
+#include <atomic>
 #include <cstdint>
 #include <functional>
 #include <memory>
@@ -80,6 +81,7 @@ DECLARE_int64(rpc_max_message_size);
 using kudu::log::Log;
 using kudu::pb_util::SecureDebugString;
 using kudu::pb_util::SecureShortDebugString;
+using std::atomic;
 using std::string;
 using std::unique_ptr;
 using std::unordered_map;
@@ -174,9 +176,11 @@ PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_ent
                                    RaftPeerPB local_peer_pb,
                                    string tablet_id,
                                    unique_ptr<ThreadPoolToken> raft_pool_observers_token,
+                                   const atomic<bool>* server_quiescing,
                                    OpId last_locally_replicated,
                                    const OpId& last_locally_committed)
     : raft_pool_observers_token_(std::move(raft_pool_observers_token)),
+      server_quiescing_(server_quiescing),
       local_peer_pb_(std::move(local_peer_pb)),
       tablet_id_(std::move(tablet_id)),
       successor_watch_in_progress_(false),
@@ -1065,16 +1069,24 @@ void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer& pre
 void PeerMessageQueue::TransferLeadershipIfNeeded(const TrackedPeer& peer,
                                                   const ConsensusStatusPB& status) {
   DCHECK(queue_lock_.is_locked());
-  if (!successor_watch_in_progress_) {
+  bool server_quiescing = server_quiescing_ && *server_quiescing_;
+  // Only transfer leadership if the local peer has begun looking for a
+  // successor, or if the server is quiescing. Otherwise, exit early.
+  if (!successor_watch_in_progress_ && !server_quiescing) {
     return;
   }
 
-  if (designated_successor_uuid_ && peer.uuid() != designated_successor_uuid_.get()) {
+  // Do some basic sanity checks that we can actually transfer leadership to
+  // the given peer.
+  if (queue_state_.mode != PeerMessageQueue::LEADER ||
+      peer.last_exchange_status != PeerStatus::OK ||
+      local_peer_pb_.permanent_uuid() == peer.uuid()) {
     return;
   }
 
-  if (queue_state_.mode != PeerMessageQueue::LEADER ||
-      peer.last_exchange_status != PeerStatus::OK) {
+  // If looking for a specific successor, ignore peers as appropriate.
+  if (successor_watch_in_progress_ &&
+      designated_successor_uuid_ && peer.uuid() != designated_successor_uuid_.get()) {
     return;
   }
 
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index d2257bc..430a313 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <atomic>
 #include <cstdint>
 #include <functional>
 #include <ostream>
@@ -186,6 +187,7 @@ class PeerMessageQueue {
                    RaftPeerPB local_peer_pb,
                    std::string tablet_id,
                    std::unique_ptr<ThreadPoolToken> raft_pool_observers_token,
+                   const std::atomic<bool>* server_quiescing,
                    OpId last_locally_replicated,
                    const OpId& last_locally_committed);
 
@@ -545,6 +547,10 @@ class PeerMessageQueue {
   // The pool token which executes observer notifications.
   std::unique_ptr<ThreadPoolToken> raft_pool_observers_token_;
 
+  // Shared boolean that indicates whether the server is quiescing, in which
+  // case leadership should be transferred away from this peer.
+  const std::atomic<bool>* server_quiescing_;
+
   // PB containing identifying information about the local peer.
   const RaftPeerPB local_peer_pb_;
 
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 618e542..4f99b71 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -277,6 +277,7 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
       local_peer_pb_,
       options_.tablet_id,
       raft_pool->NewToken(ThreadPool::ExecutionMode::SERIAL),
+      server_ctx_.quiescing,
       info.last_id,
       info.last_committed_id));
 
@@ -913,11 +914,12 @@ void RaftConsensus::NotifyPeerToPromote(const string& peer_uuid) {
 }
 
 void RaftConsensus::NotifyPeerToStartElection(const string& peer_uuid) {
-  LOG(INFO) << "Instructing follower " << peer_uuid << " to start an election";
+  const auto& log_prefix = LogPrefixThreadSafe();
+  LOG(INFO) << log_prefix << ": Instructing follower " << peer_uuid << " to start an election";
   WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryStartElectionOnPeerTask,
                                                      shared_from_this(),
                                                      peer_uuid)),
-              LogPrefixThreadSafe() + "Unable to start TryStartElectionOnPeerTask");
+              log_prefix + "Unable to start TryStartElectionOnPeerTask");
 }
 
 void RaftConsensus::NotifyPeerHealthChange() {
@@ -1002,7 +1004,7 @@ void RaftConsensus::TryStartElectionOnPeerTask(const string& peer_uuid) {
     return;
   }
   LOG_WITH_PREFIX_UNLOCKED(INFO) << "Signalling peer " << peer_uuid
-                                 << "to start an election";
+                                 << " to start an election";
   WARN_NOT_OK(peer_manager_->StartElection(peer_uuid),
               Substitute("unable to start election on peer $0", peer_uuid));
 }
diff --git a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
index bf4fcec..70c4eb9 100644
--- a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
+++ b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
@@ -36,6 +36,7 @@
 #include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
 #include "kudu/integration-tests/test_workload.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/tablet/metadata.pb.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/scanners.h"
 #include "kudu/tserver/tablet_server.h"
@@ -50,6 +51,8 @@ DECLARE_bool(enable_leader_failure_detection);
 DECLARE_bool(catalog_manager_wait_for_new_tablets_to_elect_leader);
 DECLARE_double(leader_failure_max_missed_heartbeat_periods);
 DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(tablet_copy_download_file_inject_latency_ms);
+DECLARE_int32(tablet_copy_transfer_chunk_size_bytes);
 DECLARE_int32(scanner_default_batch_size_bytes);
 DECLARE_int32(scanner_ttl_ms);
 DECLARE_int32(raft_heartbeat_interval_ms);
@@ -137,16 +140,16 @@ TEST_F(TServerQuiescingITest, TestQuiescingServerDoesntTriggerElections) {
   LOG(INFO) << Substitute("Quiescing ts $0", ts->uuid());
   *ts->server()->mutable_quiescing() = true;
 
-  // Cause a bunch of elections.
-  FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
-  FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
-
   // Soon enough, elections will occur, and our quiescing server will cease to
   // be leader.
   ASSERT_EVENTUALLY([&] {
     ASSERT_EQ(0, ts->server()->num_raft_leaders()->value());
   });
 
+  // Cause a bunch of elections.
+  FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
+  FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
+
   // When we stop quiescing the server, we should eventually see some
   // leadership return to the server.
   *ts->server()->mutable_quiescing() = false;
@@ -155,11 +158,36 @@ TEST_F(TServerQuiescingITest, TestQuiescingServerDoesntTriggerElections) {
   });
 }
 
+// Test that after quiescing a tablet's leader, leadership will be transferred
+// elsewhere.
+TEST_F(TServerQuiescingITest, TestQuiescingLeaderTransfersLeadership) {
+  const int kNumReplicas = 3;
+  NO_FATALS(StartCluster(kNumReplicas));
+  vector<string> tablet_ids;
+  NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids));
+  string tablet_id = tablet_ids[0];
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  TServerDetails* leader_details;
+  ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details));
+
+  // Start quiescing the leader.
+  const auto& orig_leader_uuid = leader_details->uuid();
+  auto* leader_ts = cluster_->mini_tablet_server_by_uuid(orig_leader_uuid);
+  *leader_ts->server()->mutable_quiescing() = true;
+
+  // The leader tserver will relinquish leadership soon enough.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details));
+    ASSERT_NE(orig_leader_uuid, leader_details->uuid());
+  });
+}
+
 // Test that even if a majority of replicas are quiescing, a tablet is still
 // able to elect a leader.
 TEST_F(TServerQuiescingITest, TestMajorityQuiescingElectsLeader) {
   const int kNumReplicas = 3;
-  FLAGS_raft_heartbeat_interval_ms = 50;
+  FLAGS_raft_heartbeat_interval_ms = 100;
   NO_FATALS(StartCluster(kNumReplicas));
   vector<string> tablet_ids;
   NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids));
@@ -170,10 +198,6 @@ TEST_F(TServerQuiescingITest, TestMajorityQuiescingElectsLeader) {
     *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true;
   }
 
-  // Cause a bunch of elections.
-  FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
-  FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
-
   // Eventually the first tserver will be elected leader.
   const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
   TServerDetails* leader_details;
@@ -234,10 +258,6 @@ TEST_F(TServerQuiescingITest, TestDoesntAllowNewScansLeadersOnly) {
   rw_workload->Setup();
   rw_workload->Start();
 
-  // Inject a bunch of leader elections to stress leadership changes.
-  FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
-  FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
-
   // Wait for the scans to begin.
   MiniTabletServer* ts = nullptr;
   ASSERT_EVENTUALLY([&] {
@@ -263,6 +283,74 @@ TEST_F(TServerQuiescingITest, TestDoesntAllowNewScansLeadersOnly) {
   NO_FATALS(rw_workload->StopAndJoin());
 }
 
+// Test that when all followers are behind (e.g. because the others are down),
+// the leader, even while quiescing, will remain leader.
+TEST_F(TServerQuiescingITest, TestQuiesceLeaderWhileFollowersCatchingUp) {
+  const int kNumReplicas = 3;
+  FLAGS_raft_heartbeat_interval_ms = 100;
+  NO_FATALS(StartCluster(kNumReplicas));
+  auto rw_workload = CreateFaultIntolerantRWWorkload();
+  rw_workload->set_num_tablets(1);
+  rw_workload->Setup();
+  rw_workload->Start();
+  while (rw_workload->rows_inserted() < 10000) {
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  TServerDetails* leader_details;
+  const auto kTimeout = MonoDelta::FromSeconds(10);
+  const string tablet_id = cluster_->mini_tablet_server(0)->ListTablets()[0];
+  ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details));
+  const string leader_uuid = leader_details->uuid();
+
+  // Slow down tablet copies so our leader will be catching up followers long
+  // enough for us to observe.
+  FLAGS_tablet_copy_transfer_chunk_size_bytes = 512;
+  FLAGS_tablet_copy_download_file_inject_latency_ms = 500;
+
+  // Stop our writes and delete the replicas on the follower servers, setting
+  // them up for tablet copies.
+  NO_FATALS(rw_workload->StopAndJoin());
+  for (const auto& ts_and_details : ts_map_) {
+    const auto& ts_uuid = ts_and_details.first;
+    if (ts_uuid != leader_uuid) {
+      const auto* ts_details = ts_and_details.second;
+      ASSERT_OK(DeleteTablet(ts_details, tablet_id,
+                             tablet::TabletDataState::TABLET_DATA_TOMBSTONED,
+                             kTimeout));
+      ASSERT_EVENTUALLY([&] {
+        vector<string> running_tablets;
+        ASSERT_OK(ListRunningTabletIds(ts_details, kTimeout, &running_tablets));
+        ASSERT_EQ(0, running_tablets.size());
+      });
+    }
+  }
+  // Quiesce the leader and wait for a bit. While the leader is catching up
+  // replicas, it shouldn't relinquish leadership.
+  auto* leader_ts = cluster_->mini_tablet_server_by_uuid(leader_uuid);
+  *leader_ts->server()->mutable_quiescing() = true;
+  SleepFor(MonoDelta::FromSeconds(3));
+  ASSERT_EQ(1, leader_ts->server()->num_raft_leaders()->value());
+  ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details));
+  ASSERT_EQ(leader_uuid, leader_details->uuid());
+
+  // Once we let the copy finish, the leader should relinquish leadership.
+  FLAGS_tablet_copy_download_file_inject_latency_ms = 0;
+  FLAGS_tablet_copy_transfer_chunk_size_bytes = 4 * 1024 * 1024;
+  for (const auto& ts_and_details : ts_map_) {
+    ASSERT_EVENTUALLY([&] {
+      vector<string> running_tablets;
+      ASSERT_OK(ListRunningTabletIds(ts_and_details.second, kTimeout, &running_tablets));
+      ASSERT_EQ(1, running_tablets.size());
+    });
+  }
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(0, leader_ts->server()->num_raft_leaders()->value());
+    TServerDetails* new_leader_details;
+    ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &new_leader_details));
+    ASSERT_NE(leader_uuid, new_leader_details->uuid());
+  });
+}
+
 class TServerQuiescingParamITest : public TServerQuiescingITest,
                                    public testing::WithParamInterface<int> {};
 
@@ -394,6 +482,72 @@ TEST_P(TServerQuiescingParamITest, TestScansRetry) {
   }
 }
 
+// Test that when all the tablet servers hosting a replica are quiescing, we
+// can still write (assuming a leader had previously been elected).
+TEST_P(TServerQuiescingParamITest, TestWriteWhileAllQuiescing) {
+  const int kNumReplicas = GetParam();
+  NO_FATALS(StartCluster(kNumReplicas));
+  auto start_write_workload = [&] {
+    // Start up a workload with some writes, with no write error tolerance.
+    unique_ptr<TestWorkload> workload(new TestWorkload(cluster_.get()));
+    workload->set_num_replicas(kNumReplicas);
+    workload->set_num_write_threads(3);
+    workload->set_num_tablets(1);
+    workload->Setup();
+    workload->Start();
+    return workload;
+  };
+  auto first_workload = start_write_workload();
+  string tablet_id;
+  ASSERT_EVENTUALLY([&] {
+    vector<string> tablet_ids;
+    tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+    ASSERT_EQ(1, tablet_ids.size());
+    tablet_id = tablet_ids[0];
+  });
+
+  TServerDetails* leader_details;
+  const auto kLeaderTimeout = MonoDelta::FromSeconds(10);
+  ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kLeaderTimeout, &leader_details));
+
+  // Now quiesce all the tablet servers.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true;
+  }
+
+  // We should continue to write uninterrupted.
+  int start_rows = first_workload->rows_inserted();
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_GT(first_workload->rows_inserted(), start_rows + 1000);
+  });
+}
+
+TEST_P(TServerQuiescingParamITest, TestAbruptStepdownWhileAllQuiescing) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  const int kNumReplicas = GetParam();
+  NO_FATALS(StartCluster(kNumReplicas));
+  vector<string> tablet_ids;
+  NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids));
+
+  TServerDetails* leader_details;
+  const auto kLeaderTimeout = MonoDelta::FromSeconds(10);
+  const auto& tablet_id = tablet_ids[0];
+  ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kLeaderTimeout, &leader_details));
+
+  // Now quiesce all the tablet servers.
+  for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+    *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true;
+  }
+  // Once we've stepped down, while quiescing, no new leader should be elected.
+  // Wait extra long to be sure.
+  ASSERT_OK(LeaderStepDown(leader_details, tablet_id, kLeaderTimeout));
+  MonoDelta election_timeout = MonoDelta::FromMilliseconds(
+      2 * FLAGS_raft_heartbeat_interval_ms * FLAGS_leader_failure_max_missed_heartbeat_periods);
+  Status s = FindTabletLeader(ts_map_, tablet_id, election_timeout, &leader_details);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+}
+
 INSTANTIATE_TEST_CASE_P(NumReplicas, TServerQuiescingParamITest, ::testing::Values(1, 3));
 
 } // namespace itest


[kudu] 02/02: KUDU-3011 p6: don't transfer leadership to quiescing followers

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 31ed4a11de3f5a158d90d76b306c2d96fe07a55d
Author: Andrew Wong <aw...@apache.org>
AuthorDate: Tue Jan 14 15:39:43 2020 -0800

    KUDU-3011 p6: don't transfer leadership to quiescing followers
    
    When a tablet server is quiescing, any followers hosted on it should not
    be considered candidates to be the leader's successor.
    
    A quiescing follower already would never become a leader because it
    would reject the StartElection request immediately. This patch improves
    upon this by nipping such requests in the bud. Followers will now send
    along with their ConsensusResponses whether or not they're quiescing,
    and if they are, the leader will know not to transfer leadership to it.
    
    I considered another approach to reducing the number of fruitless RPCs
    sent -- simply throttling the interval with which a leader can send
    StartElection requests. I opted to go with the current approach since it
    is more complete with regards to preventing extraneous StartElection
    requests.
    
    Change-Id: I74ec79d0bc4dbe42fce0cca2e001cd0b369cd066
    Reviewed-on: http://gerrit.cloudera.org:8080/15035
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/consensus/consensus.proto                   |  5 +++++
 src/kudu/consensus/consensus_queue-test.cc           | 17 +++++++++++++++++
 src/kudu/consensus/consensus_queue.cc                |  8 +++++++-
 src/kudu/consensus/consensus_queue.h                 |  4 ++++
 src/kudu/consensus/raft_consensus.cc                 |  3 +++
 .../tablet_server_quiescing-itest.cc                 | 20 ++++++++++++++++++++
 6 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index b6a2d9a..dc770e7 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -404,6 +404,11 @@ message ConsensusResponsePB {
   // The current consensus status of the receiver peer.
   optional ConsensusStatusPB status = 3;
 
+  // Whether the server that hosts the peer is quiescing. This doesn't
+  // necessarily have any bearing on the state of the Raft peer itself, but it
+  // does indicate that the peer should not be a candidate for leadership.
+  optional bool server_quiescing = 4;
+
   // A generic error message (such as tablet not found), per operation
   // error messages are sent along with the consensus status.
   optional tserver.TabletServerErrorPB error = 999;
diff --git a/src/kudu/consensus/consensus_queue-test.cc b/src/kudu/consensus/consensus_queue-test.cc
index 092d475..eba226c 100644
--- a/src/kudu/consensus/consensus_queue-test.cc
+++ b/src/kudu/consensus/consensus_queue-test.cc
@@ -339,6 +339,23 @@ TEST_F(ConsensusQueueTest, TestTransferLeadershipWhenAppropriate) {
   config.mutable_peers(1)->set_member_type(RaftPeerPB::NON_VOTER);
   queue_->SetLeaderMode(10, 1, config);
   NO_FATALS(verify_elections(/*election_happened*/false));
+
+  // Now undo that.
+  config.mutable_peers(1)->set_member_type(RaftPeerPB::VOTER);
+  queue_->SetLeaderMode(10, 1, config);
+  NO_FATALS(verify_elections(/*election_happened*/true));
+
+  // If the peer reported itself as quiescing, we also wouldn't trigger an
+  // election.
+  peer_response.set_server_quiescing(true);
+  NO_FATALS(verify_elections(/*election_happened*/false));
+
+  // If the peer stops reporting its server as quiescing, elections will start
+  // up again.
+  peer_response.clear_server_quiescing();
+  NO_FATALS(verify_elections(/*election_happened*/true));
+  peer_response.set_server_quiescing(false);
+  NO_FATALS(verify_elections(/*election_happened*/true));
 }
 
 // Tests that the queue is able to track a peer when it starts tracking a peer
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index 3c6373f..619bc18 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -146,6 +146,7 @@ PeerMessageQueue::TrackedPeer::TrackedPeer(RaftPeerPB peer_pb)
       last_exchange_status(PeerStatus::NEW),
       last_communication_time(MonoTime::Now()),
       wal_catchup_possible(true),
+      remote_server_quiescing(false),
       last_overall_health_status(HealthReportPB::UNKNOWN),
       status_log_throttler(std::make_shared<logging::LogThrottler>()),
       last_seen_term_(0) {
@@ -1079,8 +1080,9 @@ void PeerMessageQueue::TransferLeadershipIfNeeded(const TrackedPeer& peer,
   // Do some basic sanity checks that we can actually transfer leadership to
   // the given peer.
   if (queue_state_.mode != PeerMessageQueue::LEADER ||
+      local_peer_pb_.permanent_uuid() == peer.uuid() ||
       peer.last_exchange_status != PeerStatus::OK ||
-      local_peer_pb_.permanent_uuid() == peer.uuid()) {
+      peer.remote_server_quiescing) {
     return;
   }
 
@@ -1155,6 +1157,10 @@ bool PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
     // offset between the local leader and the remote peer.
     UpdateExchangeStatus(peer, prev_peer_state, response, &send_more_immediately);
 
+    // If the peer is hosted on a server that is quiescing, note that now.
+    peer->remote_server_quiescing = response.has_server_quiescing() &&
+                                    response.server_quiescing();
+
     // If the reported last-received op for the replica is in our local log,
     // then resume sending entries from that point onward. Otherwise, resume
     // after the last op they received from us. If we've never successfully
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 430a313..def0ba3 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -167,6 +167,10 @@ class PeerMessageQueue {
     // the local peer's WAL.
     bool wal_catchup_possible;
 
+    // Whether the peer's server is quiescing, which dictates whether the peer
+    // is a candidate for leadership successor.
+    bool remote_server_quiescing;
+
     // The peer's latest overall health status.
     HealthReportPB::HealthStatus last_overall_health_status;
 
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 4f99b71..d9ca4f4 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -1641,6 +1641,9 @@ void RaftConsensus::FillConsensusResponseOKUnlocked(ConsensusResponsePB* respons
       last_received_cur_leader_);
   response->mutable_status()->set_last_committed_idx(
       queue_->GetCommittedIndex());
+  if (PREDICT_TRUE(server_ctx_.quiescing) && server_ctx_.quiescing->load()) {
+    response->set_server_quiescing(true);
+  }
 }
 
 void RaftConsensus::FillConsensusResponseError(ConsensusResponsePB* response,
diff --git a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
index 70c4eb9..dde8883 100644
--- a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
+++ b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
@@ -57,6 +57,8 @@ DECLARE_int32(scanner_default_batch_size_bytes);
 DECLARE_int32(scanner_ttl_ms);
 DECLARE_int32(raft_heartbeat_interval_ms);
 
+METRIC_DECLARE_histogram(handler_latency_kudu_consensus_ConsensusService_RunLeaderElection);
+
 using kudu::client::KuduClient;
 using kudu::client::KuduScanBatch;
 using kudu::client::KuduScanner;
@@ -514,12 +516,30 @@ TEST_P(TServerQuiescingParamITest, TestWriteWhileAllQuiescing) {
   for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
     *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true;
   }
+  // Counts the number of times we were requested to start elections
+  // cluster-wide.
+  auto get_num_elections = [&] () {
+    int num_elections = 0;
+    for (int i = 0; i < kNumReplicas; i++) {
+      auto* ts = cluster_->mini_tablet_server(i)->server();
+      scoped_refptr<Histogram> hist(ts->metric_entity()->FindOrCreateHistogram(
+          &METRIC_handler_latency_kudu_consensus_ConsensusService_RunLeaderElection));
+      num_elections++;
+    }
+    return num_elections;
+  };
+
+  int initial_num_elections = get_num_elections();
+  ASSERT_LT(0, initial_num_elections);
 
   // We should continue to write uninterrupted.
   int start_rows = first_workload->rows_inserted();
   ASSERT_EVENTUALLY([&] {
     ASSERT_GT(first_workload->rows_inserted(), start_rows + 1000);
   });
+
+  // We also should not have triggered any elections.
+  ASSERT_EQ(initial_num_elections, get_num_elections());
 }
 
 TEST_P(TServerQuiescingParamITest, TestAbruptStepdownWhileAllQuiescing) {