You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/01/07 17:49:28 UTC
[kudu] 01/03: KUDU-3011 p2: mechanism to quiesce leadership
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 5b4085619747b074d89c2762f39ac1eae3580ace
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Jan 3 16:57:24 2020 -0800
KUDU-3011 p2: mechanism to quiesce leadership
This plumbs a shared flag from the TabletServer down to the
RaftConsensus instance of each TabletReplica that indicates whether the
server is in a quiescing state.
When in this state, any election started by a RaftConsensus instance on
the server will fail with an error.
Change-Id: I0eb3778fe094647bc5f61f97971087d7efb8af5e
Reviewed-on: http://gerrit.cloudera.org:8080/14977
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/consensus/raft_consensus.cc | 6 +-
src/kudu/consensus/raft_consensus.h | 4 +
src/kudu/consensus/raft_consensus_quorum-test.cc | 3 +-
src/kudu/integration-tests/CMakeLists.txt | 1 +
.../tablet_server_quiescing-itest.cc | 232 +++++++++++++++++++++
src/kudu/master/sys_catalog.cc | 3 +-
src/kudu/tablet/tablet_replica-test.cc | 3 +-
.../tserver/tablet_copy_source_session-test.cc | 3 +-
src/kudu/tserver/tablet_server.cc | 1 +
src/kudu/tserver/tablet_server.h | 17 +-
src/kudu/tserver/ts_tablet_manager.cc | 3 +-
11 files changed, 264 insertions(+), 12 deletions(-)
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index 69c19c8..204c79f 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -379,7 +379,8 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
if (IsSingleVoterConfig() && FLAGS_enable_leader_failure_detection) {
LOG_WITH_PREFIX(INFO) << "Only one voter in the Raft config. Triggering election immediately";
- RETURN_NOT_OK(StartElection(NORMAL_ELECTION, INITIAL_SINGLE_NODE_ELECTION));
+ WARN_NOT_OK(StartElection(NORMAL_ELECTION, INITIAL_SINGLE_NODE_ELECTION),
+ "Couldn't start leader election");
}
// Report become visible to the Master.
@@ -440,6 +441,9 @@ string ReasonString(RaftConsensus::ElectionReason reason, StringPiece leader_uui
} // anonymous namespace
Status RaftConsensus::StartElection(ElectionMode mode, ElectionReason reason) {
+ if (server_ctx_.quiescing && server_ctx_.quiescing->load()) {
+ return Status::IllegalState("leader elections are disabled");
+ }
const char* const mode_str = ModeString(mode);
TRACE_EVENT2("consensus", "RaftConsensus::StartElection",
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index b038bfd..900e792 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -82,6 +82,10 @@ struct ElectionResult;
// Context containing resources shared by the Raft consensus instances on a
// single server.
struct ServerContext {
+ // Shared boolean that indicates whether the server is quiescing, in which
+ // case this replica should not attempt to become leader.
+ std::atomic<bool>* quiescing;
+
// Gauge indicating how many Raft tablet leaders are hosted on the server.
scoped_refptr<AtomicGauge<int32_t>> num_leaders;
diff --git a/src/kudu/consensus/raft_consensus_quorum-test.cc b/src/kudu/consensus/raft_consensus_quorum-test.cc
index 090371a..83c6d80 100644
--- a/src/kudu/consensus/raft_consensus_quorum-test.cc
+++ b/src/kudu/consensus/raft_consensus_quorum-test.cc
@@ -192,7 +192,8 @@ class RaftConsensusQuorumTest : public KuduTest {
RETURN_NOT_OK(GetRaftConfigMember(&config_, fs->uuid(), &local_peer_pb));
shared_ptr<RaftConsensus> peer;
- ServerContext ctx({ /*num_leaders*/nullptr,
+ ServerContext ctx({ /*quiescing*/nullptr,
+ /*num_leaders*/nullptr,
raft_pool_.get() });
RETURN_NOT_OK(RaftConsensus::Create(options_,
config_.peers(i),
diff --git a/src/kudu/integration-tests/CMakeLists.txt b/src/kudu/integration-tests/CMakeLists.txt
index c76a445..4dd2f4c 100644
--- a/src/kudu/integration-tests/CMakeLists.txt
+++ b/src/kudu/integration-tests/CMakeLists.txt
@@ -120,6 +120,7 @@ ADD_KUDU_TEST(tablet_copy-itest NUM_SHARDS 6 PROCESSORS 4)
ADD_KUDU_TEST(tablet_copy_client_session-itest)
ADD_KUDU_TEST(tablet_history_gc-itest)
ADD_KUDU_TEST(tablet_replacement-itest)
+ADD_KUDU_TEST(tablet_server_quiescing-itest)
ADD_KUDU_TEST(timestamp_advancement-itest)
ADD_KUDU_TEST(tombstoned_voting-imc-itest)
ADD_KUDU_TEST(tombstoned_voting-itest)
diff --git a/src/kudu/integration-tests/tablet_server_quiescing-itest.cc b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
new file mode 100644
index 0000000..60ec1af
--- /dev/null
+++ b/src/kudu/integration-tests/tablet_server_quiescing-itest.cc
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
+#include "kudu/integration-tests/test_workload.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/ts_tablet_manager.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+DECLARE_bool(enable_leader_failure_detection);
+DECLARE_bool(catalog_manager_wait_for_new_tablets_to_elect_leader);
+DECLARE_double(leader_failure_max_missed_heartbeat_periods);
+DECLARE_int32(consensus_inject_latency_ms_in_notifications);
+DECLARE_int32(raft_heartbeat_interval_ms);
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace itest {
+
+class TServerQuiescingITest : public MiniClusterITestBase {
+ public:
+ // Creates a table with 'num_tablets' partitions and as many replicas as
+ // there are tablet servers, waiting for the tablets to show up on each
+ // server before returning. Populates 'tablet_ids' with the tablet IDs.
+ void CreateWorkloadTable(int num_tablets, vector<string>* tablet_ids = nullptr) {
+ TestWorkload workload(cluster_.get());
+ workload.set_num_replicas(cluster_->num_tablet_servers());
+ workload.set_num_tablets(num_tablets);
+ workload.Setup();
+ ASSERT_EVENTUALLY([&] {
+ for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
+ auto* ts = cluster_->mini_tablet_server(i);
+ ASSERT_EQ(num_tablets, ts->server()->tablet_manager()->GetNumLiveTablets());
+ }
+ if (tablet_ids) {
+ *tablet_ids = cluster_->mini_tablet_server(0)->ListTablets();
+ }
+ });
+ }
+};
+
+// Test that a quiescing server won't trigger an election by natural means (i.e.
+// by detecting a Raft timeout).
+TEST_F(TServerQuiescingITest, TestQuiescingServerDoesntTriggerElections) {
+ const int kNumReplicas = 3;
+ const int kNumTablets = 10;
+ FLAGS_raft_heartbeat_interval_ms = 100;
+ NO_FATALS(StartCluster(kNumReplicas));
+
+ // Set up a table with some replicas.
+ vector<string> tablet_ids;
+ NO_FATALS(CreateWorkloadTable(kNumTablets, &tablet_ids));
+
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+ // Wait for all of our relicas to have leaders.
+ for (const auto& tablet_id : tablet_ids) {
+ TServerDetails* leader_details;
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details));
+ });
+ LOG(INFO) << Substitute("Tablet $0 has leader $1", tablet_id, leader_details->uuid());
+ }
+
+ auto* ts = cluster_->mini_tablet_server(0);
+ LOG(INFO) << Substitute("Quiescing ts $0", ts->uuid());
+ *ts->server()->mutable_quiescing() = true;
+
+ // Cause a bunch of elections.
+ FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
+ FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
+
+ // Soon enough, elections will occur, and our quiescing server will cease to
+ // be leader.
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_EQ(0, ts->server()->num_raft_leaders()->value());
+ });
+
+ // When we stop quiescing the server, we should eventually see some
+ // leadership return to the server.
+ *ts->server()->mutable_quiescing() = false;
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_LT(0, ts->server()->num_raft_leaders()->value());
+ });
+}
+
+// Test that even if a majority of replicas are quiescing, a tablet is still
+// able to elect a leader.
+TEST_F(TServerQuiescingITest, TestMajorityQuiescingElectsLeader) {
+ const int kNumReplicas = 3;
+ FLAGS_raft_heartbeat_interval_ms = 50;
+ NO_FATALS(StartCluster(kNumReplicas));
+ vector<string> tablet_ids;
+ NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids));
+ string tablet_id = tablet_ids[0];
+
+ // Start quiescing all but the first tserver.
+ for (int i = 1; i < kNumReplicas; i++) {
+ *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true;
+ }
+
+ // Cause a bunch of elections.
+ FLAGS_leader_failure_max_missed_heartbeat_periods = 1;
+ FLAGS_consensus_inject_latency_ms_in_notifications = FLAGS_raft_heartbeat_interval_ms;
+
+ // Eventually the first tserver will be elected leader.
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+ TServerDetails* leader_details;
+ ASSERT_EVENTUALLY([&] {
+ ASSERT_OK(FindTabletLeader(ts_map_, tablet_id, kTimeout, &leader_details));
+ ASSERT_EQ(leader_details->uuid(), cluster_->mini_tablet_server(0)->uuid());
+ });
+}
+
+class TServerQuiescingParamITest : public TServerQuiescingITest,
+ public testing::WithParamInterface<int> {};
+
+// Test that a quiescing server won't trigger an election, even when prompted
+// via RPC.
+TEST_P(TServerQuiescingParamITest, TestQuiescingServerRejectsElectionRequests) {
+ const int kNumReplicas = GetParam();
+ NO_FATALS(StartCluster(kNumReplicas));
+
+ // We'll trigger elections manually, so turn off leader failure detection.
+ FLAGS_enable_leader_failure_detection = false;
+ FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader = false;
+
+ vector<string> tablet_ids;
+ NO_FATALS(CreateWorkloadTable(/*num_tablets*/1, &tablet_ids));
+ string tablet_id = tablet_ids[0];
+
+ // First, do a sanity check that we don't have a leader.
+ MonoDelta kLeaderTimeout = MonoDelta::FromMilliseconds(500);
+ TServerDetails* leader_details;
+ Status s = FindTabletLeader(ts_map_, tablet_id, kLeaderTimeout, &leader_details);
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Unable to find leader");
+
+ // Quiesce one of the tablet servers and try prompting it to become leader.
+ // This should fail outright.
+ auto* ts = cluster_->mini_tablet_server(0);
+ *ts->server()->mutable_quiescing() = true;
+ s = StartElection(FindOrDie(ts_map_, ts->uuid()), tablet_id, kLeaderTimeout);
+ ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "leader elections are disabled");
+
+ // And we should still have no leader.
+ s = FindTabletLeader(ts_map_, tablet_id, kLeaderTimeout, &leader_details);
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Unable to find leader");
+}
+
+// Test that if all tservers are quiescing, there will be no leaders elected.
+TEST_P(TServerQuiescingParamITest, TestNoElectionsForNewReplicas) {
+ // NOTE: this test will prevent leaders of our new tablets. In practice,
+ // users should have tablet creation not wait to finish if there all tservers
+ // are being quiesced.
+ FLAGS_catalog_manager_wait_for_new_tablets_to_elect_leader = false;
+ const int kNumReplicas = GetParam();
+ const int kNumTablets = 10;
+ NO_FATALS(StartCluster(kNumReplicas));
+
+ // Quiesce every tablet server.
+ for (int i = 0; i < kNumReplicas; i++) {
+ *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = true;
+ }
+
+ NO_FATALS(CreateWorkloadTable(kNumTablets));
+
+ // Sleep for a bit to let any would-be elections happen.
+ SleepFor(MonoDelta::FromSeconds(1));
+
+ // Since we've quiesced all our servers, none should have leaders.
+ for (int i = 0; i < kNumReplicas; i++) {
+ ASSERT_EQ(0, cluster_->mini_tablet_server(i)->server()->num_raft_leaders()->value());
+ }
+
+ // Now stop quiescing the servers and ensure that we eventually start getting
+ // leaders again.
+ for (int i = 0; i < kNumReplicas; i++) {
+ *cluster_->mini_tablet_server(i)->server()->mutable_quiescing() = false;
+ }
+ ASSERT_EVENTUALLY([&] {
+ int num_leaders = 0;
+ for (int i = 0; i < kNumReplicas; i++) {
+ num_leaders += cluster_->mini_tablet_server(i)->server()->num_raft_leaders()->value();
+ }
+ ASSERT_EQ(kNumTablets, num_leaders);
+ });
+}
+
+INSTANTIATE_TEST_CASE_P(NumReplicas, TServerQuiescingParamITest, ::testing::Values(1, 3));
+
+} // namespace itest
+} // namespace kudu
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 34043e0..1ef6e7b 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -380,7 +380,8 @@ Status SysCatalogTable::SetupTablet(
Unretained(this),
metadata->tablet_id())));
// TODO(awong): plumb master_->num_raft_leaders() here.
- RETURN_NOT_OK_SHUTDOWN(tablet_replica_->Init({ /*num_leaders*/nullptr,
+ RETURN_NOT_OK_SHUTDOWN(tablet_replica_->Init({ /*quiescing*/nullptr,
+ /*num_leaders*/nullptr,
master_->raft_pool() }),
"failed to initialize system catalog replica");
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index ecf980e..b1bf7df 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -153,7 +153,8 @@ class TabletReplicaTest : public KuduTabletTest {
Bind(&TabletReplicaTest::TabletReplicaStateChangedCallback,
Unretained(this),
tablet()->tablet_id())));
- ASSERT_OK(tablet_replica_->Init({ /*num_leaders*/nullptr,
+ ASSERT_OK(tablet_replica_->Init({ /*quiescing*/nullptr,
+ /*num_leaders*/nullptr,
raft_pool_.get() }));
// Make TabletReplica use the same LogAnchorRegistry as the Tablet created by the harness.
// TODO(mpercy): Refactor TabletHarness to allow taking a
diff --git a/src/kudu/tserver/tablet_copy_source_session-test.cc b/src/kudu/tserver/tablet_copy_source_session-test.cc
index 4801beb..8aa5eec 100644
--- a/src/kudu/tserver/tablet_copy_source_session-test.cc
+++ b/src/kudu/tserver/tablet_copy_source_session-test.cc
@@ -164,7 +164,8 @@ class TabletCopyTest : public KuduTabletTest {
Bind(&TabletCopyTest::TabletReplicaStateChangedCallback,
Unretained(this),
tablet()->tablet_id())));
- ASSERT_OK(tablet_replica_->Init({ /*num_leaders*/nullptr,
+ ASSERT_OK(tablet_replica_->Init({ /*quiescing*/nullptr,
+ /*num_leaders*/nullptr,
raft_pool_.get() }));
shared_ptr<Messenger> messenger;
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index c6513e7..7e51b53 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -54,6 +54,7 @@ namespace tserver {
TabletServer::TabletServer(const TabletServerOptions& opts)
: KuduServer("TabletServer", opts, "kudu.tabletserver"),
state_(kStopped),
+ quiescing_(false),
fail_heartbeats_for_tests_(false),
opts_(opts),
tablet_manager_(new TSTabletManager(this)),
diff --git a/src/kudu/tserver/tablet_server.h b/src/kudu/tserver/tablet_server.h
index 343a1de..90b4b45 100644
--- a/src/kudu/tserver/tablet_server.h
+++ b/src/kudu/tserver/tablet_server.h
@@ -14,9 +14,9 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#ifndef KUDU_TSERVER_TABLET_SERVER_H
-#define KUDU_TSERVER_TABLET_SERVER_H
+#pragma once
+#include <atomic>
#include <cstdint>
#include <memory>
#include <string>
@@ -36,8 +36,8 @@ namespace tserver {
class Heartbeater;
class ScannerManager;
-class TabletServerPathHandlers;
class TSTabletManager;
+class TabletServerPathHandlers;
class TabletServer : public kserver::KuduServer {
public:
@@ -59,8 +59,8 @@ class TabletServer : public kserver::KuduServer {
// Waits for the tablet server to complete the initialization.
Status WaitInited();
- virtual Status Start() override;
- virtual void Shutdown() override;
+ Status Start() override;
+ void Shutdown() override;
std::string ToString() const;
@@ -82,6 +82,10 @@ class TabletServer : public kserver::KuduServer {
return maintenance_manager_.get();
}
+ std::atomic<bool>* mutable_quiescing() {
+ return &quiescing_;
+ }
+
private:
friend class TabletServerTestBase;
@@ -93,6 +97,8 @@ class TabletServer : public kserver::KuduServer {
TabletServerState state_;
+ std::atomic<bool> quiescing_;
+
// If true, all heartbeats will be seen as failed.
Atomic32 fail_heartbeats_for_tests_;
@@ -121,4 +127,3 @@ class TabletServer : public kserver::KuduServer {
} // namespace tserver
} // namespace kudu
-#endif
diff --git a/src/kudu/tserver/ts_tablet_manager.cc b/src/kudu/tserver/ts_tablet_manager.cc
index f3c2e2e..4c5d240 100644
--- a/src/kudu/tserver/ts_tablet_manager.cc
+++ b/src/kudu/tserver/ts_tablet_manager.cc
@@ -822,7 +822,8 @@ Status TSTabletManager::CreateAndRegisterTabletReplica(
Bind(&TSTabletManager::MarkTabletDirty,
Unretained(this),
tablet_id)));
- Status s = replica->Init({ server_->num_raft_leaders(),
+ Status s = replica->Init({ server_->mutable_quiescing(),
+ server_->num_raft_leaders(),
server_->raft_pool() });
if (PREDICT_FALSE(!s.ok())) {
replica->SetError(s);