You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ba...@apache.org on 2020/09/24 18:07:34 UTC

[kudu] branch master updated: [master] KUDU-2181 Raft ChangeConfig request to add a master

This is an automated email from the ASF dual-hosted git repository.

bankim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fc6c77  [master] KUDU-2181 Raft ChangeConfig request to add a master
9fc6c77 is described below

commit 9fc6c772aff2bbfa008f52da8e7ba6e2c2d9012f
Author: Bankim Bhavsar <ba...@cloudera.com>
AuthorDate: Fri Jul 24 16:33:25 2020 -0700

    [master] KUDU-2181 Raft ChangeConfig request to add a master
    
    This change:
    - Adds hidden feature flag "--master_support_change_config" off by
      default.
    - RPC changes to add a master that initiates Raft config change and
      responds asynchronously.
    - RPC changes to report back member type(VOTER/NON_VOTER) of masters
    - Removes the cached master_addresses in catalog manager as it's no
      longer static.
    - Updates and adds comments in MasterOptions such that it's used to
      fetch master addresses only during master init time as masters can
      be added/removed dynamically with this change.
    - Updates ListMasters() to look at local Raft config instead of
      MasterOptions as the masters can be added/removed dynamically.
    
    If the new master can be caught up from the WAL then the master
    gets promoted to VOTER else it remains as NON_VOTER without master
    tablet copying support.
    
    Change-Id: I0ac7e6e55220bcb01cad0fa386daaf656258088c
    Reviewed-on: http://gerrit.cloudera.org:8080/16321
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Bankim Bhavsar <ba...@cloudera.com>
---
 src/kudu/common/wire_protocol.proto            |   6 +-
 src/kudu/consensus/raft_consensus.cc           |  17 +
 src/kudu/consensus/raft_consensus.h            |   6 +
 src/kudu/master/CMakeLists.txt                 |   1 +
 src/kudu/master/catalog_manager.cc             |  75 ++-
 src/kudu/master/catalog_manager.h              |  33 +-
 src/kudu/master/dynamic_multi_master-test.cc   | 657 +++++++++++++++++++++++++
 src/kudu/master/master.cc                      |  49 +-
 src/kudu/master/master.h                       |  10 +-
 src/kudu/master/master.proto                   |  18 +
 src/kudu/master/master_options.cc              |  12 +-
 src/kudu/master/master_options.h               |  23 +-
 src/kudu/master/master_path_handlers.cc        |  15 +-
 src/kudu/master/master_service.cc              |  52 +-
 src/kudu/master/master_service.h               |   9 +-
 src/kudu/master/mini_master.cc                 |   2 +-
 src/kudu/master/sys_catalog.cc                 |  23 +-
 src/kudu/mini-cluster/external_mini_cluster.cc |  17 +-
 src/kudu/mini-cluster/external_mini_cluster.h  |   8 +
 src/kudu/tools/tool_action_master.cc           |   4 +
 20 files changed, 984 insertions(+), 53 deletions(-)

diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto
index 59682ba..837acc0 100644
--- a/src/kudu/common/wire_protocol.proto
+++ b/src/kudu/common/wire_protocol.proto
@@ -115,13 +115,17 @@ message ServerEntryPB {
   optional NodeInstancePB instance_id = 2;
   optional ServerRegistrationPB registration = 3;
 
-  // If an error has occured earlier in the RPC call, the role
+  // If an error has occurred earlier in the RPC call, the role
   // may be not be set.
   optional consensus.RaftPeerPB.Role role = 4;
 
   // The unique cluster ID of the cluster this server belongs too.
   // TODO(ghenke): Currently only set for masters.
   optional string cluster_id = 5;
+
+  // If an error has occurred earlier in the RPC call, the
+  // member_type may not be set.
+  optional consensus.RaftPeerPB.MemberType member_type = 6;
 }
 
 // A row block in which each row is stored contiguously.
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index c16cc80..50fcd0d 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -2460,6 +2460,23 @@ RaftPeerPB::Role RaftConsensus::role() const {
   return cmeta_->active_role();
 }
 
+RaftConsensus::RoleAndMemberType RaftConsensus::GetRoleAndMemberType() const {
+  ThreadRestrictions::AssertWaitAllowed();
+
+  auto member_type = RaftPeerPB::UNKNOWN_MEMBER_TYPE;
+  const auto& local_peer_uuid = peer_uuid();
+
+  LockGuard l(lock_);
+  for (const auto& peer : cmeta_->ActiveConfig().peers()) {
+    if (peer.permanent_uuid() == local_peer_uuid) {
+      member_type = peer.member_type();
+      break;
+    }
+  }
+
+  return std::make_pair(cmeta_->active_role(), member_type);
+}
+
 int64_t RaftConsensus::CurrentTerm() const {
   LockGuard l(lock_);
   return CurrentTermUnlocked();
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index 5ad4f93..d418b44 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -142,6 +142,8 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
     EXTERNAL_REQUEST
   };
 
+  typedef std::pair<RaftPeerPB::Role, RaftPeerPB::MemberType> RoleAndMemberType;
+
   ~RaftConsensus();
 
   // Factory method to construct and initialize a RaftConsensus instance.
@@ -317,6 +319,10 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // Returns the current Raft role of this instance.
   RaftPeerPB::Role role() const;
 
+  // Returns the current Raft role and member type of this instance.
+  // May return <UNKNOWN_ROLE, UNKNOWN_MEMBER_TYPE> if the information is not available.
+  RoleAndMemberType GetRoleAndMemberType() const;
+
   // Returns the current term.
   int64_t CurrentTerm() const;
 
diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 26059bd..1bcc6de 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -89,6 +89,7 @@ SET_KUDU_TEST_LINK_LIBS(
 
 ADD_KUDU_TEST(auto_rebalancer-test)
 ADD_KUDU_TEST(catalog_manager-test)
+ADD_KUDU_TEST(dynamic_multi_master-test)
 ADD_KUDU_TEST(hms_notification_log_listener-test)
 ADD_KUDU_TEST(location_cache-test DATA_FILES ../scripts/first_argument.sh)
 ADD_KUDU_TEST(master_options-test)
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 3798ec6..ea122f9 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -80,7 +80,6 @@
 #include "kudu/consensus/consensus.proxy.h"
 #include "kudu/consensus/opid_util.h"
 #include "kudu/consensus/quorum_util.h"
-#include "kudu/consensus/raft_consensus.h"
 #include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/basictypes.h"
@@ -139,6 +138,7 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/mutex.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/random_util.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -828,10 +828,11 @@ Status CatalogManager::Init(bool is_first_run) {
     auto_rebalancer_ = std::move(task);
   }
 
-  RETURN_NOT_OK(master_->GetMasterHostPorts(&master_addresses_));
+  vector<HostPort> master_addresses;
+  RETURN_NOT_OK(master_->GetMasterHostPorts(&master_addresses));
   if (hms::HmsCatalog::IsEnabled()) {
-    string master_addresses = JoinMapped(
-      master_addresses_,
+    string master_addresses_str = JoinMapped(
+      master_addresses,
       [] (const HostPort& hostport) {
         return Substitute("$0:$1", hostport.host(), hostport.port());
       },
@@ -844,7 +845,7 @@ Status CatalogManager::Init(bool is_first_run) {
     // holding leader_lock_, so this is the path of least resistance.
     std::lock_guard<RWMutex> leader_lock_guard(leader_lock_);
 
-    hms_catalog_.reset(new hms::HmsCatalog(std::move(master_addresses)));
+    hms_catalog_.reset(new hms::HmsCatalog(std::move(master_addresses_str)));
     RETURN_NOT_OK_PREPEND(hms_catalog_->Start(HmsClientVerifyKuduSyncConfig::VERIFY),
                           "failed to start Hive Metastore catalog");
 
@@ -1414,6 +1415,12 @@ RaftPeerPB::Role CatalogManager::Role() const {
   return consensus ? consensus->role() : RaftPeerPB::UNKNOWN_ROLE;
 }
 
+RaftConsensus::RoleAndMemberType CatalogManager::GetRoleAndMemberType() const {
+  return IsInitialized() ?
+      sys_catalog_->tablet_replica()->shared_consensus()->GetRoleAndMemberType() :
+      std::make_pair(RaftPeerPB::UNKNOWN_ROLE, RaftPeerPB::UNKNOWN_MEMBER_TYPE);
+}
+
 void CatalogManager::Shutdown() {
   {
     std::lock_guard<simple_spinlock> l(state_lock_);
@@ -5511,6 +5518,13 @@ const char* CatalogManager::StateToString(State state) {
   __builtin_unreachable();
 }
 
+const char* CatalogManager::ChangeConfigOpToString(ChangeConfigOp type) {
+  switch (type) {
+    case CatalogManager::kAddMaster: return "add";
+  }
+  __builtin_unreachable();
+}
+
 void CatalogManager::ResetTableLocationsCache() {
   const auto cache_capacity_bytes =
       FLAGS_table_locations_cache_capacity_mb * 1024 * 1024;
@@ -5527,6 +5541,56 @@ void CatalogManager::ResetTableLocationsCache() {
   VLOG(1) << "table locations cache has been reset";
 }
 
+Status CatalogManager::InitiateMasterChangeConfig(ChangeConfigOp op, const HostPort& hp,
+                                                  const std::string& uuid, rpc::RpcContext* rpc) {
+  auto consensus = master_consensus();
+  if (!consensus) {
+    return Status::IllegalState("Consensus not running");
+  }
+
+  consensus::ChangeConfigRequestPB req;
+  // Request is targeted to itself, the leader master.
+  req.set_dest_uuid(master_->fs_manager()->uuid());
+  req.set_tablet_id(sys_catalog()->tablet_id());
+  req.set_cas_config_opid_index(consensus->CommittedConfig().opid_index());
+  RaftPeerPB* peer = req.mutable_server();
+  peer->set_permanent_uuid(uuid);
+
+  switch (op) {
+    case CatalogManager::kAddMaster:
+      req.set_type(consensus::ADD_PEER);
+      *peer->mutable_last_known_addr() = HostPortToPB(hp);
+      // Adding the new master as a NON_VOTER that'll be promoted to VOTER once the tablet
+      // copy is complete and is sufficiently caught up.
+      peer->set_member_type(RaftPeerPB::NON_VOTER);
+      peer->mutable_attrs()->set_promote(true);
+      break;
+    default:
+      LOG(FATAL) << "Unsupported ChangeConfig operation: " << op;
+  }
+
+  const string op_str = ChangeConfigOpToString(op);
+  LOG(INFO) << Substitute("Initiating ChangeConfig request to $0 master $1: $2",
+                          op_str, hp.ToString(), SecureDebugString(req));
+  auto completion_cb = [op_str, hp, rpc] (const Status& completion_status) {
+    if (completion_status.ok()) {
+      LOG(INFO) << Substitute("Successfully completed master ChangeConfig request to $0 master $1",
+                              op_str, hp.ToString());
+      rpc->RespondSuccess();
+    } else {
+      LOG(WARNING) << Substitute("ChangeConfig request failed to $0 master $1: $2 ",
+                                 op_str, hp.ToString(), completion_status.ToString());
+      rpc->RespondFailure(completion_status);
+    }
+  };
+  boost::optional<TabletServerErrorPB::Code> err_code;
+  RETURN_NOT_OK_PREPEND(
+      consensus->ChangeConfig(req, completion_cb, &err_code),
+      Substitute("Failed initiating master Raft ChangeConfig request, error: $0",
+                 err_code ? TabletServerErrorPB::Code_Name(*err_code) : "unknown"));
+  return Status::OK();
+}
+
 ////////////////////////////////////////////////////////////
 // CatalogManager::TSInfosDict
 ////////////////////////////////////////////////////////////
@@ -5647,6 +5711,7 @@ bool CatalogManager::ScopedLeaderSharedLock::CheckIsInitializedAndIsLeaderOrResp
 INITTED_OR_RESPOND(ConnectToMasterResponsePB);
 INITTED_OR_RESPOND(GetMasterRegistrationResponsePB);
 INITTED_OR_RESPOND(TSHeartbeatResponsePB);
+INITTED_AND_LEADER_OR_RESPOND(AddMasterResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(AlterTableResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(ChangeTServerStateResponsePB);
 INITTED_AND_LEADER_OR_RESPOND(CreateTableResponsePB);
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index 091aa70..9d19224 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -37,6 +37,7 @@
 #include <sparsehash/dense_hash_map>
 
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/consensus/raft_consensus.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
@@ -48,7 +49,6 @@
 #include "kudu/util/cow_object.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
-#include "kudu/util/net/net_util.h"
 #include "kudu/util/oid_generator.h"
 #include "kudu/util/random.h"
 #include "kudu/util/rw_mutex.h"
@@ -65,6 +65,7 @@ namespace kudu {
 
 class AuthzTokenTest_TestSingleMasterUnavailable_Test;
 class CreateTableStressTest_TestConcurrentCreateTableAndReloadMetadata_Test;
+class HostPort;
 class MetricEntity;
 class MetricRegistry;
 class MonitoredTask;
@@ -82,7 +83,6 @@ class ServiceUnavailableRetryClientTest_CreateTable_Test;
 } // namespace client
 
 namespace consensus {
-class RaftConsensus;
 class StartTabletCopyRequestPB;
 } // namespace consensus
 
@@ -773,10 +773,16 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
       const consensus::StartTabletCopyRequestPB* req,
       std::function<void(const Status&, tserver::TabletServerErrorPB::Code)> cb) override;
 
-  // Returns this CatalogManager's role in a consensus configuration. CatalogManager
-  // must be initialized before calling this method.
+  // Returns this CatalogManager's role in a consensus configuration.
+  // CatalogManager is expected to be initialized before calling this method otherwise
+  // UNKNOWN_ROLE is returned.
   consensus::RaftPeerPB::Role Role() const;
 
+  // Returns this CatalogManager's role and member type in a consensus configuration.
+  // CatalogManager is expected to be initialized before calling this method otherwise
+  // <UNKNOWN_ROLE, UNKNOWN_MEMBER_TYPE> is returned.
+  consensus::RaftConsensus::RoleAndMemberType GetRoleAndMemberType() const;
+
   hms::HmsCatalog* hms_catalog() const {
     return hms_catalog_.get();
   }
@@ -799,9 +805,15 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // name is returned.
   static std::string NormalizeTableName(const std::string& table_name);
 
-  const std::vector<HostPort>& master_addresses() const {
-    return master_addresses_;
-  }
+  enum ChangeConfigOp {
+    kAddMaster
+  };
+
+  // Add/remove a master specified by 'hp' and 'uuid' by initiating change config request.
+  // RpContext 'rpc' will be used to respond back to the client asynchronously.
+  // Returns the status of submission of the request.
+  Status InitiateMasterChangeConfig(ChangeConfigOp op, const HostPort& hp,
+                                    const std::string& uuid, rpc::RpcContext* rpc);
 
  private:
   // These tests call ElectedAsLeaderCb() directly.
@@ -1166,6 +1178,8 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   mutable simple_spinlock state_lock_;
   State state_;
 
+  static const char* ChangeConfigOpToString(ChangeConfigOp type);
+
   // Singleton pool that serializes invocations of ElectedAsLeaderCb().
   std::unique_ptr<ThreadPool> leader_election_pool_;
 
@@ -1195,11 +1209,6 @@ class CatalogManager : public tserver::TabletReplicaLookupIf {
   // Always acquire this lock before state_lock_.
   RWMutex leader_lock_;
 
-  // Cached information on master addresses. It's populated in Init() since the
-  // membership of masters' Raft consensus is static (i.e. no new members are
-  // added or any existing removed).
-  std::vector<HostPort> master_addresses_;
-
   // Async operations are accessing some private methods
   // (TODO: this stuff should be deferred and done in the background thread)
   friend class AsyncAlterTable;
diff --git a/src/kudu/master/dynamic_multi_master-test.cc b/src/kudu/master/dynamic_multi_master-test.cc
new file mode 100644
index 0000000..b8db27f
--- /dev/null
+++ b/src/kudu/master/dynamic_multi_master-test.cc
@@ -0,0 +1,657 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/client/client-test-util.h"
+#include "kudu/client/client.h"
+#include "kudu/client/schema.h"
+#include "kudu/common/common.pb.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/consensus/consensus.pb.h"
+#include "kudu/consensus/consensus.proxy.h"
+#include "kudu/consensus/metadata.pb.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
+#include "kudu/master/master.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/master/sys_catalog.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+METRIC_DECLARE_histogram(log_gc_duration);
+METRIC_DECLARE_entity(tablet);
+
+using kudu::client::KuduClient;
+using kudu::client::KuduColumnSchema;
+using kudu::client::KuduSchema;
+using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduTable;
+using kudu::client::KuduTableCreator;
+using kudu::client::sp::shared_ptr;
+using kudu::cluster::ExternalDaemonOptions;
+using kudu::cluster::ExternalMaster;
+using kudu::cluster::ExternalMiniCluster;
+using kudu::cluster::ExternalMiniClusterOptions;
+using kudu::cluster::MiniCluster;
+using kudu::rpc::RpcController;
+using std::set;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace master {
+
+static Status CreateTable(ExternalMiniCluster* cluster,
+                          const std::string& table_name) {
+  shared_ptr<KuduClient> client;
+      RETURN_NOT_OK(cluster->CreateClient(nullptr, &client));
+  KuduSchema schema;
+  KuduSchemaBuilder b;
+  b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
+      RETURN_NOT_OK(b.Build(&schema));
+  unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
+  return table_creator->table_name(table_name)
+      .schema(&schema)
+      .set_range_partition_columns({ "key" })
+      .num_replicas(1)
+      .Create();
+}
+
+// Test class for testing addition/removal of masters to a Kudu cluster.
+class DynamicMultiMasterTest : public KuduTest {
+ public:
+  void SetUpWithNumMasters(int num_masters) {
+    // Initial number of masters in the cluster before adding a master.
+    orig_num_masters_ = num_masters;
+
+    // Reserving a port upfront for the new master that'll be added to the cluster.
+    ASSERT_OK(MiniCluster::ReserveDaemonSocket(MiniCluster::MASTER, orig_num_masters_ /* index */,
+                                               kDefaultBindMode, &reserved_socket_));
+
+    ASSERT_OK(reserved_socket_->GetSocketAddress(&reserved_addr_));
+    reserved_hp_ = HostPort(reserved_addr_);
+    reserved_hp_str_ = reserved_hp_.ToString();
+  }
+
+  void StartCluster(const vector<string>& extra_master_flags,
+                    bool supply_single_master_addr = true) {
+    opts_.num_masters = orig_num_masters_;
+    opts_.supply_single_master_addr = supply_single_master_addr;
+    opts_.extra_master_flags = extra_master_flags;
+
+    cluster_.reset(new ExternalMiniCluster(opts_));
+    ASSERT_OK(cluster_->Start());
+    ASSERT_OK(cluster_->WaitForTabletServerCount(cluster_->num_tablet_servers(),
+                                                 MonoDelta::FromSeconds(5)));
+  }
+
+  // Functor that takes a leader_master_idx and runs the desired master RPC against
+  // the leader master returning the RPC status and the optional MasterErrorPB::Code.
+  typedef std::function<
+      std::pair<Status, boost::optional<MasterErrorPB::Code>>(int leader_master_idx)> MasterRPC;
+
+  // Helper function that runs the master RPC against the leader master and retries the RPC
+  // if the expected leader master returns NOT_THE_LEADER error due to leadership change.
+  // Returns a single combined Status:
+  //   - RPC return status if not OK.
+  //   - IllegalState for a master response error other than NOT_THE_LEADER error.
+  //   - TimedOut if all attempts to run the RPC against leader master are exhausted.
+  //   - OK if the master RPC is successful.
+  Status RunLeaderMasterRPC(const MasterRPC& master_rpc, ExternalMiniCluster* cluster = nullptr) {
+    if (cluster == nullptr) {
+      cluster = cluster_.get();
+    }
+
+    int64_t time_left_to_sleep_msecs = 2000;
+    while (time_left_to_sleep_msecs > 0) {
+      int leader_master_idx;
+      RETURN_NOT_OK(cluster->GetLeaderMasterIndex(&leader_master_idx));
+      const auto& rpc_result = master_rpc(leader_master_idx);
+      RETURN_NOT_OK(rpc_result.first);
+      const auto& master_error = rpc_result.second;
+      if (!master_error) {
+        return Status::OK();
+      }
+      if (master_error != MasterErrorPB::NOT_THE_LEADER) {
+        // Some other master error.
+        return Status::IllegalState(Substitute("Master error: $0"),
+                                    MasterErrorPB_Code_Name(*master_error));
+      }
+      // NOT_THE_LEADER error, so retry after some duration.
+      static const MonoDelta kSleepDuration = MonoDelta::FromMilliseconds(100);
+      SleepFor(kSleepDuration);
+      time_left_to_sleep_msecs -= kSleepDuration.ToMilliseconds();
+    }
+    return Status::TimedOut("Failed contacting the right leader master after multiple attempts");
+  }
+
+  // Run ListMasters RPC, retrying on leadership change, returning the response in 'resp'.
+  void RunListMasters(ListMastersResponsePB* resp, ExternalMiniCluster* cluster = nullptr) {
+    if (cluster == nullptr) {
+      cluster = cluster_.get();
+    }
+    auto list_masters = [&] (int leader_master_idx) {
+      ListMastersRequestPB req;
+      RpcController rpc;
+      Status s = cluster->master_proxy(leader_master_idx)->ListMasters(req, resp, &rpc);
+      boost::optional<MasterErrorPB::Code> err_code(resp->has_error(), resp->error().code());
+      return std::make_pair(s, err_code);
+    };
+    ASSERT_OK(RunLeaderMasterRPC(list_masters, cluster));
+  }
+
+  // Verify the cluster contains 'num_masters' and returns the master addresses in 'master_hps'.
+  void VerifyNumMastersAndGetAddresses(int num_masters, vector<HostPort>* master_hps) {
+    ListMastersResponsePB resp;
+    NO_FATALS(RunListMasters(&resp));
+    ASSERT_EQ(num_masters, resp.masters_size());
+    master_hps->clear();
+    for (const auto& master : resp.masters()) {
+      ASSERT_TRUE(master.role() == consensus::RaftPeerPB::LEADER ||
+                  master.role() == consensus::RaftPeerPB::FOLLOWER);
+      ASSERT_EQ(consensus::RaftPeerPB::VOTER, master.member_type());
+      ASSERT_EQ(1, master.registration().rpc_addresses_size());
+      master_hps->emplace_back(HostPortFromPB(master.registration().rpc_addresses(0)));
+    }
+  }
+
+  // Brings up a new master where 'master_hps' contains master addresses including
+  // the new master to be added.
+  void StartNewMaster(const vector<HostPort>& master_hps,
+                      bool master_supports_change_config = true) {
+    vector<string> master_addresses;
+    master_addresses.reserve(master_hps.size());
+    for (const auto& hp : master_hps) {
+      master_addresses.emplace_back(hp.ToString());
+    }
+
+    // Start with an existing master daemon's options, but modify them for use in a new master
+    ExternalDaemonOptions new_master_opts = cluster_->master(0)->opts();
+    const string new_master_id = Substitute("master-$0", orig_num_masters_);
+    new_master_opts.wal_dir = cluster_->GetWalPath(new_master_id);
+    new_master_opts.data_dirs = cluster_->GetDataPaths(new_master_id);
+    new_master_opts.log_dir = cluster_->GetLogPath(new_master_id);
+    new_master_opts.rpc_bind_address = reserved_hp_;
+    auto& flags = new_master_opts.extra_flags;
+    flags.insert(flags.end(),
+                 {"--master_addresses=" + JoinStrings(master_addresses, ","),
+                  "--master_address_add_new_master=" + reserved_hp_str_});
+
+    LOG(INFO) << "Bringing up the new master at: " << reserved_hp_str_;
+    new_master_.reset(new ExternalMaster(new_master_opts));
+    ASSERT_OK(new_master_->Start());
+    ASSERT_OK(new_master_->WaitForCatalogManager());
+
+    new_master_proxy_.reset(
+        new MasterServiceProxy(new_master_opts.messenger, reserved_addr_, reserved_addr_.host()));
+    {
+      GetMasterRegistrationRequestPB req;
+      GetMasterRegistrationResponsePB resp;
+      RpcController rpc;
+
+      ASSERT_OK(new_master_proxy_->GetMasterRegistration(req, &resp, &rpc));
+      ASSERT_FALSE(resp.has_error());
+      if (master_supports_change_config) {
+        ASSERT_EQ(consensus::RaftPeerPB::NON_VOTER, resp.member_type());
+        ASSERT_EQ(consensus::RaftPeerPB::LEARNER, resp.role());
+      } else {
+        // For a new master brought that doesn't support change config, it'll be started
+        // as a VOTER and become FOLLOWER if the other masters are reachable.
+        ASSERT_EQ(consensus::RaftPeerPB::VOTER, resp.member_type());
+        ASSERT_EQ(consensus::RaftPeerPB::FOLLOWER, resp.role());
+      }
+    }
+
+    // Verify the cluster still has the same number of masters.
+    ListMastersResponsePB resp;
+    NO_FATALS(RunListMasters(&resp));
+    ASSERT_EQ(orig_num_masters_, resp.masters_size());
+  }
+
+  // Adds the specified master to the cluster returning the appropriate error Status for negative
+  // test cases.
+  Status AddMasterToCluster(const HostPort& master) {
+    auto add_master = [&] (int leader_master_idx) {
+      AddMasterRequestPB req;
+      AddMasterResponsePB resp;
+      RpcController rpc;
+      if (master != HostPort()) {
+        *req.mutable_rpc_addr() = HostPortToPB(master);
+      }
+      rpc.RequireServerFeature(MasterFeatures::DYNAMIC_MULTI_MASTER);
+      Status s = cluster_->master_proxy(leader_master_idx)->AddMaster(req, &resp, &rpc);
+      boost::optional<MasterErrorPB::Code> err_code(resp.has_error(), resp.error().code());
+      return std::make_pair(s, err_code);
+    };
+
+    RETURN_NOT_OK(RunLeaderMasterRPC(add_master));
+    return cluster_->AddMaster(new_master_);
+  }
+
+  // Verify one of the 'expected_roles' and 'expected_member_type' of the new master by
+  // making RPC to it directly.
+  void VerifyNewMasterDirectly(const set<consensus::RaftPeerPB::Role>& expected_roles,
+                               consensus::RaftPeerPB::MemberType expected_member_type) {
+    GetMasterRegistrationRequestPB req;
+    GetMasterRegistrationResponsePB resp;
+    RpcController rpc;
+
+    ASSERT_OK(new_master_proxy_->GetMasterRegistration(req, &resp, &rpc));
+    ASSERT_FALSE(resp.has_error());
+    ASSERT_TRUE(ContainsKey(expected_roles, resp.role()));
+    ASSERT_EQ(expected_member_type, resp.member_type());
+  }
+
+ protected:
+  // Tracks the current number of masters in the cluster
+  int orig_num_masters_;
+  ExternalMiniClusterOptions opts_;
+  unique_ptr<ExternalMiniCluster> cluster_;
+
+  // Socket, HostPort, proxy etc. for the new master to be added
+  unique_ptr<Socket> reserved_socket_;
+  Sockaddr reserved_addr_;
+  HostPort reserved_hp_;
+  string reserved_hp_str_;
+  unique_ptr<MasterServiceProxy> new_master_proxy_;
+  scoped_refptr<ExternalMaster> new_master_;
+};
+
+// Parameterized DynamicMultiMasterTest class that works with different initial number of masters.
+class ParameterizedDynamicMultiMasterTest : public DynamicMultiMasterTest,
+                                            public ::testing::WithParamInterface<int> {
+ public:
+  void SetUp() override {
+    NO_FATALS(SetUpWithNumMasters(GetParam()));
+  }
+};
+
+INSTANTIATE_TEST_CASE_P(, ParameterizedDynamicMultiMasterTest,
+                        // Initial number of masters in the cluster before adding a new master
+                        ::testing::Values(1, 2));
+
+// This test starts a cluster, creates a table and then adds a new master.
+// For a system catalog with little data, the new master can be caught up from WAL and
+// promoted to a VOTER without requiring tablet copy.
+TEST_P(ParameterizedDynamicMultiMasterTest, TestAddMasterCatchupFromWAL) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  NO_FATALS(StartCluster({"--master_support_change_config"}));
+
+  // Verify that masters are running as VOTERs and collect their addresses to be used
+  // for starting the new master.
+  vector<HostPort> master_hps;
+  NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+
+  const string kTableName = "first_table";
+  ASSERT_OK(CreateTable(cluster_.get(), kTableName));
+
+  // Bring up the new master and add to the cluster.
+  master_hps.emplace_back(reserved_hp_);
+  NO_FATALS(StartNewMaster(master_hps));
+  ASSERT_OK(AddMasterToCluster(reserved_hp_));
+
+  // Newly added master will be caught up from WAL itself without requiring tablet copy
+  // since the system catalog is fresh with a single table.
+  // Catching up from WAL and promotion to VOTER will not be instantly after adding the
+  // new master. Hence using ASSERT_EVENTUALLY.
+  ASSERT_EVENTUALLY([&] {
+    ListMastersResponsePB resp;
+    NO_FATALS(RunListMasters(&resp));
+    ASSERT_EQ(orig_num_masters_ + 1, resp.masters_size());
+
+    int num_leaders = 0;
+    for (const auto& master : resp.masters()) {
+      ASSERT_EQ(consensus::RaftPeerPB::VOTER, master.member_type());
+      ASSERT_TRUE(master.role() == consensus::RaftPeerPB::LEADER ||
+                  master.role() == consensus::RaftPeerPB::FOLLOWER);
+      if (master.role() == consensus::RaftPeerPB::LEADER) {
+        num_leaders++;
+      }
+    }
+    ASSERT_EQ(1, num_leaders);
+  });
+
+  // Double check by directly contacting the new master.
+  VerifyNewMasterDirectly({ consensus::RaftPeerPB::FOLLOWER, consensus::RaftPeerPB::LEADER },
+                          consensus::RaftPeerPB::VOTER);
+
+  // Adding the same master again should return an error.
+  {
+    Status s = AddMasterToCluster(reserved_hp_);
+    ASSERT_TRUE(s.IsRemoteError());
+    ASSERT_STR_CONTAINS(s.message().ToString(), "Master already present");
+  }
+
+  // Adding one of the former masters should return an error.
+  {
+    Status s = AddMasterToCluster(master_hps[0]);
+    ASSERT_TRUE(s.IsRemoteError());
+    ASSERT_STR_CONTAINS(s.message().ToString(), "Master already present");
+  }
+
+  // Shutdown the cluster and the new master daemon process.
+  // This allows ExternalMiniCluster to manage the newly added master and allows
+  // client to connect to the new master if it's elected the leader.
+  new_master_->Shutdown();
+  cluster_.reset();
+
+  opts_.num_masters = orig_num_masters_ + 1;
+  opts_.master_rpc_addresses = master_hps;
+  ExternalMiniCluster migrated_cluster(opts_);
+  ASSERT_OK(migrated_cluster.Start());
+  ASSERT_OK(migrated_cluster.WaitForTabletServerCount(migrated_cluster.num_tablet_servers(),
+                                                      MonoDelta::FromSeconds(5)));
+
+  // Verify the cluster still has the same 3 masters.
+  {
+    ListMastersResponsePB resp;
+    NO_FATALS(RunListMasters(&resp, &migrated_cluster));
+    ASSERT_EQ(orig_num_masters_ + 1, resp.masters_size());
+
+    for (const auto& master : resp.masters()) {
+      ASSERT_EQ(consensus::RaftPeerPB::VOTER, master.member_type());
+      ASSERT_TRUE(master.role() == consensus::RaftPeerPB::LEADER ||
+                  master.role() == consensus::RaftPeerPB::FOLLOWER);
+      ASSERT_EQ(1, master.registration().rpc_addresses_size());
+      HostPort actual_hp = HostPortFromPB(master.registration().rpc_addresses(0));
+      ASSERT_TRUE(std::find(master_hps.begin(), master_hps.end(), actual_hp) != master_hps.end());
+    }
+  }
+
+  shared_ptr<KuduClient> client;
+  ASSERT_OK(migrated_cluster.CreateClient(nullptr, &client));
+
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client->OpenTable(kTableName, &table));
+  ASSERT_EQ(0, CountTableRows(table.get()));
+
+  // Perform an operation that requires replication to masters.
+  ASSERT_OK(CreateTable(&migrated_cluster, "second_table"));
+  ASSERT_OK(client->OpenTable("second_table", &table));
+  ASSERT_EQ(0, CountTableRows(table.get()));
+
+  // Pause master one at a time and create table at the same time which will allow
+  // new leader to be elected if the paused master is a leader.
+  // Need at least 3 masters to form consensus and elect a new leader.
+  if (orig_num_masters_ + 1 >= 3) {
+    for (int i = 0; i < orig_num_masters_ + 1; i++) {
+      ASSERT_OK(migrated_cluster.master(i)->Pause());
+      cluster::ScopedResumeExternalDaemon resume_daemon(migrated_cluster.master(i));
+      ASSERT_OK(client->OpenTable(kTableName, &table));
+      ASSERT_EQ(0, CountTableRows(table.get()));
+
+      // See MasterFailoverTest.TestCreateTableSync to understand why we must
+      // check for IsAlreadyPresent as well.
+      Status s = CreateTable(&migrated_cluster, Substitute("table-$0", i));
+      ASSERT_TRUE(s.ok() || s.IsAlreadyPresent());
+    }
+  }
+}
+
+// This test starts a cluster with low values for log flush and segment size to force GC
+// of the system catalog WAL. When a new master is added, test verifies that the new master
+// can't be caught up from WAL and as a result the new master, though added to the master Raft
+// config, remains a NON_VOTER.
+TEST_P(ParameterizedDynamicMultiMasterTest, TestAddMasterCatchupFromWALNotPossible) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  // Using low values of log flush threshold and segment size to trigger GC of the sys catalog WAL.
+  NO_FATALS(StartCluster({"--master_support_change_config", "--flush_threshold_secs=1",
+                          "--log_segment_size_mb=1"}));
+
+  // Verify that masters are running as VOTERs and collect their addresses to be used
+  // for starting the new master.
+  vector<HostPort> master_hps;
+  NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+
+  auto get_sys_catalog_wal_gc_count = [&] {
+    int64_t sys_catalog_wal_gc_count = 0;
+    CHECK_OK(itest::GetInt64Metric(cluster_->master(0)->bound_http_hostport(),
+                                   &METRIC_ENTITY_tablet,
+                                   master::SysCatalogTable::kSysCatalogTabletId,
+                                   &METRIC_log_gc_duration,
+                                   "total_count",
+                                   &sys_catalog_wal_gc_count));
+    return sys_catalog_wal_gc_count;
+  };
+  auto orig_gc_count = get_sys_catalog_wal_gc_count();
+
+  // Create a bunch of tables to ensure sys catalog WAL gets GC'ed.
+  // Need to create around 1k tables even with lowest flush threshold and log segment size.
+  for (int i = 1; i < 1000; i++) {
+    string table_name = "Table.TestAddMasterCatchupFromWALNotPossible." + std::to_string(i);
+    ASSERT_OK(CreateTable(cluster_.get(), table_name));
+  }
+
+  int64_t time_left_to_sleep_msecs = 2000;
+  while (time_left_to_sleep_msecs > 0 && orig_gc_count == get_sys_catalog_wal_gc_count()) {
+    static const MonoDelta kSleepDuration = MonoDelta::FromMilliseconds(100);
+    SleepFor(kSleepDuration);
+    time_left_to_sleep_msecs -= kSleepDuration.ToMilliseconds();
+  }
+  ASSERT_GT(time_left_to_sleep_msecs, 0) << "Timed out waiting for system catalog WAL to be GC'ed";
+
+  // Bring up the new master and add to the cluster.
+  master_hps.emplace_back(reserved_hp_);
+  NO_FATALS(StartNewMaster(master_hps));
+  ASSERT_OK(AddMasterToCluster(reserved_hp_));
+
+  // Newly added master will be added to the master Raft config but won't be caught up
+  // from the WAL and hence remain as a NON_VOTER.
+  ListMastersResponsePB list_resp;
+  NO_FATALS(RunListMasters(&list_resp));
+  ASSERT_EQ(orig_num_masters_ + 1, list_resp.masters_size());
+
+  for (const auto& master : list_resp.masters()) {
+    ASSERT_EQ(1, master.registration().rpc_addresses_size());
+    auto hp = HostPortFromPB(master.registration().rpc_addresses(0));
+    if (hp == reserved_hp_) {
+      ASSERT_EQ(consensus::RaftPeerPB::NON_VOTER, master.member_type());
+      ASSERT_TRUE(master.role() == consensus::RaftPeerPB::LEARNER);
+    }
+  }
+
+  // Double check by directly contacting the new master.
+  VerifyNewMasterDirectly({ consensus::RaftPeerPB::LEARNER }, consensus::RaftPeerPB::NON_VOTER);
+
+  // Verify FAILED_UNRECOVERABLE health error about the new master that can't be caught up
+  // from WAL. This health state update may take some round trips between the masters and
+  // hence wrapping it under ASSERT_EVENTUALLY.
+  ASSERT_EVENTUALLY([&] {
+    // GetConsensusState() RPC can be made to any master and not necessarily the leader master.
+    int leader_master_idx;
+    ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx));
+    auto leader_master_addr = cluster_->master(leader_master_idx)->bound_rpc_addr();
+    consensus::ConsensusServiceProxy consensus_proxy(cluster_->messenger(), leader_master_addr,
+                                                     leader_master_addr.host());
+    consensus::GetConsensusStateRequestPB req;
+    consensus::GetConsensusStateResponsePB resp;
+    RpcController rpc;
+    req.set_dest_uuid(cluster_->master(leader_master_idx)->uuid());
+    req.set_report_health(consensus::INCLUDE_HEALTH_REPORT);
+    ASSERT_OK(consensus_proxy.GetConsensusState(req, &resp, &rpc));
+    ASSERT_FALSE(resp.has_error());
+    ASSERT_EQ(1, resp.tablets_size());
+
+    // Lookup the new_master from the consensus state of the system catalog.
+    const auto& sys_catalog = resp.tablets(0);
+    ASSERT_EQ(master::SysCatalogTable::kSysCatalogTabletId, sys_catalog.tablet_id());
+    const auto& cstate = sys_catalog.cstate();
+    const auto& config = cstate.has_pending_config() ?
+        cstate.pending_config() : cstate.committed_config();
+    ASSERT_EQ(orig_num_masters_ + 1, config.peers_size());
+    int num_new_masters_found = 0;
+    for (const auto& peer : config.peers()) {
+      if (peer.permanent_uuid() == new_master_->uuid()) {
+        ASSERT_EQ(consensus::HealthReportPB::FAILED_UNRECOVERABLE,
+                  peer.health_report().overall_health());
+        num_new_masters_found++;
+      }
+    }
+    ASSERT_EQ(1, num_new_masters_found);
+  });
+}
+
+// Test that brings up a single master cluster with 'last_known_addr' not populated by
+// not specifying '--master_addresses' and then attempts to add a new master which is
+// expected to fail due to invalid Raft config.
+TEST_F(DynamicMultiMasterTest, TestAddMasterWithNoLastKnownAddr) {
+  NO_FATALS(SetUpWithNumMasters(1));
+  NO_FATALS(
+      StartCluster({"--master_support_change_config"}, false /* supply_single_master_addr */));
+
+  // Verify that existing masters are running as VOTERs and collect their addresses to be used
+  // for starting the new master.
+  vector<HostPort> master_hps;
+  NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+
+  // Bring up the new master and add to the cluster.
+  master_hps.emplace_back(reserved_hp_);
+  NO_FATALS(StartNewMaster(master_hps));
+
+  Status actual = AddMasterToCluster(reserved_hp_);
+  ASSERT_TRUE(actual.IsRemoteError());
+  ASSERT_STR_MATCHES(actual.ToString(),
+                     "Invalid config to set as pending: Peer:.* has no address");
+
+  // Verify no change in number of masters.
+  NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+}
+
+// Test that attempts to add a new master without enabling the feature flag for master Raft
+// change config.
+TEST_F(DynamicMultiMasterTest, TestAddMasterFeatureFlagNotSpecified) {
+  NO_FATALS(SetUpWithNumMasters(1));
+  NO_FATALS(StartCluster({ /* Omitting "--master_support_change_config" */ }));
+
+  // Verify that existing masters are running as VOTERs and collect their addresses to be used
+  // for starting the new master.
+  vector<HostPort> master_hps;
+  NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+
+  // Bring up the new master and add to the cluster.
+  master_hps.emplace_back(reserved_hp_);
+  NO_FATALS(StartNewMaster(master_hps, false /* master_supports_change_config */));
+
+  Status actual = AddMasterToCluster(reserved_hp_);
+  ASSERT_TRUE(actual.IsRemoteError());
+  ASSERT_STR_MATCHES(actual.ToString(), "unsupported feature flags");
+
+  // Verify no change in number of masters.
+  NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+}
+
+// Test that attempts to request a non-leader master to add a new master.
+TEST_F(DynamicMultiMasterTest, TestAddMasterToNonLeader) {
+  NO_FATALS(SetUpWithNumMasters(2));
+  NO_FATALS(StartCluster({"--master_support_change_config"}));
+
+  // Verify that existing masters are running as VOTERs and collect their addresses to be used
+  // for starting the new master.
+  vector<HostPort> master_hps;
+  NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+
+  // Bring up the new master and add to the cluster.
+  master_hps.emplace_back(reserved_hp_);
+  NO_FATALS(StartNewMaster(master_hps));
+
+  // Verify sending add master request to a non-leader master returns NOT_THE_LEADER error.
+  // It's possible there is a leadership change between querying for leader master and
+  // sending the add master request to non-leader master and hence using ASSERT_EVENTUALLY.
+  ASSERT_EVENTUALLY([&] {
+    AddMasterRequestPB req;
+    AddMasterResponsePB resp;
+    RpcController rpc;
+    *req.mutable_rpc_addr() = HostPortToPB(reserved_hp_);
+    rpc.RequireServerFeature(MasterFeatures::DYNAMIC_MULTI_MASTER);
+
+    int leader_master_idx;
+    ASSERT_OK(cluster_->GetLeaderMasterIndex(&leader_master_idx));
+    ASSERT_TRUE(leader_master_idx == 0 || leader_master_idx == 1);
+    int non_leader_master_idx = !leader_master_idx;
+    ASSERT_OK(cluster_->master_proxy(non_leader_master_idx)->AddMaster(req, &resp, &rpc));
+    ASSERT_TRUE(resp.has_error());
+    ASSERT_EQ(MasterErrorPB::NOT_THE_LEADER, resp.error().code());
+  });
+
+  // Verify no change in number of masters.
+  NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+}
+
+// Test that attempts to add a master with missing master address and a non-routable incorrect
+// address.
+TEST_F(DynamicMultiMasterTest, TestAddMasterMissingAndIncorrectAddress) {
+  NO_FATALS(SetUpWithNumMasters(1));
+  NO_FATALS(StartCluster({"--master_support_change_config"}));
+
+  // Verify that existing masters are running as VOTERs and collect their addresses to be used
+  // for starting the new master.
+  vector<HostPort> master_hps;
+  NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+
+  // Bring up the new master and add to the cluster.
+  master_hps.emplace_back(reserved_hp_);
+  NO_FATALS(StartNewMaster(master_hps));
+
+  // Empty HostPort
+  Status actual = AddMasterToCluster(HostPort());
+  ASSERT_TRUE(actual.IsRemoteError());
+  ASSERT_STR_CONTAINS(actual.ToString(), "RPC address of master to be added not supplied");
+
+  // Non-routable incorrect hostname.
+  actual = AddMasterToCluster(HostPort("foo", Master::kDefaultPort));
+  ASSERT_TRUE(actual.IsRemoteError());
+  ASSERT_STR_CONTAINS(actual.ToString(),
+                      "Network error: unable to resolve address for foo");
+
+  // Verify no change in number of masters.
+  NO_FATALS(VerifyNumMastersAndGetAddresses(orig_num_masters_, &master_hps));
+}
+
+} // namespace master
+} // namespace kudu
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 6efe084..764550c 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -106,6 +106,12 @@ using std::vector;
 using strings::Substitute;
 
 namespace kudu {
+namespace rpc {
+class RpcContext;
+}  // namespace rpc
+}  // namespace kudu
+
+namespace kudu {
 namespace master {
 
 namespace {
@@ -147,6 +153,9 @@ Status GetMasterEntryForHost(const shared_ptr<rpc::Messenger>& messenger,
   if (resp.has_cluster_id()) {
     e->set_cluster_id(resp.cluster_id());
   }
+  if (resp.has_member_type()) {
+    e->set_member_type(resp.member_type());
+  }
   return Status::OK();
 }
 } // anonymous namespace
@@ -336,23 +345,27 @@ void Master::ShutdownImpl() {
 }
 
 Status Master::ListMasters(vector<ServerEntryPB>* masters) const {
-  if (!opts_.IsDistributed()) {
+  auto consensus = catalog_manager_->master_consensus();
+  if (!consensus) {
+    return Status::IllegalState("consensus not running");
+  }
+  const auto config = consensus->CommittedConfig();
+  masters->clear();
+  DCHECK_GE(config.peers_size(), 1);
+  // Optimized code path that doesn't involve reaching out to other
+  // masters over network for single master configuration.
+  if (config.peers_size() == 1) {
     ServerEntryPB local_entry;
     local_entry.mutable_instance_id()->CopyFrom(catalog_manager_->NodeInstance());
     RETURN_NOT_OK(GetMasterRegistration(local_entry.mutable_registration()));
     local_entry.set_role(RaftPeerPB::LEADER);
     local_entry.set_cluster_id(cluster_id_);
+    local_entry.set_member_type(RaftPeerPB::VOTER);
     masters->emplace_back(std::move(local_entry));
     return Status::OK();
   }
 
-  auto consensus = catalog_manager_->master_consensus();
-  if (!consensus) {
-    return Status::IllegalState("consensus not running");
-  }
-  const auto config = consensus->CommittedConfig();
-
-  masters->clear();
+  // For distributed master configuration.
   for (const auto& peer : config.peers()) {
     HostPort hp = HostPortFromPB(peer.last_known_addr());
     ServerEntryPB peer_entry;
@@ -403,5 +416,25 @@ Status Master::GetMasterHostPorts(vector<HostPort>* hostports) const {
   return Status::OK();
 }
 
+Status Master::AddMaster(const HostPort& hp, rpc::RpcContext* rpc) {
+  // Ensure requested master to be added is not already part of list of masters.
+  vector<HostPort> masters;
+  // Here the check is made against committed config with voters only.
+  RETURN_NOT_OK(GetMasterHostPorts(&masters));
+  if (std::find(masters.begin(), masters.end(), hp) != masters.end()) {
+    return Status::AlreadyPresent("Master already present");
+  }
+
+  // Check whether the master to be added is reachable and fetch its uuid.
+  ServerEntryPB peer_entry;
+  RETURN_NOT_OK(GetMasterEntryForHost(messenger_, hp, &peer_entry));
+  const auto& peer_uuid = peer_entry.instance_id().permanent_uuid();
+
+  // No early validation for whether a config change is in progress.
+  // If it's in progress, on initiating config change Raft will return error.
+  return catalog_manager()->InitiateMasterChangeConfig(CatalogManager::kAddMaster, hp, peer_uuid,
+                                                       rpc);
+}
+
 } // namespace master
 } // namespace kudu
diff --git a/src/kudu/master/master.h b/src/kudu/master/master.h
index 6a59435..e922e9d 100644
--- a/src/kudu/master/master.h
+++ b/src/kudu/master/master.h
@@ -36,6 +36,9 @@ class HostPort;
 class MaintenanceManager;
 class MonoDelta;
 class ThreadPool;
+namespace rpc {
+class RpcContext;
+}  // namespace rpc
 
 namespace master {
 class LocationCache;
@@ -104,7 +107,7 @@ class Master : public kserver::KuduServer {
   // request.
   Status ListMasters(std::vector<ServerEntryPB>* masters) const;
 
-  // Gets the HostPorts for all of the masters in the cluster.
+  // Gets the HostPorts for all of the VOTER masters in the cluster.
   // This is not as complete as ListMasters() above, but operates just
   // based on local state.
   Status GetMasterHostPorts(std::vector<HostPort>* hostports) const;
@@ -113,6 +116,11 @@ class Master : public kserver::KuduServer {
     return state_ == kStopped;
   }
 
+  // Adds the master specified by 'hp' by initiating change config request.
+  // RpContext 'rpc' will be used to respond back to the client asynchronously.
+  // Returns the status of the master addition request.
+  Status AddMaster(const HostPort& hp, rpc::RpcContext* rpc);
+
   MaintenanceManager* maintenance_manager() {
     return maintenance_manager_.get();
   }
diff --git a/src/kudu/master/master.proto b/src/kudu/master/master.proto
index edd36d1..b569686 100644
--- a/src/kudu/master/master.proto
+++ b/src/kudu/master/master.proto
@@ -909,6 +909,14 @@ message ChangeTServerStateResponsePB {
   optional MasterErrorPB error = 1;
 }
 
+message AddMasterRequestPB {
+  optional HostPortPB rpc_addr = 1;
+}
+
+message AddMasterResponsePB {
+  optional MasterErrorPB error = 1;
+}
+
 // GetMasterRegistrationRequest/Response: get the instance id and
 // HTTP/RPC addresses for this Master server.
 message GetMasterRegistrationRequestPB {
@@ -930,6 +938,9 @@ message GetMasterRegistrationResponsePB {
 
   // The unique cluster ID of the cluster this server belongs too.
   optional string cluster_id = 5;
+
+  // This server's membership type in the consensus configuration.
+  optional consensus.RaftPeerPB.MemberType member_type = 6;
 }
 
 // ListMastersRequest/Response: get information about all of the known
@@ -986,6 +997,8 @@ enum MasterFeatures {
   REPLICA_MANAGEMENT = 4;
   // The master supports generating and dispensing authz tokens.
   GENERATE_AUTHZ_TOKEN = 5;
+  // The master supports dynamic addition/removal of masters
+  DYNAMIC_MULTI_MASTER = 6;
 }
 
 service MasterService {
@@ -1066,6 +1079,11 @@ service MasterService {
     option (kudu.rpc.authz_method) = "AuthorizeSuperUser";
   }
 
+  // Add a new master to existing cluster.
+  rpc AddMaster(AddMasterRequestPB) returns (AddMasterResponsePB) {
+    option (kudu.rpc.authz_method) = "AuthorizeSuperUser";
+  }
+
   // Master->Master RPCs
   // ------------------------------------------------------------
 
diff --git a/src/kudu/master/master_options.cc b/src/kudu/master/master_options.cc
index 378e49e..e07dfbb 100644
--- a/src/kudu/master/master_options.cc
+++ b/src/kudu/master/master_options.cc
@@ -44,13 +44,13 @@ MasterOptions::MasterOptions() {
 
   if (!FLAGS_master_addresses.empty()) {
     Status s = HostPort::ParseStrings(FLAGS_master_addresses, Master::kDefaultPort,
-                                      &master_addresses);
+                                      &master_addresses_);
     if (!s.ok()) {
       LOG(FATAL) << "Couldn't parse the master_addresses flag('" << FLAGS_master_addresses << "'): "
                  << s.ToString();
     }
     // TODO(wdberkeley): Un-actionable warning. Link to docs, once they exist.
-    if (master_addresses.size() == 2) {
+    if (master_addresses_.size() == 2) {
       LOG(WARNING) << "Only 2 masters are specified by master_addresses_flag ('" <<
           FLAGS_master_addresses << "'), but minimum of 3 are required to tolerate failures"
           " of any one master. It is recommended to use at least 3 masters.";
@@ -58,18 +58,14 @@ MasterOptions::MasterOptions() {
   }
 }
 
-bool MasterOptions::IsDistributed() const {
-  return master_addresses.size() > 1;
-}
-
 Status MasterOptions::GetTheOnlyMasterAddress(HostPort* hp) const {
   if (IsDistributed()) {
     return Status::IllegalState("Not a single master configuration");
   }
-  if (master_addresses.empty()) {
+  if (master_addresses_.empty()) {
     return Status::NotFound("Master address not specified");
   }
-  *hp = master_addresses.front();
+  *hp = master_addresses_.front();
   return Status::OK();
 }
 
diff --git a/src/kudu/master/master_options.h b/src/kudu/master/master_options.h
index dad9072..09b7b0b 100644
--- a/src/kudu/master/master_options.h
+++ b/src/kudu/master/master_options.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <utility>
 #include <vector>
 
 #include "kudu/kserver/kserver_options.h"
@@ -23,6 +24,7 @@
 #include "kudu/util/status.h"
 
 namespace kudu {
+
 namespace master {
 
 // Options for constructing the master.
@@ -31,13 +33,30 @@ namespace master {
 struct MasterOptions : public kserver::KuduServerOptions {
   MasterOptions();
 
-  std::vector<HostPort> master_addresses;
+  // Fetch master addresses from the user supplied gflags which may be empty for single
+  // master configuration.
+  // Note: Only to be used during master init time as masters can be added/removed dynamically.
+  // Use Master::GetMasterHostPorts() instead after initializing the master at runtime.
+  const std::vector<HostPort>& master_addresses() const {
+    return master_addresses_;
+  }
 
-  bool IsDistributed() const;
+  // Only to be used only during init time as masters can be added/removed dynamically.
+  bool IsDistributed() const {
+    return master_addresses_.size() > 1;
+  }
 
   // For a single master configuration output the only master address in 'hp', if available.
   // Otherwise NotFound error or IllegalState for distributed master config.
   Status GetTheOnlyMasterAddress(HostPort* hp) const;
+
+  // Allows setting/overwriting list of masters. Only to be used by tests.
+  void SetMasterAddressesForTests(std::vector<HostPort> addresses) {
+    master_addresses_ = std::move(addresses);
+  }
+
+ private:
+  std::vector<HostPort> master_addresses_;
 };
 
 } // namespace master
diff --git a/src/kudu/master/master_path_handlers.cc b/src/kudu/master/master_path_handlers.cc
index fd281aa..32bafdf 100644
--- a/src/kudu/master/master_path_handlers.cc
+++ b/src/kudu/master/master_path_handlers.cc
@@ -53,7 +53,6 @@
 #include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
-#include "kudu/master/master_options.h"
 #include "kudu/master/sys_catalog.h"
 #include "kudu/master/table_metrics.h"
 #include "kudu/master/ts_descriptor.h"
@@ -831,17 +830,23 @@ pair<string, string> MasterPathHandlers::TSDescToLinkPair(const TSDescriptor& de
 }
 
 string MasterPathHandlers::MasterAddrsToCsv() const {
-  if (!master_->opts().master_addresses.empty()) {
+  vector<HostPort> master_addresses;
+  Status s = master_->GetMasterHostPorts(&master_addresses);
+  LOG(WARNING) << "Unable to fetch master addresses: " << s.ToString();
+  if (!s.ok()) {
+    return string();
+  }
+  if (!master_addresses.empty()) {
     vector<string> all_addresses;
-    all_addresses.reserve(master_->opts().master_addresses.size());
-    for (const HostPort& hp : master_->opts().master_addresses) {
+    all_addresses.reserve(master_addresses.size());
+    for (const HostPort& hp : master_addresses) {
       all_addresses.push_back(hp.ToString());
     }
     return JoinElements(all_addresses, ",");
   }
   Sockaddr addr = master_->first_rpc_address();
   HostPort hp;
-  Status s = HostPortFromSockaddrReplaceWildcard(addr, &hp);
+  s = HostPortFromSockaddrReplaceWildcard(addr, &hp);
   if (s.ok()) {
     return hp.ToString();
   }
diff --git a/src/kudu/master/master_service.cc b/src/kudu/master/master_service.cc
index 3eea33d..c8533a8 100644
--- a/src/kudu/master/master_service.cc
+++ b/src/kudu/master/master_service.cc
@@ -55,6 +55,7 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/scoped_cleanup.h"
@@ -95,6 +96,12 @@ DEFINE_bool(master_support_authz_tokens, true,
             "testing version compatibility in the client.");
 TAG_FLAG(master_support_authz_tokens, hidden);
 
+DEFINE_bool(master_support_change_config, false,
+            "Whether the master supports adding/removing master servers dynamically.");
+TAG_FLAG(master_support_change_config, hidden);
+TAG_FLAG(master_support_change_config, unsafe);
+
+
 using google::protobuf::Message;
 using kudu::consensus::ReplicaManagementInfoPB;
 using kudu::pb_util::SecureDebugString;
@@ -229,6 +236,36 @@ void MasterServiceImpl::ChangeTServerState(const ChangeTServerStateRequestPB* re
   rpc->RespondSuccess();
 }
 
+void MasterServiceImpl::AddMaster(const AddMasterRequestPB* req,
+                                  AddMasterResponsePB* resp,
+                                  rpc::RpcContext* rpc) {
+  if (!FLAGS_master_support_change_config) {
+    rpc->RespondFailure(Status::NotSupported("Adding master is not supported"));
+    return;
+  }
+
+  if (!req->has_rpc_addr()) {
+    rpc->RespondFailure(Status::InvalidArgument("RPC address of master to be added not supplied"));
+    return;
+  }
+
+  CatalogManager::ScopedLeaderSharedLock l(server_->catalog_manager());
+  if (!l.CheckIsInitializedAndIsLeaderOrRespond(resp, rpc)) {
+    return;
+  }
+
+  Status s = server_->AddMaster(HostPortFromPB(req->rpc_addr()), rpc);
+  if (!s.ok()) {
+    LOG(ERROR) << Substitute("Failed adding master $0:$1. $2", req->rpc_addr().host(),
+                             req->rpc_addr().port(), s.ToString());
+    rpc->RespondFailure(s);
+    return;
+  }
+  // ChangeConfig request successfully submitted. Once the ChangeConfig request is complete
+  // the completion callback will respond back with the result to the RPC client.
+  // See completion_cb in CatalogManager::InitiateMasterChangeConfig().
+}
+
 void MasterServiceImpl::TSHeartbeat(const TSHeartbeatRequestPB* req,
                                     TSHeartbeatResponsePB* resp,
                                     rpc::RpcContext* rpc) {
@@ -618,7 +655,9 @@ void MasterServiceImpl::GetMasterRegistration(const GetMasterRegistrationRequest
 
   Status s = server_->GetMasterRegistration(resp->mutable_registration());
   CheckRespErrorOrSetUnknown(s, resp);
-  resp->set_role(server_->catalog_manager()->Role());
+  const auto& role_and_member = server_->catalog_manager()->GetRoleAndMemberType();
+  resp->set_role(role_and_member.first);
+  resp->set_member_type(role_and_member.second);
   resp->set_cluster_id(server_->cluster_id());
   rpc->RespondSuccess();
 }
@@ -633,7 +672,14 @@ void MasterServiceImpl::ConnectToMaster(const ConnectToMasterRequestPB* /*req*/,
 
   // Set the info about the other masters, so that the client can verify
   // it has the full set of info.
-  const auto& addresses = server_->catalog_manager()->master_addresses();
+  vector<HostPort> addresses;
+  Status s = server_->GetMasterHostPorts(&addresses);
+  if (!s.ok()) {
+    StatusToPB(s, resp->mutable_error()->mutable_status());
+    resp->mutable_error()->set_code(MasterErrorPB::UNKNOWN_ERROR);
+    rpc->RespondSuccess();
+    return;
+  }
   resp->mutable_master_addrs()->Reserve(addresses.size());
   for (const auto& hp : addresses) {
     *resp->add_master_addrs() = HostPortToPB(hp);
@@ -746,6 +792,8 @@ bool MasterServiceImpl::SupportsFeature(uint32_t feature) const {
       return FLAGS_master_support_authz_tokens;
     case MasterFeatures::CONNECT_TO_MASTER:
       return FLAGS_master_support_connect_to_master_rpc;
+    case MasterFeatures::DYNAMIC_MULTI_MASTER:
+      return FLAGS_master_support_change_config;
     default:
       return false;
   }
diff --git a/src/kudu/master/master_service.h b/src/kudu/master/master_service.h
index e5ba019..9a066ba 100644
--- a/src/kudu/master/master_service.h
+++ b/src/kudu/master/master_service.h
@@ -36,6 +36,8 @@ class RpcContext;
 
 namespace master {
 
+class AddMasterRequestPB;
+class AddMasterResponsePB;
 class AlterTableRequestPB;
 class AlterTableResponsePB;
 class ChangeTServerStateRequestPB;
@@ -69,10 +71,10 @@ class ListTabletServersResponsePB;
 class Master;
 class PingRequestPB;
 class PingResponsePB;
-class ReplaceTabletRequestPB;
-class ReplaceTabletResponsePB;
 class RefreshAuthzCacheRequestPB;
 class RefreshAuthzCacheResponsePB;
+class ReplaceTabletRequestPB;
+class ReplaceTabletResponsePB;
 class TSHeartbeatRequestPB;
 class TSHeartbeatResponsePB;
 
@@ -104,6 +106,9 @@ class MasterServiceImpl : public MasterServiceIf {
                           ChangeTServerStateResponsePB* resp,
                           rpc::RpcContext* rpc) override;
 
+  void AddMaster(const AddMasterRequestPB* req,
+                 AddMasterResponsePB* resp, rpc::RpcContext* rpc) override;
+
   void Ping(const PingRequestPB* req,
             PingResponsePB* resp,
             rpc::RpcContext* rpc) override;
diff --git a/src/kudu/master/mini_master.cc b/src/kudu/master/mini_master.cc
index c6cc092..19c2681 100644
--- a/src/kudu/master/mini_master.cc
+++ b/src/kudu/master/mini_master.cc
@@ -78,7 +78,7 @@ MiniMaster::~MiniMaster() {
 
 void MiniMaster::SetMasterAddresses(vector<HostPort> master_addrs) {
   CHECK(!master_);
-  opts_.master_addresses = std::move(master_addrs);
+  opts_.SetMasterAddressesForTests(std::move(master_addrs));
 }
 
 Status MiniMaster::Start() {
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index 1ee8b20..fa73b26 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -50,7 +50,6 @@
 #include "kudu/consensus/consensus_peers.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_anchor_registry.h"
-#include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/opid_util.h"
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/consensus/raft_consensus.h"
@@ -89,6 +88,12 @@ DEFINE_double(sys_catalog_fail_during_write, 0.0,
               "Fraction of the time when system table writes will fail");
 TAG_FLAG(sys_catalog_fail_during_write, hidden);
 
+DEFINE_string(master_address_add_new_master, "",
+              "Address of master to add as a NON_VOTER on creating a distributed master config.");
+TAG_FLAG(master_address_add_new_master, unsafe);
+TAG_FLAG(master_address_add_new_master, hidden);
+
+DECLARE_bool(master_support_change_config);
 DECLARE_int64(rpc_max_message_size);
 
 METRIC_DEFINE_counter(server, sys_catalog_oversized_write_requests,
@@ -266,10 +271,11 @@ Status SysCatalogTable::Load(FsManager *fs_manager) {
     // Make sure the set of masters passed in at start time matches the set in
     // the on-disk cmeta.
     set<string> peer_addrs_from_opts;
-    for (const auto& hp : master_->opts().master_addresses) {
+    const auto& master_addresses = master_->opts().master_addresses();
+    for (const auto& hp : master_addresses) {
       peer_addrs_from_opts.insert(hp.ToString());
     }
-    if (peer_addrs_from_opts.size() < master_->opts().master_addresses.size()) {
+    if (peer_addrs_from_opts.size() < master_addresses.size()) {
       LOG(WARNING) << Substitute("Found duplicates in --master_addresses: "
                                  "the unique set of addresses is $0",
                                  JoinStrings(peer_addrs_from_opts, ", "));
@@ -358,11 +364,18 @@ Status SysCatalogTable::CreateDistributedConfig(const MasterOptions& options,
   new_config.set_opid_index(consensus::kInvalidOpIdIndex);
 
   // Build the set of followers from our server options.
-  for (const HostPort& host_port : options.master_addresses) {
+  for (const HostPort& host_port : options.master_addresses()) {
     RaftPeerPB peer;
     HostPortPB peer_host_port_pb = HostPortToPB(host_port);
     peer.mutable_last_known_addr()->CopyFrom(peer_host_port_pb);
-    peer.set_member_type(RaftPeerPB::VOTER);
+    // Adding new master as a NON_VOTER to ensure it doesn't become a leader on creating
+    // distributed config and also helps with replacing a dead master at the same hostport.
+    if (FLAGS_master_support_change_config &&
+        FLAGS_master_address_add_new_master == host_port.ToString()) {
+      peer.set_member_type(RaftPeerPB::NON_VOTER);
+    } else {
+      peer.set_member_type(RaftPeerPB::VOTER);
+    }
     new_config.add_peers()->CopyFrom(peer);
   }
 
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index c6167e4..3da89a7 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -112,6 +112,7 @@ static double kMasterCatalogManagerTimeoutSeconds = 60.0;
 
 ExternalMiniClusterOptions::ExternalMiniClusterOptions()
     : num_masters(1),
+      supply_single_master_addr(false),
       num_tablet_servers(1),
       bind_mode(kDefaultBindMode),
       num_data_dirs(1),
@@ -498,7 +499,7 @@ Status ExternalMiniCluster::StartMasters() {
   // Setting --master_addresses flag for a single master configuration is now supported but not
   // mandatory. Not setting the flag helps test existing kudu deployments that don't specify
   // the --master_addresses flag for single master configuration.
-  if (num_masters > 1) {
+  if (num_masters > 1 || opts_.supply_single_master_addr) {
     flags.emplace_back(Substitute("--master_addresses=$0",
                                   HostPort::ToCommaSeparatedString(master_rpc_addrs)));
   }
@@ -923,6 +924,20 @@ string ExternalMiniCluster::UuidForTS(int ts_idx) const {
   return tablet_server(ts_idx)->uuid();
 }
 
+Status ExternalMiniCluster::AddMaster(const scoped_refptr<ExternalMaster>& new_master) {
+  const auto& new_master_uuid = new_master->instance_id().permanent_uuid();
+  for (const auto& m : masters_) {
+    if (m->instance_id().permanent_uuid() == new_master_uuid) {
+      CHECK(m->bound_rpc_hostport() == new_master->bound_rpc_hostport());
+      return Status::AlreadyPresent(Substitute(
+          "Master $0, uuid: $1 already present in ExternalMiniCluster",
+          m->bound_rpc_hostport().ToString(), new_master_uuid));
+    }
+  }
+  masters_.emplace_back(new_master);
+  return Status::OK();
+}
+
 //------------------------------------------------------------
 // ExternalDaemon
 //------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
index d4a1da2..6d16f3c 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -130,6 +130,10 @@ struct ExternalMiniClusterOptions {
   // Default: 1.
   int num_masters;
 
+  // Whether to supply 'master_addresses' field for single master configuration.
+  // Default: False
+  bool supply_single_master_addr;
+
   // Number of TS to start.
   //
   // Default: 1.
@@ -460,6 +464,10 @@ class ExternalMiniCluster : public MiniCluster {
   // files that reside in the log dir.
   std::string GetLogPath(const std::string& daemon_id) const;
 
+  // Adds a master to the ExternalMiniCluster when the new master has been added
+  // dynamically after bringing up the ExternalMiniCluster.
+  Status AddMaster(const scoped_refptr<ExternalMaster>& master);
+
  private:
   Status StartMasters();
 
diff --git a/src/kudu/tools/tool_action_master.cc b/src/kudu/tools/tool_action_master.cc
index 3a0a24f..05a2d75 100644
--- a/src/kudu/tools/tool_action_master.cc
+++ b/src/kudu/tools/tool_action_master.cc
@@ -189,6 +189,10 @@ Status ListMasters(const RunnerContext& context) {
       for (const auto& master : masters) {
         values.emplace_back(RaftPeerPB::Role_Name(master.role()));
       }
+    } else if (boost::iequals(column, "member_type")) {
+      for (const auto& master : masters) {
+        values.emplace_back(RaftPeerPB::MemberType_Name(master.member_type()));
+      }
     } else {
       return Status::InvalidArgument("unknown column (--columns)", column);
     }