You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ba...@apache.org on 2021/04/20 19:08:14 UTC

[kudu] branch master updated: [master][tool] KUDU-2181 Tool to orchestrate adding a master

This is an automated email from the ASF dual-hosted git repository.

bankim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new d5d8fad  [master][tool] KUDU-2181 Tool to orchestrate adding a master
d5d8fad is described below

commit d5d8fadadac750bd9a71c4bc928dc7de05168a9a
Author: Bankim Bhavsar <ba...@cloudera.com>
AuthorDate: Wed Mar 31 10:40:06 2021 -0700

    [master][tool] KUDU-2181 Tool to orchestrate adding a master
    
    This tool orchestrates the mechanism to add a master to an existing
    Kudu cluster using the changes made previously including the add master
    Raft change config.
    
    This tool must be run from the master being added.
    
    Outline of steps:
    - Bring up master from supplied flags
    - Invoke add master Raft change config RPC
    - Determine whether new master has caught up from WAL
      and ksck is healthy.
    - If yes, we are done else continue below
    - Delete system catalog on the new master
    - Copy system catalog from one of the existing masters
    - Verify ksck is healthy.
    - Shutdown the new master
    
    The tool now manages bringing up the new master process, copying
    system catalog and hence bunch of functionality from the
    dynamic_multi_master test is no longer needed and hence deleted.
    
    Using Subprocess we need to shutdown the new master and hence can't
    verify certain state which included contacting new master directly
    immediately after adding new master. Function
    VerifyClusterAfterMasterAddition() covers comprehensive verification
    steps so not missing out by removing those steps.
    
    This change also includes some refactoring in ExternalDaemon
    to fetch Kerberos related configuration to be used outside
    ExternalMiniCluster.
    
    Change-Id: I8f99cf2b3f1738c4127c7e7288beab61daf42e7b
    Reviewed-on: http://gerrit.cloudera.org:8080/17267
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/master/CMakeLists.txt                 |   2 +-
 src/kudu/master/dynamic_multi_master-test.cc   | 480 +++++++++++--------------
 src/kudu/mini-cluster/external_mini_cluster.cc |  51 ++-
 src/kudu/mini-cluster/external_mini_cluster.h  |  13 +-
 src/kudu/tools/kudu-tool-test.cc               |  11 +-
 src/kudu/tools/tool_action_common.cc           |  14 +
 src/kudu/tools/tool_action_common.h            |   4 +
 src/kudu/tools/tool_action_master.cc           | 381 ++++++++++++++++----
 src/kudu/tools/tool_test_util.cc               |  21 +-
 src/kudu/tools/tool_test_util.h                |   6 +-
 10 files changed, 610 insertions(+), 373 deletions(-)

diff --git a/src/kudu/master/CMakeLists.txt b/src/kudu/master/CMakeLists.txt
index 2706e47..dafeea6 100644
--- a/src/kudu/master/CMakeLists.txt
+++ b/src/kudu/master/CMakeLists.txt
@@ -108,7 +108,7 @@ SET_KUDU_TEST_LINK_LIBS(
 
 ADD_KUDU_TEST(auto_rebalancer-test)
 ADD_KUDU_TEST(catalog_manager-test)
-ADD_KUDU_TEST(dynamic_multi_master-test)
+ADD_KUDU_TEST(dynamic_multi_master-test NUM_SHARDS 4)
 ADD_KUDU_TEST(hms_notification_log_listener-test)
 ADD_KUDU_TEST(location_cache-test DATA_FILES ../scripts/first_argument.sh)
 ADD_KUDU_TEST(master_options-test)
diff --git a/src/kudu/master/dynamic_multi_master-test.cc b/src/kudu/master/dynamic_multi_master-test.cc
index 09a1d7b..9c3ac29 100644
--- a/src/kudu/master/dynamic_multi_master-test.cc
+++ b/src/kudu/master/dynamic_multi_master-test.cc
@@ -18,9 +18,10 @@
 #include <algorithm>
 #include <cstdint>
 #include <functional>
+#include <iterator>
+#include <map>
 #include <memory>
 #include <ostream>
-#include <set>
 #include <string>
 #include <tuple>
 #include <unordered_set>
@@ -28,6 +29,7 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -39,6 +41,8 @@
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
 #include "kudu/consensus/metadata.pb.h"
+#include "kudu/fs/dir_manager.h"
+#include "kudu/fs/fs_manager.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
@@ -54,6 +58,8 @@
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/tools/tool_test_util.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
@@ -64,6 +70,9 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+DECLARE_string(fs_wal_dir);
+DECLARE_string(fs_data_dirs);
+
 METRIC_DECLARE_histogram(log_gc_duration);
 METRIC_DECLARE_entity(tablet);
 
@@ -85,7 +94,7 @@ using kudu::consensus::LeaderStepDownResponsePB;
 using kudu::consensus::RaftConfigPB;
 using kudu::consensus::RaftPeerPB;
 using kudu::rpc::RpcController;
-using std::set;
+using std::map;
 using std::string;
 using std::tuple;
 using std::unique_ptr;
@@ -115,6 +124,8 @@ static Status CreateTable(ExternalMiniCluster* cluster,
 // Test class for testing addition/removal of masters to a Kudu cluster.
 class DynamicMultiMasterTest : public KuduTest {
  protected:
+  typedef map<string, string> EnvVars;
+
   void SetUpWithNumMasters(int num_masters) {
     // Initial number of masters in the cluster before adding a master.
     orig_num_masters_ = num_masters;
@@ -294,51 +305,38 @@ class DynamicMultiMasterTest : public KuduTest {
   void StartNewMaster(const vector<HostPort>& master_hps,
                       const HostPort& new_master_hp,
                       int new_master_idx,
-                      bool master_supports_change_config = true) {
+                      scoped_refptr<ExternalMaster>* new_master_out) {
     vector<string> master_addresses;
     master_addresses.reserve(master_hps.size());
     for (const auto& hp : master_hps) {
       master_addresses.emplace_back(hp.ToString());
     }
 
-    // Start with an existing master daemon's options, but modify them for use in a new master
-    ExternalDaemonOptions new_master_opts = cluster_->master(0)->opts();
-    const string new_master_id = Substitute("master-$0", new_master_idx);
-    new_master_opts.wal_dir = cluster_->GetWalPath(new_master_id);
-    new_master_opts.data_dirs = cluster_->GetDataPaths(new_master_id);
-    new_master_opts.log_dir = cluster_->GetLogPath(new_master_id);
-    new_master_opts.rpc_bind_address = new_master_hp;
+    ExternalDaemonOptions new_master_opts;
+    ASSERT_OK(BuildMasterOpts(new_master_idx, new_master_hp, &new_master_opts));
     auto& flags = new_master_opts.extra_flags;
     flags.insert(flags.end(),
                  {"--master_addresses=" + JoinStrings(master_addresses, ","),
                   "--master_address_add_new_master=" + new_master_hp.ToString()});
 
     LOG(INFO) << "Bringing up the new master at: " << new_master_hp.ToString();
-    new_master_.reset(new ExternalMaster(new_master_opts));
-    ASSERT_OK(new_master_->Start());
-    ASSERT_OK(new_master_->WaitForCatalogManager());
+    scoped_refptr<ExternalMaster> master = new ExternalMaster(new_master_opts);
+    ASSERT_OK(master->Start());
+    ASSERT_OK(master->WaitForCatalogManager());
 
     Sockaddr new_master_addr;
     ASSERT_OK(SockaddrFromHostPort(new_master_hp, &new_master_addr));
-    new_master_proxy_.reset(
-        new MasterServiceProxy(new_master_opts.messenger, new_master_addr, new_master_hp.host()));
-    {
-      GetMasterRegistrationRequestPB req;
-      GetMasterRegistrationResponsePB resp;
-      RpcController rpc;
+    MasterServiceProxy new_master_proxy(new_master_opts.messenger, new_master_addr,
+                                        new_master_hp.host());
 
-      ASSERT_OK(new_master_proxy_->GetMasterRegistration(req, &resp, &rpc));
-      ASSERT_FALSE(resp.has_error());
-      if (master_supports_change_config) {
-        ASSERT_EQ(RaftPeerPB::NON_VOTER, resp.member_type());
-        ASSERT_EQ(RaftPeerPB::LEARNER, resp.role());
-      } else {
-        // For a new master brought that doesn't support change config, it'll be started
-        // as a VOTER and become FOLLOWER if the other masters are reachable.
-        ASSERT_EQ(RaftPeerPB::VOTER, resp.member_type());
-        ASSERT_EQ(RaftPeerPB::FOLLOWER, resp.role());
-      }
-    }
+    GetMasterRegistrationRequestPB req;
+    GetMasterRegistrationResponsePB resp;
+    RpcController rpc;
+    ASSERT_OK(new_master_proxy.GetMasterRegistration(req, &resp, &rpc));
+    ASSERT_FALSE(resp.has_error());
+    ASSERT_EQ(RaftPeerPB::NON_VOTER, resp.member_type());
+    ASSERT_EQ(RaftPeerPB::LEARNER, resp.role());
+    *new_master_out = std::move(master);
   }
 
   // Fetch a follower (non-leader) master index from the cluster.
@@ -359,13 +357,44 @@ class DynamicMultiMasterTest : public KuduTest {
     return Status::OK();
   }
 
+  // Builds and returns ExternalDaemonOptions used to add master with address 'rpc_addr' and
+  // 'master_idx' which helps with naming master's directories.
+  // Output 'kerberos_env_vars' parameter must be supplied for a secure cluster i.e. when Kerberos
+  // is enabled and returns Kerberos related environment variables.
+  Status BuildMasterOpts(int master_idx, const HostPort& rpc_addr,
+                         ExternalDaemonOptions* master_opts,
+                         EnvVars* kerberos_env_vars = nullptr) {
+    // Start with an existing master daemon's options, but modify them for use in a new master.
+    const string new_master_id = Substitute("master-$0", master_idx);
+    ExternalDaemonOptions opts = cluster_->master(0)->opts();
+    opts.rpc_bind_address = rpc_addr;
+    opts.wal_dir = cluster_->GetWalPath(new_master_id);
+    opts.data_dirs = cluster_->GetDataPaths(new_master_id);
+    opts.log_dir = cluster_->GetLogPath(new_master_id);
+    if (opts_.enable_kerberos) {
+      CHECK(kerberos_env_vars);
+      vector<string> flags;
+      RETURN_NOT_OK(cluster::ExternalDaemon::CreateKerberosConfig(
+          cluster_->kdc(), opts_.principal, rpc_addr.host(), &flags, kerberos_env_vars));
+      // Inserting the Kerberos related flags at the end will override flags from the master
+      // which was used as a basis for the new master.
+      opts.extra_flags.insert(opts.extra_flags.end(), std::make_move_iterator(flags.begin()),
+                              std::make_move_iterator(flags.end()));
+    }
+    *master_opts = std::move(opts);
+    return Status::OK();
+  }
+
   // Adds the specified master to the cluster using the CLI tool.
-  // Unset 'master' can be used to indicate to not supply master address.
+  // Unset 'rpc_bind_address' in 'opts' can be used to indicate to not supply master address.
+  // Optional 'env_vars' can be used to set environment variables while running kudu tool.
   // Optional 'wait_secs' can be used to supply wait timeout to the master add CLI tool.
+  // Optional 'kudu_binary' can be used to supply the path to the kudu binary.
   // Returns generic RuntimeError() on failure with the actual error in the optional 'err'
   // output parameter.
-  Status AddMasterToClusterUsingCLITool(const HostPort& master, string* err = nullptr,
-                                        int wait_secs = 0) {
+  Status AddMasterToClusterUsingCLITool(const ExternalDaemonOptions& opts, string* err = nullptr,
+                                        EnvVars env_vars = {}, int wait_secs = 0,
+                                        const string& kudu_binary = "") {
     auto hps = cluster_->master_rpc_addrs();
     vector<string> addresses;
     addresses.reserve(hps.size());
@@ -374,19 +403,29 @@ class DynamicMultiMasterTest : public KuduTest {
     }
 
     vector<string> cmd = {"master", "add", JoinStrings(addresses, ",")};
-    if (master.Initialized()) {
-      cmd.emplace_back(master.ToString());
+    if (opts.rpc_bind_address.Initialized()) {
+      cmd.emplace_back(opts.rpc_bind_address.ToString());
     }
+
+    vector<string> new_master_flags = ExternalMaster::GetMasterFlags(opts);
+    cmd.insert(cmd.end(), std::make_move_iterator(new_master_flags.begin()),
+               std::make_move_iterator(new_master_flags.end()));
+    cmd.insert(cmd.end(), opts.extra_flags.begin(), opts.extra_flags.end());
+
     if (wait_secs != 0) {
       cmd.emplace_back("-wait_secs=" + std::to_string(wait_secs));
     }
-    RETURN_NOT_OK(tools::RunKuduTool(cmd, nullptr, err));
-    // master add CLI doesn't return an error if the master is already present.
-    // So don't try adding to the ExternalMiniCluster.
-    if (err != nullptr && err->find("Master already present") != string::npos)  {
-      return Status::OK();
+    if (!kudu_binary.empty()) {
+      cmd.emplace_back("-kudu_abs_path=" + kudu_binary);
+    }
+
+    RETURN_NOT_OK(env_util::CreateDirsRecursively(Env::Default(), opts.log_dir));
+    Status s = tools::RunKuduTool(cmd, nullptr, err, "", std::move(env_vars));
+    if (!s.ok() && err != nullptr) {
+      LOG(INFO) << "Add master stderr: " << *err;
     }
-    return cluster_->AddMaster(new_master_);
+    RETURN_NOT_OK(s);
+    return Status::OK();
   }
 
   // Removes the specified master from the cluster using the CLI tool.
@@ -414,20 +453,6 @@ class DynamicMultiMasterTest : public KuduTest {
     return cluster_->RemoveMaster(master_to_remove);
   }
 
-  // Verify one of the 'expected_roles' and 'expected_member_type' of the new master by
-  // making RPC to it directly.
-  void VerifyNewMasterDirectly(const set<RaftPeerPB::Role>& expected_roles,
-                               RaftPeerPB::MemberType expected_member_type) {
-    GetMasterRegistrationRequestPB req;
-    GetMasterRegistrationResponsePB resp;
-    RpcController rpc;
-
-    ASSERT_OK(new_master_proxy_->GetMasterRegistration(req, &resp, &rpc));
-    ASSERT_FALSE(resp.has_error());
-    ASSERT_TRUE(ContainsKey(expected_roles, resp.role()));
-    ASSERT_EQ(expected_member_type, resp.member_type());
-  }
-
   // Fetch consensus state of the leader master.
   void GetLeaderMasterConsensusState(RaftConfigPB* consensus_config) {
     int leader_master_idx;
@@ -452,22 +477,6 @@ class DynamicMultiMasterTest : public KuduTest {
                         cstate.pending_config() : cstate.committed_config();
   }
 
-  // Verify the newly added master is in FAILED_UNRECOVERABLE state and can't be caught up
-  // from WAL.
-  void VerifyNewMasterInFailedUnrecoverableState(int expected_num_masters) {
-    RaftConfigPB config;
-    NO_FATALS(GetLeaderMasterConsensusState(&config));
-    ASSERT_EQ(expected_num_masters, config.peers_size());
-    int num_new_masters_found = 0;
-    for (const auto& peer : config.peers()) {
-      if (peer.permanent_uuid() == new_master_->uuid()) {
-        ASSERT_EQ(HealthReportPB::FAILED_UNRECOVERABLE, peer.health_report().overall_health());
-        num_new_masters_found++;
-      }
-    }
-    ASSERT_EQ(1, num_new_masters_found);
-  }
-
   void VerifyDeadMasterInSpecifiedState(const string& dead_master_uuid,
                                         HealthReportPB::HealthStatus expected_state) {
     RaftConfigPB config;
@@ -522,6 +531,21 @@ class DynamicMultiMasterTest : public KuduTest {
     });
   }
 
+  // Fetch uuid of the specified 'fs_wal_dir' and 'fs_data_dirs'.
+  static Status GetFsUuid(const string& fs_wal_dir, const vector<string>& fs_data_dirs,
+                          string* uuid) {
+    google::FlagSaver saver;
+    FLAGS_fs_wal_dir = fs_wal_dir;
+    FLAGS_fs_data_dirs = JoinStrings(fs_data_dirs, ",");
+    FsManagerOpts fs_opts;
+    fs_opts.read_only = true;
+    fs_opts.update_instances = fs::UpdateInstanceBehavior::DONT_UPDATE;
+    FsManager fs_manager(Env::Default(), std::move(fs_opts));
+    RETURN_NOT_OK(fs_manager.PartialOpen());
+    *uuid = fs_manager.uuid();
+    return Status::OK();
+  }
+
   // Verification steps after the new master has been added successfully and it's promoted
   // as VOTER. The supplied 'master_hps' includes the new_master as well.
   void VerifyClusterAfterMasterAddition(const vector<HostPort>& master_hps,
@@ -535,13 +559,14 @@ class DynamicMultiMasterTest : public KuduTest {
     for (int i = 0; i < cluster_->num_masters(); i++) {
       master_uuids.emplace(cluster_->master(i)->uuid());
     }
-    master_uuids.emplace(new_master_->uuid());
+    string new_master_uuid;
+    ASSERT_OK(GetFsUuid(new_master_opts_.wal_dir, new_master_opts_.data_dirs, &new_master_uuid));
+    master_uuids.emplace(new_master_uuid);
 
     // Shutdown the cluster and the new master daemon process.
     // This allows ExternalMiniCluster to manage the newly added master and allows
     // client to connect to the new master if it's elected the leader.
     LOG(INFO) << "Shutting down the old cluster";
-    new_master_->Shutdown();
     cluster_.reset();
 
     LOG(INFO) << "Bringing up the migrated cluster";
@@ -576,7 +601,7 @@ class DynamicMultiMasterTest : public KuduTest {
     }
 
     // Transfer leadership to the new master.
-    NO_FATALS(TransferMasterLeadership(&migrated_cluster, new_master_->uuid()));
+    NO_FATALS(TransferMasterLeadership(&migrated_cluster, new_master_uuid));
 
     shared_ptr<KuduClient> client;
     ASSERT_OK(migrated_cluster.CreateClient(nullptr, &client));
@@ -694,61 +719,6 @@ class DynamicMultiMasterTest : public KuduTest {
     *dead_master_hp = master_to_recover_hp;
   }
 
-  // Helper function that verifies that the newly added master can't be caught up from WAL
-  // and remains as NON_VOTER.
-  void VerifyNewNonVoterMaster(const HostPort& new_master_hp,
-                               int expected_num_masters) {
-    // Newly added master will be added to the master Raft config but won't be caught up
-    // from the WAL and hence remain as a NON_VOTER.
-    ListMastersResponsePB list_resp;
-    NO_FATALS(RunListMasters(&list_resp));
-    ASSERT_EQ(expected_num_masters, list_resp.masters_size());
-
-    for (const auto& master : list_resp.masters()) {
-      ASSERT_EQ(1, master.registration().rpc_addresses_size());
-      auto hp = HostPortFromPB(master.registration().rpc_addresses(0));
-      if (hp == new_master_hp) {
-        ASSERT_EQ(RaftPeerPB::NON_VOTER, master.member_type());
-        ASSERT_TRUE(master.role() == RaftPeerPB::LEARNER);
-      }
-    }
-
-    // Double check by directly contacting the new master.
-    NO_FATALS(VerifyNewMasterDirectly({ RaftPeerPB::LEARNER }, RaftPeerPB::NON_VOTER));
-
-    // Verify new master is in FAILED_UNRECOVERABLE state.
-    // This health state update may take some round trips between the masters and
-    // hence wrapping it under ASSERT_EVENTUALLY.
-    ASSERT_EVENTUALLY([&] {
-      NO_FATALS(VerifyNewMasterInFailedUnrecoverableState(expected_num_masters));
-    });
-  }
-
-  // Helper function to copy system catalog from 'src_master_hp' master.
-  void CopySystemCatalog(const HostPort& src_master_hp) {
-    LOG(INFO) << "Shutting down the new master";
-    new_master_->Shutdown();
-
-    LOG(INFO) << "Deleting the system catalog";
-    // Delete sys catalog on local master
-    ASSERT_OK(tools::RunKuduTool({"local_replica", "delete",
-                                  master::SysCatalogTable::kSysCatalogTabletId,
-                                  "-fs_data_dirs=" + JoinStrings(new_master_->data_dirs(), ","),
-                                  "-fs_wal_dir=" + new_master_->wal_dir(),
-                                  "-clean_unsafe"}));
-
-    // Copy from remote system catalog from specified master
-    LOG(INFO) << "Copying from remote master: " << src_master_hp.ToString();
-    ASSERT_OK(tools::RunKuduTool({"local_replica", "copy_from_remote",
-                                  master::SysCatalogTable::kSysCatalogTabletId,
-                                  src_master_hp.ToString(),
-                                  "-fs_data_dirs=" + JoinStrings(new_master_->data_dirs(), ","),
-                                  "-fs_wal_dir=" + new_master_->wal_dir()}));
-
-    LOG(INFO) << "Restarting the new master";
-    ASSERT_OK(new_master_->Restart());
-  }
-
   // Tracks the current number of masters in the cluster
   int orig_num_masters_;
   ExternalMiniClusterOptions opts_;
@@ -758,26 +728,31 @@ class DynamicMultiMasterTest : public KuduTest {
   unique_ptr<Socket> reserved_socket_;
   Sockaddr reserved_addr_;
   HostPort reserved_hp_;
-  unique_ptr<MasterServiceProxy> new_master_proxy_;
-  scoped_refptr<ExternalMaster> new_master_;
+  ExternalDaemonOptions new_master_opts_;
 
   static const char* const kTableName;
 };
 
 const char* const DynamicMultiMasterTest::kTableName = "first_table";
 
-// Parameterized DynamicMultiMasterTest class that works with different initial number of masters.
+// Parameterized DynamicMultiMasterTest class that works with different initial number of masters
+// and secure/unsecure clusters.
 class ParameterizedAddMasterTest : public DynamicMultiMasterTest,
-                                   public ::testing::WithParamInterface<int> {
+                                   public ::testing::WithParamInterface<tuple<int, bool>> {
  public:
   void SetUp() override {
-    NO_FATALS(SetUpWithNumMasters(GetParam()));
+    NO_FATALS(SetUpWithNumMasters(std::get<0>(GetParam())));
+    opts_.enable_kerberos = std::get<1>(GetParam());
   }
 };
 
 INSTANTIATE_TEST_SUITE_P(, ParameterizedAddMasterTest,
-                         // Initial number of masters in the cluster before adding a new master
-                         ::testing::Values(1, 2));
+                           ::testing::Combine(
+                               // Initial number of masters in the cluster before adding a new
+                               // master
+                               ::testing::Values(1, 2),
+                               // Whether Kerberos is enabled
+                               ::testing::Bool()));
 
 // This test starts a cluster, creates a table and then adds a new master.
 // For a system catalog with little data, the new master can be caught up from WAL and
@@ -794,38 +769,40 @@ TEST_P(ParameterizedAddMasterTest, TestAddMasterCatchupFromWAL) {
 
   ASSERT_OK(CreateTable(cluster_.get(), kTableName));
 
-  // Bring up the new master and add to the cluster.
-  master_hps.emplace_back(reserved_hp_);
-  NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_));
-  NO_FATALS(VerifyVoterMasters(orig_num_masters_));
-  ASSERT_OK(AddMasterToClusterUsingCLITool(reserved_hp_, nullptr, 4));
-
-  // Newly added master will be caught up from WAL itself without requiring tablet copy
-  // since the system catalog is fresh with a single table.
-  // Catching up from WAL and promotion to VOTER will not be instant after adding the
-  // new master. Hence using ASSERT_EVENTUALLY.
-  ASSERT_EVENTUALLY([&] {
-    VerifyVoterMasters(orig_num_masters_ + 1);
-  });
-
-  // Double check by directly contacting the new master.
-  NO_FATALS(VerifyNewMasterDirectly(
-      { RaftPeerPB::FOLLOWER, RaftPeerPB::LEADER }, RaftPeerPB::VOTER));
+  // Add new master to the cluster.
+  {
+    string err;
+    EnvVars env_vars;
+    ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &new_master_opts_, &env_vars));
+    ASSERT_OK(AddMasterToClusterUsingCLITool(new_master_opts_, &err, std::move(env_vars),
+                                             4 /* wait_secs */, tools::GetKuduToolAbsolutePath()));
+    ASSERT_STR_CONTAINS(err, Substitute("Master $0 successfully caught up from WAL.",
+                                        new_master_opts_.rpc_bind_address.ToString()));
+  }
 
   // Adding the same master again should print a message but not throw an error.
   {
     string err;
-    ASSERT_OK(AddMasterToClusterUsingCLITool(reserved_hp_, &err));
+    ExternalDaemonOptions opts;
+    EnvVars env_vars;
+    ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts, &env_vars));
+    ASSERT_OK(AddMasterToClusterUsingCLITool(opts, &err, std::move(env_vars)));
     ASSERT_STR_CONTAINS(err, "Master already present");
   }
 
-  // Adding one of the former masters should print a message but not throw an error.
+  // Adding one of the running former masters will throw an error.
   {
     string err;
-    ASSERT_OK(AddMasterToClusterUsingCLITool(master_hps[0], &err));
-    ASSERT_STR_CONTAINS(err, "Master already present");
+    const auto& hp = master_hps[0];
+    ExternalDaemonOptions opts;
+    EnvVars env_vars;
+    ASSERT_OK(BuildMasterOpts(0, hp, &opts, &env_vars));
+    Status s = AddMasterToClusterUsingCLITool(opts, &err, std::move(env_vars));
+    ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+    ASSERT_STR_CONTAINS(err, Substitute("Master $0 already running", hp.ToString()));
   }
 
+  master_hps.emplace_back(reserved_hp_);
   NO_FATALS(VerifyClusterAfterMasterAddition(master_hps, orig_num_masters_ + 1));
 }
 
@@ -837,52 +814,46 @@ TEST_P(ParameterizedAddMasterTest, TestAddMasterSysCatalogCopy) {
   NO_FATALS(StartClusterWithSysCatalogGCed(&master_hps));
 
   ASSERT_OK(CreateTable(cluster_.get(), kTableName));
-  // Bring up the new master and add to the cluster.
-  master_hps.emplace_back(reserved_hp_);
-  NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_));
-  NO_FATALS(VerifyVoterMasters(orig_num_masters_));
-  string err;
-  ASSERT_OK(AddMasterToClusterUsingCLITool(reserved_hp_, &err));
-  ASSERT_STR_MATCHES(err, Substitute("New master $0 part of the Raft configuration.*"
-                                     "Please follow the next steps which includes system catalog "
-                                     "tablet copy", reserved_hp_.ToString()));
 
-  // Newly added master will be added to the master Raft config but won't be caught up
-  // from the WAL and hence remain as a NON_VOTER and transition to FAILED_UNRECOVERABLE state.
-  NO_FATALS(VerifyNewNonVoterMaster(reserved_hp_, orig_num_masters_ + 1));
+  // Add new master to the cluster.
+  {
+    string err;
+    EnvVars env_vars;
+    ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &new_master_opts_, &env_vars));
+    ASSERT_OK(AddMasterToClusterUsingCLITool(new_master_opts_, &err, std::move(env_vars),
+                                             4 /* wait_secs */));
+    ASSERT_STR_CONTAINS(err, Substitute("Master $0 could not be caught up from WAL.",
+                                        reserved_hp_.ToString()));
+    ASSERT_STR_CONTAINS(err, "Successfully copied system catalog and new master is healthy");
+  }
 
   // Adding the same master again should print a message but not throw an error.
   {
     string err;
-    ASSERT_OK(AddMasterToClusterUsingCLITool(reserved_hp_, &err));
+    ExternalDaemonOptions opts;
+    EnvVars env_vars;
+    ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts, &env_vars));
+    ASSERT_OK(AddMasterToClusterUsingCLITool(opts, &err, std::move(env_vars)));
     ASSERT_STR_CONTAINS(err, "Master already present");
   }
 
-  // Adding one of the former masters should print a message but not throw an error.
+  // Adding one of the running former masters will throw an error.
   {
     string err;
-    ASSERT_OK(AddMasterToClusterUsingCLITool(master_hps[0], &err));
-    ASSERT_STR_CONTAINS(err, "Master already present");
+    const auto& hp = master_hps[0];
+    ExternalDaemonOptions opts;
+    EnvVars env_vars;
+    ASSERT_OK(BuildMasterOpts(0, hp, &opts, &env_vars));
+    Status s = AddMasterToClusterUsingCLITool(opts, &err, std::move(env_vars));
+    ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+    ASSERT_STR_CONTAINS(err, Substitute("Master $0 already running", hp.ToString()));
   }
 
-  // Without system catalog copy, the new master will remain in the FAILED_UNRECOVERABLE state.
-  // So lets proceed with the tablet copy process for system catalog.
-  NO_FATALS(CopySystemCatalog(cluster_->master(0)->bound_rpc_hostport()));
-
-  // Wait for the new master to be up and running and the leader master to send status only Raft
-  // message to allow the new master to be considered caught up and promoted to be being a VOTER.
-  ASSERT_EVENTUALLY([&] {
-    VerifyVoterMasters(orig_num_masters_ + 1);
-  });
-
-  // Verify the same state from the new master directly.
-  // Double check by directly contacting the new master.
-  NO_FATALS(VerifyNewMasterDirectly(
-      { RaftPeerPB::FOLLOWER, RaftPeerPB::LEADER }, RaftPeerPB::VOTER));
-
+  master_hps.emplace_back(reserved_hp_);
   NO_FATALS(VerifyClusterAfterMasterAddition(master_hps, orig_num_masters_ + 1));
 }
 
+
 class ParameterizedRemoveMasterTest : public DynamicMultiMasterTest,
                                       public ::testing::WithParamInterface<tuple<int, bool>> {
  public:
@@ -1000,6 +971,7 @@ TEST_P(ParameterizedRemoveMasterTest, TestRemoveMaster) {
   NO_FATALS(mcv.CheckRowCount(kTableName, ClusterVerifier::EXACTLY, 0));
 }
 
+
 class ParameterizedRecoverMasterTest : public DynamicMultiMasterTest,
                                        public ::testing::WithParamInterface<int> {
  public:
@@ -1034,23 +1006,16 @@ TEST_P(ParameterizedRecoverMasterTest, TestRecoverDeadMasterCatchupfromWAL) {
   HostPort src_master_hp;
   NO_FATALS(FailAndRemoveFollowerMaster(master_hps, &master_to_recover_idx, &master_to_recover_hp,
                                         &src_master_hp));
-
-  // Add new master at the same HostPort
-  NO_FATALS(StartNewMaster(master_hps, master_to_recover_hp, master_to_recover_idx));
   NO_FATALS(VerifyVoterMasters(orig_num_masters_ - 1));
-  ASSERT_OK(AddMasterToClusterUsingCLITool(master_to_recover_hp));
-
-  // Newly added master will be caught up from WAL itself without requiring tablet copy
-  // since the system catalog is fresh with a single table.
-  // Catching up from WAL and promotion to VOTER will not be instant after adding the
-  // new master. Hence using ASSERT_EVENTUALLY.
-  ASSERT_EVENTUALLY([&] {
-    VerifyVoterMasters(orig_num_masters_);
-  });
 
-  // Double check by directly contacting the new master.
-  NO_FATALS(VerifyNewMasterDirectly(
-      { RaftPeerPB::FOLLOWER, RaftPeerPB::LEADER }, RaftPeerPB::VOTER));
+  // Add new master at the same HostPort
+  {
+    string err;
+    ASSERT_OK(BuildMasterOpts(master_to_recover_idx, master_to_recover_hp, &new_master_opts_));
+    ASSERT_OK(AddMasterToClusterUsingCLITool(new_master_opts_, &err));
+    ASSERT_STR_CONTAINS(err, Substitute("Master $0 successfully caught up from WAL.",
+                                        new_master_opts_.rpc_bind_address.ToString()));
+  }
 
   NO_FATALS(VerifyClusterAfterMasterAddition(master_hps, orig_num_masters_));
 }
@@ -1078,30 +1043,15 @@ TEST_P(ParameterizedRecoverMasterTest, TestRecoverDeadMasterSysCatalogCopy) {
   HostPort src_master_hp;
   NO_FATALS(FailAndRemoveFollowerMaster(master_hps, &master_to_recover_idx, &master_to_recover_hp,
                                         &src_master_hp));
+  NO_FATALS(VerifyVoterMasters(orig_num_masters_ - 1));
 
   // Add new master at the same HostPort
-  NO_FATALS(StartNewMaster(master_hps, master_to_recover_hp, master_to_recover_idx));
-  NO_FATALS(VerifyVoterMasters(orig_num_masters_ - 1));
   string err;
-  ASSERT_OK(AddMasterToClusterUsingCLITool(master_to_recover_hp, &err));
-  ASSERT_STR_MATCHES(err, Substitute("New master $0 part of the Raft configuration.*"
-                                     "Please follow the next steps which includes system catalog "
-                                     "tablet copy", master_to_recover_hp.ToString()));
-
-  // Verify the new master will remain as NON_VOTER and transition to FAILED_UNRECOVERABLE state.
-  NO_FATALS(VerifyNewNonVoterMaster(master_to_recover_hp, orig_num_masters_));
-
-  // Without system catalog copy, the new master will remain in the FAILED_UNRECOVERABLE state.
-  // So lets proceed with the tablet copy process for system catalog.
-  NO_FATALS(CopySystemCatalog(src_master_hp));
-
-  // Wait for the new master to be up and running and the leader master to send status only Raft
-  // message to allow the new master to be considered caught up and promoted to be being a VOTER.
-  ASSERT_EVENTUALLY([&] {
-    VerifyVoterMasters(orig_num_masters_);
-  });
-
-  VerifyNewMasterDirectly({ RaftPeerPB::FOLLOWER, RaftPeerPB::LEADER }, RaftPeerPB::VOTER);
+  ASSERT_OK(BuildMasterOpts(master_to_recover_idx, master_to_recover_hp, &new_master_opts_));
+  ASSERT_OK(AddMasterToClusterUsingCLITool(new_master_opts_, &err));
+  ASSERT_STR_CONTAINS(err, Substitute("Master $0 could not be caught up from WAL.",
+                                      master_to_recover_hp.ToString()));
+  ASSERT_STR_CONTAINS(err, "Successfully copied system catalog and new master is healthy");
 
   NO_FATALS(VerifyClusterAfterMasterAddition(master_hps, orig_num_masters_));
 }
@@ -1111,21 +1061,16 @@ TEST_P(ParameterizedRecoverMasterTest, TestRecoverDeadMasterSysCatalogCopy) {
 // expected to fail due to invalid Raft config.
 TEST_F(DynamicMultiMasterTest, TestAddMasterWithNoLastKnownAddr) {
   NO_FATALS(SetUpWithNumMasters(1));
-  NO_FATALS(
-      StartCluster({"--master_support_change_config"}, false /* supply_single_master_addr */));
-
-  // Verify that existing masters are running as VOTERs and collect their addresses to be used
-  // for starting the new master.
-  vector<HostPort> master_hps;
-  NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps));
+  NO_FATALS(StartCluster({"--master_support_change_config"}, false/* supply_single_master_addr */));
 
-  // Bring up the new master and add to the cluster.
-  master_hps.emplace_back(reserved_hp_);
-  NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_));
+  // Verify that existing masters are running as VOTERs.
   NO_FATALS(VerifyVoterMasters(orig_num_masters_));
 
+  // Add master to the cluster.
   string err;
-  Status actual = AddMasterToClusterUsingCLITool(reserved_hp_, &err);
+  ExternalDaemonOptions opts;
+  ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts));
+  Status actual = AddMasterToClusterUsingCLITool(opts, &err);
   ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString();
   ASSERT_STR_MATCHES(err, "'last_known_addr' field in single master Raft configuration not set. "
                           "Please restart master with --master_addresses flag");
@@ -1140,19 +1085,14 @@ TEST_F(DynamicMultiMasterTest, TestAddMasterFeatureFlagNotSpecified) {
   NO_FATALS(SetUpWithNumMasters(1));
   NO_FATALS(StartCluster({ "--master_support_change_config=false" }));
 
-  // Verify that existing masters are running as VOTERs and collect their addresses to be used
-  // for starting the new master.
-  vector<HostPort> master_hps;
-  NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps));
-
-  // Bring up the new master and add to the cluster.
-  master_hps.emplace_back(reserved_hp_);
-  NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_,
-                           false /* master_supports_change_config */));
+  // Verify that existing masters are running as VOTERs.
   NO_FATALS(VerifyVoterMasters(orig_num_masters_));
 
+  // Add master to the cluster.
   string err;
-  Status actual = AddMasterToClusterUsingCLITool(reserved_hp_, &err);
+  ExternalDaemonOptions opts;
+  ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts));
+  Status actual = AddMasterToClusterUsingCLITool(opts, &err);
   ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString();
   ASSERT_STR_MATCHES(err, "Cluster does not support AddMaster");
 
@@ -1208,7 +1148,8 @@ TEST_F(DynamicMultiMasterTest, TestAddMasterToNonLeader) {
 
   // Bring up the new master and add to the cluster.
   master_hps.emplace_back(reserved_hp_);
-  NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_));
+  scoped_refptr<ExternalMaster> master;
+  NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_, &master));
   NO_FATALS(VerifyVoterMasters(orig_num_masters_));
 
   // Verify sending add master request to a non-leader master returns NOT_THE_LEADER error.
@@ -1275,20 +1216,15 @@ TEST_F(DynamicMultiMasterTest, TestAddMasterMissingAndIncorrectAddress) {
   NO_FATALS(SetUpWithNumMasters(1));
   NO_FATALS(StartCluster({"--master_support_change_config"}));
 
-  // Verify that existing masters are running as VOTERs and collect their addresses to be used
-  // for starting the new master.
-  vector<HostPort> master_hps;
-  NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps));
-
-  // Bring up the new master and add to the cluster.
-  master_hps.emplace_back(reserved_hp_);
-  NO_FATALS(StartNewMaster(master_hps, reserved_hp_, orig_num_masters_));
+  // Verify that existing masters are running as VOTERs.
   NO_FATALS(VerifyVoterMasters(orig_num_masters_));
 
   // Empty HostPort
   {
     string err;
-    Status actual = AddMasterToClusterUsingCLITool(HostPort(), &err);
+    ExternalDaemonOptions opts;
+    ASSERT_OK(BuildMasterOpts(orig_num_masters_, HostPort(), &opts));
+    Status actual = AddMasterToClusterUsingCLITool(opts, &err);
     ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString();
     ASSERT_STR_CONTAINS(err, "must provide positional argument master_address");
   }
@@ -1296,15 +1232,16 @@ TEST_F(DynamicMultiMasterTest, TestAddMasterMissingAndIncorrectAddress) {
   // Non-routable incorrect hostname.
   {
     string err;
-    Status actual = AddMasterToClusterUsingCLITool(
-        HostPort("non-existent-path.local", Master::kDefaultPort), &err);
+    ExternalDaemonOptions opts;
+    ASSERT_OK(BuildMasterOpts(orig_num_masters_,
+                              HostPort("non-existent-path.local", Master::kDefaultPort), &opts));
+    Status actual = AddMasterToClusterUsingCLITool(opts, &err);
     ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString();
-    ASSERT_STR_CONTAINS(err,
-                        "Network error: unable to resolve address for non-existent-path.local");
+    ASSERT_STR_CONTAINS(err, "unable to resolve address for non-existent-path.local");
   }
 
   // Verify no change in number of masters.
-  NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps));
+  NO_FATALS(VerifyVoterMasters(orig_num_masters_));
 }
 
 // Test that attempts to remove a master with missing master address and a non-existent
@@ -1369,6 +1306,27 @@ TEST_F(DynamicMultiMasterTest, TestRemoveMasterMismatchHostnameAndUuid) {
   NO_FATALS(VerifyVoterMasters(orig_num_masters_, &master_hps));
 }
 
+// Test that attempts to add a master with non-existent kudu executable path.
+TEST_F(DynamicMultiMasterTest, TestAddMasterIncorrectKuduBinary) {
+  NO_FATALS(SetUpWithNumMasters(1));
+  NO_FATALS(StartCluster({ "--master_support_change_config" }));
+
+  // Verify that existing masters are running as VOTERs.
+  NO_FATALS(VerifyVoterMasters(orig_num_masters_));
+
+  // Add master to the cluster.
+  string err;
+  string kudu_abs_path = "/tmp/path/to/nowhere";
+  ExternalDaemonOptions opts;
+  ASSERT_OK(BuildMasterOpts(orig_num_masters_, reserved_hp_, &opts));
+  Status actual = AddMasterToClusterUsingCLITool(opts, &err, {} /* env_vars */, 4, kudu_abs_path);
+  ASSERT_TRUE(actual.IsRuntimeError()) << actual.ToString();
+  ASSERT_STR_CONTAINS(err, Substitute("kudu binary not found at $0", kudu_abs_path));
+
+  // Verify no change in number of masters.
+  NO_FATALS(VerifyVoterMasters(orig_num_masters_));
+}
+
 // Test that attempts removing a leader master itself from a cluster with
 // 1 or 2 masters.
 class ParameterizedRemoveLeaderMasterTest : public DynamicMultiMasterTest,
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
index bca138b..74e56a5 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.cc
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -90,6 +90,7 @@ using kudu::tserver::ListTabletsResponsePB;
 using kudu::tserver::TabletServerAdminServiceProxy;
 using kudu::tserver::TabletServerServiceProxy;
 using std::copy;
+using std::map;
 using std::pair;
 using std::string;
 using std::unique_ptr;
@@ -949,20 +950,6 @@ string ExternalMiniCluster::UuidForTS(int ts_idx) const {
   return tablet_server(ts_idx)->uuid();
 }
 
-Status ExternalMiniCluster::AddMaster(const scoped_refptr<ExternalMaster>& new_master) {
-  const auto& new_master_uuid = new_master->instance_id().permanent_uuid();
-  for (const auto& m : masters_) {
-    if (m->instance_id().permanent_uuid() == new_master_uuid) {
-      CHECK(m->bound_rpc_hostport() == new_master->bound_rpc_hostport());
-      return Status::AlreadyPresent(Substitute(
-          "Master $0, uuid: $1 already present in ExternalMiniCluster",
-          m->bound_rpc_hostport().ToString(), new_master_uuid));
-    }
-  }
-  masters_.emplace_back(new_master);
-  return Status::OK();
-}
-
 Status ExternalMiniCluster::RemoveMaster(const HostPort& hp) {
   for (auto it = masters_.begin(); it != masters_.end(); ++it) {
     if ((*it)->bound_rpc_hostport() == hp) {
@@ -1165,25 +1152,35 @@ void ExternalDaemon::SetMetastoreIntegration(const string& hms_uris,
   opts_.extra_flags.emplace_back(Substitute("--hive_metastore_sasl_enabled=$0", enable_kerberos));
 }
 
-Status ExternalDaemon::EnableKerberos(MiniKdc* kdc,
-                                      const string& principal,
-                                      const string& bind_host) {
-  string spn = principal + "/" + bind_host;
+Status ExternalDaemon::CreateKerberosConfig(MiniKdc* kdc,
+                                            const string& principal_base,
+                                            const string& bind_host,
+                                            vector<string>* flags,
+                                            map<string, string>* env_vars) {
+  string spn = principal_base + "/" + bind_host;
   string ktpath;
   RETURN_NOT_OK_PREPEND(kdc->CreateServiceKeytab(spn, &ktpath),
                         "could not create keytab");
-  extra_env_ = kdc->GetEnvVars();
+  *env_vars = kdc->GetEnvVars();
+  *flags =  {
+      Substitute("--keytab_file=$0", ktpath),
+      Substitute("--principal=$0", spn),
+      "--rpc_authentication=required",
+      "--superuser_acl=test-admin",
+      "--user_acl=test-user",
+  };
 
+  return Status::OK();
+}
+
+Status ExternalDaemon::EnableKerberos(MiniKdc* kdc, const string& principal_base,
+                                      const string& bind_host) {
+  vector<string> flags;
+  RETURN_NOT_OK(CreateKerberosConfig(kdc, principal_base, bind_host, &flags, &extra_env_));
   // Insert Kerberos flags at the front of extra_flags, so that user specified
   // flags will override them.
-  opts_.extra_flags.insert(opts_.extra_flags.begin(), {
-    Substitute("--keytab_file=$0", ktpath),
-    Substitute("--principal=$0", spn),
-    "--rpc_authentication=required",
-    "--superuser_acl=test-admin",
-    "--user_acl=test-user",
-  });
-
+  opts_.extra_flags.insert(opts_.extra_flags.begin(), std::make_move_iterator(flags.begin()),
+                           std::make_move_iterator(flags.end()));
   return Status::OK();
 }
 
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
index 0823384..f4bd5fc 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -480,10 +480,6 @@ class ExternalMiniCluster : public MiniCluster {
   // files that reside in the log dir.
   std::string GetLogPath(const std::string& daemon_id) const;
 
-  // Adds a master to the ExternalMiniCluster when the new master has been added
-  // dynamically after bringing up the ExternalMiniCluster.
-  Status AddMaster(const scoped_refptr<ExternalMaster>& master);
-
   // Removes any bookkeeping of the master specified by 'hp' from the ExternalMiniCluster
   // after already having run through a successful master Raft change config to remove it.
   // This helps keep the state of the actual cluster in sync with the state in ExternalMiniCluster.
@@ -566,6 +562,15 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
   void SetMetastoreIntegration(const std::string& hms_uris,
                                bool enable_kerberos);
 
+  // Create a Kerberos principal and keytab using the 'principal_base' and 'bind_host' hostname
+  // of the form <principal_base>/<bind_host>.
+  // Returns the generated keytab file and service principal as 'flags' and the appropriate
+  // environment variables in 'env_vars' output parameters.
+  static Status CreateKerberosConfig(MiniKdc* kdc, const std::string& principal_base,
+                                     const std::string& bind_host,
+                                     std::vector<std::string>* flags,
+                                     std::map<std::string, std::string>* env_vars);
+
   // Enable Kerberos for this daemon. This creates a Kerberos principal
   // and keytab, and sets the appropriate environment variables in the
   // subprocess such that the server will use Kerberos authentication.
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 326a4f4..c475511 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -54,11 +54,11 @@
 #include "kudu/common/common.pb.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/partition.h"
+#include "kudu/common/row_operations.pb.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/types.h"
 #include "kudu/common/wire_protocol-test-util.h"
 #include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/log.h"
 #include "kudu/consensus/log_util.h"
@@ -1107,8 +1107,8 @@ TEST_F(ToolTest, TestModeHelp) {
         "status.*Get the status",
         "timestamp.*Get the current timestamp",
         "list.*List masters in a Kudu cluster",
-        "add.*Add a master to the Raft configuration",
-        "remove.*Remove a master from the Raft configuration"
+        "add.*Add a master to the Kudu cluster",
+        "remove.*Remove a master from the Kudu cluster"
     };
     NO_FATALS(RunTestHelp(kCmd, kMasterModeRegexes));
     NO_FATALS(RunTestHelpRpcFlags(kCmd,
@@ -1130,9 +1130,10 @@ TEST_F(ToolTest, TestModeHelp) {
   }
   {
     NO_FATALS(RunTestHelp("master add --help",
-                          {"-wait_secs \\(Timeout in seconds to wait for the newly added master"}));
+                          {"-wait_secs.*\\(Timeout in seconds to wait while retrying operations",
+                           "-kudu_abs_path.*\\(Absolute file path of the 'kudu' executable used"}));
     NO_FATALS(RunTestHelp("master remove --help",
-                          {"-master_uuid \\(Permanent UUID of the master"}));
+                          {"-master_uuid.*\\(Permanent UUID of the master"}));
   }
   {
     const vector<string> kPbcModeRegexes = {
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index 626b737..4d9fdfd 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -767,6 +767,20 @@ Status DumpMemTrackers(const string& address, uint16_t default_port) {
   return Status::OK();
 }
 
+Status GetKuduToolAbsolutePathSafe(string* path) {
+  static const char* const kKuduCtlFileName = "kudu";
+  string exe;
+  RETURN_NOT_OK(Env::Default()->GetExecutablePath(&exe));
+  const string binroot = DirName(exe);
+  string tool_abs_path = JoinPathSegments(binroot, kKuduCtlFileName);
+  if (!Env::Default()->FileExists(tool_abs_path)) {
+    return Status::NotFound(Substitute(
+        "$0 binary not found at $1", kKuduCtlFileName, tool_abs_path));
+  }
+  *path = std::move(tool_abs_path);
+  return Status::OK();
+}
+
 namespace {
 
 // Pretty print a table using the psql format. For example:
diff --git a/src/kudu/tools/tool_action_common.h b/src/kudu/tools/tool_action_common.h
index c97ea4e..45f7330 100644
--- a/src/kudu/tools/tool_action_common.h
+++ b/src/kudu/tools/tool_action_common.h
@@ -216,6 +216,10 @@ Status ParseMasterAddresses(
     const RunnerContext& context,
     std::vector<std::string>* master_addresses);
 
+// Get full path to the top-level 'kudu' tool binary in the output 'path' parameter.
+// Returns appropriate error if the binary is not found.
+Status GetKuduToolAbsolutePathSafe(std::string* path);
+
 // Parses a comma separated list of "host:port" pairs into an unordered set of
 // HostPort objects. If no port is specified for an entry in the comma separated list,
 // the default master port is used for that entry's pair.
diff --git a/src/kudu/tools/tool_action_master.cc b/src/kudu/tools/tool_action_master.cc
index 7826fa5..4b61e4f 100644
--- a/src/kudu/tools/tool_action_master.cc
+++ b/src/kudu/tools/tool_action_master.cc
@@ -16,7 +16,9 @@
 // under the License.
 
 #include <algorithm>
+#include <csignal>
 #include <cstdint>
+#include <fstream> // IWYU pragma: keep
 #include <functional>
 #include <iostream>
 #include <iterator>
@@ -45,24 +47,37 @@
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
 #include "kudu/master/master_runner.h"
+#include "kudu/master/sys_catalog.h"
 #include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/tools/ksck.h"
+#include "kudu/tools/ksck_remote.h"
 #include "kudu/tools/tool_action.h"
 #include "kudu/tools/tool_action_common.h"
+#include "kudu/util/env.h"
+#include "kudu/util/flags.h"
 #include "kudu/util/init.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/net_util.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/string_case.h"
+#include "kudu/util/subprocess.h"
 
 DECLARE_bool(force);
+DECLARE_int64(negotiation_timeout_ms);
 DECLARE_int64(timeout_ms);
 DECLARE_string(columns);
+DECLARE_string(fs_wal_dir);
+DECLARE_string(fs_data_dirs);
 
 DEFINE_string(master_uuid, "", "Permanent UUID of the master. Only needed to disambiguate in case "
                                "of multiple masters with same RPC address");
 DEFINE_int64(wait_secs, 8,
-             "Timeout in seconds to wait for the newly added master to be promoted as VOTER.");
+             "Timeout in seconds to wait while retrying operations like bringing up new master, "
+             "running ksck, waiting for the new master to be promoted as VOTER, etc.");
+DEFINE_string(kudu_abs_path, "", "Absolute file path of the 'kudu' executable used to bring up "
+                                 "new master and other workflow steps.");
 
 using kudu::master::AddMasterRequestPB;
 using kudu::master::AddMasterResponsePB;
@@ -127,49 +142,90 @@ Status MasterTimestamp(const RunnerContext& context) {
   return PrintServerTimestamp(address, Master::kDefaultPort);
 }
 
-Status AddMasterChangeConfig(const RunnerContext& context) {
-  const string& new_master_address = FindOrDie(context.required_args, kMasterAddressArg);
+// Bring up new master at the specified HostPort 'hp' with user-supplied 'flags' using kudu
+// executable 'kudu_abs_path'.
+// 'master_addresses' is list of masters in existing cluster including the new
+// master.
+Status BringUpNewMaster(const string& kudu_abs_path,
+                        vector<string> flags,
+                        const HostPort& hp,
+                        const vector<string>& master_addresses,
+                        unique_ptr<Subprocess>* new_master_out) {
+  // Ensure the new master is not already running at the specified RPC address.
+  unique_ptr<MasterServiceProxy> proxy;
+  RETURN_NOT_OK_PREPEND(BuildProxy(hp.ToString(), Master::kDefaultPort, &proxy),
+                        "Failed building proxy for new master");
+
+  auto is_catalog_mngr_running = [](MasterServiceProxy* master_proxy) {
+    master::GetMasterRegistrationRequestPB req;
+    master::GetMasterRegistrationResponsePB resp;
+    RpcController rpc;
+    Status s = master_proxy->GetMasterRegistration(req, &resp, &rpc);
+    return s.ok() && !resp.has_error();
+  };
 
-  LeaderMasterProxy proxy;
-  RETURN_NOT_OK(proxy.Init(context));
+  if (is_catalog_mngr_running(proxy.get())) {
+    return Status::IllegalState(Substitute("Master $0 already running", hp.ToString()));
+  }
 
-  HostPort hp;
-  RETURN_NOT_OK(hp.ParseString(new_master_address, Master::kDefaultPort));
-  {
-    AddMasterRequestPB req;
-    AddMasterResponsePB resp;
-    *req.mutable_rpc_addr() = HostPortToPB(hp);
-
-    Status s = proxy.SyncRpc<AddMasterRequestPB, AddMasterResponsePB>(
-        req, &resp, "AddMaster", &MasterServiceProxy::AddMasterAsync,
-        {master::MasterFeatures::DYNAMIC_MULTI_MASTER});
-    // It's possible this is a retry request in which case instead of returning
-    // the master is already present in the Raft config error we make further checks
-    // whether the master has been promoted to a VOTER.
-    bool master_already_present =
-        resp.has_error() && resp.error().code() == master::MasterErrorPB::MASTER_ALREADY_PRESENT;
-    if (!s.ok() && !master_already_present) {
-      return s;
+  flags.emplace_back("--master_addresses=" + JoinStrings(master_addresses, ","));
+  flags.emplace_back("--master_address_add_new_master=" + hp.ToString());
+  flags.emplace_back("--master_support_change_config");
+  vector<string> argv = { kudu_abs_path, "master", "run" };
+  argv.insert(argv.end(), std::make_move_iterator(flags.begin()),
+              std::make_move_iterator(flags.end()));
+  auto new_master = std::make_unique<Subprocess>(argv);
+  RETURN_NOT_OK_PREPEND(new_master->Start(), "Failed starting new master");
+  auto stop_master = MakeScopedCleanup([&] {
+    WARN_NOT_OK(new_master->KillAndWait(SIGKILL), "Failed stopping new master");
+  });
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(FLAGS_wait_secs);
+  do {
+    Status wait_status = new_master->WaitNoBlock();
+    if (!wait_status.IsTimedOut()) {
+      return Status::RuntimeError("Failed to bring up new master");
     }
-    if (master_already_present) {
-      LOG(INFO) << "Master already present. Checking for promotion to VOTER...";
+    if (is_catalog_mngr_running(proxy.get())) {
+      stop_master.cancel();
+      *new_master_out = std::move(new_master);
+      return Status::OK();
     }
-  }
+    SleepFor(MonoDelta::FromMilliseconds(100));
+  } while (MonoTime::Now() < deadline);
 
-  // If the system catalog of the new master can be caught up from the WAL then the new master will
-  // be promoted to a VOTER and become a FOLLOWER. However this can take some time, so we'll
-  // try for a few seconds. It's perfectly normal for the new master to be not caught up from
-  // the WAL in which case subsequent steps of system catalog tablet copy need to be carried out
-  // as outlined in the documentation for adding a new master to Kudu cluster.
+  return Status::TimedOut("Timed out waiting for the new master to come up");
+}
+
+// Check health and consensus status of masters in ksck.
+Status CheckMastersHealthy(const vector<string>& master_addresses) {
+  std::shared_ptr<KsckCluster> cluster;
+  RETURN_NOT_OK(RemoteKsckCluster::Build(master_addresses, &cluster));
+
+  // Print to an unopened ofstream to discard ksck output.
+  // See https://stackoverflow.com/questions/8243743.
+  std::ofstream null_stream;
+  Ksck ksck(cluster, &null_stream);
+  RETURN_NOT_OK(ksck.CheckMasterHealth());
+  RETURN_NOT_OK(ksck.CheckMasterConsensus());
+  return Status::OK();
+}
+
+// Check new master 'new_master_hp' is promoted to a VOTER and all masters are healthy.
+// Returns the last Raft role and member_type in output parameters 'master_role' and 'master_type'
+// respectively.
+Status CheckMasterVoterAndHealthy(LeaderMasterProxy* proxy,
+                                  const vector<string>& master_addresses,
+                                  const HostPort& new_master_hp,
+                                  RaftPeerPB::Role* master_role,
+                                  RaftPeerPB::MemberType* master_type) {
+  *master_role = RaftPeerPB::UNKNOWN_ROLE;
+  *master_type = RaftPeerPB::UNKNOWN_MEMBER_TYPE;
   MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(FLAGS_wait_secs);
-  RaftPeerPB::Role master_role = RaftPeerPB::UNKNOWN_ROLE;
-  RaftPeerPB::MemberType master_type = RaftPeerPB::UNKNOWN_MEMBER_TYPE;
   do {
     ListMastersRequestPB req;
     ListMastersResponsePB resp;
-    RETURN_NOT_OK((proxy.SyncRpc<ListMastersRequestPB, ListMastersResponsePB>(
-                   req, &resp, "ListMasters", &MasterServiceProxy::ListMastersAsync)));
-
+    RETURN_NOT_OK((proxy->SyncRpc<ListMastersRequestPB, ListMastersResponsePB>(
+        req, &resp, "ListMasters", &MasterServiceProxy::ListMastersAsync)));
     if (resp.has_error()) {
       return StatusFromPB(resp.error().status());
     }
@@ -184,7 +240,7 @@ Status AddMasterChangeConfig(const RunnerContext& context) {
         continue;
       }
       for (const auto& master_hp : master.registration().rpc_addresses()) {
-        if (hp == HostPortFromPB(master_hp)) {
+        if (new_master_hp == HostPortFromPB(master_hp)) {
           // Found the newly added master
           new_master_found = true;
           break;
@@ -196,29 +252,212 @@ Status AddMasterChangeConfig(const RunnerContext& context) {
     }
     if (!new_master_found) {
       return Status::NotFound(Substitute("New master $0 not found. Retry adding the master",
-                                         hp.ToString()));
+                                         new_master_hp.ToString()));
     }
     CHECK_LT(i, resp.masters_size());
     const auto& master = resp.masters(i);
-    master_role = master.role();
-    master_type = master.member_type();
-    if (master_type == RaftPeerPB::VOTER &&
-        (master_role == RaftPeerPB::FOLLOWER || master_role == RaftPeerPB::LEADER)) {
-      LOG(INFO) << Substitute("Successfully added master $0 to the cluster. Please follow the "
-                              "next steps which includes updating master addresses, updating "
-                              "client configuration etc. from the Kudu administration "
-                              "documentation on \"Migrating to Multiple Kudu Masters\".",
-                              hp.ToString());
-      return Status::OK();
+    *master_role = master.role();
+    *master_type = master.member_type();
+    if (*master_type == RaftPeerPB::VOTER &&
+        (*master_role == RaftPeerPB::FOLLOWER || *master_role == RaftPeerPB::LEADER)) {
+      // Check the master ksck state as well.
+      if (CheckMastersHealthy(master_addresses).ok()) {
+        return Status::OK();
+      }
     }
     SleepFor(MonoDelta::FromMilliseconds(100));
   } while (MonoTime::Now() < deadline);
 
-  LOG(INFO) << Substitute("New master $0 part of the Raft configuration; role: $1, member_type: "
-                          "$2. Please follow the next steps which includes system catalog tablet "
-                          "copy, updating master addresses etc. from the Kudu administration "
-                          "documentation on \"Migrating to Multiple Kudu Masters\".",
-                          hp.ToString(), master_role, master_type);
+  return Status::TimedOut("Timed out waiting for master to catch up from WAL");
+}
+
+// Deletes local system catalog on 'dst_master' and copies system catalog from one of the masters
+// in 'master_addresses' which includes the dst_master using the kudu executable 'kudu_abs_path'.
+Status CopyRemoteSystemCatalog(const string& kudu_abs_path,
+                               const HostPort& dst_master,
+                               const vector<string>& master_addresses) {
+  // Find source master to copy system catalog from.
+  string src_master;
+  const auto& dst_master_str = dst_master.ToString();
+  for (const auto& addr : master_addresses) {
+    if (addr != dst_master_str) {
+      src_master = addr;
+      break;
+    }
+  }
+  if (src_master.empty()) {
+    return Status::RuntimeError("Failed to find source master to copy system catalog");
+  }
+
+  LOG(INFO) << Substitute("Deleting system catalog on $0", dst_master_str);
+  RETURN_NOT_OK_PREPEND(
+      Subprocess::Call(
+          { kudu_abs_path, "local_replica", "delete", master::SysCatalogTable::kSysCatalogTabletId,
+            "--fs_wal_dir=" + FLAGS_fs_wal_dir, "--fs_data_dirs=" + FLAGS_fs_data_dirs,
+            "-clean_unsafe" }),
+      "Failed to delete system catalog");
+
+  LOG(INFO) << Substitute("Copying system catalog from master $0", src_master);
+  RETURN_NOT_OK_PREPEND(
+      Subprocess::Call(
+      { kudu_abs_path, "local_replica", "copy_from_remote",
+        master::SysCatalogTable::kSysCatalogTabletId, src_master,
+        "--fs_wal_dir=" + FLAGS_fs_wal_dir, "--fs_data_dirs=" + FLAGS_fs_data_dirs }),
+      "Failed to copy system catalog");
+
+  return Status::OK();
+}
+
+// Invoke the add master Raft change config RPC to add the supplied master 'hp'.
+// If the master is already present then Status::OK is returned and a log message is printed.
+Status AddMasterRaftConfig(const RunnerContext& context,
+                           const HostPort& hp) {
+  LeaderMasterProxy proxy;
+  RETURN_NOT_OK(proxy.Init(context));
+  AddMasterRequestPB req;
+  AddMasterResponsePB resp;
+  *req.mutable_rpc_addr() = HostPortToPB(hp);
+
+  Status s = proxy.SyncRpc<AddMasterRequestPB, AddMasterResponsePB>(
+      req, &resp, "AddMaster", &MasterServiceProxy::AddMasterAsync,
+      {master::MasterFeatures::DYNAMIC_MULTI_MASTER});
+  // It's possible this is a retry request in which case instead of returning
+  // the master is already present in the Raft config error we make further checks
+  // whether the master has been promoted to a VOTER.
+  bool master_already_present =
+      resp.has_error() && resp.error().code() == master::MasterErrorPB::MASTER_ALREADY_PRESENT;
+  if (!s.ok() && !master_already_present) {
+    return s;
+  }
+  if (master_already_present) {
+    LOG(INFO) << "Master already present.";
+  }
+  LOG(INFO) << Substitute("Successfully added master $0 to the Raft configuration.",
+                          hp.ToString());
+  return Status::OK();
+}
+
+Status AddMaster(const RunnerContext& context) {
+  static const char* const kPostSuccessMsg =
+      "Please follow the next steps which includes updating master addresses, updating client "
+      "configuration etc. from the Kudu administration documentation on "
+      "\"Migrating to Multiple Kudu Masters\".";
+  static const char* const kFailedStopMsg =
+      "Failed to stop new master after successfully adding to the cluster. Stop the master before "
+      "proceeding further.";
+
+  // Parse input arguments.
+  vector<string> master_addresses;
+  RETURN_NOT_OK(ParseMasterAddresses(context, &master_addresses));
+  const string& new_master_address = FindOrDie(context.required_args, kMasterAddressArg);
+  HostPort hp;
+  RETURN_NOT_OK(hp.ParseString(new_master_address, Master::kDefaultPort));
+
+  // Check for the 'kudu' executable if the user has supplied one.
+  if (!FLAGS_kudu_abs_path.empty() && !Env::Default()->FileExists(FLAGS_kudu_abs_path)) {
+    return Status::NotFound(Substitute("kudu binary not found at $0", FLAGS_kudu_abs_path));
+  }
+  string kudu_abs_path = FLAGS_kudu_abs_path;
+  if (kudu_abs_path.empty()) {
+    RETURN_NOT_OK(GetKuduToolAbsolutePathSafe(&kudu_abs_path));
+  }
+
+  // Get the flags that'll be needed to bring up new master and for system catalog copy.
+  if (FLAGS_fs_wal_dir.empty()) {
+    return Status::InvalidArgument("Flag -fs_wal_dir not supplied");
+  }
+  if (FLAGS_fs_data_dirs.empty()) {
+    return Status::InvalidArgument("Flag -fs_data_dirs not supplied");
+  }
+  GFlagsMap flags_map = GetFlagsMap();
+  // Remove the optional parameters for this command.
+  // Remaining optional flags need to be passed to the new master.
+  flags_map.erase("wait_secs");
+  flags_map.erase("kudu_abs_path");
+
+  // Bring up the new master.
+  vector<string> new_master_flags;
+  new_master_flags.reserve(flags_map.size());
+  for (const auto& name_flag_pair : flags_map)  {
+    const auto& flag = name_flag_pair.second;
+    new_master_flags.emplace_back(Substitute("--$0=$1", flag.name, flag.current_value));
+  }
+
+  // Bring up the new master that includes master addresses of the cluster and itself.
+  // It's possible this is a retry in which case the new master is already part of
+  // master_addresses.
+  if (std::find(master_addresses.begin(), master_addresses.end(), hp.ToString()) ==
+      master_addresses.end()) {
+    master_addresses.emplace_back(hp.ToString());
+  }
+
+  unique_ptr<Subprocess> new_master;
+  RETURN_NOT_OK_PREPEND(BringUpNewMaster(kudu_abs_path, new_master_flags, hp, master_addresses,
+                                         &new_master),
+                        "Failed bringing up new master. See Kudu Master log for details.");
+  auto stop_master = MakeScopedCleanup([&] {
+    auto* master_ptr = new_master.get();
+    if (master_ptr != nullptr) {
+      WARN_NOT_OK(master_ptr->KillAndWait(SIGKILL), "Failed stopping new master");
+    }
+  });
+  // Call the add master Raft change config RPC.
+  RETURN_NOT_OK(AddMasterRaftConfig(context, hp));
+
+  // If the system catalog of the new master can be caught up from the WAL then the new master will
+  // transition to FAILED_UNRECOVERABLE state. However this can take some time, so we'll
+  // try for a few seconds. It's perfectly normal for the new master to be not caught up from
+  // the WAL in which case subsequent steps of system catalog tablet copy need to be carried out.
+  RaftPeerPB::Role master_role;
+  RaftPeerPB::MemberType master_type;
+  LeaderMasterProxy proxy;
+  // Since new master is now part of the Raft configuration, use updated 'master_addresses'.
+  RETURN_NOT_OK(proxy.Init(
+      master_addresses, MonoDelta::FromMilliseconds(FLAGS_timeout_ms),
+      MonoDelta::FromMilliseconds(FLAGS_negotiation_timeout_ms)));
+
+  LOG(INFO) << "Checking for master consensus and health status...";
+  Status wal_catchup_status = CheckMasterVoterAndHealthy(&proxy, master_addresses, hp,
+                                                         &master_role, &master_type);
+  if (wal_catchup_status.ok()) {
+    stop_master.cancel();
+    RETURN_NOT_OK_PREPEND(new_master->KillAndWait(SIGTERM), kFailedStopMsg);
+    LOG(INFO) << Substitute("Master $0 successfully caught up from WAL.", hp.ToString());
+    LOG(INFO) << kPostSuccessMsg;
+    return Status::OK();
+  }
+  if (!wal_catchup_status.IsTimedOut()) {
+    RETURN_NOT_OK_PREPEND(wal_catchup_status,
+                          "Unexpected error waiting for master to catchup from WAL.");
+    return wal_catchup_status;
+  }
+
+  // New master could not be caught up from WAL, so next we'll attempt copying
+  // system catalog.
+  LOG(INFO) << Substitute("Master $0 status; role: $1, member_type: $2",
+                          hp.ToString(), RaftPeerPB::Role_Name(master_role),
+                          RaftPeerPB::MemberType_Name(master_type));
+  LOG(INFO) << Substitute("Master $0 could not be caught up from WAL.", hp.ToString());
+
+  RETURN_NOT_OK_PREPEND(new_master->KillAndWait(SIGTERM),
+                        "Unable to stop master to proceed with system catalog copy");
+  new_master.reset();
+
+  RETURN_NOT_OK(CopyRemoteSystemCatalog(kudu_abs_path, hp, master_addresses));
+
+  RETURN_NOT_OK_PREPEND(BringUpNewMaster(kudu_abs_path, new_master_flags, hp, master_addresses,
+                                         &new_master),
+                        "Failed bringing up new master after system catalog copy. "
+                        "See Kudu Master log for details.");
+
+  RETURN_NOT_OK_PREPEND(CheckMasterVoterAndHealthy(&proxy, master_addresses, hp, &master_role,
+                                                   &master_type),
+                        "Failed waiting for new master to be healthy after system catalog copy");
+
+  stop_master.cancel();
+  RETURN_NOT_OK_PREPEND(new_master->KillAndWait(SIGTERM), kFailedStopMsg);
+  LOG(INFO) << "Successfully copied system catalog and new master is healthy.";
+  LOG(INFO) << kPostSuccessMsg;
   return Status::OK();
 }
 
@@ -560,24 +799,44 @@ unique_ptr<Mode> BuildMasterMode() {
   }
   {
     unique_ptr<Action> add_master =
-        ActionBuilder("add", &AddMasterChangeConfig)
-        .Description("Add a master to the Raft configuration of the Kudu cluster. "
-                     "Please refer to the Kudu administration documentation on "
-                     "\"Migrating to Multiple Kudu Masters\" for the complete steps.")
+        ActionBuilder("add", &AddMaster)
+        .Description("Add a master to the Kudu cluster")
+        .ExtraDescription(
+            "This is an advanced command that orchestrates the workflow to bring up and add a "
+            "new master to the Kudu cluster. It must be run locally on the new master being added "
+            "and not on the leader master. This tool shuts down the new master after completion "
+            "of the command regardless of whether the new master addition is successful. "
+            "After the command completes successfully, user is expected to explicitly "
+            "start the new master using the same flags as supplied to this tool.\n\n"
+            "Supply all the necessary flags to bring up the new master. "
+            "The most common configuration flags used to bring up a master are described "
+            "below. See \"Kudu Configuration Reference\" for all the configuration options that "
+            "apply to a Kudu master.\n\n"
+            "Please refer to the Kudu administration documentation on "
+            "\"Migrating to Multiple Kudu Masters\" for the complete steps.")
         .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
         .AddRequiredParameter({ kMasterAddressArg, kMasterAddressDesc })
         .AddOptionalParameter("wait_secs")
+        .AddOptionalParameter("kudu_abs_path")
+        .AddOptionalParameter("fs_wal_dir")
+        .AddOptionalParameter("fs_data_dirs")
+        .AddOptionalParameter("fs_metadata_dir")
+        .AddOptionalParameter("log_dir")
+        // Unlike most tools we don't log to stderr by default to match the
+        // kudu-master binary as closely as possible.
+        .AddOptionalParameter("logtostderr", string("false"))
         .Build();
     builder.AddAction(std::move(add_master));
   }
   {
     unique_ptr<Action> remove_master =
         ActionBuilder("remove", &RemoveMasterChangeConfig)
-        .Description("Remove a master from the Raft configuration of the Kudu cluster. "
-                     "Please refer to the Kudu administration documentation on "
-                     "\"Removing Kudu Masters from a Multi-Master Deployment\" or "
-                     "\"Recovering from a dead Kudu Master in a Multi-Master Deployment\" "
-                     "as appropriate.")
+        .Description("Remove a master from the Kudu cluster")
+        .ExtraDescription(
+            "Removes a master from the Raft configuration of the Kudu cluster.\n\n"
+            "Please refer to the Kudu administration documentation on "
+            "\"Removing Kudu Masters from a Multi-Master Deployment\" or "
+            "\"Recovering from a dead Kudu Master in a Multi-Master Deployment\" as appropriate.")
         .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
         .AddRequiredParameter({ kMasterAddressArg, kMasterAddressDesc })
         .AddOptionalParameter("master_uuid")
diff --git a/src/kudu/tools/tool_test_util.cc b/src/kudu/tools/tool_test_util.cc
index 22abba5..6d4f117 100644
--- a/src/kudu/tools/tool_test_util.cc
+++ b/src/kudu/tools/tool_test_util.cc
@@ -20,18 +20,18 @@
 #include "kudu/tools/tool_test_util.h"
 
 #include <cstdio>
-#include <ostream>
+#include <utility>
 #include <vector>
 
 #include <glog/logging.h>
 
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/env.h"
-#include "kudu/util/path_util.h"
+#include "kudu/tools/tool_action_common.h"
 #include "kudu/util/status.h"
 #include "kudu/util/subprocess.h"
 
+using std::map;
 using std::string;
 using std::vector;
 using strings::Split;
@@ -41,18 +41,13 @@ namespace kudu {
 namespace tools {
 
 string GetKuduToolAbsolutePath() {
-  static const string kKuduCtlFileName = "kudu";
-  string exe;
-  CHECK_OK(Env::Default()->GetExecutablePath(&exe));
-  const string binroot = DirName(exe);
-  const string tool_abs_path = JoinPathSegments(binroot, kKuduCtlFileName);
-  CHECK(Env::Default()->FileExists(tool_abs_path))
-      << kKuduCtlFileName << " binary not found at " << tool_abs_path;
-  return tool_abs_path;
+  string path;
+  CHECK_OK(GetKuduToolAbsolutePathSafe(&path));
+  return path;
 }
 
 Status RunKuduTool(const vector<string>& args, string* out, string* err,
-                   const std::string& in) {
+                   const string& in, map<string, string> env_vars) {
   vector<string> total_args = { GetKuduToolAbsolutePath() };
 
   // Some scenarios might add unsafe flags for testing purposes.
@@ -74,7 +69,7 @@ Status RunKuduTool(const vector<string>& args, string* out, string* err,
   total_args.emplace_back("--openssl_security_level_override=0");
 
   total_args.insert(total_args.end(), args.begin(), args.end());
-  return Subprocess::Call(total_args, in, out, err);
+  return Subprocess::Call(total_args, in, out, err, std::move(env_vars));
 }
 
 Status RunActionPrependStdoutStderr(const string& arg_str) {
diff --git a/src/kudu/tools/tool_test_util.h b/src/kudu/tools/tool_test_util.h
index b511c60..7aa41e3 100644
--- a/src/kudu/tools/tool_test_util.h
+++ b/src/kudu/tools/tool_test_util.h
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <map>
 #include <string>
 #include <vector>
 
@@ -39,10 +40,13 @@ std::string GetKuduToolAbsolutePath();
 //
 // If 'out' or 'err' is set, the tool's stdout or stderr output will be
 // written to each respectively.
+// Optionally allows a passed map of environment variables to be set
+// on the kudu tool via 'env_vars'.
 Status RunKuduTool(const std::vector<std::string>& args,
                    std::string* out = nullptr,
                    std::string* err = nullptr,
-                   const std::string& in = "");
+                   const std::string& in = "",
+                   std::map<std::string, std::string> env_vars = {});
 
 // Runs the 'kudu' tool binary with the given argument string, returning an
 // error prepended with stdout and stderr if the run was unsuccessful.