You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ba...@apache.org on 2020/09/24 18:07:34 UTC
[kudu] branch master updated: [master] KUDU-2181 Raft ChangeConfig
request to add a master
This is an automated email from the ASF dual-hosted git repository.
bankim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 9fc6c77 [master] KUDU-2181 Raft ChangeConfig request to add a master
9fc6c77 is described below
commit 9fc6c772aff2bbfa008f52da8e7ba6e2c2d9012f
Author: Bankim Bhavsar <ba...@cloudera.com>
AuthorDate: Fri Jul 24 16:33:25 2020 -0700
[master] KUDU-2181 Raft ChangeConfig request to add a master
This change:
- Adds hidden feature flag "--master_support_change_config" off by
default.
- RPC changes to add a master that initiates Raft config change and
responds asynchronously.
- RPC changes to report back member type(VOTER/NON_VOTER) of masters
- Removes the cached master_addresses in catalog manager as it's no
longer static.
- Updates and adds comments in MasterOptions such that it's used to
fetch master addresses only during master init time as masters can
be added/removed dynamically with this change.
- Updates ListMasters() to look at local Raft config instead of
MasterOptions as the masters can be added/removed dynamically.
If the new master can be caught up from the WAL then the master
gets promoted to VOTER else it remains as NON_VOTER without master
tablet copying support.
Change-Id: I0ac7e6e55220bcb01cad0fa386daaf656258088c
Reviewed-on: http://gerrit.cloudera.org:8080/16321
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Tested-by: Bankim Bhavsar <ba...@cloudera.com>
---
src/kudu/common/wire_protocol.proto | 6 +-
src/kudu/consensus/raft_consensus.cc | 17 +
src/kudu/consensus/raft_consensus.h | 6 +
src/kudu/master/CMakeLists.txt | 1 +
src/kudu/master/catalog_manager.cc | 75 ++-
src/kudu/master/catalog_manager.h | 33 +-
src/kudu/master/dynamic_multi_master-test.cc | 657 +++++++++++++++++++++++++
src/kudu/master/master.cc | 49 +-
src/kudu/master/master.h | 10 +-
src/kudu/master/master.proto | 18 +
src/kudu/master/master_options.cc | 12 +-
src/kudu/master/master_options.h | 23 +-
src/kudu/master/master_path_handlers.cc | 15 +-
src/kudu/master/master_service.cc | 52 +-
src/kudu/master/master_service.h | 9 +-
src/kudu/master/mini_master.cc | 2 +-
src/kudu/master/sys_catalog.cc | 23 +-
src/kudu/mini-cluster/external_mini_cluster.cc | 17 +-
src/kudu/mini-cluster/external_mini_cluster.h | 8 +
src/kudu/tools/tool_action_master.cc | 4 +
20 files changed, 984 insertions(+), 53 deletions(-)
diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto
index 59682ba..837acc0 100644
--- a/src/kudu/common/wire_protocol.proto
+++ b/src/kudu/common/wire_protocol.proto
@@ -115,13 +115,17 @@ message ServerEntryPB {
optional NodeInstancePB instance_id = 2;
optional ServerRegistrationPB registration = 3;
- // If an error has occured earlier in the RPC call, the role
+ // If an error has occurred earlier in the RPC call, the role
// may be not be set.
optional consensus.RaftPeerPB.Role role = 4;
// The unique cluster ID of the cluster this server belongs too.
// TODO(ghenke): Currently only set for masters.
optional string cluster_id = 5;
+
+ // If an error has occurred earlier in the RPC call, the
+ // member_type may not be set.
+ optional consensus.RaftPeerPB.MemberType member_type = 6;
}
// A row block in which each row is stored contiguously.
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index c16cc80..50fcd0d 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -2460,6 +2460,23 @@ RaftPeerPB::Role RaftConsensus::role() const {
return cmeta_->active_role();
}
+RaftConsensus::RoleAndMemberType RaftConsensus::GetRoleAndMemberType() const {
+ ThreadRestrictions::AssertWaitAllowed();
+
+ auto member_type = RaftPeerPB::UNKNOWN_MEMBER_TYPE;
+ const auto& local_peer_uuid = peer_uuid();
+
+ LockGuard l(lock_);
+ for (const auto& peer : cmeta_->ActiveConfig().peers()) {
+ if (peer.permanent_uuid() == local_peer_uuid) {
+ member_type = peer.member_type();
+ break;
+ }
+ }
+
+ return std::make_pair(cmeta_->active_role(), member_type);
+}
+
int64_t RaftConsensus::CurrentTerm() const {
LockGuard l(lock_);
return CurrentTermUnlocked();
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 5ad4f93..d418b44 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -142,6 +142,8 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
EXTERNAL_REQUEST
};
+ typedef std::pair<RaftPeerPB::Role, RaftPeerPB::MemberType> RoleAndMemberType;
+
~RaftConsensus();
// Factory method to construct and initialize a RaftConsensus instance.
@@ -317,6 +319,10 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
// Returns the current Raft role of this instance.
RaftPeerPB::Role role() const;
+ // Returns the current Raft role and member type of this instance.
+ // May return <UNKNOWN_ROLE, UNKNOWN_MEMBER_TYPE> if the information is not available.
+ RoleAndMemberType GetRoleAndMemberType() const;
+
// Returns the current term.
int64_t CurrentTerm() const;
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 26059bd..1bcc6de 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -89,6 +89,7 @@ SET_KUDU_TEST_LINK_LIBS(
ADD_KUDU_TEST(auto_rebalancer-test)
ADD_KUDU_TEST(catalog_manager-test)
+ADD_KUDU_TEST(dynamic_multi_master-test)
ADD_KUDU_TEST(hms_notification_log_listener-test)
ADD_KUDU_TEST(location_cache-test DATA_FILES ../scripts/first_argument.sh)
ADD_KUDU_TEST(master_options-test)
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 3798ec6..ea122f9 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -80,7 +80,6 @@
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/quorum_util.h"
-#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/basictypes.h"
@@ -139,6 +138,7 @@
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
+#include "kudu/util/net/net_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random_util.h"
#include "kudu/util/scoped_cleanup.h"
@@ -828,10 +828,11 @@ Status CatalogManager::Init(bool is_first_run) {
auto_rebalancer_ = std::move(task);
}
- RETURN_NOT_OK(master_->GetMasterHostPorts(&master_addresses_));
+ vector<HostPort> master_addresses;
+ RETURN_NOT_OK(master_->GetMasterHostPorts(&master_addresses));
if (hms::HmsCatalog::IsEnabled()) {
- string master_addresses = JoinMapped(
- master_addresses_,
+ string master_addresses_str = JoinMapped(
+ master_addresses,
[] (const HostPort& hostport) {
return Substitute("$0:$1", hostport.host(), hostport.port());
},
@@ -844,7 +845,7 @@ Status CatalogManager::Init(bool is_first_run) {
// holding leader_lock_, so this is the path of least resistance.
std::lock_guard<RWMutex> leader_lock_guard(leader_lock_);
- hms_catalog_.reset(new hms::HmsCatalog(std::move(master_addresses)));
+ hms_catalog_.reset(new hms::HmsCatalog(std::move(master_addresses_str)));
RETURN_NOT_OK_PREPEND(hms_catalog_->Start(HmsClientVerifyKuduSyncConfig::VERIFY),
"failed to start Hive Metastore catalog");
@@ -1414,6 +1415,12 @@ RaftPeerPB::Role CatalogManager::Role() const {
return consensus ? consensus->role() : RaftPeerPB::UNKNOWN_ROLE;
}
+RaftConsensus::RoleAndMemberType CatalogManager::GetRoleAndMemberType() const {
+ return IsInitialized() ?
+ sys_catalog_->tablet_replica()->shared_consensus()->GetRoleAndMemberType() :
+ std::make_pair(RaftPeerPB::UNKNOWN_ROLE, RaftPeerPB::UNKNOWN_MEMBER_TYPE);
+}
+
void CatalogManager::Shutdown() {
{
std::lock_guard<simple_spinlock> l(state_lock_);
@@ -5511,6 +5518,13 @@ const char* CatalogManager::StateToString(State state) {
__builtin_unreachable();
}
+const char* CatalogManager::ChangeConfigOpToString(ChangeConfigOp type) {
+ switch (type) {
+ case CatalogManager::kAddMaster: return "add";
+ }
+ __builtin_unreachable();
+}
+
void CatalogManager::ResetTableLocationsCache() {
const auto cache_capacity_bytes =
FLAGS_table_locations_cache_capacity_mb * 1024 * 1024;
@@ -5527,6 +5541,56 @@ void CatalogManager::ResetTableLocationsCache() {
VLOG(1) << "table locations cache has been reset";
}
+Status CatalogManager::InitiateMasterChangeConfig(ChangeConfigOp op, const HostPort& hp,
+ const std::string& uuid, rpc::RpcContext* rpc) {
+ auto consensus = master_consensus();
+ if (!consensus) {
+ return Status::IllegalState("Consensus not running");
+ }
+
+ consensus::ChangeConfigRequestPB req;
+ // Request is targeted to itself, the leader master.
+ req.set_dest_uuid(master_->fs_manager()->uuid());
+ req.set_tablet_id(sys_catalog()->tablet_id());
+ req.set_cas_config_opid_index(consensus->CommittedConfig().opid_index());
+ RaftPeerPB* peer = req.mutable_server();
+ peer->set_permanent_uuid(uuid);
+
+ switch (op) {
+ case CatalogManager::kAddMaster:
+ req.set_type(consensus::ADD_PEER);
+ *peer->mutable_last_known_addr() = HostPortToPB(hp);
+ // Adding the new master as a NON_VOTER that'll be promoted to VOTER once the tablet
+ // copy is complete and is sufficiently caught up.
+ peer->set_member_type(RaftPeerPB::NON_VOTER);
+ peer->mutable_attrs()->set_promote(true);
+ break;
+ default:
+ LOG(FATAL) << "Unsupported ChangeConfig operation: " << op;
+ }
+
+ const string op_str = ChangeConfigOpToString(op);
+ LOG(INFO) << Substitute("Initiating ChangeConfig request to $0 master $1: $2",
+ op_str, hp.ToString(), SecureDebugString(req));
+ auto completion_cb = [op_str, hp, rpc] (const Status& completion_status) {
+ if (completion_status.ok()) {
+ LOG(INFO) << Substitute("Successfully completed master ChangeConfig request to $0 master $1",
+ op_str, hp.ToString());
+ rpc->RespondSuccess();
+ } else {
+ LOG(WARNING) << Substitute("ChangeConfig request failed to $0 master $1: $2 ",
+ op_str, hp.ToString(), completion_status.ToString());
+ rpc->RespondFailure(completion_status);
+ }
+ };
+ boost::optional<TabletServerErrorPB::Code> err_code;
+ RETURN_NOT_OK_PREPEND(
+ consensus->ChangeConfig(req, completion_cb, &err_code),
+ Substitute("Failed initiating master Raft ChangeConfig request, error: $0",
+ err_code ? TabletServerErrorPB::Code_Name(*err_code) : "unknown"));
+ return Status::OK();
+}
+
////////////////////////////////////////////////////////////
// CatalogManager::TSInfosDict
////////////////////////////////////////////////////////////
@@ -5647,6 +5711,7 @@ bool CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedAndIsLeaderOrResp
INITTED_OR_RESPOND(ConnectToMasterResponsePB);
INITTED_OR_RESPOND(GetMasterRegistrationResponsePB);
INITTED_OR_RESPOND(TSHeartbeatResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(AddMasterResponsePB);
INITTED_AND_LEADER_OR_RESPOND(AlterTableResponsePB);
INITTED_AND_LEADER_OR_RESPOND(ChangeTServerStateResponsePB);
INITTED_AND_LEADER_OR_RESPOND(CreateTableResponsePB);
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 091aa70..9d19224 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -37,6 +37,7 @@
#include <sparsehash/dense_hash_map>
#include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
@@ -48,7 +49,6 @@
#include "kudu/util/cow_object.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
-#include "kudu/util/net/net_util.h"
#include "kudu/util/oid_generator.h"
#include "kudu/util/random.h"
#include "kudu/util/rw_mutex.h"
@@ -65,6 +65,7 @@ namespace kudu {
class AuthzTokenTest_TestSingleMasterUnavailable_Test;
class CreateTableStressTest_TestConcurrentCreateTableAndReloadMetadata_Test;
+class HostPort;
class MetricEntity;
class MetricRegistry;
class MonitoredTask;
@@ -82,7 +83,6 @@ class ServiceUnavailableRetryClientTest_CreateTable_Test;
} // namespace client
namespace consensus {
-class RaftConsensus;
class StartTabletCopyRequestPB;
} // namespace consensus
@@ -773,10 +773,16 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
const consensus::StartTabletCopyRequestPB* req,
std::function<void(const Status&, tserver::TabletServerErrorPB::Code)> cb) override;
- // Returns this CatalogManager's role in a consensus configuration. CatalogManager
- // must be initialized before calling this method.
+ // Returns this CatalogManager's role in a consensus configuration.
+ // CatalogManager is expected to be initialized before calling this method otherwise
+ // UNKNOWN_ROLE is returned.
consensus::RaftPeerPB::Role Role() const;
+ // Returns this CatalogManager's role and member type in a consensus configuration.
+ // CatalogManager is expected to be initialized before calling this method otherwise
+ // <UNKNOWN_ROLE, UNKNOWN_MEMBER_TYPE> is returned.
+ consensus::RaftConsensus::RoleAndMemberType GetRoleAndMemberType() const;
+
hms::HmsCatalog* hms_catalog() const {
return hms_catalog_.get();
}
@@ -799,9 +805,15 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
// name is returned.
static std::string NormalizeTableName(const std::string& table_name);
- const std::vector<HostPort>& master_addresses() const {
- return master_addresses_;
- }
+ enum ChangeConfigOp {
+ kAddMaster
+ };
+
+ // Add/remove a master specified by 'hp' and 'uuid' by initiating change config request.
+ // RpContext 'rpc' will be used to respond back to the client asynchronously.
+ // Returns the status of submission of the request.
+ Status InitiateMasterChangeConfig(ChangeConfigOp op, const HostPort& hp,
+ const std::string& uuid, rpc::RpcContext* rpc);
private:
// These tests call ElectedAsLeaderCb() directly.
@@ -1166,6 +1178,8 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
mutable simple_spinlock state_lock_;
State state_;
+ static const char* ChangeConfigOpToString(ChangeConfigOp type);
+
// Singleton pool that serializes invocations of ElectedAsLeaderCb().
std::unique_ptr<ThreadPool> leader_election_pool_;
@@ -1195,11 +1209,6 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
// Always acquire this lock before state_lock_.
RWMutex leader_lock_;
- // Cached information on master addresses. It's populated in Init() since the
- // membership of masters' Raft consensus is static (i.e. no new members are
- // added or any existing removed).
- std::vector<HostPort> master_addresses_;
-
// Async operations are accessing some private methods
// (TODO: this stuff should be deferred and done in the background thread)
friend class AsyncAlterTable;
diff --git a/src/kudu/master/dynamic_multi_master-test.cc b/src/kudu/master/dynamic_multi_master-test.cc
new file mode 100644
index 0000000..b8db27f
--- /dev/null
+++ b/src/kudu/master/dynamic_multi_master-test.cc
@@ -0,0 +1,657 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/client.h"
+#include "kudu/client/schema.h"
+#include "kudu/common/common.pb.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/master/master.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/master/sys_catalog.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+METRIC_DECLARE_histogram(log_gc_duration);
+METRIC_DECLARE_entity(tablet);
+
+using kudu::client::KuduClient;
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
+using kudu::client::sp::shared_ptr;
+using kudu::cluster::ExternalDaemonOptions;
+using kudu::cluster::ExternalMaster;
+using kudu::cluster::ExternalMiniCluster;
+using kudu::cluster::ExternalMiniClusterOptions;
+using kudu::cluster::MiniCluster;
+using kudu::rpc::RpcController;
+using std::set;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace master {
+
+static Status CreateTable(ExternalMiniCluster* cluster,
+ const std::string& table_name) {
+ shared_ptr<KuduClient> client;
+ RETURN_NOT_OK(cluster->CreateClient(nullptr, &client));
+ KuduSchema schema;
+ KuduSchemaBuilder b;
+ b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+ RETURN_NOT_OK(b.Build(&schema));
+ unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+ return table_creator->table_name(table_name)
+ .schema(&schema)
+ .set_range_partition_columns({ "key" })
+ .num_replicas(1)
+ .Create();
+}
+
+// Test class for testing addition/removal of masters to a Kudu cluster.
+class DynamicMultiMasterTest : public KuduTest {
+ public:
+ void SetUpWithNumMasters(int num_masters) {
+ // Initial number of masters in the cluster before adding a master.
+ orig_num_masters_ = num_masters;
+
+ // Reserving a port upfront for the new master that'll be added to the cluster.
+ ASSERT_OK(MiniCluster::ReserveDaemonSocket(MiniCluster::MASTER, orig_num_masters_ /* index */,
+ kDefaultBindMode, &reserved_socket_));
+
+ ASSERT_OK(reserved_socket_->GetSocketAddress(&reserved_addr_));
+ reserved_hp_ = HostPort(reserved_addr_);
+ reserved_hp_str_ = reserved_hp_.ToString();
+ }
+
+ void StartCluster(const vector<string>& extra_master_flags,
+ bool supply_single_master_addr = true) {
+ opts_.num_masters = orig_num_masters_;
+ opts_.supply_single_master_addr = supply_single_master_addr;
+ opts_.extra_master_flags = extra_master_flags;
+
+ cluster_.reset(new ExternalMiniCluster(opts_));
+ ASSERT_OK(cluster_->Start());
+ ASSERT_OK(cluster_->WaitForTabletServerCount(cluster_->num_tablet_servers(),
+ MonoDelta::FromSeconds(5)));
+ }
+
+ // Functor that takes a leader_master_idx and runs the desired master RPC against
+ // the leader master returning the RPC status and the optional MasterErrorPB::Code.
+ typedef std::function<
+ std::pair<Status, boost::optional<MasterErrorPB::Code>>(int leader_master_idx)> MasterRPC;
+
+ // Helper function that runs the master RPC against the leader master and retries the RPC
+ // if the expected leader master returns NOT_THE_LEADER error due to leadership change.
+ // Returns a single combined Status:
+ // - RPC return status if not OK.
+ // - IllegalState for a master response error other than NOT_THE_LEADER error.
+ // - TimedOut if all attempts to run the RPC against leader master are exhausted.
+ // - OK if the master RPC is successful.
+ Status RunLeaderMasterRPC(const MasterRPC& master_rpc, ExternalMiniCluster* cluster = nullptr) {
+ if (cluster == nullptr) {
+ cluster = cluster_.get();
+ }
+
+ int64_t time_left_to_sleep_msecs = 2000;
+ while (time_left_to_sleep_msecs > 0) {
+ int leader_master_idx;
+ RETURN_NOT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx));
+ const auto& rpc_result = master_rpc(leader_master_idx);
+ RETURN_NOT_OK(rpc_result.first);
+ const auto& master_error = rpc_result.second;
+ if (!master_error) {
+ return Status::OK();
+ }
+ if (master_error != MasterErrorPB::NOT_THE_LEADER) {
+ // Some other master error.
+ return Status::IllegalState(Substitute("Master error: $0"),
+ MasterErrorPB_Code_Name(*master_error));
+ }
+ // NOT_THE_LEADER error, so retry after some duration.
+ static const MonoDelta kSleepDuration = MonoDelta::FromMilliseconds(100);
+ SleepFor(kSleepDuration);
+ time_left_to_sleep_msecs -= kSleepDuration.ToMilliseconds();
+ }
+ return Status::TimedOut("Failed contacting the right leader master after multiple attempts");
+ }
+
+ // Run ListMasters RPC, retrying on leadership change, returning the response in 'resp'.
+ void RunListMasters(ListMastersResponsePB* resp, ExternalMiniCluster* cluster = nullptr) {
+ if (cluster == nullptr) {
+ cluster = cluster_.get();
+ }
+ auto list_masters = [&] (int leader_master_idx) {
+ ListMastersRequestPB req;
+ RpcController rpc;
+ Status s = cluster->master_proxy(leader_master_idx)->ListMasters(req, resp, &rpc);
+ boost::optional<MasterErrorPB::Code> err_code(resp->has_error(), resp->error().code());
+ return std::make_pair(s, err_code);
+ };
+ ASSERT_OK(RunLeaderMasterRPC(list_masters, cluster));
+ }
+
+ // Verify the cluster contains 'num_masters' and returns the master addresses in 'master_hps'.
+ void VerifyNumMastersAndGetAddresses(int num_masters, vector<HostPort>* master_hps) {
+ ListMastersResponsePB resp;
+ NO_FATALS(RunListMasters(&resp));
+ ASSERT_EQ(num_masters, resp.masters_size());
+ master_hps->clear();
+ for (const auto& master : resp.masters()) {
+ ASSERT_TRUE(master.role() == consensus::RaftPeerPB::LEADER ||
+ master.role() == consensus::RaftPeerPB::FOLLOWER);
+ ASSERT_EQ(consensus::RaftPeerPB::VOTER, master.member_type());
+ ASSERT_EQ(1, master.registration().rpc_addresses_size());
+ master_hps->emplace_back(HostPortFromPB(master.registration().rpc_addresses(0)));
+ }
+ }
+
+ // Brings up a new master where 'master_hps' contains master addresses including
+ // the new master to be added.
+ void StartNewMaster(const vector<HostPort>& master_hps,
+ bool master_supports_change_config = true) {
+ vector<string> master_addresses;
+ master_addresses.reserve(master_hps.size());
+ for (const auto& hp : master_hps) {
+ master_addresses.emplace_back(hp.ToString());
+ }
+
+ // Start with an existing master daemon's options, but modify them for use in a new master
+ ExternalDaemonOptions new_master_opts = cluster_->master(0)->opts();
+ const string new_master_id = Substitute("master-$0", orig_num_masters_);
+ new_master_opts.wal_dir = cluster_->GetWalPath(new_master_id);
+ new_master_opts.data_dirs = cluster_->GetDataPaths(new_master_id);
+ new_master_opts.log_dir = cluster_->GetLogPath(new_master_id);
+ new_master_opts.rpc_bind_address = reserved_hp_;
+ auto& flags = new_master_opts.extra_flags;
+ flags.insert(flags.end(),
+ {"--master_addresses=" + JoinStrings(master_addresses, ","),
+ "--master_address_add_new_master=" + reserved_hp_str_});
+
+ LOG(INFO) << "Bringing up the new master at: " << reserved_hp_str_;
+ new_master_.reset(new ExternalMaster(new_master_opts));
+ ASSERT_OK(new_master_->Start());
+ ASSERT_OK(new_master_->WaitForCatalogManager());
+
+ new_master_proxy_.reset(
+ new MasterServiceProxy(new_master_opts.messenger, reserved_addr_, reserved_addr_.host()));
+ {
+ GetMasterRegistrationRequestPB req;
+ GetMasterRegistrationResponsePB resp;
+ RpcController rpc;
+
+ ASSERT_OK(new_master_proxy_->GetMasterRegistration(req, &resp, &rpc));
+ ASSERT_FALSE(resp.has_error());
+ if (master_supports_change_config) {
+ ASSERT_EQ(consensus::RaftPeerPB::NON_VOTER, resp.member_type());
+ ASSERT_EQ(consensus::RaftPeerPB::LEARNER, resp.role());
+ } else {
+ // For a new master brought that doesn't support change config, it'll be started
+ // as a VOTER and become FOLLOWER if the other masters are reachable.
+ ASSERT_EQ(consensus::RaftPeerPB::VOTER, resp.member_type());
+ ASSERT_EQ(consensus::RaftPeerPB::FOLLOWER, resp.role());
+ }
+ }
+
+ // Verify the cluster still has the same number of masters.
+ ListMastersResponsePB resp;
+ NO_FATALS(RunListMasters(&resp));
+ ASSERT_EQ(orig_num_masters_, resp.masters_size());
+ }
+
+ // Adds the specified master to the cluster returning the appropriate error Status for negative
+ // test cases.
+ Status AddMasterToCluster(const HostPort& master) {
+ auto add_master = [&] (int leader_master_idx) {
+ AddMasterRequestPB req;
+ AddMasterResponsePB resp;
+ RpcController rpc;
+ if (master != HostPort()) {
+ *req.mutable_rpc_addr() = HostPortToPB(master);
+ }
+ rpc.RequireServerFeature(MasterFeatures::DYNAMIC_MULTI_MASTER);
+ Status s = cluster_->master_proxy(leader_master_idx)->AddMaster(req, &resp, &rpc);
+ boost::optional<MasterErrorPB::Code> err_code(resp.has_error(), resp.error().code());
+ return std::make_pair(s, err_code);
+ };
+
+ RETURN_NOT_OK(RunLeaderMasterRPC(add_master));
+ return cluster_->AddMaster(new_master_);
+ }
+
+ // Verify one of the 'expected_roles' and 'expected_member_type' of the new master by
+ // making RPC to it directly.
+ void VerifyNewMasterDirectly(const set<consensus::RaftPeerPB::Role>& expected_roles,
+ consensus::RaftPeerPB::MemberType expected_member_type) {
+ GetMasterRegistrationRequestPB req;
+ GetMasterRegistrationResponsePB resp;
+ RpcController rpc;
+
+ ASSERT_OK(new_master_proxy_->GetMasterRegistration(req, &resp, &rpc));
+ ASSERT_FALSE(resp.has_error());
+ ASSERT_TRUE(ContainsKey(expected_roles, resp.role()));
+ ASSERT_EQ(expected_member_type, resp.member_type());
+ }
+
+ protected:
+ // Tracks the current number of masters in the cluster
+ int orig_num_masters_;
+ ExternalMiniClusterOptions opts_;
+ unique_ptr<ExternalMiniCluster> cluster_;
+
+ // Socket, HostPort, proxy etc. for the new master to be added
+ unique_ptr<Socket> reserved_socket_;
+ Sockaddr reserved_addr_;
+ HostPort reserved_hp_;
+ string reserved_hp_str_;
+ unique_ptr<MasterServiceProxy> new_master_proxy_;
+ scoped_refptr<ExternalMaster> new_master_;
+};
+
+// Parameterized DynamicMultiMasterTest class that works with different initial number of masters.
+class ParameterizedDynamicMultiMasterTest : public DynamicMultiMasterTest,
+ public ::testing::WithParamInterface<int> {
+ public:
+ void SetUp() override {
+ NO_FATALS(SetUpWithNumMasters(GetParam()));
+ }
+};
+
+INSTANTIATE_TEST_CASE_P(, ParameterizedDynamicMultiMasterTest,
+ // Initial number of masters in the cluster before adding a new master
+ ::testing::Values(1, 2));
+
+// This test starts a cluster, creates a table and then adds a new master.
+// For a system catalog with little data, the new master can be caught up from WAL and
+// promoted to a VOTER without requiring tablet copy.
+TEST_P(ParameterizedDynamicMultiMasterTest, TestAddMasterCatchupFromWAL) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ NO_FATALS(StartCluster({"--master_support_change_config"}));
+
+ // Verify that masters are running as VOTERs and collect their addresses to be used
+ // for starting the new master.
+ vector<HostPort> master_hps;
+ NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+
+ const string kTableName = "first_table";
+ ASSERT_OK(CreateTable(cluster_.get(), kTableName));
+
+ // Bring up the new master and add to the cluster.
+ master_hps.emplace_back(reserved_hp_);
+ NO_FATALS(StartNewMaster(master_hps));
+ ASSERT_OK(AddMasterToCluster(reserved_hp_));
+
+ // Newly added master will be caught up from WAL itself without requiring tablet copy
+ // since the system catalog is fresh with a single table.
+ // Catching up from WAL and promotion to VOTER will not be instantly after adding the
+ // new master. Hence using ASSERT_EVENTUALLY.
+ ASSERT_EVENTUALLY([&] {
+ ListMastersResponsePB resp;
+ NO_FATALS(RunListMasters(&resp));
+ ASSERT_EQ(orig_num_masters_ + 1, resp.masters_size());
+
+ int num_leaders = 0;
+ for (const auto& master : resp.masters()) {
+ ASSERT_EQ(consensus::RaftPeerPB::VOTER, master.member_type());
+ ASSERT_TRUE(master.role() == consensus::RaftPeerPB::LEADER ||
+ master.role() == consensus::RaftPeerPB::FOLLOWER);
+ if (master.role() == consensus::RaftPeerPB::LEADER) {
+ num_leaders++;
+ }
+ }
+ ASSERT_EQ(1, num_leaders);
+ });
+
+ // Double check by directly contacting the new master.
+ VerifyNewMasterDirectly({ consensus::RaftPeerPB::FOLLOWER, consensus::RaftPeerPB::LEADER },
+ consensus::RaftPeerPB::VOTER);
+
+ // Adding the same master again should return an error.
+ {
+ Status s = AddMasterToCluster(reserved_hp_);
+ ASSERT_TRUE(s.IsRemoteError());
+ ASSERT_STR_CONTAINS(s.message().ToString(), "Master already present");
+ }
+
+ // Adding one of the former masters should return an error.
+ {
+ Status s = AddMasterToCluster(master_hps[0]);
+ ASSERT_TRUE(s.IsRemoteError());
+ ASSERT_STR_CONTAINS(s.message().ToString(), "Master already present");
+ }
+
+ // Shutdown the cluster and the new master daemon process.
+ // This allows ExternalMiniCluster to manage the newly added master and allows
+ // client to connect to the new master if it's elected the leader.
+ new_master_->Shutdown();
+ cluster_.reset();
+
+ opts_.num_masters = orig_num_masters_ + 1;
+ opts_.master_rpc_addresses = master_hps;
+ ExternalMiniCluster migrated_cluster(opts_);
+ ASSERT_OK(migrated_cluster.Start());
+ ASSERT_OK(migrated_cluster.WaitForTabletServerCount(migrated_cluster.num_tablet_servers(),
+ MonoDelta::FromSeconds(5)));
+
+ // Verify the cluster still has the same 3 masters.
+ {
+ ListMastersResponsePB resp;
+ NO_FATALS(RunListMasters(&resp, &migrated_cluster));
+ ASSERT_EQ(orig_num_masters_ + 1, resp.masters_size());
+
+ for (const auto& master : resp.masters()) {
+ ASSERT_EQ(consensus::RaftPeerPB::VOTER, master.member_type());
+ ASSERT_TRUE(master.role() == consensus::RaftPeerPB::LEADER ||
+ master.role() == consensus::RaftPeerPB::FOLLOWER);
+ ASSERT_EQ(1, master.registration().rpc_addresses_size());
+ HostPort actual_hp = HostPortFromPB(master.registration().rpc_addresses(0));
+ ASSERT_TRUE(std::find(master_hps.begin(), master_hps.end(), actual_hp) != master_hps.end());
+ }
+ }
+
+ shared_ptr<KuduClient> client;
+ ASSERT_OK(migrated_cluster.CreateClient(nullptr, &client));
+
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client->OpenTable(kTableName, &table));
+ ASSERT_EQ(0, CountTableRows(table.get()));
+
+ // Perform an operation that requires replication to masters.
+ ASSERT_OK(CreateTable(&migrated_cluster, "second_table"));
+ ASSERT_OK(client->OpenTable("second_table", &table));
+ ASSERT_EQ(0, CountTableRows(table.get()));
+
+ // Pause master one at a time and create table at the same time which will allow
+ // new leader to be elected if the paused master is a leader.
+ // Need at least 3 masters to form consensus and elect a new leader.
+ if (orig_num_masters_ + 1 >= 3) {
+ for (int i = 0; i < orig_num_masters_ + 1; i++) {
+ ASSERT_OK(migrated_cluster.master(i)->Pause());
+ cluster::ScopedResumeExternalDaemon resume_daemon(migrated_cluster.master(i));
+ ASSERT_OK(client->OpenTable(kTableName, &table));
+ ASSERT_EQ(0, CountTableRows(table.get()));
+
+ // See MasterFailoverTest.TestCreateTableSync to understand why we must
+ // check for IsAlreadyPresent as well.
+ Status s = CreateTable(&migrated_cluster, Substitute("table-$0", i));
+ ASSERT_TRUE(s.ok() || s.IsAlreadyPresent());
+ }
+ }
+}
+
+// This test starts a cluster with low values for log flush and segment size to force GC
+// of the system catalog WAL. When a new master is added, test verifies that the new master
+// can't be caught up from WAL and as a result the new master, though added to the master Raft
+// config, remains a NON_VOTER.
+TEST_P(ParameterizedDynamicMultiMasterTest, TestAddMasterCatchupFromWALNotPossible) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+
+ // Using low values of log flush threshold and segment size to trigger GC of the sys catalog WAL.
+ NO_FATALS(StartCluster({"--master_support_change_config", "--flush_threshold_secs=1",
+ "--log_segment_size_mb=1"}));
+
+ // Verify that masters are running as VOTERs and collect their addresses to be used
+ // for starting the new master.
+ vector<HostPort> master_hps;
+ NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+
+ auto get_sys_catalog_wal_gc_count = [&] {
+ int64_t sys_catalog_wal_gc_count = 0;
+ CHECK_OK(itest::GetInt64Metric(cluster_->master(0)->bound_http_hostport(),
+ &METRIC_ENTITY_tablet,
+ master::SysCatalogTable::kSysCatalogTabletId,
+ &METRIC_log_gc_duration,
+ "total_count",
+ &sys_catalog_wal_gc_count));
+ return sys_catalog_wal_gc_count;
+ };
+ auto orig_gc_count = get_sys_catalog_wal_gc_count();
+
+ // Create a bunch of tables to ensure sys catalog WAL gets GC'ed.
+ // Need to create around 1k tables even with lowest flush threshold and log segment size.
+ for (int i = 1; i < 1000; i++) {
+ string table_name = "Table.TestAddMasterCatchupFromWALNotPossible." + std::to_string(i);
+ ASSERT_OK(CreateTable(cluster_.get(), table_name));
+ }
+
+ int64_t time_left_to_sleep_msecs = 2000;
+ while (time_left_to_sleep_msecs > 0 && orig_gc_count == get_sys_catalog_wal_gc_count()) {
+ static const MonoDelta kSleepDuration = MonoDelta::FromMilliseconds(100);
+ SleepFor(kSleepDuration);
+ time_left_to_sleep_msecs -= kSleepDuration.ToMilliseconds();
+ }
+ ASSERT_GT(time_left_to_sleep_msecs, 0) << "Timed out waiting for system catalog WAL to be GC'ed";
+
+ // Bring up the new master and add to the cluster.
+ master_hps.emplace_back(reserved_hp_);
+ NO_FATALS(StartNewMaster(master_hps));
+ ASSERT_OK(AddMasterToCluster(reserved_hp_));
+
+ // Newly added master will be added to the master Raft config but won't be caught up
+ // from the WAL and hence remain as a NON_VOTER.
+ ListMastersResponsePB list_resp;
+ NO_FATALS(RunListMasters(&list_resp));
+ ASSERT_EQ(orig_num_masters_ + 1, list_resp.masters_size());
+
+ for (const auto& master : list_resp.masters()) {
+ ASSERT_EQ(1, master.registration().rpc_addresses_size());
+ auto hp = HostPortFromPB(master.registration().rpc_addresses(0));
+ if (hp == reserved_hp_) {
+ ASSERT_EQ(consensus::RaftPeerPB::NON_VOTER, master.member_type());
+ ASSERT_TRUE(master.role() == consensus::RaftPeerPB::LEARNER);
+ }
+ }
+
+ // Double check by directly contacting the new master.
+ VerifyNewMasterDirectly({ consensus::RaftPeerPB::LEARNER }, consensus::RaftPeerPB::NON_VOTER);
+
+ // Verify FAILED_UNRECOVERABLE health error about the new master that can't be caught up
+ // from WAL. This health state update may take some round trips between the masters and
+ // hence wrapping it under ASSERT_EVENTUALLY.
+ ASSERT_EVENTUALLY([&] {
+ // GetConsensusState() RPC can be made to any master and not necessarily the leader master.
+ int leader_master_idx;
+ ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx));
+ auto leader_master_addr = cluster_->master(leader_master_idx)->bound_rpc_addr();
+ consensus::ConsensusServiceProxy consensus_proxy(cluster_->messenger(), leader_master_addr,
+ leader_master_addr.host());
+ consensus::GetConsensusStateRequestPB req;
+ consensus::GetConsensusStateResponsePB resp;
+ RpcController rpc;
+ req.set_dest_uuid(cluster_->master(leader_master_idx)->uuid());
+ req.set_report_health(consensus::INCLUDE_HEALTH_REPORT);
+ ASSERT_OK(consensus_proxy.GetConsensusState(req, &resp, &rpc));
+ ASSERT_FALSE(resp.has_error());
+ ASSERT_EQ(1, resp.tablets_size());
+
+ // Lookup the new_master from the consensus state of the system catalog.
+ const auto& sys_catalog = resp.tablets(0);
+ ASSERT_EQ(master::SysCatalogTable::kSysCatalogTabletId, sys_catalog.tablet_id());
+ const auto& cstate = sys_catalog.cstate();
+ const auto& config = cstate.has_pending_config() ?
+ cstate.pending_config() : cstate.committed_config();
+ ASSERT_EQ(orig_num_masters_ + 1, config.peers_size());
+ int num_new_masters_found = 0;
+ for (const auto& peer : config.peers()) {
+ if (peer.permanent_uuid() == new_master_->uuid()) {
+ ASSERT_EQ(consensus::HealthReportPB::FAILED_UNRECOVERABLE,
+ peer.health_report().overall_health());
+ num_new_masters_found++;
+ }
+ }
+ ASSERT_EQ(1, num_new_masters_found);
+ });
+}
+
+// Test that brings up a single master cluster with 'last_known_addr' not populated by
+// not specifying '--master_addresses' and then attempts to add a new master which is
+// expected to fail due to invalid Raft config.
+TEST_F(DynamicMultiMasterTest, TestAddMasterWithNoLastKnownAddr) {
+ NO_FATALS(SetUpWithNumMasters(1));
+ NO_FATALS(
+ StartCluster({"--master_support_change_config"}, false /* supply_single_master_addr */));
+
+ // Verify that existing masters are running as VOTERs and collect their addresses to be used
+ // for starting the new master.
+ vector<HostPort> master_hps;
+ NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+
+ // Bring up the new master and add to the cluster.
+ master_hps.emplace_back(reserved_hp_);
+ NO_FATALS(StartNewMaster(master_hps));
+
+ Status actual = AddMasterToCluster(reserved_hp_);
+ ASSERT_TRUE(actual.IsRemoteError());
+ ASSERT_STR_MATCHES(actual.ToString(),
+ "Invalid config to set as pending: Peer:.* has no address");
+
+ // Verify no change in number of masters.
+ NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+}
+
+// Test that attempts to add a new master without enabling the feature flag for master Raft
+// change config.
+TEST_F(DynamicMultiMasterTest, TestAddMasterFeatureFlagNotSpecified) {
+ NO_FATALS(SetUpWithNumMasters(1));
+ NO_FATALS(StartCluster({ /* Omitting "--master_support_change_config" */ }));
+
+ // Verify that existing masters are running as VOTERs and collect their addresses to be used
+ // for starting the new master.
+ vector<HostPort> master_hps;
+ NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+
+ // Bring up the new master and add to the cluster.
+ master_hps.emplace_back(reserved_hp_);
+ NO_FATALS(StartNewMaster(master_hps, false /* master_supports_change_config */));
+
+ Status actual = AddMasterToCluster(reserved_hp_);
+ ASSERT_TRUE(actual.IsRemoteError());
+ ASSERT_STR_MATCHES(actual.ToString(), "unsupported feature flags");
+
+ // Verify no change in number of masters.
+ NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+}
+
+// Test that attempts to request a non-leader master to add a new master.
+TEST_F(DynamicMultiMasterTest, TestAddMasterToNonLeader) {
+ NO_FATALS(SetUpWithNumMasters(2));
+ NO_FATALS(StartCluster({"--master_support_change_config"}));
+
+ // Verify that existing masters are running as VOTERs and collect their addresses to be used
+ // for starting the new master.
+ vector<HostPort> master_hps;
+ NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+
+ // Bring up the new master and add to the cluster.
+ master_hps.emplace_back(reserved_hp_);
+ NO_FATALS(StartNewMaster(master_hps));
+
+ // Verify sending add master request to a non-leader master returns NOT_THE_LEADER error.
+ // It's possible there is a leadership change between querying for leader master and
+ // sending the add master request to non-leader master and hence using ASSERT_EVENTUALLY.
+ ASSERT_EVENTUALLY([&] {
+ AddMasterRequestPB req;
+ AddMasterResponsePB resp;
+ RpcController rpc;
+ *req.mutable_rpc_addr() = HostPortToPB(reserved_hp_);
+ rpc.RequireServerFeature(MasterFeatures::DYNAMIC_MULTI_MASTER);
+
+ int leader_master_idx;
+ ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx));
+ ASSERT_TRUE(leader_master_idx == 0 || leader_master_idx == 1);
+ int non_leader_master_idx = !leader_master_idx;
+ ASSERT_OK(cluster_->master_proxy(non_leader_master_idx)->AddMaster(req, &resp, &rpc));
+ ASSERT_TRUE(resp.has_error());
+ ASSERT_EQ(MasterErrorPB::NOT_THE_LEADER, resp.error().code());
+ });
+
+ // Verify no change in number of masters.
+ NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+}
+
+// Test that attempts to add a master with missing master address and a non-routable incorrect
+// address.
+TEST_F(DynamicMultiMasterTest, TestAddMasterMissingAndIncorrectAddress) {
+ NO_FATALS(SetUpWithNumMasters(1));
+ NO_FATALS(StartCluster({"--master_support_change_config"}));
+
+ // Verify that existing masters are running as VOTERs and collect their addresses to be used
+ // for starting the new master.
+ vector<HostPort> master_hps;
+ NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+
+ // Bring up the new master and add to the cluster.
+ master_hps.emplace_back(reserved_hp_);
+ NO_FATALS(StartNewMaster(master_hps));
+
+ // Empty HostPort
+ Status actual = AddMasterToCluster(HostPort());
+ ASSERT_TRUE(actual.IsRemoteError());
+ ASSERT_STR_CONTAINS(actual.ToString(), "RPC address of master to be added not supplied");
+
+ // Non-routable incorrect hostname.
+ actual = AddMasterToCluster(HostPort("foo", Master::kDefaultPort));
+ ASSERT_TRUE(actual.IsRemoteError());
+ ASSERT_STR_CONTAINS(actual.ToString(),
+ "Network error: unable to resolve address for foo");
+
+ // Verify no change in number of masters.
+ NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+}
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 6efe084..764550c 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -106,6 +106,12 @@ using std::vector;
using strings::Substitute;
namespace kudu {
+namespace rpc {
+class RpcContext;
+} // namespace rpc
+} // namespace kudu
+
+namespace kudu {
namespace master {
namespace {
@@ -147,6 +153,9 @@ Status GetMasterEntryForHost(const shared_ptr<rpc::Messenger>& messenger,
if (resp.has_cluster_id()) {
e->set_cluster_id(resp.cluster_id());
}
+ if (resp.has_member_type()) {
+ e->set_member_type(resp.member_type());
+ }
return Status::OK();
}
} // anonymous namespace
@@ -336,23 +345,27 @@ void Master::ShutdownImpl() {
}
Status Master::ListMasters(vector<ServerEntryPB>* masters) const {
- if (!opts_.IsDistributed()) {
+ auto consensus = catalog_manager_->master_consensus();
+ if (!consensus) {
+ return Status::IllegalState("consensus not running");
+ }
+ const auto config = consensus->CommittedConfig();
+ masters->clear();
+ DCHECK_GE(config.peers_size(), 1);
+ // Optimized code path that doesn't involve reaching out to other
+ // masters over network for single master configuration.
+ if (config.peers_size() == 1) {
ServerEntryPB local_entry;
local_entry.mutable_instance_id()->CopyFrom(catalog_manager_->NodeInstance());
RETURN_NOT_OK(GetMasterRegistration(local_entry.mutable_registration()));
local_entry.set_role(RaftPeerPB::LEADER);
local_entry.set_cluster_id(cluster_id_);
+ local_entry.set_member_type(RaftPeerPB::VOTER);
masters->emplace_back(std::move(local_entry));
return Status::OK();
}
- auto consensus = catalog_manager_->master_consensus();
- if (!consensus) {
- return Status::IllegalState("consensus not running");
- }
- const auto config = consensus->CommittedConfig();
-
- masters->clear();
+ // For distributed master configuration.
for (const auto& peer : config.peers()) {
HostPort hp = HostPortFromPB(peer.last_known_addr());
ServerEntryPB peer_entry;
@@ -403,5 +416,25 @@ Status Master::GetMasterHostPorts(vector<HostPort>* hostports) const {
return Status::OK();
}
+Status Master::AddMaster(const HostPort& hp, rpc::RpcContext* rpc) {
+ // Ensure requested master to be added is not already part of list of masters.
+ vector<HostPort> masters;
+ // Here the check is made against committed config with voters only.
+ RETURN_NOT_OK(GetMasterHostPorts(&masters));
+ if (std::find(masters.begin(), masters.end(), hp) != masters.end()) {
+ return Status::AlreadyPresent("Master already present");
+ }
+
+ // Check whether the master to be added is reachable and fetch its uuid.
+ ServerEntryPB peer_entry;
+ RETURN_NOT_OK(GetMasterEntryForHost(messenger_, hp, &peer_entry));
+ const auto& peer_uuid = peer_entry.instance_id().permanent_uuid();
+
+ // No early validation for whether a config change is in progress.
+ // If it's in progress, on initiating config change Raft will return error.
+ return catalog_manager()->InitiateMasterChangeConfig(CatalogManager::kAddMaster, hp, peer_uuid,
+ rpc);
+}
+
} // namespace master
} // namespace kudu
diff --git a/src/kudu/master/master.h b/src/kudu/master/master.h
index 6a59435..e922e9d 100644
--- a/src/kudu/master/master.h
+++ b/src/kudu/master/master.h
@@ -36,6 +36,9 @@ class HostPort;
class MaintenanceManager;
class MonoDelta;
class ThreadPool;
+namespace rpc {
+class RpcContext;
+} // namespace rpc
namespace master {
class LocationCache;
@@ -104,7 +107,7 @@ class Master : public kserver::KuduServer {
// request.
Status ListMasters(std::vector<ServerEntryPB>* masters) const;
- // Gets the HostPorts for all of the masters in the cluster.
+ // Gets the HostPorts for all of the VOTER masters in the cluster.
// This is not as complete as ListMasters() above, but operates just
// based on local state.
Status GetMasterHostPorts(std::vector<HostPort>* hostports) const;
@@ -113,6 +116,11 @@ class Master : public kserver::KuduServer {
return state_ == kStopped;
}
+ // Adds the master specified by 'hp' by initiating change config request.
+ // RpContext 'rpc' will be used to respond back to the client asynchronously.
+ // Returns the status of the master addition request.
+ Status AddMaster(const HostPort& hp, rpc::RpcContext* rpc);
+
MaintenanceManager* maintenance_manager() {
return maintenance_manager_.get();
}
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index edd36d1..b569686 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -909,6 +909,14 @@ message ChangeTServerStateResponsePB {
optional MasterErrorPB error = 1;
}
+message AddMasterRequestPB {
+ optional HostPortPB rpc_addr = 1;
+}
+
+message AddMasterResponsePB {
+ optional MasterErrorPB error = 1;
+}
+
// GetMasterRegistrationRequest/Response: get the instance id and
// HTTP/RPC addresses for this Master server.
message GetMasterRegistrationRequestPB {
@@ -930,6 +938,9 @@ message GetMasterRegistrationResponsePB {
// The unique cluster ID of the cluster this server belongs too.
optional string cluster_id = 5;
+
+ // This server's membership type in the consensus configuration.
+ optional consensus.RaftPeerPB.MemberType member_type = 6;
}
// ListMastersRequest/Response: get information about all of the known
@@ -986,6 +997,8 @@ enum MasterFeatures {
REPLICA_MANAGEMENT = 4;
// The master supports generating and dispensing authz tokens.
GENERATE_AUTHZ_TOKEN = 5;
+ // The master supports dynamic addition/removal of masters
+ DYNAMIC_MULTI_MASTER = 6;
}
service MasterService {
@@ -1066,6 +1079,11 @@ service MasterService {
option (kudu.rpc.authz_method) = "AuthorizeSuperUser";
}
+ // Add a new master to existing cluster.
+ rpc AddMaster(AddMasterRequestPB) returns (AddMasterResponsePB) {
+ option (kudu.rpc.authz_method) = "AuthorizeSuperUser";
+ }
+
// Master->Master RPCs
// ------------------------------------------------------------
diff --git a/src/kudu/master/master_options.cc b/src/kudu/master/master_options.cc
index 378e49e..e07dfbb 100644
--- a/src/kudu/master/master_options.cc
+++ b/src/kudu/master/master_options.cc
@@ -44,13 +44,13 @@ MasterOptions::MasterOptions() {
if (!FLAGS_master_addresses.empty()) {
Status s = HostPort::ParseStrings(FLAGS_master_addresses, Master::kDefaultPort,
- &master_addresses);
+ &master_addresses_);
if (!s.ok()) {
LOG(FATAL) << "Couldn't parse the master_addresses flag('" << FLAGS_master_addresses << "'): "
<< s.ToString();
}
// TODO(wdberkeley): Un-actionable warning. Link to docs, once they exist.
- if (master_addresses.size() == 2) {
+ if (master_addresses_.size() == 2) {
LOG(WARNING) << "Only 2 masters are specified by master_addresses_flag ('" <<
FLAGS_master_addresses << "'), but minimum of 3 are required to tolerate failures"
" of any one master. It is recommended to use at least 3 masters.";
@@ -58,18 +58,14 @@ MasterOptions::MasterOptions() {
}
}
-bool MasterOptions::IsDistributed() const {
- return master_addresses.size() > 1;
-}
-
Status MasterOptions::GetTheOnlyMasterAddress(HostPort* hp) const {
if (IsDistributed()) {
return Status::IllegalState("Not a single master configuration");
}
- if (master_addresses.empty()) {
+ if (master_addresses_.empty()) {
return Status::NotFound("Master address not specified");
}
- *hp = master_addresses.front();
+ *hp = master_addresses_.front();
return Status::OK();
}
diff --git a/src/kudu/master/master_options.h b/src/kudu/master/master_options.h
index dad9072..09b7b0b 100644
--- a/src/kudu/master/master_options.h
+++ b/src/kudu/master/master_options.h
@@ -16,6 +16,7 @@
// under the License.
#pragma once
+#include <utility>
#include <vector>
#include "kudu/kserver/kserver_options.h"
@@ -23,6 +24,7 @@
#include "kudu/util/status.h"
namespace kudu {
+
namespace master {
// Options for constructing the master.
@@ -31,13 +33,30 @@ namespace master {
struct MasterOptions : public kserver::KuduServerOptions {
MasterOptions();
- std::vector<HostPort> master_addresses;
+ // Fetch master addresses from the user supplied gflags which may be empty for single
+ // master configuration.
+ // Note: Only to be used during master init time as masters can be added/removed dynamically.
+ // Use Master::GetMasterHostPorts() instead after initializing the master at runtime.
+ const std::vector<HostPort>& master_addresses() const {
+ return master_addresses_;
+ }
- bool IsDistributed() const;
+ // Only to be used only during init time as masters can be added/removed dynamically.
+ bool IsDistributed() const {
+ return master_addresses_.size() > 1;
+ }
// For a single master configuration output the only master address in 'hp', if available.
// Otherwise NotFound error or IllegalState for distributed master config.
Status GetTheOnlyMasterAddress(HostPort* hp) const;
+
+ // Allows setting/overwriting list of masters. Only to be used by tests.
+ void SetMasterAddressesForTests(std::vector<HostPort> addresses) {
+ master_addresses_ = std::move(addresses);
+ }
+
+ private:
+ std::vector<HostPort> master_addresses_;
};
} // namespace master
diff --git a/src/kudu/master/master_path_handlers.cc b/src/kudu/master/master_path_handlers.cc
index fd281aa..32bafdf 100644
--- a/src/kudu/master/master_path_handlers.cc
+++ b/src/kudu/master/master_path_handlers.cc
@@ -53,7 +53,6 @@
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
-#include "kudu/master/master_options.h"
#include "kudu/master/sys_catalog.h"
#include "kudu/master/table_metrics.h"
#include "kudu/master/ts_descriptor.h"
@@ -831,17 +830,23 @@ pair<string, string> MasterPathHandlers::TSDescToLinkPair(const TSDescriptor& de
}
string MasterPathHandlers::MasterAddrsToCsv() const {
- if (!master_->opts().master_addresses.empty()) {
+ vector<HostPort> master_addresses;
+ Status s = master_->GetMasterHostPorts(&master_addresses);
+ LOG(WARNING) << "Unable to fetch master addresses: " << s.ToString();
+ if (!s.ok()) {
+ return string();
+ }
+ if (!master_addresses.empty()) {
vector<string> all_addresses;
- all_addresses.reserve(master_->opts().master_addresses.size());
- for (const HostPort& hp : master_->opts().master_addresses) {
+ all_addresses.reserve(master_addresses.size());
+ for (const HostPort& hp : master_addresses) {
all_addresses.push_back(hp.ToString());
}
return JoinElements(all_addresses, ",");
}
Sockaddr addr = master_->first_rpc_address();
HostPort hp;
- Status s = HostPortFromSockaddrReplaceWildcard(addr, &hp);
+ s = HostPortFromSockaddrReplaceWildcard(addr, &hp);
if (s.ok()) {
return hp.ToString();
}
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 3eea33d..c8533a8 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -55,6 +55,7 @@
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/scoped_cleanup.h"
@@ -95,6 +96,12 @@ DEFINE_bool(master_support_authz_tokens, true,
"testing version compatibility in the client.");
TAG_FLAG(master_support_authz_tokens, hidden);
+DEFINE_bool(master_support_change_config, false,
+ "Whether the master supports adding/removing master servers dynamically.");
+TAG_FLAG(master_support_change_config, hidden);
+TAG_FLAG(master_support_change_config, unsafe);
+
+
using google::protobuf::Message;
using kudu::consensus::ReplicaManagementInfoPB;
using kudu::pb_util::SecureDebugString;
@@ -229,6 +236,36 @@ void MasterServiceImpl::ChangeTServerState(const ChangeTServerStateRequestPB* re
rpc->RespondSuccess();
}
+void MasterServiceImpl::AddMaster(const AddMasterRequestPB* req,
+ AddMasterResponsePB* resp,
+ rpc::RpcContext* rpc) {
+ if (!FLAGS_master_support_change_config) {
+ rpc->RespondFailure(Status::NotSupported("Adding master is not supported"));
+ return;
+ }
+
+ if (!req->has_rpc_addr()) {
+ rpc->RespondFailure(Status::InvalidArgument("RPC address of master to be added not supplied"));
+ return;
+ }
+
+ CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+ if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
+ return;
+ }
+
+ Status s = server_->AddMaster(HostPortFromPB(req->rpc_addr()), rpc);
+ if (!s.ok()) {
+ LOG(ERROR) << Substitute("Failed adding master $0:$1. $2", req->rpc_addr().host(),
+ req->rpc_addr().port(), s.ToString());
+ rpc->RespondFailure(s);
+ return;
+ }
+ // ChangeConfig request successfully submitted. Once the ChangeConfig request is complete
+ // the completion callback will respond back with the result to the RPC client.
+ // See completion_cb in CatalogManager::InitiateMasterChangeConfig().
+}
+
void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
TSHeartbeatResponsePB* resp,
rpc::RpcContext* rpc) {
@@ -618,7 +655,9 @@ void MasterServiceImpl::GetMasterRegistration(const GetMasterRegistrationRequest
Status s = server_->GetMasterRegistration(resp->mutable_registration());
CheckRespErrorOrSetUnknown(s, resp);
- resp->set_role(server_->catalog_manager()->Role());
+ const auto& role_and_member = server_->catalog_manager()->GetRoleAndMemberType();
+ resp->set_role(role_and_member.first);
+ resp->set_member_type(role_and_member.second);
resp->set_cluster_id(server_->cluster_id());
rpc->RespondSuccess();
}
@@ -633,7 +672,14 @@ void MasterServiceImpl::ConnectToMaster(const ConnectToMasterRequestPB* /*req*/,
// Set the info about the other masters, so that the client can verify
// it has the full set of info.
- const auto& addresses = server_->catalog_manager()->master_addresses();
+ vector<HostPort> addresses;
+ Status s = server_->GetMasterHostPorts(&addresses);
+ if (!s.ok()) {
+ StatusToPB(s, resp->mutable_error()->mutable_status());
+ resp->mutable_error()->set_code(MasterErrorPB::UNKNOWN_ERROR);
+ rpc->RespondSuccess();
+ return;
+ }
resp->mutable_master_addrs()->Reserve(addresses.size());
for (const auto& hp : addresses) {
*resp->add_master_addrs() = HostPortToPB(hp);
@@ -746,6 +792,8 @@ bool MasterServiceImpl::SupportsFeature(uint32_t feature) const {
return FLAGS_master_support_authz_tokens;
case MasterFeatures::CONNECT_TO_MASTER:
return FLAGS_master_support_connect_to_master_rpc;
+ case MasterFeatures::DYNAMIC_MULTI_MASTER:
+ return FLAGS_master_support_change_config;
default:
return false;
}
diff --git a/src/kudu/master/master_service.h b/src/kudu/master/master_service.h
index e5ba019..9a066ba 100644
--- a/src/kudu/master/master_service.h
+++ b/src/kudu/master/master_service.h
@@ -36,6 +36,8 @@ class RpcContext;
namespace master {
+class AddMasterRequestPB;
+class AddMasterResponsePB;
class AlterTableRequestPB;
class AlterTableResponsePB;
class ChangeTServerStateRequestPB;
@@ -69,10 +71,10 @@ class ListTabletServersResponsePB;
class Master;
class PingRequestPB;
class PingResponsePB;
-class ReplaceTabletRequestPB;
-class ReplaceTabletResponsePB;
class RefreshAuthzCacheRequestPB;
class RefreshAuthzCacheResponsePB;
+class ReplaceTabletRequestPB;
+class ReplaceTabletResponsePB;
class TSHeartbeatRequestPB;
class TSHeartbeatResponsePB;
@@ -104,6 +106,9 @@ class MasterServiceImpl : public MasterServiceIf {
ChangeTServerStateResponsePB* resp,
rpc::RpcContext* rpc) override;
+ void AddMaster(const AddMasterRequestPB* req,
+ AddMasterResponsePB* resp, rpc::RpcContext* rpc) override;
+
void Ping(const PingRequestPB* req,
PingResponsePB* resp,
rpc::RpcContext* rpc) override;
diff --git a/src/kudu/master/mini_master.cc b/src/kudu/master/mini_master.cc
index c6cc092..19c2681 100644
--- a/src/kudu/master/mini_master.cc
+++ b/src/kudu/master/mini_master.cc
@@ -78,7 +78,7 @@ MiniMaster::~MiniMaster() {
void MiniMaster::SetMasterAddresses(vector<HostPort> master_addrs) {
CHECK(!master_);
- opts_.master_addresses = std::move(master_addrs);
+ opts_.SetMasterAddressesForTests(std::move(master_addrs));
}
Status MiniMaster::Start() {
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 1ee8b20..fa73b26 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -50,7 +50,6 @@
#include "kudu/consensus/consensus_peers.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
-#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/raft_consensus.h"
@@ -89,6 +88,12 @@ DEFINE_double(sys_catalog_fail_during_write, 0.0,
"Fraction of the time when system table writes will fail");
TAG_FLAG(sys_catalog_fail_during_write, hidden);
+DEFINE_string(master_address_add_new_master, "",
+ "Address of master to add as a NON_VOTER on creating a distributed master config.");
+TAG_FLAG(master_address_add_new_master, unsafe);
+TAG_FLAG(master_address_add_new_master, hidden);
+
+DECLARE_bool(master_support_change_config);
DECLARE_int64(rpc_max_message_size);
METRIC_DEFINE_counter(server, sys_catalog_oversized_write_requests,
@@ -266,10 +271,11 @@ Status SysCatalogTable::Load(FsManager *fs_manager) {
// Make sure the set of masters passed in at start time matches the set in
// the on-disk cmeta.
set<string> peer_addrs_from_opts;
- for (const auto& hp : master_->opts().master_addresses) {
+ const auto& master_addresses = master_->opts().master_addresses();
+ for (const auto& hp : master_addresses) {
peer_addrs_from_opts.insert(hp.ToString());
}
- if (peer_addrs_from_opts.size() < master_->opts().master_addresses.size()) {
+ if (peer_addrs_from_opts.size() < master_addresses.size()) {
LOG(WARNING) << Substitute("Found duplicates in --master_addresses: "
"the unique set of addresses is $0",
JoinStrings(peer_addrs_from_opts, ", "));
@@ -358,11 +364,18 @@ Status SysCatalogTable::CreateDistributedConfig(const MasterOptions& options,
new_config.set_opid_index(consensus::kInvalidOpIdIndex);
// Build the set of followers from our server options.
- for (const HostPort& host_port : options.master_addresses) {
+ for (const HostPort& host_port : options.master_addresses()) {
RaftPeerPB peer;
HostPortPB peer_host_port_pb = HostPortToPB(host_port);
peer.mutable_last_known_addr()->CopyFrom(peer_host_port_pb);
- peer.set_member_type(RaftPeerPB::VOTER);
+ // Adding new master as a NON_VOTER to ensure it doesn't become a leader on creating
+ // distributed config and also helps with replacing a dead master at the same hostport.
+ if (FLAGS_master_support_change_config &&
+ FLAGS_master_address_add_new_master == host_port.ToString()) {
+ peer.set_member_type(RaftPeerPB::NON_VOTER);
+ } else {
+ peer.set_member_type(RaftPeerPB::VOTER);
+ }
new_config.add_peers()->CopyFrom(peer);
}
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index c6167e4..3da89a7 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -112,6 +112,7 @@ static double kMasterCatalogManagerTimeoutSeconds = 60.0;
ExternalMiniClusterOptions::ExternalMiniClusterOptions()
: num_masters(1),
+ supply_single_master_addr(false),
num_tablet_servers(1),
bind_mode(kDefaultBindMode),
num_data_dirs(1),
@@ -498,7 +499,7 @@ Status ExternalMiniCluster::StartMasters() {
// Setting --master_addresses flag for a single master configuration is now supported but not
// mandatory. Not setting the flag helps test existing kudu deployments that don't specify
// the --master_addresses flag for single master configuration.
- if (num_masters > 1) {
+ if (num_masters > 1 || opts_.supply_single_master_addr) {
flags.emplace_back(Substitute("--master_addresses=$0",
HostPort::ToCommaSeparatedString(master_rpc_addrs)));
}
@@ -923,6 +924,20 @@ string ExternalMiniCluster::UuidForTS(int ts_idx) const {
return tablet_server(ts_idx)->uuid();
}
+Status ExternalMiniCluster::AddMaster(const scoped_refptr<ExternalMaster>& new_master) {
+ const auto& new_master_uuid = new_master->instance_id().permanent_uuid();
+ for (const auto& m : masters_) {
+ if (m->instance_id().permanent_uuid() == new_master_uuid) {
+ CHECK(m->bound_rpc_hostport() == new_master->bound_rpc_hostport());
+ return Status::AlreadyPresent(Substitute(
+ "Master $0, uuid: $1 already present in ExternalMiniCluster",
+ m->bound_rpc_hostport().ToString(), new_master_uuid));
+ }
+ }
+ masters_.emplace_back(new_master);
+ return Status::OK();
+}
+
//------------------------------------------------------------
// ExternalDaemon
//------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
index d4a1da2..6d16f3c 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -130,6 +130,10 @@ struct ExternalMiniClusterOptions {
// Default: 1.
int num_masters;
+ // Whether to supply 'master_addresses' field for single master configuration.
+ // Default: False
+ bool supply_single_master_addr;
+
// Number of TS to start.
//
// Default: 1.
@@ -460,6 +464,10 @@ class ExternalMiniCluster : public MiniCluster {
// files that reside in the log dir.
std::string GetLogPath(const std::string& daemon_id) const;
+ // Adds a master to the ExternalMiniCluster when the new master has been added
+ // dynamically after bringing up the ExternalMiniCluster.
+ Status AddMaster(const scoped_refptr<ExternalMaster>& master);
+
private:
Status StartMasters();
diff --git a/src/kudu/tools/tool_action_master.cc b/src/kudu/tools/tool_action_master.cc
index 3a0a24f..05a2d75 100644
--- a/src/kudu/tools/tool_action_master.cc
+++ b/src/kudu/tools/tool_action_master.cc
@@ -189,6 +189,10 @@ Status ListMasters(const RunnerContext& context) {
for (const auto& master : masters) {
values.emplace_back(RaftPeerPB::Role_Name(master.role()));
}
+ } else if (boost::iequals(column, "member_type")) {
+ for (const auto& master : masters) {
+ values.emplace_back(RaftPeerPB::MemberType_Name(master.member_type()));
+ }
} else {
return Status::InvalidArgument("unknown column (--columns)", column);
}