You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/10/02 19:25:22 UTC

[5/6] kudu git commit: mini-cluster: new module for the mini cluster implementations
diff --git a/src/kudu/integration-tests/ b/src/kudu/integration-tests/
deleted file mode 100644
index da7e81f..0000000
--- a/src/kudu/integration-tests/
+++ /dev/null
@@ -1,1280 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "kudu/integration-tests/external_mini_cluster.h"
-#include <algorithm>
-#include <csignal>
-#include <cstdint>
-#include <cstdlib>
-#include <functional>
-#include <memory>
-#include <string>
-#include <unordered_set>
-#include <utility>
-#include <gflags/gflags.h>
-#include <gtest/gtest.h>
-#include <rapidjson/document.h>
-#include "kudu/client/client.h"
-#include "kudu/client/master_rpc.h"
-#include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol.pb.h"
-#include "kudu/gutil/basictypes.h"
-#include "kudu/gutil/strings/join.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/strings/util.h"
-#include "kudu/master/master.pb.h"
-#include "kudu/master/master.proxy.h"
-#include "kudu/rpc/messenger.h"
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/security/test/mini_kdc.h"
-#include "kudu/server/server_base.pb.h"
-#include "kudu/server/server_base.proxy.h"
-#include "kudu/tablet/metadata.pb.h"
-#include "kudu/tablet/tablet.pb.h"
-#include "kudu/tserver/tserver.pb.h"
-#include "kudu/tserver/tserver_service.proxy.h"
-#include "kudu/util/async_util.h"
-#include "kudu/util/curl_util.h"
-#include "kudu/util/env.h"
-#include "kudu/util/env_util.h"
-#include "kudu/util/faststring.h"
-#include "kudu/util/fault_injection.h"
-#include "kudu/util/jsonreader.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/net/sockaddr.h"
-#include "kudu/util/path_util.h"
-#include "kudu/util/pb_util.h"
-#include "kudu/util/status.h"
-#include "kudu/util/stopwatch.h"
-#include "kudu/util/subprocess.h"
-#include "kudu/util/test_util.h"
-using kudu::client::internal::ConnectToClusterRpc;
-using kudu::master::ListTablesRequestPB;
-using kudu::master::ListTablesResponsePB;
-using kudu::master::MasterServiceProxy;
-using kudu::pb_util::SecureDebugString;
-using kudu::pb_util::SecureShortDebugString;
-using kudu::rpc::RpcController;
-using kudu::server::ServerStatusPB;
-using kudu::tserver::ListTabletsRequestPB;
-using kudu::tserver::ListTabletsResponsePB;
-using kudu::tserver::TabletServerServiceProxy;
-using rapidjson::Value;
-using std::pair;
-using std::string;
-using std::unique_ptr;
-using std::unordered_set;
-using std::vector;
-using strings::Substitute;
-typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
-DEFINE_bool(perf_record, false,
-            "Whether to run \"perf record --call-graph fp\" on each daemon in the cluster");
-namespace kudu {
-static const char* const kMasterBinaryName = "kudu-master";
-static const char* const kTabletServerBinaryName = "kudu-tserver";
-static double kTabletServerRegistrationTimeoutSeconds = 15.0;
-static double kMasterCatalogManagerTimeoutSeconds = 60.0;
-    : num_masters(1),
-      num_tablet_servers(1),
-      bind_mode(MiniCluster::kDefaultBindMode),
-      num_data_dirs(1),
-      enable_kerberos(false),
-      logtostderr(true),
-      start_process_timeout(MonoDelta::FromSeconds(30)) {
-  : opts_(ExternalMiniClusterOptions()) {
-ExternalMiniCluster::ExternalMiniCluster(ExternalMiniClusterOptions opts)
-  : opts_(std::move(opts)) {
-ExternalMiniCluster::~ExternalMiniCluster() {
-  Shutdown();
-Status ExternalMiniCluster::DeduceBinRoot(std::string* ret) {
-  string exe;
-  RETURN_NOT_OK(Env::Default()->GetExecutablePath(&exe));
-  *ret = DirName(exe);
-  return Status::OK();
-Status ExternalMiniCluster::HandleOptions() {
-  daemon_bin_path_ = opts_.daemon_bin_path;
-  if (daemon_bin_path_.empty()) {
-    RETURN_NOT_OK(DeduceBinRoot(&daemon_bin_path_));
-  }
-  data_root_ = opts_.data_root;
-  if (data_root_.empty()) {
-    // If they don't specify a data root, use the current gtest directory.
-    data_root_ = JoinPathSegments(GetTestDataDirectory(), "minicluster-data");
-  }
-  return Status::OK();
-Status ExternalMiniCluster::Start() {
-  CHECK(masters_.empty()) << "Masters are not empty (size: " << masters_.size()
-      << "). Maybe you meant Restart()?";
-  CHECK(tablet_servers_.empty()) << "Tablet servers are not empty (size: "
-      << tablet_servers_.size() << "). Maybe you meant Restart()?";
-  RETURN_NOT_OK(HandleOptions());
-  RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
-                        .set_num_reactors(1)
-                        .set_max_negotiation_threads(1)
-                        .Build(&messenger_),
-                        "Failed to start Messenger for minicluster");
-  Status s = Env::Default()->CreateDir(data_root_);
-  if (!s.ok() && !s.IsAlreadyPresent()) {
-    RETURN_NOT_OK_PREPEND(s, "Could not create root dir " + data_root_);
-  }
-  if (opts_.enable_kerberos) {
-    kdc_.reset(new MiniKdc(opts_.mini_kdc_options));
-    RETURN_NOT_OK(kdc_->Start());
-    RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-admin"),
-                          "could not create admin principal");
-    RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-user"),
-                          "could not create user principal");
-    RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("joe-interloper"),
-                          "could not create unauthorized principal");
-    RETURN_NOT_OK_PREPEND(kdc_->Kinit("test-admin"),
-                          "could not kinit as admin");
-    RETURN_NOT_OK_PREPEND(kdc_->SetKrb5Environment(),
-                          "could not set krb5 client env");
-  }
-  if (opts_.num_masters != 1) {
-    RETURN_NOT_OK_PREPEND(StartDistributedMasters(),
-                          "Failed to add distributed masters");
-  } else {
-    RETURN_NOT_OK_PREPEND(StartSingleMaster(),
-                          Substitute("Failed to start a single Master"));
-  }
-  for (int i = 1; i <= opts_.num_tablet_servers; i++) {
-    RETURN_NOT_OK_PREPEND(AddTabletServer(),
-                          Substitute("Failed starting tablet server $0", i));
-  }
-  RETURN_NOT_OK(WaitForTabletServerCount(
-                  opts_.num_tablet_servers,
-                  MonoDelta::FromSeconds(kTabletServerRegistrationTimeoutSeconds)));
-  return Status::OK();
-void ExternalMiniCluster::ShutdownNodes(ClusterNodes nodes) {
-  if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::TS_ONLY) {
-    for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
-      ts->Shutdown();
-    }
-  }
-  if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::MASTERS_ONLY) {
-    for (const scoped_refptr<ExternalMaster>& master : masters_) {
-      if (master) {
-        master->Shutdown();
-      }
-    }
-  }
-Status ExternalMiniCluster::Restart() {
-  for (const scoped_refptr<ExternalMaster>& master : masters_) {
-    if (master && master->IsShutdown()) {
-      RETURN_NOT_OK_PREPEND(master->Restart(), "Cannot restart master bound at: " +
-                                               master->bound_rpc_hostport().ToString());
-    }
-  }
-  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
-    if (ts->IsShutdown()) {
-      RETURN_NOT_OK_PREPEND(ts->Restart(), "Cannot restart tablet server bound at: " +
-                                           ts->bound_rpc_hostport().ToString());
-    }
-  }
-  RETURN_NOT_OK(WaitForTabletServerCount(
-      tablet_servers_.size(),
-      MonoDelta::FromSeconds(kTabletServerRegistrationTimeoutSeconds)));
-  return Status::OK();
-void ExternalMiniCluster::SetDaemonBinPath(string daemon_bin_path) {
-  daemon_bin_path_ = std::move(daemon_bin_path);
-  for (auto& master : masters_) {
-    master->SetExePath(GetBinaryPath(kMasterBinaryName));
-  }
-  for (auto& ts : tablet_servers_) {
-    ts->SetExePath(GetBinaryPath(kTabletServerBinaryName));
-  }
-string ExternalMiniCluster::GetBinaryPath(const string& binary) const {
-  CHECK(!daemon_bin_path_.empty());
-  return JoinPathSegments(daemon_bin_path_, binary);
-string ExternalMiniCluster::GetLogPath(const string& daemon_id) const {
-  CHECK(!data_root_.empty());
-  return JoinPathSegments(JoinPathSegments(data_root_, daemon_id), "logs");
-string ExternalMiniCluster::GetDataPath(const string& daemon_id,
-                                        boost::optional<uint32_t> dir_index) const {
-  CHECK(!data_root_.empty());
-  string data_path = "data";
-  if (dir_index) {
-    CHECK_LT(*dir_index, opts_.num_data_dirs);
-    data_path = Substitute("$0-$1", data_path, dir_index.get());
-  } else {
-    CHECK_EQ(1, opts_.num_data_dirs);
-  }
-  return JoinPathSegments(JoinPathSegments(data_root_, daemon_id), data_path);
-vector<string> ExternalMiniCluster::GetDataPaths(const string& daemon_id) const {
-  if (opts_.num_data_dirs == 1) {
-    return { GetDataPath(daemon_id) };
-  }
-  vector<string> paths;
-  for (uint32_t dir_index = 0; dir_index < opts_.num_data_dirs; dir_index++) {
-    paths.emplace_back(GetDataPath(daemon_id, dir_index));
-  }
-  return paths;
-string ExternalMiniCluster::GetWalPath(const string& daemon_id) const {
-  CHECK(!data_root_.empty());
-  return JoinPathSegments(JoinPathSegments(data_root_, daemon_id), "wal");
-namespace {
-vector<string> SubstituteInFlags(const vector<string>& orig_flags,
-                                 int index) {
-  string str_index = strings::Substitute("$0", index);
-  vector<string> ret;
-  for (const string& orig : orig_flags) {
-    ret.push_back(StringReplace(orig, "${index}", str_index, true));
-  }
-  return ret;
-} // anonymous namespace
-Status ExternalMiniCluster::StartSingleMaster() {
-  string daemon_id = "master-0";
-  ExternalDaemonOptions opts(opts_.logtostderr);
-  opts.messenger = messenger_;
-  opts.exe = GetBinaryPath(kMasterBinaryName);
-  opts.wal_dir = GetWalPath(daemon_id);
-  opts.data_dirs = GetDataPaths(daemon_id);
-  opts.log_dir = GetLogPath(daemon_id);
-  if (FLAGS_perf_record) {
-    opts.perf_record_filename =
-        Substitute("$0/perf-$", opts.log_dir, daemon_id);
-  }
-  opts.extra_flags = SubstituteInFlags(opts_.extra_master_flags, 0);
-  opts.start_process_timeout = opts_.start_process_timeout;
-  opts.rpc_bind_address = HostPort(GetBindIpForMaster(0), 0);
-  scoped_refptr<ExternalMaster> master = new ExternalMaster(opts);
-  if (opts_.enable_kerberos) {
-    // The bind host here is the hostname that will be used to generate the
-    // Kerberos principal, so it has to match the bind address for the master
-    // rpc endpoint.
-    RETURN_NOT_OK_PREPEND(master->EnableKerberos(kdc_.get(),,
-                          "could not enable Kerberos");
-  }
-  RETURN_NOT_OK(master->Start());
-  masters_.push_back(master);
-  return Status::OK();
-Status ExternalMiniCluster::StartDistributedMasters() {
-  int num_masters = opts_.num_masters;
-  if (opts_.master_rpc_ports.size() != num_masters) {
-    LOG(FATAL) << num_masters << " masters requested, but only " <<
-        opts_.master_rpc_ports.size() << " ports specified in 'master_rpc_ports'";
-  }
-  vector<HostPort> peer_hostports = master_rpc_addrs();
-  vector<string> flags = opts_.extra_master_flags;
-  flags.push_back(Substitute("--master_addresses=$0",
-                             HostPort::ToCommaSeparatedString(peer_hostports)));
-  string exe = GetBinaryPath(kMasterBinaryName);
-  // Start the masters.
-  for (int i = 0; i < num_masters; i++) {
-    string daemon_id = Substitute("master-$0", i);
-    ExternalDaemonOptions opts(opts_.logtostderr);
-    opts.messenger = messenger_;
-    opts.exe = exe;
-    opts.wal_dir = GetWalPath(daemon_id);
-    opts.data_dirs = GetDataPaths(daemon_id);
-    opts.log_dir = GetLogPath(daemon_id);
-    if (FLAGS_perf_record) {
-      opts.perf_record_filename =
-          Substitute("$0/perf-$", opts.log_dir, daemon_id);
-    }
-    opts.extra_flags = SubstituteInFlags(flags, i);
-    opts.start_process_timeout = opts_.start_process_timeout;
-    opts.rpc_bind_address = peer_hostports[i];
-    scoped_refptr<ExternalMaster> peer = new ExternalMaster(opts);
-    if (opts_.enable_kerberos) {
-      RETURN_NOT_OK_PREPEND(peer->EnableKerberos(kdc_.get(), peer_hostports[i].host()),
-                            "could not enable Kerberos");
-    }
-    RETURN_NOT_OK_PREPEND(peer->Start(),
-                          Substitute("Unable to start Master at index $0", i));
-    masters_.push_back(peer);
-  }
-  return Status::OK();
-string ExternalMiniCluster::GetBindIpForTabletServer(int index) const {
-  return MiniCluster::GetBindIpForDaemon(MiniCluster::TSERVER, index, opts_.bind_mode);
-string ExternalMiniCluster::GetBindIpForMaster(int index) const {
-  return MiniCluster::GetBindIpForDaemon(MiniCluster::MASTER, index, opts_.bind_mode);
-Status ExternalMiniCluster::AddTabletServer() {
-  CHECK(leader_master() != nullptr)
-      << "Must have started at least 1 master before adding tablet servers";
-  int idx = tablet_servers_.size();
-  string daemon_id = Substitute("ts-$0", idx);
-  vector<HostPort> master_hostports;
-  for (int i = 0; i < num_masters(); i++) {
-    master_hostports.push_back(DCHECK_NOTNULL(master(i))->bound_rpc_hostport());
-  }
-  string bind_host = GetBindIpForTabletServer(idx);
-  ExternalDaemonOptions opts(opts_.logtostderr);
-  opts.messenger = messenger_;
-  opts.exe = GetBinaryPath(kTabletServerBinaryName);
-  opts.wal_dir = GetWalPath(daemon_id);
-  opts.data_dirs = GetDataPaths(daemon_id);
-  opts.log_dir = GetLogPath(daemon_id);
-  if (FLAGS_perf_record) {
-    opts.perf_record_filename =
-        Substitute("$0/perf-$", opts.log_dir, daemon_id);
-  }
-  opts.extra_flags = SubstituteInFlags(opts_.extra_tserver_flags, idx);
-  opts.start_process_timeout = opts_.start_process_timeout;
-  opts.rpc_bind_address = HostPort(bind_host, 0);
-  scoped_refptr<ExternalTabletServer> ts =
-      new ExternalTabletServer(opts, master_hostports);
-  if (opts_.enable_kerberos) {
-    RETURN_NOT_OK_PREPEND(ts->EnableKerberos(kdc_.get(), bind_host),
-                          "could not enable Kerberos");
-  }
-  RETURN_NOT_OK(ts->Start());
-  tablet_servers_.push_back(ts);
-  return Status::OK();
-Status ExternalMiniCluster::WaitForTabletServerCount(int count, const MonoDelta& timeout) {
-  MonoTime deadline = MonoTime::Now() + timeout;
-  unordered_set<int> masters_to_search;
-  for (int i = 0; i < masters_.size(); i++) {
-    if (!masters_[i]->IsShutdown()) {
-      masters_to_search.insert(i);
-    }
-  }
-  while (true) {
-    MonoDelta remaining = deadline - MonoTime::Now();
-    if (remaining.ToSeconds() < 0) {
-      return Status::TimedOut(Substitute(
-          "Timed out waiting for $0 TS(s) to register with all masters", count));
-    }
-    for (auto iter = masters_to_search.begin(); iter != masters_to_search.end();) {
-      master::ListTabletServersRequestPB req;
-      master::ListTabletServersResponsePB resp;
-      rpc::RpcController rpc;
-      rpc.set_timeout(remaining);
-      RETURN_NOT_OK_PREPEND(master_proxy(*iter)->ListTabletServers(req, &resp, &rpc),
-                            "ListTabletServers RPC failed");
-      // ListTabletServers() may return servers that are no longer online.
-      // Do a second step of verification to verify that the descs that we got
-      // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
-      int match_count = 0;
-      for (const master::ListTabletServersResponsePB_Entry& e : resp.servers()) {
-        for (const scoped_refptr<ExternalTabletServer>& ets : tablet_servers_) {
-          if (ets->instance_id().permanent_uuid() == e.instance_id().permanent_uuid() &&
-              ets->instance_id().instance_seqno() == e.instance_id().instance_seqno()) {
-            match_count++;
-            break;
-          }
-        }
-      }
-      if (match_count == count) {
-        // This master has returned the correct set of tservers.
-        iter = masters_to_search.erase(iter);
-      } else {
-        iter++;
-      }
-    }
-    if (masters_to_search.empty()) {
-      // All masters have returned the correct set of tservers.
-      LOG(INFO) << count << " TS(s) registered with all masters";
-      return Status::OK();
-    }
-    SleepFor(MonoDelta::FromMilliseconds(1));
-  }
-void ExternalMiniCluster::AssertNoCrashes() {
-  vector<ExternalDaemon*> daemons = this->daemons();
-  int num_crashes = 0;
-  for (ExternalDaemon* d : daemons) {
-    if (d->IsShutdown()) continue;
-    if (!d->IsProcessAlive()) {
-      LOG(ERROR) << "Process with UUID " << d->uuid() << " has crashed";
-      num_crashes++;
-    }
-  }
-  ASSERT_EQ(0, num_crashes) << "At least one process crashed";
-Status ExternalMiniCluster::WaitForTabletsRunning(ExternalTabletServer* ts,
-                                                  int min_tablet_count,
-                                                  const MonoDelta& timeout) {
-  TabletServerServiceProxy proxy(messenger_, ts->bound_rpc_addr(), ts->bound_rpc_addr().host());
-  ListTabletsRequestPB req;
-  ListTabletsResponsePB resp;
-  MonoTime deadline = MonoTime::Now() + timeout;
-  while (MonoTime::Now() < deadline) {
-    rpc::RpcController rpc;
-    rpc.set_timeout(MonoDelta::FromSeconds(10));
-    RETURN_NOT_OK(proxy.ListTablets(req, &resp, &rpc));
-    if (resp.has_error()) {
-      return StatusFromPB(resp.error().status());
-    }
-    bool all_running = true;
-    for (const StatusAndSchemaPB& status : resp.status_and_schema()) {
-      if (status.tablet_status().state() != tablet::RUNNING) {
-        all_running = false;
-      }
-    }
-    // We're done if:
-    // 1. All the tablets are running, and
-    // 2. We've observed as many tablets as we had expected or more.
-    if (all_running && resp.status_and_schema_size() >= min_tablet_count) {
-      return Status::OK();
-    }
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-  return Status::TimedOut(SecureDebugString(resp));
-Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) {
-  scoped_refptr<ConnectToClusterRpc> rpc;
-  Synchronizer sync;
-  vector<pair<Sockaddr, string>> addrs_with_names;
-  Sockaddr leader_master_addr;
-  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
-  for (const scoped_refptr<ExternalMaster>& master : masters_) {
-    addrs_with_names.emplace_back(master->bound_rpc_addr(), master->bound_rpc_addr().host());
-  }
-  const auto& cb = [&](const Status& status,
-                       const pair<Sockaddr, string>& leader_master,
-                       const master::ConnectToMasterResponsePB& resp) {
-    if (status.ok()) {
-      leader_master_addr = leader_master.first;
-    }
-    sync.StatusCB(status);
-  };
-  rpc.reset(new ConnectToClusterRpc(cb,
-                                    std::move(addrs_with_names),
-                                    deadline,
-                                    MonoDelta::FromSeconds(5),
-                                    messenger_));
-  rpc->SendRpc();
-  RETURN_NOT_OK(sync.Wait());
-  bool found = false;
-  for (int i = 0; i < masters_.size(); i++) {
-    if (masters_[i]->bound_rpc_hostport().port() == leader_master_addr.port()) {
-      found = true;
-      *idx = i;
-      break;
-    }
-  }
-  if (!found) {
-    // There is never a situation where this should happen, so it's
-    // better to exit with a FATAL log message right away vs. return a
-    // Status::IllegalState().
-    LOG(FATAL) << "Leader master is not in masters_";
-  }
-  return Status::OK();
-ExternalTabletServer* ExternalMiniCluster::tablet_server_by_uuid(const std::string& uuid) const {
-  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
-    if (ts->instance_id().permanent_uuid() == uuid) {
-      return ts.get();
-    }
-  }
-  return nullptr;
-int ExternalMiniCluster::tablet_server_index_by_uuid(const std::string& uuid) const {
-  for (int i = 0; i < tablet_servers_.size(); i++) {
-    if (tablet_servers_[i]->uuid() == uuid) {
-      return i;
-    }
-  }
-  return -1;
-vector<ExternalDaemon*> ExternalMiniCluster::daemons() const {
-  vector<ExternalDaemon*> results;
-  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
-    results.push_back(ts.get());
-  }
-  for (const scoped_refptr<ExternalMaster>& master : masters_) {
-    results.push_back(master.get());
-  }
-  return results;
-vector<HostPort> ExternalMiniCluster::master_rpc_addrs() const {
-  vector<HostPort> master_rpc_addrs;
-  for (int i = 0; i < opts_.master_rpc_ports.size(); i++) {
-    master_rpc_addrs.emplace_back(
-        GetBindIpForDaemon(MiniCluster::MASTER, i, opts_.bind_mode),
-        opts_.master_rpc_ports[i]);
-  }
-  return master_rpc_addrs;
-std::shared_ptr<rpc::Messenger> ExternalMiniCluster::messenger() const {
-  return messenger_;
-std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy() const {
-  CHECK_EQ(masters_.size(), 1);
-  return master_proxy(0);
-std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy(int idx) const {
-  CHECK_LT(idx, masters_.size());
-  const auto& addr = CHECK_NOTNULL(master(idx))->bound_rpc_addr();
-  return std::make_shared<MasterServiceProxy>(messenger_, addr,;
-Status ExternalMiniCluster::CreateClient(client::KuduClientBuilder* builder,
-                                         client::sp::shared_ptr<client::KuduClient>* client) const {
-  client::KuduClientBuilder defaults;
-  if (builder == nullptr) {
-    builder = &defaults;
-  }
-  CHECK(!masters_.empty());
-  builder->clear_master_server_addrs();
-  for (const scoped_refptr<ExternalMaster>& master : masters_) {
-    builder->add_master_server_addr(master->bound_rpc_hostport().ToString());
-  }
-  return builder->Build(client);
-Status ExternalMiniCluster::SetFlag(ExternalDaemon* daemon,
-                                    const string& flag,
-                                    const string& value) {
-  const auto& addr = daemon->bound_rpc_addr();
-  server::GenericServiceProxy proxy(messenger_, addr,;
-  rpc::RpcController controller;
-  controller.set_timeout(MonoDelta::FromSeconds(30));
-  server::SetFlagRequestPB req;
-  server::SetFlagResponsePB resp;
-  req.set_flag(flag);
-  req.set_value(value);
-  req.set_force(true);
-  RETURN_NOT_OK_PREPEND(proxy.SetFlag(req, &resp, &controller),
-                        "rpc failed");
-  if (resp.result() != server::SetFlagResponsePB::SUCCESS) {
-    return Status::RemoteError("failed to set flag",
-                               SecureShortDebugString(resp));
-  }
-  return Status::OK();
-// ExternalDaemon
-ExternalDaemon::ExternalDaemon(ExternalDaemonOptions opts)
-    : messenger_(std::move(opts.messenger)),
-      wal_dir_(std::move(opts.wal_dir)),
-      data_dirs_(std::move(opts.data_dirs)),
-      log_dir_(std::move(opts.log_dir)),
-      perf_record_filename_(std::move(opts.perf_record_filename)),
-      start_process_timeout_(opts.start_process_timeout),
-      logtostderr_(opts.logtostderr),
-      rpc_bind_address_(std::move(opts.rpc_bind_address)),
-      exe_(std::move(opts.exe)),
-      extra_flags_(std::move(opts.extra_flags)) {
-  CHECK(rpc_bind_address_.Initialized());
-ExternalDaemon::~ExternalDaemon() {
-Status ExternalDaemon::EnableKerberos(MiniKdc* kdc, const string& bind_host) {
-  string spn = "kudu/" + bind_host;
-  string ktpath;
-  RETURN_NOT_OK_PREPEND(kdc->CreateServiceKeytab(spn, &ktpath),
-                        "could not create keytab");
-  extra_env_ = kdc->GetEnvVars();
-  extra_flags_.push_back(Substitute("--keytab_file=$0", ktpath));
-  extra_flags_.push_back(Substitute("--principal=$0", spn));
-  extra_flags_.emplace_back("--rpc_authentication=required");
-  extra_flags_.emplace_back("--superuser_acl=test-admin");
-  extra_flags_.emplace_back("--user_acl=test-user");
-  return Status::OK();
-Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
-  CHECK(!process_);
-  vector<string> argv;
-  // First the exe for argv[0].
-  argv.push_back(exe_);
-  // Then all the flags coming from the minicluster framework.
-  argv.insert(argv.end(), user_flags.begin(), user_flags.end());
-  // Disable fsync to dramatically speed up runtime. This is safe as no tests
-  // rely on forcefully cutting power to a machine or equivalent.
-  argv.emplace_back("--never_fsync");
-  // Generate smaller RSA keys -- generating a 1024-bit key is faster
-  // than generating the default 2048-bit key, and we don't care about
-  // strong encryption in tests. Setting it lower (e.g. 512 bits) results
-  // in OpenSSL errors RSA_sign:digest too big for rsa key:rsa_sign.c:122
-  // since we are using strong/high TLS v1.2 cipher suites, so the minimum
-  // size of TLS-related RSA key is 768 bits (due to the usage of
-  // the ECDHE-RSA-AES256-GCM-SHA384 suite). However, to work with Java
-  // client it's necessary to have at least 1024 bits for certificate RSA key
-  // due to Java security policies.
-  argv.emplace_back("--ipki_server_key_size=1024");
-  // Disable minidumps by default since many tests purposely inject faults.
-  argv.emplace_back("--enable_minidumps=false");
-  // Disable log redaction.
-  argv.emplace_back("--redact=flag");
-  // Enable metrics logging.
-  argv.emplace_back("--metrics_log_interval_ms=1000");
-  if (logtostderr_) {
-    // Ensure that logging goes to the test output and doesn't get buffered.
-    argv.emplace_back("--logtostderr");
-    argv.emplace_back("--logbuflevel=-1");
-  }
-  // Even if we are logging to stderr, metrics logs and minidumps end up being
-  // written based on -log_dir. So, we have to set that too.
-  argv.push_back("--log_dir=" + log_dir_);
-  RETURN_NOT_OK(env_util::CreateDirsRecursively(Env::Default(), log_dir_));
-  // Tell the server to dump its port information so we can pick it up.
-  string info_path = JoinPathSegments(data_dirs_[0], "info.pb");
-  argv.push_back("--server_dump_info_path=" + info_path);
-  argv.emplace_back("--server_dump_info_format=pb");
-  // We use ephemeral ports in many tests. They don't work for production, but are OK
-  // in unit tests.
-  argv.emplace_back("--rpc_server_allow_ephemeral_ports");
-  // Allow unsafe and experimental flags from tests, since we often use
-  // fault injection, etc.
-  argv.emplace_back("--unlock_experimental_flags");
-  argv.emplace_back("--unlock_unsafe_flags");
-  // Then the "extra flags" passed into the ctor (from the ExternalMiniCluster
-  // options struct). These come at the end so they can override things like
-  // web port or RPC bind address if necessary.
-  argv.insert(argv.end(), extra_flags_.begin(), extra_flags_.end());
-  // A previous instance of the daemon may have run in the same directory. So, remove
-  // the previous info file if it's there.
-  ignore_result(Env::Default()->DeleteFile(info_path));
-  // Start the daemon.
-  unique_ptr<Subprocess> p(new Subprocess(argv));
-  p->ShareParentStdout(false);
-  p->SetEnvVars(extra_env_);
-  string env_str;
-  JoinMapKeysAndValues(extra_env_, "=", ",", &env_str);
-  LOG(INFO) << "Running " << exe_ << "\n" << JoinStrings(argv, "\n")
-            << " with env {" << env_str << "}";
-                        Substitute("Failed to start subprocess $0", exe_));
-  // If requested, start a monitoring subprocess.
-  unique_ptr<Subprocess> perf_record;
-  if (!perf_record_filename_.empty()) {
-    perf_record.reset(new Subprocess({
-      "perf",
-      "record",
-      "--call-graph",
-      "fp",
-      "-o",
-      perf_record_filename_,
-      Substitute("--pid=$0", p->pid())
-    }, SIGINT));
-    RETURN_NOT_OK_PREPEND(perf_record->Start(),
-                          "Could not start perf record subprocess");
-  }
-  // The process is now starting -- wait for the bound port info to show up.
-  Stopwatch sw;
-  sw.start();
-  bool success = false;
-  while (sw.elapsed().wall_seconds() < start_process_timeout_.ToSeconds()) {
-    if (Env::Default()->FileExists(info_path)) {
-      success = true;
-      break;
-    }
-    SleepFor(MonoDelta::FromMilliseconds(10));
-    int wait_status;
-    Status s = p->WaitNoBlock(&wait_status);
-    if (s.IsTimedOut()) {
-      // The process is still running.
-      continue;
-    }
-    // If the process exited with expected exit status we need to still swap() the process
-    // and exit as if it had succeeded.
-    if (WIFEXITED(wait_status) && WEXITSTATUS(wait_status) == fault_injection::kExitStatus) {
-      process_.swap(p);
-      perf_record_process_.swap(perf_record);
-      return Status::OK();
-    }
-    RETURN_NOT_OK_PREPEND(s, Substitute("Failed waiting on $0", exe_));
-    string exit_info;
-    RETURN_NOT_OK(p->GetExitStatus(nullptr, &exit_info));
-    return Status::RuntimeError(exit_info);
-  }
-  if (!success) {
-    ignore_result(p->Kill(SIGKILL));
-    return Status::TimedOut(
-        Substitute("Timed out after $0s waiting for process ($1) to write info file ($2)",
-                   start_process_timeout_.ToString(), exe_, info_path));
-  }
-  status_.reset(new ServerStatusPB());
-  RETURN_NOT_OK_PREPEND(pb_util::ReadPBFromPath(Env::Default(), info_path, status_.get()),
-                        "Failed to read info file from " + info_path);
-  LOG(INFO) << "Started " << exe_ << " as pid " << p->pid();
-  VLOG(1) << exe_ << " instance information:\n" << SecureDebugString(*status_);
-  process_.swap(p);
-  perf_record_process_.swap(perf_record);
-  return Status::OK();
-void ExternalDaemon::SetExePath(string exe) {
-  CHECK(IsShutdown()) << "Call Shutdown() before changing the executable path";
-  exe_ = std::move(exe);
-Status ExternalDaemon::Pause() {
-  if (!process_) {
-    return Status::IllegalState(Substitute(
-        "Request to pause '$0' but the process is not there", exe_));
-  }
-  VLOG(1) << "Pausing " << exe_ << " with pid " << process_->pid();
-  const Status s = process_->Kill(SIGSTOP);
-  paused_ = true;
-  return s;
-Status ExternalDaemon::Resume() {
-  if (!process_) {
-    return Status::IllegalState(Substitute(
-        "Request to resume '$0' but the process is not there", exe_));
-  }
-  VLOG(1) << "Resuming " << exe_ << " with pid " << process_->pid();
-  const Status s = process_->Kill(SIGCONT);
-  paused_ = false;
-  return s;
-bool ExternalDaemon::IsShutdown() const {
-  return !process_;
-bool ExternalDaemon::IsProcessAlive() const {
-  if (IsShutdown()) {
-    return false;
-  }
-  Status s = process_->WaitNoBlock();
-  // If the non-blocking Wait "times out", that means the process
-  // is running.
-  return s.IsTimedOut();
-Status ExternalDaemon::WaitForInjectedCrash(const MonoDelta& timeout) const {
-  return WaitForCrash(timeout, [](int status) {
-      return WIFEXITED(status) && WEXITSTATUS(status) == fault_injection::kExitStatus;
-    }, "fault injection");
-Status ExternalDaemon::WaitForFatal(const MonoDelta& timeout) const {
-  return WaitForCrash(timeout, [](int status) {
-      return WIFSIGNALED(status) && WTERMSIG(status) == SIGABRT;
-    }, "FATAL crash");
-Status ExternalDaemon::WaitForCrash(const MonoDelta& timeout,
-                                    const std::function<bool(int)>& wait_status_predicate,
-                                    const char* crash_type_str) const {
-  CHECK(process_) << "process not started";
-  MonoTime deadline = MonoTime::Now() + timeout;
-  int i = 1;
-  while (IsProcessAlive() && MonoTime::Now() < deadline) {
-    int sleep_ms = std::min(i++ * 10, 200);
-    SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
-  }
-  if (IsProcessAlive()) {
-    return Status::TimedOut(Substitute("Process did not crash within $0",
-                                       timeout.ToString()));
-  }
-  // If the process has exited, make sure it exited with the expected status.
-  int wait_status;
-  RETURN_NOT_OK_PREPEND(process_->WaitNoBlock(&wait_status),
-                        "could not get wait status");
-  if (!wait_status_predicate(wait_status)) {
-    string info_str;
-    RETURN_NOT_OK_PREPEND(process_->GetExitStatus(nullptr, &info_str),
-                          "could not get description of exit");
-    return Status::Aborted(
-        Substitute("process exited, but not due to a $0: $1", crash_type_str, info_str));
-  }
-  return Status::OK();
-pid_t ExternalDaemon::pid() const {
-  return process_->pid();
-Subprocess* ExternalDaemon::process() const {
-  return process_.get();
-void ExternalDaemon::Shutdown() {
-  if (!process_) return;
-  // Before we kill the process, store the addresses. If we're told to
-  // start again we'll reuse these. Store only the port if the
-  // daemons were using wildcard address for binding.
-  if (rpc_bind_address().host() != MiniCluster::kWildcardIpAddr) {
-    bound_rpc_ = bound_rpc_hostport();
-    bound_http_ = bound_http_hostport();
-  } else {
-    bound_rpc_.set_host(MiniCluster::kWildcardIpAddr);
-    bound_rpc_.set_port(bound_rpc_hostport().port());
-    bound_http_.set_host(MiniCluster::kWildcardIpAddr);
-    bound_http_.set_port(bound_http_hostport().port());
-  }
-  if (IsProcessAlive()) {
-    if (!paused_) {
-      // In coverage builds, ask the process nicely to flush coverage info
-      // before we kill -9 it. Otherwise, we never get any coverage from
-      // external clusters.
-      FlushCoverage();
-      // Similarly, check for leaks in LSAN builds before killing.
-      CheckForLeaks();
-    }
-    LOG(INFO) << "Killing " << exe_ << " with pid " << process_->pid();
-    ignore_result(process_->Kill(SIGKILL));
-  }
-  WARN_NOT_OK(process_->Wait(), "Waiting on " + exe_);
-  paused_ = false;
-  process_.reset();
-  perf_record_process_.reset();
-Status ExternalDaemon::DeleteFromDisk() const {
-  for (const string& data_dir : data_dirs()) {
-    RETURN_NOT_OK(Env::Default()->DeleteRecursively(data_dir));
-  }
-  RETURN_NOT_OK(Env::Default()->DeleteRecursively(wal_dir()));
-  return Status::OK();
-void ExternalDaemon::FlushCoverage() {
-  return; // NOLINT(*)
-  LOG(INFO) << "Attempting to flush coverage for " << exe_ << " pid " << process_->pid();
-  server::GenericServiceProxy proxy(
-      messenger_, bound_rpc_addr(), bound_rpc_addr().host());
-  server::FlushCoverageRequestPB req;
-  server::FlushCoverageResponsePB resp;
-  rpc::RpcController rpc;
-  rpc.set_timeout(MonoDelta::FromMilliseconds(1000));
-  Status s = proxy.FlushCoverage(req, &resp, &rpc);
-  if (s.ok() && !resp.success()) {
-    s = Status::RemoteError("Server does not appear to be running a coverage build");
-  }
-  WARN_NOT_OK(s, Substitute("Unable to flush coverage on $0 pid $1", exe_, process_->pid()));
-void ExternalDaemon::CheckForLeaks() {
-#if defined(__has_feature)
-#  if __has_feature(address_sanitizer)
-  LOG(INFO) << "Attempting to check leaks for " << exe_ << " pid " << process_->pid();
-  server::GenericServiceProxy proxy(messenger_, bound_rpc_addr(), bound_rpc_addr().host());
-  server::CheckLeaksRequestPB req;
-  server::CheckLeaksResponsePB resp;
-  rpc::RpcController rpc;
-  rpc.set_timeout(MonoDelta::FromMilliseconds(1000));
-  Status s = proxy.CheckLeaks(req, &resp, &rpc);
-  if (s.ok()) {
-    if (!resp.success()) {
-      s = Status::RemoteError("Server does not appear to be running an LSAN build");
-    } else {
-      CHECK(!resp.found_leaks()) << "Found leaks in " << exe_ << " pid " << process_->pid();
-    }
-  }
-  WARN_NOT_OK(s, Substitute("Unable to check leaks on $0 pid $1", exe_, process_->pid()));
-#  endif
-HostPort ExternalDaemon::bound_rpc_hostport() const {
-  CHECK(status_);
-  CHECK_GE(status_->bound_rpc_addresses_size(), 1);
-  HostPort ret;
-  CHECK_OK(HostPortFromPB(status_->bound_rpc_addresses(0), &ret));
-  return ret;
-Sockaddr ExternalDaemon::bound_rpc_addr() const {
-  HostPort hp = bound_rpc_hostport();
-  vector<Sockaddr> addrs;
-  CHECK_OK(hp.ResolveAddresses(&addrs));
-  CHECK(!addrs.empty());
-  return addrs[0];
-HostPort ExternalDaemon::bound_http_hostport() const {
-  CHECK(status_);
-  if (status_->bound_http_addresses_size() == 0) {
-    return HostPort();
-  }
-  HostPort ret;
-  CHECK_OK(HostPortFromPB(status_->bound_http_addresses(0), &ret));
-  return ret;
-const NodeInstancePB& ExternalDaemon::instance_id() const {
-  CHECK(status_);
-  return status_->node_instance();
-const string& ExternalDaemon::uuid() const {
-  CHECK(status_);
-  return status_->node_instance().permanent_uuid();
-Status ExternalDaemon::GetInt64Metric(const MetricEntityPrototype* entity_proto,
-                                      const char* entity_id,
-                                      const MetricPrototype* metric_proto,
-                                      const char* value_field,
-                                      int64_t* value) const {
-  CHECK(bound_http_hostport().Initialized());
-  // Fetch metrics whose name matches the given prototype.
-  string url = Substitute(
-      "http://$0/jsonmetricz?metrics=$1",
-      bound_http_hostport().ToString(),
-      metric_proto->name());
-  EasyCurl curl;
-  faststring dst;
-  RETURN_NOT_OK(curl.FetchURL(url, &dst));
-  // Parse the results, beginning with the top-level entity array.
-  JsonReader r(dst.ToString());
-  RETURN_NOT_OK(r.Init());
-  vector<const Value*> entities;
-  RETURN_NOT_OK(r.ExtractObjectArray(r.root(), NULL, &entities));
-  for (const Value* entity : entities) {
-    // Find the desired entity.
-    string type;
-    RETURN_NOT_OK(r.ExtractString(entity, "type", &type));
-    if (type != entity_proto->name()) {
-      continue;
-    }
-    if (entity_id) {
-      string id;
-      RETURN_NOT_OK(r.ExtractString(entity, "id", &id));
-      if (id != entity_id) {
-        continue;
-      }
-    }
-    // Find the desired metric within the entity.
-    vector<const Value*> metrics;
-    RETURN_NOT_OK(r.ExtractObjectArray(entity, "metrics", &metrics));
-    for (const Value* metric : metrics) {
-      string name;
-      RETURN_NOT_OK(r.ExtractString(metric, "name", &name));
-      if (name != metric_proto->name()) {
-        continue;
-      }
-      RETURN_NOT_OK(r.ExtractInt64(metric, value_field, value));
-      return Status::OK();
-    }
-  }
-  string msg;
-  if (entity_id) {
-    msg = Substitute("Could not find metric $0.$1 for entity $2",
-                     entity_proto->name(), metric_proto->name(),
-                     entity_id);
-  } else {
-    msg = Substitute("Could not find metric $0.$1",
-                     entity_proto->name(), metric_proto->name());
-  }
-  return Status::NotFound(msg);
-// ScopedResumeExternalDaemon
-ScopedResumeExternalDaemon::ScopedResumeExternalDaemon(ExternalDaemon* daemon)
-    : daemon_(CHECK_NOTNULL(daemon)) {
-ScopedResumeExternalDaemon::~ScopedResumeExternalDaemon() {
-  WARN_NOT_OK(daemon_->Resume(), "Could not resume external daemon");
-// ExternalMaster
-ExternalMaster::ExternalMaster(ExternalDaemonOptions opts)
-    : ExternalDaemon(std::move(opts)) {
-ExternalMaster::~ExternalMaster() {
-Status ExternalMaster::Start() {
-  vector<string> flags(GetCommonFlags());
-  flags.push_back(Substitute("--rpc_bind_addresses=$0", rpc_bind_address().ToString()));
-  flags.push_back(Substitute("--webserver_interface=$0", rpc_bind_address().host()));
-  flags.emplace_back("--webserver_port=0");
-  return StartProcess(flags);
-Status ExternalMaster::Restart() {
-  // We store the addresses on shutdown so make sure we did that first.
-  if (bound_rpc_.port() == 0) {
-    return Status::IllegalState("Master cannot be restarted. Must call Shutdown() first.");
-  }
-  vector<string> flags(GetCommonFlags());
-  flags.push_back(Substitute("--rpc_bind_addresses=$0", bound_rpc_.ToString()));
-  if (bound_http_.Initialized()) {
-    flags.push_back(Substitute("--webserver_interface=$0",;
-    flags.push_back(Substitute("--webserver_port=$0", bound_http_.port()));
-  } else {
-    flags.push_back(Substitute("--webserver_interface=$0",;
-    flags.emplace_back("--webserver_port=0");
-  }
-  return StartProcess(flags);
-Status ExternalMaster::WaitForCatalogManager() {
-  unique_ptr<MasterServiceProxy> proxy(new MasterServiceProxy(
-      messenger_, bound_rpc_addr(), bound_rpc_addr().host()));
-  Stopwatch sw;
-  sw.start();
-  while (sw.elapsed().wall_seconds() < kMasterCatalogManagerTimeoutSeconds) {
-    ListTablesRequestPB req;
-    ListTablesResponsePB resp;
-    RpcController rpc;
-    Status s = proxy->ListTables(req, &resp, &rpc);
-    if (s.ok()) {
-      if (!resp.has_error()) {
-        // This master is the leader and is up and running.
-        break;
-      } else {
-        s = StatusFromPB(resp.error().status());
-        if (s.IsIllegalState()) {
-          // This master is not the leader but is otherwise up and running.
-          break;
-        } else if (!s.IsServiceUnavailable()) {
-          // Unexpected error from master.
-          return s;
-        }
-      }
-    } else if (!s.IsTimedOut() && !s.IsNetworkError()) {
-      // Unexpected error from proxy.
-      return s;
-    }
-    // There was some kind of transient network error or the master isn't yet
-    // ready. Sleep and retry.
-    SleepFor(MonoDelta::FromMilliseconds(50));
-  }
-  if (sw.elapsed().wall_seconds() > kMasterCatalogManagerTimeoutSeconds) {
-    return Status::TimedOut(
-        Substitute("Timed out after $0s waiting for master ($1) startup",
-                   kMasterCatalogManagerTimeoutSeconds,
-                   bound_rpc_addr().ToString()));
-  }
-  return Status::OK();
-vector<string> ExternalMaster::GetCommonFlags() const {
-  return {
-    "--fs_wal_dir=" + wal_dir_,
-    "--fs_data_dirs=" + JoinStrings(data_dirs_, ","),
-    "--webserver_interface=localhost",
-    // See the in-line comment for "--ipki_server_key_size" flag in
-    // ExternalDaemon::StartProcess() method.
-    "--ipki_ca_key_size=1024",
-    // As for the TSK keys, 512 bits is the minimum since we are using the SHA256
-    // digest for token signing/verification.
-    "--tsk_num_rsa_bits=512",
-  };
-// ExternalTabletServer
-ExternalTabletServer::ExternalTabletServer(ExternalDaemonOptions opts,
-                                           vector<HostPort> master_addrs)
-    : ExternalDaemon(std::move(opts)),
-      master_addrs_(std::move(master_addrs)) {
-  DCHECK(!master_addrs_.empty());
-ExternalTabletServer::~ExternalTabletServer() {
-Status ExternalTabletServer::Start() {
-  vector<string> flags;
-  flags.push_back("--fs_wal_dir=" + wal_dir_);
-  flags.push_back("--fs_data_dirs=" + JoinStrings(data_dirs_, ","));
-  flags.push_back(Substitute("--rpc_bind_addresses=$0",
-                             rpc_bind_address().ToString()));
-  flags.push_back(Substitute("--local_ip_for_outbound_sockets=$0",
-                             rpc_bind_address().host()));
-  flags.push_back(Substitute("--webserver_interface=$0",
-                             rpc_bind_address().host()));
-  flags.emplace_back("--webserver_port=0");
-  flags.push_back(Substitute("--tserver_master_addrs=$0",
-                             HostPort::ToCommaSeparatedString(master_addrs_)));
-  RETURN_NOT_OK(StartProcess(flags));
-  return Status::OK();
-Status ExternalTabletServer::Restart() {
-  // We store the addresses on shutdown so make sure we did that first.
-  if (bound_rpc_.port() == 0) {
-    return Status::IllegalState("Tablet server cannot be restarted. Must call Shutdown() first.");
-  }
-  vector<string> flags;
-  flags.push_back("--fs_wal_dir=" + wal_dir_);
-  flags.push_back("--fs_data_dirs=" + JoinStrings(data_dirs_, ","));
-  flags.push_back(Substitute("--rpc_bind_addresses=$0", bound_rpc_.ToString()));
-  flags.push_back(Substitute("--local_ip_for_outbound_sockets=$0",
-                             rpc_bind_address().host()));
-  if (bound_http_.Initialized()) {
-    flags.push_back(Substitute("--webserver_port=$0", bound_http_.port()));
-    flags.push_back(Substitute("--webserver_interface=$0",
-                     ;
-  }
-  flags.push_back(Substitute("--tserver_master_addrs=$0",
-                             HostPort::ToCommaSeparatedString(master_addrs_)));
-  return StartProcess(flags);
-} // namespace kudu
diff --git a/src/kudu/integration-tests/external_mini_cluster.h b/src/kudu/integration-tests/external_mini_cluster.h
deleted file mode 100644
index 0a23d9f..0000000
--- a/src/kudu/integration-tests/external_mini_cluster.h
+++ /dev/null
@@ -1,564 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#pragma once
-#include <sys/types.h>
-#include <cstdint>
-#include <functional>
-#include <map>
-#include <memory>
-#include <ostream>
-#include <string>
-#include <vector>
-#include <boost/optional/optional.hpp>
-#include <glog/logging.h>
-#include <gtest/gtest_prod.h>
-#include "kudu/client/shared_ptr.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/integration-tests/mini_cluster.h"
-#include "kudu/security/test/mini_kdc.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/net/net_util.h"
-#include "kudu/util/status.h"
-namespace kudu {
-class ExternalDaemon;
-class ExternalMaster;
-class ExternalTabletServer;
-class MetricEntityPrototype;
-class MetricPrototype;
-class NodeInstancePB;
-class Sockaddr;
-class Subprocess;
-namespace client {
-class KuduClient;
-class KuduClientBuilder;
-} // namespace client
-namespace master {
-class MasterServiceProxy;
-} // namespace master
-namespace rpc {
-class Messenger;
-} // namespace rpc
-namespace server {
-class ServerStatusPB;
-} // namespace server
-struct ExternalMiniClusterOptions {
-  ExternalMiniClusterOptions();
-  // Number of masters to start.
-  // Default: 1
-  int num_masters;
-  // Number of TS to start.
-  // Default: 1
-  int num_tablet_servers;
-  // Directory in which to store data.
-  // Default: "", which auto-generates a unique path for this cluster.
-  std::string data_root;
-  MiniCluster::BindMode bind_mode;
-  // The path where the kudu daemons should be run from.
-  // Default: "", which uses the same path as the currently running executable.
-  // This works for unit tests, since they all end up in build/latest/bin.
-  std::string daemon_bin_path;
-  // Number of data directories to be created for each daemon.
-  // Default: 1
-  int num_data_dirs;
-  // Extra flags for tablet servers and masters respectively.
-  //
-  // In these flags, you may use the special string '${index}' which will
-  // be substituted with the index of the tablet server or master.
-  std::vector<std::string> extra_tserver_flags;
-  std::vector<std::string> extra_master_flags;
-  // If more than one master is specified, list of ports for the
-  // masters in a consensus configuration. Port at index 0 is used for the leader
-  // master.
-  std::vector<uint16_t> master_rpc_ports;
-  // Options to configure the MiniKdc before starting it up.
-  // Only used when 'enable_kerberos' is 'true'.
-  MiniKdcOptions mini_kdc_options;
-  // If true, set up a KDC as part of this ExternalMiniCluster, generate keytabs for
-  // the servers, and require Kerberos authentication from clients.
-  //
-  // Additionally, when the cluster is started, the environment of the
-  // test process will be modified to include Kerberos credentials for
-  // a principal named 'testuser'.
-  bool enable_kerberos;
-  // If true, sends logging output to stderr instead of a log file. Defaults to
-  // true.
-  bool logtostderr;
-  // Amount of time that may elapse between the creation of a daemon process
-  // and the process writing out its info file. Defaults to 30 seconds.
-  MonoDelta start_process_timeout;
-// A mini-cluster made up of subprocesses running each of the daemons
-// separately. This is useful for black-box or grey-box failure testing
-// purposes -- it provides the ability to forcibly kill or stop particular
-// cluster participants, which isn't feasible in the normal InternalMiniCluster.
-// On the other hand, there is little access to inspect the internal state
-// of the daemons.
-class ExternalMiniCluster : public MiniCluster {
- public:
-  // Constructs a cluster with the default options.
-  ExternalMiniCluster();
-  // Constructs a cluster with options specified in 'opts'.
-  explicit ExternalMiniCluster(ExternalMiniClusterOptions opts);
-  // Destroys a cluster.
-  virtual ~ExternalMiniCluster();
-  // Start the cluster.
-  Status Start() override;
-  // Restarts the cluster. Requires that it has been Shutdown() first.
-  Status Restart();
-  // Add a new TS to the cluster. The new TS is started.
-  // Requires that the master is already running.
-  Status AddTabletServer();
-  // Currently, this uses SIGKILL on each daemon for a non-graceful shutdown.
-  void ShutdownNodes(ClusterNodes nodes) override;
-  // Return the IP address that the tablet server with the given index will bind to.
-  // If options.bind_to_unique_loopback_addresses is false, this will be
-  // Otherwise, it is another IP in the local netblock.
-  std::string GetBindIpForTabletServer(int index) const;
-  // Same as above but for a master.
-  std::string GetBindIpForMaster(int index) const;
-  // Return a pointer to the running leader master. This may be NULL
-  // if the cluster is not started.
-  //
-  // TODO: Use the appropriate RPC here to return the leader master,
-  // to allow some of the existing tests (e.g., raft_consensus-itest)
-  // to use multiple masters.
-  ExternalMaster* leader_master() { return master(0); }
-  // Perform an RPC to determine the leader of the external mini
-  // cluster.  Set 'index' to the leader master's index (for calls to
-  // to master() below).
-  //
-  // NOTE: if a leader election occurs after this method is executed,
-  // the last result may not be valid.
-  Status GetLeaderMasterIndex(int* idx);
-  // If this cluster is configured for a single non-distributed
-  // master, return the single master or NULL if the master is not
-  // started. Exits with a CHECK failure if there are multiple
-  // masters.
-  ExternalMaster* master() const {
-    CHECK_EQ(masters_.size(), 1)
-        << "master() should not be used with multiple masters, use leader_master() instead.";
-    return master(0);
-  }
-  // Return master at 'idx' or NULL if the master at 'idx' has not
-  // been started.
-  ExternalMaster* master(int idx) const {
-    CHECK_LT(idx, masters_.size());
-    return masters_[idx].get();
-  }
-  ExternalTabletServer* tablet_server(int idx) const {
-    CHECK_LT(idx, tablet_servers_.size());
-    return tablet_servers_[idx].get();
-  }
-  // Return ExternalTabletServer given its UUID. If not found, returns NULL.
-  ExternalTabletServer* tablet_server_by_uuid(const std::string& uuid) const;
-  // Return the index of the ExternalTabletServer that has the given 'uuid', or
-  // -1 if no such UUID can be found.
-  int tablet_server_index_by_uuid(const std::string& uuid) const;
-  // Return all tablet servers and masters.
-  std::vector<ExternalDaemon*> daemons() const;
-  MiniKdc* kdc() const {
-    return CHECK_NOTNULL(kdc_.get());
-  }
-  int num_tablet_servers() const override {
-    return tablet_servers_.size();
-  }
-  int num_masters() const override {
-    return masters_.size();
-  }
-  BindMode bind_mode() const override {
-    return opts_.bind_mode;
-  }
-  std::vector<uint16_t> master_rpc_ports() const override {
-    return opts_.master_rpc_ports;
-  }
-  std::vector<HostPort> master_rpc_addrs() const override;
-  std::shared_ptr<rpc::Messenger> messenger() const override;
-  std::shared_ptr<master::MasterServiceProxy> master_proxy() const override;
-  std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const override;
-  // Wait until the number of registered tablet servers reaches the given count
-  // on all of the running masters. Returns Status::TimedOut if the desired
-  // count is not achieved with the given timeout.
-  Status WaitForTabletServerCount(int count, const MonoDelta& timeout);
-  // Runs gtest assertions that no servers have crashed.
-  void AssertNoCrashes();
-  // Wait until all tablets on the given tablet server are in the RUNNING
-  // state. Returns Status::TimedOut if 'timeout' elapses and at least one
-  // tablet is not yet RUNNING.
-  //
-  // If 'min_tablet_count' is not -1, will also wait for at least that many
-  // RUNNING tablets to appear before returning (potentially timing out if that
-  // number is never reached).
-  Status WaitForTabletsRunning(ExternalTabletServer* ts, int min_tablet_count,
-                               const MonoDelta& timeout);
-  // Create a client configured to talk to this cluster.
-  // Builder may contain override options for the client. The master address will
-  // be overridden to talk to the running master.
-  //
-  // REQUIRES: the cluster must have already been Start()ed.
-  Status CreateClient(client::KuduClientBuilder* builder,
-                      client::sp::shared_ptr<client::KuduClient>* client) const override;
-  // Sets the given flag on the given daemon, which must be running.
-  //
-  // This uses the 'force' flag on the RPC so that, even if the flag
-  // is considered unsafe to change at runtime, it is changed.
-  Status SetFlag(ExternalDaemon* daemon,
-                 const std::string& flag,
-                 const std::string& value) WARN_UNUSED_RESULT;
-  // Set the path where daemon binaries can be found.
-  // Overrides 'daemon_bin_path' set by ExternalMiniClusterOptions.
-  // The cluster must be shut down before calling this method.
-  void SetDaemonBinPath(std::string daemon_bin_path);
-  // Returns the path where 'binary' is expected to live, based on
-  // ExternalMiniClusterOptions.daemon_bin_path if it was provided, or on the
-  // path of the currently running executable otherwise.
-  std::string GetBinaryPath(const std::string& binary) const;
-  // Returns the path where 'daemon_id' is expected to store its data, based on
-  // ExternalMiniClusterOptions.data_root if it was provided, or on the
-  // standard Kudu test directory otherwise.
-  // 'dir_index' is an optional numeric suffix to be added to the default path.
-  // If it is not specified, the cluster must be configured to use a single data dir.
-  std::string GetDataPath(const std::string& daemon_id,
-                          boost::optional<uint32_t> dir_index = boost::none) const;
-  // Returns paths where 'daemon_id' is expected to store its data, each with a
-  // numeric suffix appropriate for 'opts_.num_data_dirs'
-  std::vector<std::string> GetDataPaths(const std::string& daemon_id) const;
-  // Returns the path where 'daemon_id' is expected to store its wal, or other
-  // files that reside in the wal dir.
-  std::string GetWalPath(const std::string& daemon_id) const;
-  // Returns the path where 'daemon_id' is expected to store its logs, or other
-  // files that reside in the log dir.
-  std::string GetLogPath(const std::string& daemon_id) const;
- private:
-  FRIEND_TEST(MasterFailoverTest, TestKillAnyMaster);
-  Status StartSingleMaster();
-  Status StartDistributedMasters();
-  Status DeduceBinRoot(std::string* ret);
-  Status HandleOptions();
-  const ExternalMiniClusterOptions opts_;
-  // The root for binaries.
-  std::string daemon_bin_path_;
-  std::string data_root_;
-  std::vector<scoped_refptr<ExternalMaster> > masters_;
-  std::vector<scoped_refptr<ExternalTabletServer> > tablet_servers_;
-  std::unique_ptr<MiniKdc> kdc_;
-  std::shared_ptr<rpc::Messenger> messenger_;
-  DISALLOW_COPY_AND_ASSIGN(ExternalMiniCluster);
-struct ExternalDaemonOptions {
-  explicit ExternalDaemonOptions(bool logtostderr)
-      : logtostderr(logtostderr) {
-  }
-  bool logtostderr;
-  std::shared_ptr<rpc::Messenger> messenger;
-  std::string exe;
-  HostPort rpc_bind_address;
-  std::string wal_dir;
-  std::vector<std::string> data_dirs;
-  std::string log_dir;
-  std::string perf_record_filename;
-  std::vector<std::string> extra_flags;
-  MonoDelta start_process_timeout;
-class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
- public:
-  explicit ExternalDaemon(ExternalDaemonOptions opts);
-  HostPort bound_rpc_hostport() const;
-  Sockaddr bound_rpc_addr() const;
-  // Return the host/port that this daemon is bound to for HTTP.
-  // May return an uninitialized HostPort if HTTP is disabled.
-  HostPort bound_http_hostport() const;
-  const NodeInstancePB& instance_id() const;
-  const std::string& uuid() const;
-  // Return the pid of the running process.
-  // Causes a CHECK failure if the process is not running.
-  pid_t pid() const;
-  // Return the pointer to the undelying Subprocess if it is set.
-  // Otherwise, returns nullptr.
-  Subprocess* process() const;
-  // Set the path of the executable to run as a daemon.
-  // Overrides the exe path specified in the constructor.
-  // The daemon must be shut down before calling this method.
-  void SetExePath(std::string exe);
-  // Enable Kerberos for this daemon. This creates a Kerberos principal
-  // and keytab, and sets the appropriate environment variables in the
-  // subprocess such that the server will use Kerberos authentication.
-  //
-  // 'bind_host' is the hostname that will be used to generate the Kerberos
-  // service principal.
-  //
-  // Must be called before 'StartProcess()'.
-  Status EnableKerberos(MiniKdc* kdc, const std::string& bind_host);
-  // Sends a SIGSTOP signal to the daemon.
-  Status Pause() WARN_UNUSED_RESULT;
-  // Sends a SIGCONT signal to the daemon.
-  Status Resume() WARN_UNUSED_RESULT;
-  // Return true if we have explicitly shut down the process.
-  bool IsShutdown() const;
-  // Return true if the process is still running.
-  // This may return false if the process crashed, even if we didn't
-  // explicitly call Shutdown().
-  bool IsProcessAlive() const;
-  // Wait for this process to crash due to a configured fault
-  // injection, or the given timeout to elapse. If the process
-  // crashes for some reason other than an injected fault, returns
-  // Status::Aborted.
-  //
-  // If the process is already crashed, returns immediately.
-  Status WaitForInjectedCrash(const MonoDelta& timeout) const;
-  // Same as the above, but expects the process to crash due to a
-  // LOG(FATAL) or CHECK failure. In other words, waits for it to
-  // crash from SIGABRT.
-  Status WaitForFatal(const MonoDelta& timeout) const;
-  virtual void Shutdown();
-  // Delete files specified by 'wal_dir_' and 'data_dirs_'.
-  Status DeleteFromDisk() const WARN_UNUSED_RESULT;
-  const std::string& wal_dir() const { return wal_dir_; }
-  const std::string& data_dir() const {
-    CHECK_EQ(1, data_dirs_.size());
-    return data_dirs_[0];
-  }
-  const std::vector<std::string>& data_dirs() const { return data_dirs_; }
-  // Returns the log dir of the external daemon.
-  const std::string& log_dir() const { return log_dir_; }
-  // Return a pointer to the flags used for this server on restart.
-  // Modifying these flags will only take effect on the next restart.
-  std::vector<std::string>* mutable_flags() { return &extra_flags_; }
-  // Retrieve the value of a given metric from this server. The metric must
-  // be of int64_t type.
-  //
-  // 'value_field' represents the particular field of the metric to be read.
-  // For example, for a counter or gauge, this should be 'value'. For a
-  // histogram, it might be 'total_count' or 'mean'.
-  //
-  // 'entity_id' may be NULL, in which case the first entity of the same type
-  // as 'entity_proto' will be matched.
-  Status GetInt64Metric(const MetricEntityPrototype* entity_proto,
-                        const char* entity_id,
-                        const MetricPrototype* metric_proto,
-                        const char* value_field,
-                        int64_t* value) const;
- protected:
-  friend class RefCountedThreadSafe<ExternalDaemon>;
-  virtual ~ExternalDaemon();
-  // Starts a process with the given flags.
-  Status StartProcess(const std::vector<std::string>& user_flags);
-  // Wait for the process to exit, and then call 'wait_status_predicate'
-  // on the resulting exit status. NOTE: this is not the return code, but
-  // rather the value provided by waitpid(2): use WEXITSTATUS, etc.
-  //
-  // If the predicate matches, returns OK. Otherwise, returns an error.
-  // 'crash_type_str' should be a descriptive name for the type of crash,
-  // used in formatting the error message.
-  Status WaitForCrash(const MonoDelta& timeout,
-                      const std::function<bool(int)>& wait_status_predicate,
-                      const char* crash_type_str) const;
-  // In a code-coverage build, try to flush the coverage data to disk.
-  // In a non-coverage build, this does nothing.
-  void FlushCoverage();
-  // In an LSAN build, ask the daemon to check for leaked memory, and
-  // LOG(FATAL) if there are any leaks.
-  void CheckForLeaks();
-  // Get RPC bind address for daemon.
-  const HostPort& rpc_bind_address() const {
-    return rpc_bind_address_;
-  }
-  const std::shared_ptr<rpc::Messenger> messenger_;
-  const std::string wal_dir_;
-  std::vector<std::string> data_dirs_;
-  const std::string log_dir_;
-  const std::string perf_record_filename_;
-  const MonoDelta start_process_timeout_;
-  const bool logtostderr_;
-  const HostPort rpc_bind_address_;
-  std::string exe_;
-  std::vector<std::string> extra_flags_;
-  std::map<std::string, std::string> extra_env_;
-  std::unique_ptr<Subprocess> process_;
-  bool paused_ = false;
-  std::unique_ptr<Subprocess> perf_record_process_;
-  std::unique_ptr<server::ServerStatusPB> status_;
-  // These capture the daemons parameters and running ports and
-  // are used to Restart() the daemon with the same parameters.
-  HostPort bound_rpc_;
-  HostPort bound_http_;
-// Resumes a daemon that was stopped with ExternalDaemon::Pause() upon
-// exiting a scope.
-class ScopedResumeExternalDaemon {
- public:
-  // 'daemon' must remain valid for the lifetime of a
-  // ScopedResumeExternalDaemon object.
-  explicit ScopedResumeExternalDaemon(ExternalDaemon* daemon);
-  // Resume 'daemon_'.
-  ~ScopedResumeExternalDaemon();
- private:
-  ExternalDaemon* daemon_;
-  DISALLOW_COPY_AND_ASSIGN(ScopedResumeExternalDaemon);
-class ExternalMaster : public ExternalDaemon {
- public:
-  explicit ExternalMaster(ExternalDaemonOptions opts);
-  Status Start();
-  // Restarts the daemon.
-  // Requires that it has previously been shutdown.
-  Status Restart() WARN_UNUSED_RESULT;
-  // Blocks until the master's catalog manager is initialized and responding to
-  // RPCs.
-  Status WaitForCatalogManager() WARN_UNUSED_RESULT;
- private:
-  std::vector<std::string> GetCommonFlags() const;
-  friend class RefCountedThreadSafe<ExternalMaster>;
-  virtual ~ExternalMaster();
-class ExternalTabletServer : public ExternalDaemon {
- public:
-  ExternalTabletServer(ExternalDaemonOptions opts,
-                       std::vector<HostPort> master_addrs);
-  Status Start();
-  // Restarts the daemon.
-  // Requires that it has previously been shutdown.
-  Status Restart() WARN_UNUSED_RESULT;
- private:
-  const std::vector<HostPort> master_addrs_;
-  friend class RefCountedThreadSafe<ExternalTabletServer>;
-  virtual ~ExternalTabletServer();
-} // namespace kudu
diff --git a/src/kudu/integration-tests/ b/src/kudu/integration-tests/
index 3873d43..f22f12f 100644
--- a/src/kudu/integration-tests/
+++ b/src/kudu/integration-tests/
@@ -27,7 +27,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/strings/util.h"
 #include "kudu/fs/fs_manager.h"
-#include "kudu/integration-tests/external_mini_cluster.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/util/env.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/path_util.h"
@@ -40,7 +40,7 @@ namespace itest {
 using std::set;
 using std::string;
 using std::vector;
+using cluster::ExternalMiniCluster;
 using consensus::ConsensusMetadataPB;
 using strings::Substitute;
 using tablet::TabletDataState;
diff --git a/src/kudu/integration-tests/external_mini_cluster_fs_inspector.h b/src/kudu/integration-tests/external_mini_cluster_fs_inspector.h
index 3341bfe..3fde5f9 100644
--- a/src/kudu/integration-tests/external_mini_cluster_fs_inspector.h
+++ b/src/kudu/integration-tests/external_mini_cluster_fs_inspector.h
@@ -14,9 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+#pragma once
 #include <cstdint>
 #include <string>
@@ -28,13 +26,17 @@
 #include "kudu/util/monotime.h"
 namespace kudu {
 class Env;
-class ExternalMiniCluster;
 class Status;
+namespace cluster {
+class ExternalMiniCluster;
+} // namespace cluster
 namespace consensus {
 class ConsensusMetadataPB;
+} // namespace consensus
 namespace itest {
@@ -44,7 +46,7 @@ namespace itest {
 class ExternalMiniClusterFsInspector {
   // Does not take ownership of the ExternalMiniCluster pointer.
-  explicit ExternalMiniClusterFsInspector(ExternalMiniCluster* cluster);
+  explicit ExternalMiniClusterFsInspector(cluster::ExternalMiniCluster* cluster);
   Status ListFilesInDir(const std::string& path, std::vector<std::string>* entries);
@@ -130,12 +132,10 @@ class ExternalMiniClusterFsInspector {
                                           const std::string& tablet_id) const;
   Env* const env_;
-  ExternalMiniCluster* const cluster_;
+  cluster::ExternalMiniCluster* const cluster_;
 } // namespace itest
 } // namespace kudu
diff --git a/src/kudu/integration-tests/ b/src/kudu/integration-tests/
index c965bb1..8863331 100644
--- a/src/kudu/integration-tests/
+++ b/src/kudu/integration-tests/
@@ -43,9 +43,9 @@
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/integration-tests/cluster_itest_util.h"
-#include "kudu/integration-tests/external_mini_cluster.h"
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/tools/data_gen_util.h"
 #include "kudu/util/monotime.h"
@@ -71,6 +71,8 @@ using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
 using kudu::client::KuduValue;
 using kudu::client::sp::shared_ptr;
+using kudu::cluster::ExternalMiniCluster;
+using kudu::cluster::ExternalMiniClusterOptions;
 using kudu::master::GetTableLocationsRequestPB;
 using kudu::master::GetTableLocationsResponsePB;
 using kudu::master::MasterErrorPB;
diff --git a/src/kudu/integration-tests/ b/src/kudu/integration-tests/
index b23cc42..9ca82a4 100644
--- a/src/kudu/integration-tests/
+++ b/src/kudu/integration-tests/
@@ -44,8 +44,8 @@
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/integration-tests/internal_mini_cluster.h"
 #include "kudu/master/mini_master.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/mini_tablet_server.h"
@@ -101,6 +101,8 @@ using client::KuduSession;
 using client::KuduStatusMemberCallback;
 using client::KuduTable;
 using client::KuduTableCreator;
+using cluster::InternalMiniCluster;
+using cluster::InternalMiniClusterOptions;
 using strings::Split;
 using strings::Substitute;
diff --git a/src/kudu/integration-tests/ b/src/kudu/integration-tests/
index 81173e2..d393bdf 100644
--- a/src/kudu/integration-tests/
+++ b/src/kudu/integration-tests/
@@ -52,8 +52,8 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/integration-tests/internal_mini_cluster.h"
 #include "kudu/master/mini_master.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/tablet/key_value_test_schema.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet.h"
@@ -113,6 +113,8 @@ using client::KuduUpsert;
 using client::KuduValue;
 using client::KuduWriteOperation;
 using client::sp::shared_ptr;
+using cluster::InternalMiniCluster;
+using cluster::InternalMiniClusterOptions;
 using std::list;
 using std::map;
 using std::string;
diff --git a/src/kudu/integration-tests/ b/src/kudu/integration-tests/
index 80f171e..69bfc4c 100644
--- a/src/kudu/integration-tests/
+++ b/src/kudu/integration-tests/
@@ -26,6 +26,9 @@
 namespace kudu {
+using cluster::InternalMiniCluster;
+using cluster::InternalMiniClusterOptions;
 void MiniClusterITestBase::TearDown() {
diff --git a/src/kudu/integration-tests/internal_mini_cluster-itest-base.h b/src/kudu/integration-tests/internal_mini_cluster-itest-base.h
index 4d189c4..ca20a42 100644
--- a/src/kudu/integration-tests/internal_mini_cluster-itest-base.h
+++ b/src/kudu/integration-tests/internal_mini_cluster-itest-base.h
@@ -22,18 +22,18 @@
 #include <unordered_map>
 #include "kudu/client/shared_ptr.h"
-#include "kudu/integration-tests/internal_mini_cluster.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/util/test_util.h"
 namespace kudu {
 namespace client {
 class KuduClient;
+} // namespace client
 namespace itest {
 struct TServerDetails;
+} // namespace itest
 // Simple base utility class to provide a mini cluster with common setup
 // routines useful for integration tests.
@@ -45,7 +45,7 @@ class MiniClusterITestBase : public KuduTest {
   void StartCluster(int num_tablet_servers = 3);
   void StopCluster();
-  std::unique_ptr<InternalMiniCluster> cluster_;
+  std::unique_ptr<cluster::InternalMiniCluster> cluster_;
   client::sp::shared_ptr<client::KuduClient> client_;
   std::unordered_map<std::string, itest::TServerDetails*> ts_map_;
diff --git a/src/kudu/integration-tests/ b/src/kudu/integration-tests/
deleted file mode 100644
index 17153a0..0000000
--- a/src/kudu/integration-tests/
+++ /dev/null
@@ -1,375 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "kudu/integration-tests/internal_mini_cluster.h"
-#include <cstdint>
-#include <ostream>
-#include <unordered_set>
-#include <utility>
-#include "kudu/client/client.h"
-#include "kudu/common/wire_protocol.pb.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/master/catalog_manager.h"
-#include "kudu/master/master.h"
-#include "kudu/master/master.proxy.h"
-#include "kudu/master/mini_master.h"
-#include "kudu/master/ts_descriptor.h"
-#include "kudu/master/ts_manager.h"
-#include "kudu/rpc/messenger.h"
-#include "kudu/tserver/mini_tablet_server.h"
-#include "kudu/tserver/tablet_server.h"
-#include "kudu/tserver/tablet_server_options.h"
-#include "kudu/util/env.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/net/net_util.h"
-#include "kudu/util/net/sockaddr.h"
-#include "kudu/util/path_util.h"
-#include "kudu/util/status.h"
-#include "kudu/util/stopwatch.h"
-#include "kudu/util/test_util.h"
-using std::string;
-using std::vector;
-using strings::Substitute;
-namespace kudu {
-using client::KuduClient;
-using client::KuduClientBuilder;
-using master::CatalogManager;
-using master::MasterServiceProxy;
-using master::MiniMaster;
-using master::TSDescriptor;
-using std::shared_ptr;
-using tserver::MiniTabletServer;
-using tserver::TabletServer;
-  : num_masters(1),
-    num_tablet_servers(1),
-    num_data_dirs(1),
-    bind_mode(MiniCluster::kDefaultBindMode) {
-InternalMiniCluster::InternalMiniCluster(Env* env, InternalMiniClusterOptions options)
-  : env_(env),
-    opts_(std::move(options)),
-    running_(false) {
-  if (opts_.data_root.empty()) {
-    opts_.data_root = JoinPathSegments(GetTestDataDirectory(), "minicluster-data");
-  }
-InternalMiniCluster::~InternalMiniCluster() {
-  Shutdown();
-Status InternalMiniCluster::Start() {
-  CHECK(!opts_.data_root.empty()) << "No Fs root was provided";
-  CHECK(!running_);
-  if (opts_.num_masters > 1) {
-    CHECK_GE(opts_.master_rpc_ports.size(), opts_.num_masters);
-  }
-  if (!env_->FileExists(opts_.data_root)) {
-    RETURN_NOT_OK(env_->CreateDir(opts_.data_root));
-  }
-  // start the masters
-  if (opts_.num_masters > 1) {
-    RETURN_NOT_OK_PREPEND(StartDistributedMasters(),
-                          "Couldn't start distributed masters");
-  } else {
-    RETURN_NOT_OK_PREPEND(StartSingleMaster(), "Couldn't start the single master");
-  }
-  for (int i = 0; i < opts_.num_tablet_servers; i++) {
-    RETURN_NOT_OK_PREPEND(AddTabletServer(),
-                          Substitute("Error adding TS $0", i));
-  }
-  RETURN_NOT_OK_PREPEND(WaitForTabletServerCount(opts_.num_tablet_servers),
-                        "Waiting for tablet servers to start");
-  RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
-                        .set_num_reactors(1)
-                        .set_max_negotiation_threads(1)
-                        .Build(&messenger_),
-                        "Failed to start Messenger for minicluster");
-  running_ = true;
-  return Status::OK();
-Status InternalMiniCluster::StartDistributedMasters() {
-  CHECK_GT(opts_.num_data_dirs, 0);
-  CHECK_GE(opts_.master_rpc_ports.size(), opts_.num_masters);
-  CHECK_GT(opts_.master_rpc_ports.size(), 1);
-  vector<HostPort> master_rpc_addrs = this->master_rpc_addrs();
-  LOG(INFO) << "Creating distributed mini masters. Addrs: "
-            << HostPort::ToCommaSeparatedString(master_rpc_addrs);
-  for (int i = 0; i < opts_.num_masters; i++) {
-    shared_ptr<MiniMaster> mini_master(new MiniMaster(GetMasterFsRoot(i), master_rpc_addrs[i]));
-    mini_master->SetMasterAddresses(master_rpc_addrs);
-    RETURN_NOT_OK_PREPEND(mini_master->Start(), Substitute("Couldn't start follower $0", i));
-    VLOG(1) << "Started MiniMaster with UUID " << mini_master->permanent_uuid()
-            << " at index " << i;
-    mini_masters_.push_back(std::move(mini_master));
-  }
-  int i = 0;
-  for (const shared_ptr<MiniMaster>& master : mini_masters_) {
-    LOG(INFO) << "Waiting to initialize catalog manager on master " << i++;
-    RETURN_NOT_OK_PREPEND(master->WaitForCatalogManagerInit(),
-                          Substitute("Could not initialize catalog manager on master $0", i));
-  }
-  return Status::OK();
-Status InternalMiniCluster::StartSync() {
-  RETURN_NOT_OK(Start());
-  int count = 0;
-  for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
-    RETURN_NOT_OK_PREPEND(tablet_server->WaitStarted(),
-                          Substitute("TabletServer $0 failed to start.", count));
-    count++;
-  }
-  return Status::OK();
-Status InternalMiniCluster::StartSingleMaster() {
-  CHECK_GT(opts_.num_data_dirs, 0);
-  CHECK_EQ(1, opts_.num_masters);
-  CHECK_LE(opts_.master_rpc_ports.size(), 1);
-  uint16_t master_rpc_port = 0;
-  if (opts_.master_rpc_ports.size() == 1) {
-    master_rpc_port = opts_.master_rpc_ports[0];
-  }
-  // start the master (we need the port to set on the servers).
-  string bind_ip = GetBindIpForDaemon(MiniCluster::MASTER, /*index=*/ 0, opts_.bind_mode);
-  shared_ptr<MiniMaster> mini_master(new MiniMaster(GetMasterFsRoot(0),
-      HostPort(std::move(bind_ip), master_rpc_port), opts_.num_data_dirs));
-  RETURN_NOT_OK_PREPEND(mini_master->Start(), "Couldn't start master");
-  RETURN_NOT_OK(mini_master->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests(
-      MonoDelta::FromSeconds(kMasterStartupWaitTimeSeconds)));
-  mini_masters_.push_back(std::move(mini_master));
-  return Status::OK();
-Status InternalMiniCluster::AddTabletServer() {
-  if (mini_masters_.empty()) {
-    return Status::IllegalState("Master not yet initialized");
-  }
-  int new_idx = mini_tablet_servers_.size();
-  uint16_t ts_rpc_port = 0;
-  if (opts_.tserver_rpc_ports.size() > new_idx) {
-    ts_rpc_port = opts_.tserver_rpc_ports[new_idx];
-  }
-  string bind_ip = GetBindIpForDaemon(MiniCluster::TSERVER, new_idx, opts_.bind_mode);
-  gscoped_ptr<MiniTabletServer> tablet_server(new MiniTabletServer(GetTabletServerFsRoot(new_idx),
-      HostPort(bind_ip, ts_rpc_port), opts_.num_data_dirs));
-  // set the master addresses
-  tablet_server->options()->master_addresses.clear();
-  for (const shared_ptr<MiniMaster>& master : mini_masters_) {
-    tablet_server->options()->master_addresses.emplace_back(master->bound_rpc_addr());
-  }
-  RETURN_NOT_OK(tablet_server->Start())
-  mini_tablet_servers_.push_back(shared_ptr<MiniTabletServer>(tablet_server.release()));
-  return Status::OK();
-void InternalMiniCluster::ShutdownNodes(ClusterNodes nodes) {
-  if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::TS_ONLY) {
-    for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
-      tablet_server->Shutdown();
-    }
-    mini_tablet_servers_.clear();
-  }
-  if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::MASTERS_ONLY) {
-    for (const shared_ptr<MiniMaster>& master_server : mini_masters_) {
-      master_server->Shutdown();
-    }
-    mini_masters_.clear();
-  }
-  running_ = false;
-MiniMaster* InternalMiniCluster::mini_master(int idx) const {
-  CHECK_GE(idx, 0) << "Master idx must be >= 0";
-  CHECK_LT(idx, mini_masters_.size()) << "Master idx must be < num masters started";
-  return mini_masters_[idx].get();
-MiniTabletServer* InternalMiniCluster::mini_tablet_server(int idx) const {
-  CHECK_GE(idx, 0) << "TabletServer idx must be >= 0";
-  CHECK_LT(idx, mini_tablet_servers_.size()) << "TabletServer idx must be < 'num_ts_started_'";
-  return mini_tablet_servers_[idx].get();
-vector<HostPort> InternalMiniCluster::master_rpc_addrs() const {
-  vector<HostPort> master_rpc_addrs;
-  for (int i = 0; i < opts_.master_rpc_ports.size(); i++) {
-    master_rpc_addrs.emplace_back(
-        GetBindIpForDaemon(MiniCluster::MASTER, i, opts_.bind_mode),
-        opts_.master_rpc_ports[i]);
-  }
-  return master_rpc_addrs;
-string InternalMiniCluster::GetMasterFsRoot(int idx) const {
-  return JoinPathSegments(opts_.data_root, Substitute("master-$0-root", idx));
-string InternalMiniCluster::GetTabletServerFsRoot(int idx) const {
-  return JoinPathSegments(opts_.data_root, Substitute("ts-$0-root", idx));
-Status InternalMiniCluster::WaitForTabletServerCount(int count) const {
-  vector<shared_ptr<master::TSDescriptor>> descs;
-  return WaitForTabletServerCount(count, MatchMode::MATCH_TSERVERS, &descs);
-Status InternalMiniCluster::WaitForTabletServerCount(int count,
-                                             MatchMode mode,
-                                             vector<shared_ptr<TSDescriptor>>* descs) const {
-  std::unordered_set<int> masters_to_search;
-  for (int i = 0; i < num_masters(); i++) {
-    if (!mini_master(i)->master()->IsShutdown()) {
-      masters_to_search.insert(i);
-    }
-  }
-  Stopwatch sw;
-  sw.start();
-  while (sw.elapsed().wall_seconds() < kRegistrationWaitTimeSeconds) {
-    for (auto iter = masters_to_search.begin(); iter != masters_to_search.end();) {
-      mini_master(*iter)->master()->ts_manager()->GetAllDescriptors(descs);
-      int match_count = 0;
-      switch (mode) {
-        case MatchMode::MATCH_TSERVERS:
-          // GetAllDescriptors() may return servers that are no longer online.
-          // Do a second step of verification to verify that the descs that we got
-          // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
-          for (const shared_ptr<TSDescriptor>& desc : *descs) {
-            for (const auto& mini_tablet_server : mini_tablet_servers_) {
-              const TabletServer* ts = mini_tablet_server->server();
-              if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
-                  ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
-                match_count++;
-                break;
-              }
-            }
-          }
-          break;
-        case MatchMode::DO_NOT_MATCH_TSERVERS:
-          match_count = descs->size();
-          break;
-        default:
-          LOG(FATAL) << "Invalid match mode";
-      }
-      if (match_count == count) {
-        // This master has returned the correct set of tservers.
-        iter = masters_to_search.erase(iter);
-      } else {
-        iter++;
-      }
-    }
-    if (masters_to_search.empty()) {
-      // All masters have returned the correct set of tservers.
-      LOG(INFO) << Substitute("$0 TS(s) registered with all masters after $1s",
-                              count, sw.elapsed().wall_seconds());
-      return Status::OK();
-    }
-    SleepFor(MonoDelta::FromMilliseconds(1));
-  }
-  return Status::TimedOut(Substitute(
-      "Timed out waiting for $0 TS(s) to register with all masters", count));
-Status InternalMiniCluster::CreateClient(KuduClientBuilder* builder,
-                                 client::sp::shared_ptr<KuduClient>* client) const {
-  client::KuduClientBuilder defaults;
-  if (builder == nullptr) {
-    builder = &defaults;
-  }
-  builder->clear_master_server_addrs();
-  for (const shared_ptr<MiniMaster>& master : mini_masters_) {
-    CHECK(master);
-    builder->add_master_server_addr(master->bound_rpc_addr_str());
-  }
-  return builder->Build(client);
-Status InternalMiniCluster::GetLeaderMasterIndex(int* idx) const {
-  const MonoTime deadline = MonoTime::Now() +
-      MonoDelta::FromSeconds(kMasterStartupWaitTimeSeconds);
-  int leader_idx = -1;
-  while (MonoTime::Now() < deadline) {
-    for (int i = 0; i < num_masters(); i++) {
-      master::MiniMaster* mm = mini_master(i);
-      if (!mm->is_started() || mm->master()->IsShutdown()) {
-        continue;
-      }
-      master::CatalogManager* catalog = mm->master()->catalog_manager();
-      master::CatalogManager::ScopedLeaderSharedLock l(catalog);
-      if (l.first_failed_status().ok()) {
-        leader_idx = i;
-        break;
-      }
-    }
-    if (leader_idx != -1) {
-      break;
-    }
-    SleepFor(MonoDelta::FromMilliseconds(100));
-  }
-  if (leader_idx == -1) {
-    return Status::NotFound("Leader master was not found within deadline");
-  }
-  if (idx) {
-    *idx = leader_idx;
-  }
-  return Status::OK();
-std::shared_ptr<rpc::Messenger> InternalMiniCluster::messenger() const {
-  return messenger_;
-std::shared_ptr<MasterServiceProxy> InternalMiniCluster::master_proxy() const {
-  CHECK_EQ(1, mini_masters_.size());
-  return master_proxy(0);
-std::shared_ptr<MasterServiceProxy> InternalMiniCluster::master_proxy(int idx) const {
-  const auto& addr = CHECK_NOTNULL(mini_master(idx))->bound_rpc_addr();
-  return std::make_shared<MasterServiceProxy>(messenger_, addr,;
-} // namespace kudu