You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/01/07 17:49:28 UTC

[kudu] 01/03: KUDU-3011 p2: mechanism to quiesce leadership

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5b4085619747b074d89c2762f39ac1eae3580ace
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Jan 3 16:57:24 2020 -0800

    KUDU-3011 p2: mechanism to quiesce leadership
    
    This plumbs a shared flag from the TabletServer down to the
    RaftConsensus instance of each TabletReplica that indicates whether the
    server is in a quiescing state.
    
    When in this state, any election started by a RaftConsensus instance on
    the server will fail with an error.
    
    Change-Id: I0eb3778fe094647bc5f61f97971087d7efb8af5e
    Reviewed-on: http://gerrit.cloudera.org:8080/14977
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/consensus/raft_consensus.cc               |   6 +-
 src/kudu/consensus/raft_consensus.h                |   4 +
 src/kudu/consensus/raft_consensus_quorum-test.cc   |   3 +-
 src/kudu/integration-tests/CMakeLists.txt          |   1 +
 .../tablet_server_quiescing-itest.cc               | 232 +++++++++++++++++++++
 src/kudu/master/sys_catalog.cc                     |   3 +-
 src/kudu/tablet/tablet_replica-test.cc             |   3 +-
 .../tserver/tablet_copy_source_session-test.cc     |   3 +-
 src/kudu/tserver/tablet_server.cc                  |   1 +
 src/kudu/tserver/tablet_server.h                   |  17 +-
 src/kudu/tserver/ts_tablet_manager.cc              |   3 +-
 11 files changed, 264 insertions(+), 12 deletions(-)

diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 69c19c8..204c79f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -379,7 +379,8 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
 
   if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) {
     LOG_WITH_PREFIX(INFO) << "Only one voter in the Raft config. Triggering election immediately";
-    RETURN_NOT_OK(StartElection(NORMAL_ELECTION, INITIAL_SINGLE_NODE_ELECTION));
+    WARN_NOT_OK(StartElection(NORMAL_ELECTION, INITIAL_SINGLE_NODE_ELECTION),
+                "Couldn't start leader election");
   }
 
   // Report become visible to the Master.
@@ -440,6 +441,9 @@ string ReasonString(RaftConsensus::ElectionReason reason, StringPiece leader_uui
 } // anonymous namespace
 
 Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
+  if (server_ctx_.quiescing && server_ctx_.quiescing->load()) {
+    return Status::IllegalState("leader elections are disabled");
+  }
   const char* const mode_str = ModeString(mode);
 
   TRACE_EVENT2("consensus", "RaftConsensus::StartElection",
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index b038bfd..900e792 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -82,6 +82,10 @@ struct ElectionResult;
 // Context containing resources shared by the Raft consensus instances on a
 // single server.
 struct ServerContext {
+  // Shared boolean that indicates whether the server is quiescing, in which
+  // case this replica should not attempt to become leader.
+  std::atomic<bool>* quiescing;
+
   // Gauge indicating how many Raft tablet leaders are hosted on the server.
   scoped_refptr<AtomicGauge<int32_t>> num_leaders;
 
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 090371a..83c6d80 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -192,7 +192,8 @@ class RaftConsensusQuorumTest : public KuduTest {
       RETURN_NOT_OK(GetRaftConfigMember(&config_, fs->uuid(), &local_peer_pb));
 
       shared_ptr<RaftConsensus> peer;
-      ServerContext ctx({ /*num_leaders*/nullptr,
+      ServerContext ctx({ /*quiescing*/nullptr,
+                          /*num_leaders*/nullptr,
                           raft_pool_.get() });
       RETURN_NOT_OK(RaftConsensus::Create(options_,
                                           config_.peers(i),
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index c76a445..4dd2f4c 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -120,6 +120,7 @@ ADD_KUDU_TEST(tablet_copy-itest NUM_SHARDS 6 PROCESSORS 4)
 ADD_KUDU_TEST(tablet_copy_client_session-itest)
 ADD_KUDU_TEST(tablet_history_gc-itest)
 ADD_KUDU_TEST(tablet_replacement-itest)
+ADD_KUDU_TEST(tablet_server_quiescing-itest)
 ADD_KUDU_TEST(timestamp_advancement-itest)
 ADD_KUDU_TEST(tombstoned_voting-imc-itest)
 ADD_KUDU_TEST(tombstoned_voting-itest)
diff --git a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
new file mode 100644
index 0000000..60ec1af
--- /dev/null
+++ b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(enable_leader_failure_detection);
+DECLARE_bool(catalog_manager_wait_for_new_tablets_to_elect_leader);
+DECLARE_double(leader_failure_max_missed_heartbeat_periods);
+DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(raft_heartbeat_interval_ms);
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace itest {
+
+class TServerQuiescingITest : public MiniClusterITestBase {
+ public:
+  // Creates a table with 'num_tablets' partitions and as many replicas as
+  // there are tablet servers, waiting for the tablets to show up on each
+  // server before returning. Populates 'tablet_ids' with the tablet IDs.
+  void CreateWorkloadTable(int num_tablets, vector<string>* tablet_ids = nullptr) {
+    TestWorkload workload(cluster_.get());
+    workload.set_num_replicas(cluster_->num_tablet_servers());
+    workload.set_num_tablets(num_tablets);
+    workload.Setup();
+    ASSERT_EVENTUALLY([&] {
+      for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+        auto* ts = cluster_->mini_tablet_server(i);
+        ASSERT_EQ(num_tablets, ts->server()->tablet_manager()->GetNumLiveTablets());
+      }
+      if (tablet_ids) {
+        *tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+      }
+    });
+  }
+};
+
+// Test that a quiescing server won't trigger an election by natural means (i.e.
+// by detecting a Raft timeout).
+TEST_F(TServerQuiescingITest, TestQuiescingServerDoesntTriggerElections) {
+  const int kNumReplicas = 3;
+  const int kNumTablets = 10;
+  FLAGS_raft_heartbeat_interval_ms = 100;
+  NO_FATALS(StartCluster(kNumReplicas));
+
+  // Set up a table with some replicas.
+  vector<string> tablet_ids;
+  NO_FATALS(CreateWorkloadTable(kNumTablets, &tablet_ids));
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  // Wait for all of our relicas to have leaders.
+  for (const auto& tablet_id : tablet_ids) {
+    TServerDetails* leader_details;
+    ASSERT_EVENTUALLY([&] {
+      ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details));
+    });
+    LOG(INFO) << Substitute("Tablet $0 has leader $1", tablet_id, leader_details->uuid());
+  }
+
+  auto* ts = cluster_->mini_tablet_server(0);
+  LOG(INFO) << Substitute("Quiescing ts $0", ts->uuid());
+  *ts->server()->mutable_quiescing() = true;
+
+  // Cause a bunch of elections.
+  FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
+  FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
+
+  // Soon enough, elections will occur, and our quiescing server will cease to
+  // be leader.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(0, ts->server()->num_raft_leaders()->value());
+  });
+
+  // When we stop quiescing the server, we should eventually see some
+  // leadership return to the server.
+  *ts->server()->mutable_quiescing() = false;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_LT(0, ts->server()->num_raft_leaders()->value());
+  });
+}
+
+// Test that even if a majority of replicas are quiescing, a tablet is still
+// able to elect a leader.
+TEST_F(TServerQuiescingITest, TestMajorityQuiescingElectsLeader) {
+  const int kNumReplicas = 3;
+  FLAGS_raft_heartbeat_interval_ms = 50;
+  NO_FATALS(StartCluster(kNumReplicas));
+  vector<string> tablet_ids;
+  NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids));
+  string tablet_id = tablet_ids[0];
+
+  // Start quiescing all but the first tserver.
+  for (int i = 1; i < kNumReplicas; i++) {
+    *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true;
+  }
+
+  // Cause a bunch of elections.
+  FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
+  FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
+
+  // Eventually the first tserver will be elected leader.
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  TServerDetails* leader_details;
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details));
+    ASSERT_EQ(leader_details->uuid(), cluster_->mini_tablet_server(0)->uuid());
+  });
+}
+
+class TServerQuiescingParamITest : public TServerQuiescingITest,
+                                   public testing::WithParamInterface<int> {};
+
+// Test that a quiescing server won't trigger an election, even when prompted
+// via RPC.
+TEST_P(TServerQuiescingParamITest, TestQuiescingServerRejectsElectionRequests) {
+  const int kNumReplicas = GetParam();
+  NO_FATALS(StartCluster(kNumReplicas));
+
+  // We'll trigger elections manually, so turn off leader failure detection.
+  FLAGS_enable_leader_failure_detection = false;
+  FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader = false;
+
+  vector<string> tablet_ids;
+  NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids));
+  string tablet_id = tablet_ids[0];
+
+  // First, do a sanity check that we don't have a leader.
+  MonoDelta kLeaderTimeout = MonoDelta::FromMilliseconds(500);
+  TServerDetails* leader_details;
+  Status s = FindTabletLeader(ts_map_, tablet_id, kLeaderTimeout, &leader_details);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Unable to find leader");
+
+  // Quiesce one of the tablet servers and try prompting it to become leader.
+  // This should fail outright.
+  auto* ts = cluster_->mini_tablet_server(0);
+  *ts->server()->mutable_quiescing() = true;
+  s = StartElection(FindOrDie(ts_map_, ts->uuid()), tablet_id, kLeaderTimeout);
+  ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "leader elections are disabled");
+
+  // And we should still have no leader.
+  s = FindTabletLeader(ts_map_, tablet_id, kLeaderTimeout, &leader_details);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+  ASSERT_STR_CONTAINS(s.ToString(), "Unable to find leader");
+}
+
+// Test that if all tservers are quiescing, there will be no leaders elected.
+TEST_P(TServerQuiescingParamITest, TestNoElectionsForNewReplicas) {
+  // NOTE: this test will prevent leaders of our new tablets. In practice,
+  // users should have tablet creation not wait to finish if there all tservers
+  // are being quiesced.
+  FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader = false;
+  const int kNumReplicas = GetParam();
+  const int kNumTablets = 10;
+  NO_FATALS(StartCluster(kNumReplicas));
+
+  // Quiesce every tablet server.
+  for (int i = 0; i < kNumReplicas; i++) {
+    *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true;
+  }
+
+  NO_FATALS(CreateWorkloadTable(kNumTablets));
+
+  // Sleep for a bit to let any would-be elections happen.
+  SleepFor(MonoDelta::FromSeconds(1));
+
+  // Since we've quiesced all our servers, none should have leaders.
+  for (int i = 0; i < kNumReplicas; i++) {
+    ASSERT_EQ(0, cluster_->mini_tablet_server(i)->server()->num_raft_leaders()->value());
+  }
+
+  // Now stop quiescing the servers and ensure that we eventually start getting
+  // leaders again.
+  for (int i = 0; i < kNumReplicas; i++) {
+    *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = false;
+  }
+  ASSERT_EVENTUALLY([&] {
+    int num_leaders = 0;
+    for (int i = 0; i < kNumReplicas; i++) {
+      num_leaders += cluster_->mini_tablet_server(i)->server()->num_raft_leaders()->value();
+    }
+    ASSERT_EQ(kNumTablets, num_leaders);
+  });
+}
+
+INSTANTIATE_TEST_CASE_P(NumReplicas, TServerQuiescingParamITest, ::testing::Values(1, 3));
+
+} // namespace itest
+} // namespace kudu
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 34043e0..1ef6e7b 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -380,7 +380,8 @@ Status SysCatalogTable::SetupTablet(
            Unretained(this),
            metadata->tablet_id())));
   // TODO(awong): plumb master_->num_raft_leaders() here.
-  RETURN_NOT_OK_SHUTDOWN(tablet_replica_->Init({ /*num_leaders*/nullptr,
+  RETURN_NOT_OK_SHUTDOWN(tablet_replica_->Init({ /*quiescing*/nullptr,
+                                                 /*num_leaders*/nullptr,
                                                  master_->raft_pool() }),
                          "failed to initialize system catalog replica");
 
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index ecf980e..b1bf7df 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -153,7 +153,8 @@ class TabletReplicaTest : public KuduTabletTest {
                         Bind(&TabletReplicaTest::TabletReplicaStateChangedCallback,
                              Unretained(this),
                              tablet()->tablet_id())));
-    ASSERT_OK(tablet_replica_->Init({ /*num_leaders*/nullptr,
+    ASSERT_OK(tablet_replica_->Init({ /*quiescing*/nullptr,
+                                      /*num_leaders*/nullptr,
                                       raft_pool_.get() }));
     // Make TabletReplica use the same LogAnchorRegistry as the Tablet created by the harness.
     // TODO(mpercy): Refactor TabletHarness to allow taking a
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 4801beb..8aa5eec 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -164,7 +164,8 @@ class TabletCopyTest : public KuduTabletTest {
                           Bind(&TabletCopyTest::TabletReplicaStateChangedCallback,
                                Unretained(this),
                                tablet()->tablet_id())));
-    ASSERT_OK(tablet_replica_->Init({ /*num_leaders*/nullptr,
+    ASSERT_OK(tablet_replica_->Init({ /*quiescing*/nullptr,
+                                      /*num_leaders*/nullptr,
                                       raft_pool_.get() }));
 
     shared_ptr<Messenger> messenger;
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index c6513e7..7e51b53 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -54,6 +54,7 @@ namespace tserver {
 TabletServer::TabletServer(const TabletServerOptions& opts)
   : KuduServer("TabletServer", opts, "kudu.tabletserver"),
     state_(kStopped),
+    quiescing_(false),
     fail_heartbeats_for_tests_(false),
     opts_(opts),
     tablet_manager_(new TSTabletManager(this)),
diff --git a/src/kudu/tserver/tablet_server.h b/src/kudu/tserver/tablet_server.h
index 343a1de..90b4b45 100644
--- a/src/kudu/tserver/tablet_server.h
+++ b/src/kudu/tserver/tablet_server.h
@@ -14,9 +14,9 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_TSERVER_TABLET_SERVER_H
-#define KUDU_TSERVER_TABLET_SERVER_H
+#pragma once
 
+#include <atomic>
 #include <cstdint>
 #include <memory>
 #include <string>
@@ -36,8 +36,8 @@ namespace tserver {
 
 class Heartbeater;
 class ScannerManager;
-class TabletServerPathHandlers;
 class TSTabletManager;
+class TabletServerPathHandlers;
 
 class TabletServer : public kserver::KuduServer {
  public:
@@ -59,8 +59,8 @@ class TabletServer : public kserver::KuduServer {
   // Waits for the tablet server to complete the initialization.
   Status WaitInited();
 
-  virtual Status Start() override;
-  virtual void Shutdown() override;
+  Status Start() override;
+  void Shutdown() override;
 
   std::string ToString() const;
 
@@ -82,6 +82,10 @@ class TabletServer : public kserver::KuduServer {
     return maintenance_manager_.get();
   }
 
+  std::atomic<bool>* mutable_quiescing() {
+    return &quiescing_;
+  }
+
  private:
   friend class TabletServerTestBase;
 
@@ -93,6 +97,8 @@ class TabletServer : public kserver::KuduServer {
 
   TabletServerState state_;
 
+  std::atomic<bool> quiescing_;
+
   // If true, all heartbeats will be seen as failed.
   Atomic32 fail_heartbeats_for_tests_;
 
@@ -121,4 +127,3 @@ class TabletServer : public kserver::KuduServer {
 
 } // namespace tserver
 } // namespace kudu
-#endif
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index f3c2e2e..4c5d240 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -822,7 +822,8 @@ Status TSTabletManager::CreateAndRegisterTabletReplica(
                         Bind(&TSTabletManager::MarkTabletDirty,
                              Unretained(this),
                              tablet_id)));
-  Status s = replica->Init({ server_->num_raft_leaders(),
+  Status s = replica->Init({ server_->mutable_quiescing(),
+                             server_->num_raft_leaders(),
                              server_->raft_pool() });
   if (PREDICT_FALSE(!s.ok())) {
     replica->SetError(s);