You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/10/02 19:25:20 UTC

[3/6] kudu git commit: mini-cluster: new module for the mini cluster implementations

http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/mini-cluster/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
new file mode 100644
index 0000000..896e063
--- /dev/null
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -0,0 +1,1281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/mini-cluster/external_mini_cluster.h"
+
+#include <algorithm>
+#include <csignal>
+#include <cstdint>
+#include <cstdlib>
+#include <functional>
+#include <memory>
+#include <string>
+#include <unordered_set>
+#include <utility>
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <rapidjson/document.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/master_rpc.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/security/test/mini_kdc.h"
+#include "kudu/server/server_base.pb.h"
+#include "kudu/server/server_base.proxy.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet.pb.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/async_util.h"
+#include "kudu/util/curl_util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/subprocess.h"
+#include "kudu/util/test_util.h"
+
+using kudu::client::internal::ConnectToClusterRpc;
+using kudu::master::ListTablesRequestPB;
+using kudu::master::ListTablesResponsePB;
+using kudu::master::MasterServiceProxy;
+using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::RpcController;
+using kudu::server::ServerStatusPB;
+using kudu::tserver::ListTabletsRequestPB;
+using kudu::tserver::ListTabletsResponsePB;
+using kudu::tserver::TabletServerServiceProxy;
+using rapidjson::Value;
+using std::pair;
+using std::string;
+using std::unique_ptr;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
+
+DEFINE_bool(perf_record, false,
+            "Whether to run \"perf record --call-graph fp\" on each daemon in the cluster");
+
+namespace kudu {
+namespace cluster {
+
+static const char* const kMasterBinaryName = "kudu-master";
+static const char* const kTabletServerBinaryName = "kudu-tserver";
+static double kTabletServerRegistrationTimeoutSeconds = 15.0;
+static double kMasterCatalogManagerTimeoutSeconds = 60.0;
+
+ExternalMiniClusterOptions::ExternalMiniClusterOptions()
+    : num_masters(1),
+      num_tablet_servers(1),
+      bind_mode(MiniCluster::kDefaultBindMode),
+      num_data_dirs(1),
+      enable_kerberos(false),
+      logtostderr(true),
+      start_process_timeout(MonoDelta::FromSeconds(30)) {
+}
+
+ExternalMiniCluster::ExternalMiniCluster()
+  : opts_(ExternalMiniClusterOptions()) {
+}
+
+ExternalMiniCluster::ExternalMiniCluster(ExternalMiniClusterOptions opts)
+  : opts_(std::move(opts)) {
+}
+
+ExternalMiniCluster::~ExternalMiniCluster() {
+  Shutdown();
+}
+
+Status ExternalMiniCluster::DeduceBinRoot(std::string* ret) {
+  string exe;
+  RETURN_NOT_OK(Env::Default()->GetExecutablePath(&exe));
+  *ret = DirName(exe);
+  return Status::OK();
+}
+
+Status ExternalMiniCluster::HandleOptions() {
+  daemon_bin_path_ = opts_.daemon_bin_path;
+  if (daemon_bin_path_.empty()) {
+    RETURN_NOT_OK(DeduceBinRoot(&daemon_bin_path_));
+  }
+
+  data_root_ = opts_.data_root;
+  if (data_root_.empty()) {
+    // If they don't specify a data root, use the current gtest directory.
+    data_root_ = JoinPathSegments(GetTestDataDirectory(), "minicluster-data");
+  }
+
+  return Status::OK();
+}
+
+Status ExternalMiniCluster::Start() {
+  CHECK(masters_.empty()) << "Masters are not empty (size: " << masters_.size()
+      << "). Maybe you meant Restart()?";
+  CHECK(tablet_servers_.empty()) << "Tablet servers are not empty (size: "
+      << tablet_servers_.size() << "). Maybe you meant Restart()?";
+  RETURN_NOT_OK(HandleOptions());
+
+  RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
+                        .set_num_reactors(1)
+                        .set_max_negotiation_threads(1)
+                        .Build(&messenger_),
+                        "Failed to start Messenger for minicluster");
+
+  Status s = Env::Default()->CreateDir(data_root_);
+  if (!s.ok() && !s.IsAlreadyPresent()) {
+    RETURN_NOT_OK_PREPEND(s, "Could not create root dir " + data_root_);
+  }
+
+  if (opts_.enable_kerberos) {
+    kdc_.reset(new MiniKdc(opts_.mini_kdc_options));
+    RETURN_NOT_OK(kdc_->Start());
+    RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-admin"),
+                          "could not create admin principal");
+    RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-user"),
+                          "could not create user principal");
+    RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("joe-interloper"),
+                          "could not create unauthorized principal");
+
+    RETURN_NOT_OK_PREPEND(kdc_->Kinit("test-admin"),
+                          "could not kinit as admin");
+    RETURN_NOT_OK_PREPEND(kdc_->SetKrb5Environment(),
+                          "could not set krb5 client env");
+  }
+
+  if (opts_.num_masters != 1) {
+    RETURN_NOT_OK_PREPEND(StartDistributedMasters(),
+                          "Failed to add distributed masters");
+  } else {
+    RETURN_NOT_OK_PREPEND(StartSingleMaster(),
+                          Substitute("Failed to start a single Master"));
+  }
+
+  for (int i = 1; i <= opts_.num_tablet_servers; i++) {
+    RETURN_NOT_OK_PREPEND(AddTabletServer(),
+                          Substitute("Failed starting tablet server $0", i));
+  }
+  RETURN_NOT_OK(WaitForTabletServerCount(
+                  opts_.num_tablet_servers,
+                  MonoDelta::FromSeconds(kTabletServerRegistrationTimeoutSeconds)));
+
+  return Status::OK();
+}
+
+
+void ExternalMiniCluster::ShutdownNodes(ClusterNodes nodes) {
+  if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::TS_ONLY) {
+    for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
+      ts->Shutdown();
+    }
+  }
+  if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::MASTERS_ONLY) {
+    for (const scoped_refptr<ExternalMaster>& master : masters_) {
+      if (master) {
+        master->Shutdown();
+      }
+    }
+  }
+}
+
+Status ExternalMiniCluster::Restart() {
+  for (const scoped_refptr<ExternalMaster>& master : masters_) {
+    if (master && master->IsShutdown()) {
+      RETURN_NOT_OK_PREPEND(master->Restart(), "Cannot restart master bound at: " +
+                                               master->bound_rpc_hostport().ToString());
+    }
+  }
+
+  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
+    if (ts->IsShutdown()) {
+      RETURN_NOT_OK_PREPEND(ts->Restart(), "Cannot restart tablet server bound at: " +
+                                           ts->bound_rpc_hostport().ToString());
+    }
+  }
+
+  RETURN_NOT_OK(WaitForTabletServerCount(
+      tablet_servers_.size(),
+      MonoDelta::FromSeconds(kTabletServerRegistrationTimeoutSeconds)));
+
+  return Status::OK();
+}
+
+void ExternalMiniCluster::SetDaemonBinPath(string daemon_bin_path) {
+  daemon_bin_path_ = std::move(daemon_bin_path);
+  for (auto& master : masters_) {
+    master->SetExePath(GetBinaryPath(kMasterBinaryName));
+  }
+  for (auto& ts : tablet_servers_) {
+    ts->SetExePath(GetBinaryPath(kTabletServerBinaryName));
+  }
+}
+
+string ExternalMiniCluster::GetBinaryPath(const string& binary) const {
+  CHECK(!daemon_bin_path_.empty());
+  return JoinPathSegments(daemon_bin_path_, binary);
+}
+
+string ExternalMiniCluster::GetLogPath(const string& daemon_id) const {
+  CHECK(!data_root_.empty());
+  return JoinPathSegments(JoinPathSegments(data_root_, daemon_id), "logs");
+}
+
+string ExternalMiniCluster::GetDataPath(const string& daemon_id,
+                                        boost::optional<uint32_t> dir_index) const {
+  CHECK(!data_root_.empty());
+  string data_path = "data";
+  if (dir_index) {
+    CHECK_LT(*dir_index, opts_.num_data_dirs);
+    data_path = Substitute("$0-$1", data_path, dir_index.get());
+  } else {
+    CHECK_EQ(1, opts_.num_data_dirs);
+  }
+  return JoinPathSegments(JoinPathSegments(data_root_, daemon_id), data_path);
+}
+
+vector<string> ExternalMiniCluster::GetDataPaths(const string& daemon_id) const {
+  if (opts_.num_data_dirs == 1) {
+    return { GetDataPath(daemon_id) };
+  }
+  vector<string> paths;
+  for (uint32_t dir_index = 0; dir_index < opts_.num_data_dirs; dir_index++) {
+    paths.emplace_back(GetDataPath(daemon_id, dir_index));
+  }
+  return paths;
+}
+
+string ExternalMiniCluster::GetWalPath(const string& daemon_id) const {
+  CHECK(!data_root_.empty());
+  return JoinPathSegments(JoinPathSegments(data_root_, daemon_id), "wal");
+}
+
+namespace {
+vector<string> SubstituteInFlags(const vector<string>& orig_flags,
+                                 int index) {
+  string str_index = strings::Substitute("$0", index);
+  vector<string> ret;
+  for (const string& orig : orig_flags) {
+    ret.push_back(StringReplace(orig, "${index}", str_index, true));
+  }
+  return ret;
+}
+
+} // anonymous namespace
+
+Status ExternalMiniCluster::StartSingleMaster() {
+  string daemon_id = "master-0";
+
+  ExternalDaemonOptions opts(opts_.logtostderr);
+  opts.messenger = messenger_;
+  opts.exe = GetBinaryPath(kMasterBinaryName);
+  opts.wal_dir = GetWalPath(daemon_id);
+  opts.data_dirs = GetDataPaths(daemon_id);
+  opts.log_dir = GetLogPath(daemon_id);
+  if (FLAGS_perf_record) {
+    opts.perf_record_filename =
+        Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
+  }
+  opts.extra_flags = SubstituteInFlags(opts_.extra_master_flags, 0);
+  opts.start_process_timeout = opts_.start_process_timeout;
+
+  opts.rpc_bind_address = HostPort(GetBindIpForMaster(0), 0);
+  scoped_refptr<ExternalMaster> master = new ExternalMaster(opts);
+  if (opts_.enable_kerberos) {
+    // The bind host here is the hostname that will be used to generate the
+    // Kerberos principal, so it has to match the bind address for the master
+    // rpc endpoint.
+    RETURN_NOT_OK_PREPEND(master->EnableKerberos(kdc_.get(), opts.rpc_bind_address.host()),
+                          "could not enable Kerberos");
+  }
+
+  RETURN_NOT_OK(master->Start());
+  masters_.push_back(master);
+  return Status::OK();
+}
+
+Status ExternalMiniCluster::StartDistributedMasters() {
+  int num_masters = opts_.num_masters;
+
+  if (opts_.master_rpc_ports.size() != num_masters) {
+    LOG(FATAL) << num_masters << " masters requested, but only " <<
+        opts_.master_rpc_ports.size() << " ports specified in 'master_rpc_ports'";
+  }
+
+  vector<HostPort> peer_hostports = master_rpc_addrs();
+  vector<string> flags = opts_.extra_master_flags;
+  flags.push_back(Substitute("--master_addresses=$0",
+                             HostPort::ToCommaSeparatedString(peer_hostports)));
+  string exe = GetBinaryPath(kMasterBinaryName);
+
+  // Start the masters.
+  for (int i = 0; i < num_masters; i++) {
+    string daemon_id = Substitute("master-$0", i);
+
+    ExternalDaemonOptions opts(opts_.logtostderr);
+    opts.messenger = messenger_;
+    opts.exe = exe;
+    opts.wal_dir = GetWalPath(daemon_id);
+    opts.data_dirs = GetDataPaths(daemon_id);
+    opts.log_dir = GetLogPath(daemon_id);
+    if (FLAGS_perf_record) {
+      opts.perf_record_filename =
+          Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
+    }
+    opts.extra_flags = SubstituteInFlags(flags, i);
+    opts.start_process_timeout = opts_.start_process_timeout;
+    opts.rpc_bind_address = peer_hostports[i];
+
+    scoped_refptr<ExternalMaster> peer = new ExternalMaster(opts);
+    if (opts_.enable_kerberos) {
+      RETURN_NOT_OK_PREPEND(peer->EnableKerberos(kdc_.get(), peer_hostports[i].host()),
+                            "could not enable Kerberos");
+    }
+    RETURN_NOT_OK_PREPEND(peer->Start(),
+                          Substitute("Unable to start Master at index $0", i));
+    masters_.push_back(peer);
+  }
+
+  return Status::OK();
+}
+
+string ExternalMiniCluster::GetBindIpForTabletServer(int index) const {
+  return MiniCluster::GetBindIpForDaemon(MiniCluster::TSERVER, index, opts_.bind_mode);
+}
+
+string ExternalMiniCluster::GetBindIpForMaster(int index) const {
+  return MiniCluster::GetBindIpForDaemon(MiniCluster::MASTER, index, opts_.bind_mode);
+}
+
+Status ExternalMiniCluster::AddTabletServer() {
+  CHECK(leader_master() != nullptr)
+      << "Must have started at least 1 master before adding tablet servers";
+
+  int idx = tablet_servers_.size();
+  string daemon_id = Substitute("ts-$0", idx);
+
+  vector<HostPort> master_hostports;
+  for (int i = 0; i < num_masters(); i++) {
+    master_hostports.push_back(DCHECK_NOTNULL(master(i))->bound_rpc_hostport());
+  }
+  string bind_host = GetBindIpForTabletServer(idx);
+
+  ExternalDaemonOptions opts(opts_.logtostderr);
+  opts.messenger = messenger_;
+  opts.exe = GetBinaryPath(kTabletServerBinaryName);
+  opts.wal_dir = GetWalPath(daemon_id);
+  opts.data_dirs = GetDataPaths(daemon_id);
+  opts.log_dir = GetLogPath(daemon_id);
+  if (FLAGS_perf_record) {
+    opts.perf_record_filename =
+        Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
+  }
+  opts.extra_flags = SubstituteInFlags(opts_.extra_tserver_flags, idx);
+  opts.start_process_timeout = opts_.start_process_timeout;
+  opts.rpc_bind_address = HostPort(bind_host, 0);
+
+  scoped_refptr<ExternalTabletServer> ts =
+      new ExternalTabletServer(opts, master_hostports);
+  if (opts_.enable_kerberos) {
+    RETURN_NOT_OK_PREPEND(ts->EnableKerberos(kdc_.get(), bind_host),
+                          "could not enable Kerberos");
+  }
+
+  RETURN_NOT_OK(ts->Start());
+  tablet_servers_.push_back(ts);
+  return Status::OK();
+}
+
+Status ExternalMiniCluster::WaitForTabletServerCount(int count, const MonoDelta& timeout) {
+  MonoTime deadline = MonoTime::Now() + timeout;
+
+  unordered_set<int> masters_to_search;
+  for (int i = 0; i < masters_.size(); i++) {
+    if (!masters_[i]->IsShutdown()) {
+      masters_to_search.insert(i);
+    }
+  }
+
+  while (true) {
+    MonoDelta remaining = deadline - MonoTime::Now();
+    if (remaining.ToSeconds() < 0) {
+      return Status::TimedOut(Substitute(
+          "Timed out waiting for $0 TS(s) to register with all masters", count));
+    }
+
+    for (auto iter = masters_to_search.begin(); iter != masters_to_search.end();) {
+      master::ListTabletServersRequestPB req;
+      master::ListTabletServersResponsePB resp;
+      rpc::RpcController rpc;
+      rpc.set_timeout(remaining);
+      RETURN_NOT_OK_PREPEND(master_proxy(*iter)->ListTabletServers(req, &resp, &rpc),
+                            "ListTabletServers RPC failed");
+      // ListTabletServers() may return servers that are no longer online.
+      // Do a second step of verification to verify that the descs that we got
+      // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
+      int match_count = 0;
+      for (const master::ListTabletServersResponsePB_Entry& e : resp.servers()) {
+        for (const scoped_refptr<ExternalTabletServer>& ets : tablet_servers_) {
+          if (ets->instance_id().permanent_uuid() == e.instance_id().permanent_uuid() &&
+              ets->instance_id().instance_seqno() == e.instance_id().instance_seqno()) {
+            match_count++;
+            break;
+          }
+        }
+      }
+      if (match_count == count) {
+        // This master has returned the correct set of tservers.
+        iter = masters_to_search.erase(iter);
+      } else {
+        iter++;
+      }
+    }
+
+    if (masters_to_search.empty()) {
+      // All masters have returned the correct set of tservers.
+      LOG(INFO) << count << " TS(s) registered with all masters";
+      return Status::OK();
+    }
+    SleepFor(MonoDelta::FromMilliseconds(1));
+  }
+}
+
+void ExternalMiniCluster::AssertNoCrashes() {
+  vector<ExternalDaemon*> daemons = this->daemons();
+  int num_crashes = 0;
+  for (ExternalDaemon* d : daemons) {
+    if (d->IsShutdown()) continue;
+    if (!d->IsProcessAlive()) {
+      LOG(ERROR) << "Process with UUID " << d->uuid() << " has crashed";
+      num_crashes++;
+    }
+  }
+  ASSERT_EQ(0, num_crashes) << "At least one process crashed";
+}
+
+Status ExternalMiniCluster::WaitForTabletsRunning(ExternalTabletServer* ts,
+                                                  int min_tablet_count,
+                                                  const MonoDelta& timeout) {
+  TabletServerServiceProxy proxy(messenger_, ts->bound_rpc_addr(), ts->bound_rpc_addr().host());
+  ListTabletsRequestPB req;
+  ListTabletsResponsePB resp;
+
+  MonoTime deadline = MonoTime::Now() + timeout;
+  while (MonoTime::Now() < deadline) {
+    rpc::RpcController rpc;
+    rpc.set_timeout(MonoDelta::FromSeconds(10));
+    RETURN_NOT_OK(proxy.ListTablets(req, &resp, &rpc));
+    if (resp.has_error()) {
+      return StatusFromPB(resp.error().status());
+    }
+
+    bool all_running = true;
+    for (const StatusAndSchemaPB& status : resp.status_and_schema()) {
+      if (status.tablet_status().state() != tablet::RUNNING) {
+        all_running = false;
+      }
+    }
+
+    // We're done if:
+    // 1. All the tablets are running, and
+    // 2. We've observed as many tablets as we had expected or more.
+    if (all_running && resp.status_and_schema_size() >= min_tablet_count) {
+      return Status::OK();
+    }
+
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  return Status::TimedOut(SecureDebugString(resp));
+}
+
+Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) {
+  scoped_refptr<ConnectToClusterRpc> rpc;
+  Synchronizer sync;
+  vector<pair<Sockaddr, string>> addrs_with_names;
+  Sockaddr leader_master_addr;
+  MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+
+  for (const scoped_refptr<ExternalMaster>& master : masters_) {
+    addrs_with_names.emplace_back(master->bound_rpc_addr(), master->bound_rpc_addr().host());
+  }
+  const auto& cb = [&](const Status& status,
+                       const pair<Sockaddr, string>& leader_master,
+                       const master::ConnectToMasterResponsePB& resp) {
+    if (status.ok()) {
+      leader_master_addr = leader_master.first;
+    }
+    sync.StatusCB(status);
+  };
+  rpc.reset(new ConnectToClusterRpc(cb,
+                                    std::move(addrs_with_names),
+                                    deadline,
+                                    MonoDelta::FromSeconds(5),
+                                    messenger_));
+  rpc->SendRpc();
+  RETURN_NOT_OK(sync.Wait());
+  bool found = false;
+  for (int i = 0; i < masters_.size(); i++) {
+    if (masters_[i]->bound_rpc_hostport().port() == leader_master_addr.port()) {
+      found = true;
+      *idx = i;
+      break;
+    }
+  }
+  if (!found) {
+    // There is never a situation where this should happen, so it's
+    // better to exit with a FATAL log message right away vs. return a
+    // Status::IllegalState().
+    LOG(FATAL) << "Leader master is not in masters_";
+  }
+  return Status::OK();
+}
+
+ExternalTabletServer* ExternalMiniCluster::tablet_server_by_uuid(const std::string& uuid) const {
+  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
+    if (ts->instance_id().permanent_uuid() == uuid) {
+      return ts.get();
+    }
+  }
+  return nullptr;
+}
+
+int ExternalMiniCluster::tablet_server_index_by_uuid(const std::string& uuid) const {
+  for (int i = 0; i < tablet_servers_.size(); i++) {
+    if (tablet_servers_[i]->uuid() == uuid) {
+      return i;
+    }
+  }
+  return -1;
+}
+
+vector<ExternalDaemon*> ExternalMiniCluster::daemons() const {
+  vector<ExternalDaemon*> results;
+  for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
+    results.push_back(ts.get());
+  }
+  for (const scoped_refptr<ExternalMaster>& master : masters_) {
+    results.push_back(master.get());
+  }
+  return results;
+}
+
+vector<HostPort> ExternalMiniCluster::master_rpc_addrs() const {
+  vector<HostPort> master_rpc_addrs;
+  for (int i = 0; i < opts_.master_rpc_ports.size(); i++) {
+    master_rpc_addrs.emplace_back(
+        GetBindIpForDaemon(MiniCluster::MASTER, i, opts_.bind_mode),
+        opts_.master_rpc_ports[i]);
+  }
+  return master_rpc_addrs;
+}
+
+std::shared_ptr<rpc::Messenger> ExternalMiniCluster::messenger() const {
+  return messenger_;
+}
+
+std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy() const {
+  CHECK_EQ(masters_.size(), 1);
+  return master_proxy(0);
+}
+
+std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy(int idx) const {
+  CHECK_LT(idx, masters_.size());
+  const auto& addr = CHECK_NOTNULL(master(idx))->bound_rpc_addr();
+  return std::make_shared<MasterServiceProxy>(messenger_, addr, addr.host());
+}
+
+Status ExternalMiniCluster::CreateClient(client::KuduClientBuilder* builder,
+                                         client::sp::shared_ptr<client::KuduClient>* client) const {
+  client::KuduClientBuilder defaults;
+  if (builder == nullptr) {
+    builder = &defaults;
+  }
+
+  CHECK(!masters_.empty());
+  builder->clear_master_server_addrs();
+  for (const scoped_refptr<ExternalMaster>& master : masters_) {
+    builder->add_master_server_addr(master->bound_rpc_hostport().ToString());
+  }
+  return builder->Build(client);
+}
+
+Status ExternalMiniCluster::SetFlag(ExternalDaemon* daemon,
+                                    const string& flag,
+                                    const string& value) {
+  const auto& addr = daemon->bound_rpc_addr();
+  server::GenericServiceProxy proxy(messenger_, addr, addr.host());
+
+  rpc::RpcController controller;
+  controller.set_timeout(MonoDelta::FromSeconds(30));
+  server::SetFlagRequestPB req;
+  server::SetFlagResponsePB resp;
+  req.set_flag(flag);
+  req.set_value(value);
+  req.set_force(true);
+  RETURN_NOT_OK_PREPEND(proxy.SetFlag(req, &resp, &controller),
+                        "rpc failed");
+  if (resp.result() != server::SetFlagResponsePB::SUCCESS) {
+    return Status::RemoteError("failed to set flag",
+                               SecureShortDebugString(resp));
+  }
+  return Status::OK();
+}
+
+//------------------------------------------------------------
+// ExternalDaemon
+//------------------------------------------------------------
+
+ExternalDaemon::ExternalDaemon(ExternalDaemonOptions opts)
+    : messenger_(std::move(opts.messenger)),
+      wal_dir_(std::move(opts.wal_dir)),
+      data_dirs_(std::move(opts.data_dirs)),
+      log_dir_(std::move(opts.log_dir)),
+      perf_record_filename_(std::move(opts.perf_record_filename)),
+      start_process_timeout_(opts.start_process_timeout),
+      logtostderr_(opts.logtostderr),
+      rpc_bind_address_(std::move(opts.rpc_bind_address)),
+      exe_(std::move(opts.exe)),
+      extra_flags_(std::move(opts.extra_flags)) {
+  CHECK(rpc_bind_address_.Initialized());
+}
+
+ExternalDaemon::~ExternalDaemon() {
+}
+
+Status ExternalDaemon::EnableKerberos(MiniKdc* kdc, const string& bind_host) {
+  string spn = "kudu/" + bind_host;
+  string ktpath;
+  RETURN_NOT_OK_PREPEND(kdc->CreateServiceKeytab(spn, &ktpath),
+                        "could not create keytab");
+  extra_env_ = kdc->GetEnvVars();
+  extra_flags_.push_back(Substitute("--keytab_file=$0", ktpath));
+  extra_flags_.push_back(Substitute("--principal=$0", spn));
+  extra_flags_.emplace_back("--rpc_authentication=required");
+  extra_flags_.emplace_back("--superuser_acl=test-admin");
+  extra_flags_.emplace_back("--user_acl=test-user");
+  return Status::OK();
+}
+
+Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
+  CHECK(!process_);
+
+  vector<string> argv;
+
+  // First the exe for argv[0].
+  argv.push_back(exe_);
+
+  // Then all the flags coming from the minicluster framework.
+  argv.insert(argv.end(), user_flags.begin(), user_flags.end());
+
+  // Disable fsync to dramatically speed up runtime. This is safe as no tests
+  // rely on forcefully cutting power to a machine or equivalent.
+  argv.emplace_back("--never_fsync");
+
+  // Generate smaller RSA keys -- generating a 1024-bit key is faster
+  // than generating the default 2048-bit key, and we don't care about
+  // strong encryption in tests. Setting it lower (e.g. 512 bits) results
+  // in OpenSSL errors RSA_sign:digest too big for rsa key:rsa_sign.c:122
+  // since we are using strong/high TLS v1.2 cipher suites, so the minimum
+  // size of TLS-related RSA key is 768 bits (due to the usage of
+  // the ECDHE-RSA-AES256-GCM-SHA384 suite). However, to work with Java
+  // client it's necessary to have at least 1024 bits for certificate RSA key
+  // due to Java security policies.
+  argv.emplace_back("--ipki_server_key_size=1024");
+
+  // Disable minidumps by default since many tests purposely inject faults.
+  argv.emplace_back("--enable_minidumps=false");
+
+  // Disable log redaction.
+  argv.emplace_back("--redact=flag");
+
+  // Enable metrics logging.
+  argv.emplace_back("--metrics_log_interval_ms=1000");
+
+  if (logtostderr_) {
+    // Ensure that logging goes to the test output and doesn't get buffered.
+    argv.emplace_back("--logtostderr");
+    argv.emplace_back("--logbuflevel=-1");
+  }
+
+  // Even if we are logging to stderr, metrics logs and minidumps end up being
+  // written based on -log_dir. So, we have to set that too.
+  argv.push_back("--log_dir=" + log_dir_);
+  RETURN_NOT_OK(env_util::CreateDirsRecursively(Env::Default(), log_dir_));
+
+  // Tell the server to dump its port information so we can pick it up.
+  string info_path = JoinPathSegments(data_dirs_[0], "info.pb");
+  argv.push_back("--server_dump_info_path=" + info_path);
+  argv.emplace_back("--server_dump_info_format=pb");
+
+  // We use ephemeral ports in many tests. They don't work for production, but are OK
+  // in unit tests.
+  argv.emplace_back("--rpc_server_allow_ephemeral_ports");
+
+  // Allow unsafe and experimental flags from tests, since we often use
+  // fault injection, etc.
+  argv.emplace_back("--unlock_experimental_flags");
+  argv.emplace_back("--unlock_unsafe_flags");
+
+  // Then the "extra flags" passed into the ctor (from the ExternalMiniCluster
+  // options struct). These come at the end so they can override things like
+  // web port or RPC bind address if necessary.
+  argv.insert(argv.end(), extra_flags_.begin(), extra_flags_.end());
+
+  // A previous instance of the daemon may have run in the same directory. So, remove
+  // the previous info file if it's there.
+  ignore_result(Env::Default()->DeleteFile(info_path));
+
+  // Start the daemon.
+  unique_ptr<Subprocess> p(new Subprocess(argv));
+  p->ShareParentStdout(false);
+  p->SetEnvVars(extra_env_);
+  string env_str;
+  JoinMapKeysAndValues(extra_env_, "=", ",", &env_str);
+  LOG(INFO) << "Running " << exe_ << "\n" << JoinStrings(argv, "\n")
+            << " with env {" << env_str << "}";
+  RETURN_NOT_OK_PREPEND(p->Start(),
+                        Substitute("Failed to start subprocess $0", exe_));
+
+  // If requested, start a monitoring subprocess.
+  unique_ptr<Subprocess> perf_record;
+  if (!perf_record_filename_.empty()) {
+    perf_record.reset(new Subprocess({
+      "perf",
+      "record",
+      "--call-graph",
+      "fp",
+      "-o",
+      perf_record_filename_,
+      Substitute("--pid=$0", p->pid())
+    }, SIGINT));
+    RETURN_NOT_OK_PREPEND(perf_record->Start(),
+                          "Could not start perf record subprocess");
+  }
+
+  // The process is now starting -- wait for the bound port info to show up.
+  Stopwatch sw;
+  sw.start();
+  bool success = false;
+  while (sw.elapsed().wall_seconds() < start_process_timeout_.ToSeconds()) {
+    if (Env::Default()->FileExists(info_path)) {
+      success = true;
+      break;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(10));
+    int wait_status;
+    Status s = p->WaitNoBlock(&wait_status);
+    if (s.IsTimedOut()) {
+      // The process is still running.
+      continue;
+    }
+
+    // If the process exited with expected exit status we need to still swap() the process
+    // and exit as if it had succeeded.
+    if (WIFEXITED(wait_status) && WEXITSTATUS(wait_status) == fault_injection::kExitStatus) {
+      process_.swap(p);
+      perf_record_process_.swap(perf_record);
+      return Status::OK();
+    }
+
+    RETURN_NOT_OK_PREPEND(s, Substitute("Failed waiting on $0", exe_));
+    string exit_info;
+    RETURN_NOT_OK(p->GetExitStatus(nullptr, &exit_info));
+    return Status::RuntimeError(exit_info);
+  }
+
+  if (!success) {
+    ignore_result(p->Kill(SIGKILL));
+    return Status::TimedOut(
+        Substitute("Timed out after $0s waiting for process ($1) to write info file ($2)",
+                   start_process_timeout_.ToString(), exe_, info_path));
+  }
+
+  status_.reset(new ServerStatusPB());
+  RETURN_NOT_OK_PREPEND(pb_util::ReadPBFromPath(Env::Default(), info_path, status_.get()),
+                        "Failed to read info file from " + info_path);
+  LOG(INFO) << "Started " << exe_ << " as pid " << p->pid();
+  VLOG(1) << exe_ << " instance information:\n" << SecureDebugString(*status_);
+
+  process_.swap(p);
+  perf_record_process_.swap(perf_record);
+  return Status::OK();
+}
+
+void ExternalDaemon::SetExePath(string exe) {
+  CHECK(IsShutdown()) << "Call Shutdown() before changing the executable path";
+  exe_ = std::move(exe);
+}
+
+Status ExternalDaemon::Pause() {
+  if (!process_) {
+    return Status::IllegalState(Substitute(
+        "Request to pause '$0' but the process is not there", exe_));
+  }
+  VLOG(1) << "Pausing " << exe_ << " with pid " << process_->pid();
+  const Status s = process_->Kill(SIGSTOP);
+  RETURN_NOT_OK(s);
+  paused_ = true;
+  return s;
+}
+
+Status ExternalDaemon::Resume() {
+  if (!process_) {
+    return Status::IllegalState(Substitute(
+        "Request to resume '$0' but the process is not there", exe_));
+  }
+  VLOG(1) << "Resuming " << exe_ << " with pid " << process_->pid();
+  const Status s = process_->Kill(SIGCONT);
+  RETURN_NOT_OK(s);
+  paused_ = false;
+  return s;
+}
+
+bool ExternalDaemon::IsShutdown() const {
+  return !process_;
+}
+
+bool ExternalDaemon::IsProcessAlive() const {
+  if (IsShutdown()) {
+    return false;
+  }
+  Status s = process_->WaitNoBlock();
+  // If the non-blocking Wait "times out", that means the process
+  // is running.
+  return s.IsTimedOut();
+}
+
+Status ExternalDaemon::WaitForInjectedCrash(const MonoDelta& timeout) const {
+  return WaitForCrash(timeout, [](int status) {
+      return WIFEXITED(status) && WEXITSTATUS(status) == fault_injection::kExitStatus;
+    }, "fault injection");
+}
+
+Status ExternalDaemon::WaitForFatal(const MonoDelta& timeout) const {
+  return WaitForCrash(timeout, [](int status) {
+      return WIFSIGNALED(status) && WTERMSIG(status) == SIGABRT;
+    }, "FATAL crash");
+}
+
+
+Status ExternalDaemon::WaitForCrash(const MonoDelta& timeout,
+                                    const std::function<bool(int)>& wait_status_predicate,
+                                    const char* crash_type_str) const {
+  CHECK(process_) << "process not started";
+  MonoTime deadline = MonoTime::Now() + timeout;
+
+  int i = 1;
+  while (IsProcessAlive() && MonoTime::Now() < deadline) {
+    int sleep_ms = std::min(i++ * 10, 200);
+    SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+  }
+
+  if (IsProcessAlive()) {
+    return Status::TimedOut(Substitute("Process did not crash within $0",
+                                       timeout.ToString()));
+  }
+
+  // If the process has exited, make sure it exited with the expected status.
+  int wait_status;
+  RETURN_NOT_OK_PREPEND(process_->WaitNoBlock(&wait_status),
+                        "could not get wait status");
+
+  if (!wait_status_predicate(wait_status)) {
+    string info_str;
+    RETURN_NOT_OK_PREPEND(process_->GetExitStatus(nullptr, &info_str),
+                          "could not get description of exit");
+    return Status::Aborted(
+        Substitute("process exited, but not due to a $0: $1", crash_type_str, info_str));
+  }
+  return Status::OK();
+}
+
+pid_t ExternalDaemon::pid() const {
+  return process_->pid();
+}
+
+Subprocess* ExternalDaemon::process() const {
+  return process_.get();
+}
+
+void ExternalDaemon::Shutdown() {
+  if (!process_) return;
+
+  // Before we kill the process, store the addresses. If we're told to
+  // start again we'll reuse these. Store only the port if the
+  // daemons were using wildcard address for binding.
+  if (rpc_bind_address().host() != MiniCluster::kWildcardIpAddr) {
+    bound_rpc_ = bound_rpc_hostport();
+    bound_http_ = bound_http_hostport();
+  } else {
+    bound_rpc_.set_host(MiniCluster::kWildcardIpAddr);
+    bound_rpc_.set_port(bound_rpc_hostport().port());
+    bound_http_.set_host(MiniCluster::kWildcardIpAddr);
+    bound_http_.set_port(bound_http_hostport().port());
+  }
+
+  if (IsProcessAlive()) {
+    if (!paused_) {
+      // In coverage builds, ask the process nicely to flush coverage info
+      // before we kill -9 it. Otherwise, we never get any coverage from
+      // external clusters.
+      FlushCoverage();
+      // Similarly, check for leaks in LSAN builds before killing.
+      CheckForLeaks();
+    }
+
+    LOG(INFO) << "Killing " << exe_ << " with pid " << process_->pid();
+    ignore_result(process_->Kill(SIGKILL));
+  }
+  WARN_NOT_OK(process_->Wait(), "Waiting on " + exe_);
+  paused_ = false;
+  process_.reset();
+  perf_record_process_.reset();
+}
+
+Status ExternalDaemon::DeleteFromDisk() const {
+  for (const string& data_dir : data_dirs()) {
+    RETURN_NOT_OK(Env::Default()->DeleteRecursively(data_dir));
+  }
+  RETURN_NOT_OK(Env::Default()->DeleteRecursively(wal_dir()));
+  return Status::OK();
+}
+
+void ExternalDaemon::FlushCoverage() {
+#ifndef COVERAGE_BUILD
+  return; // NOLINT(*)
+#else
+  LOG(INFO) << "Attempting to flush coverage for " << exe_ << " pid " << process_->pid();
+  server::GenericServiceProxy proxy(
+      messenger_, bound_rpc_addr(), bound_rpc_addr().host());
+
+  server::FlushCoverageRequestPB req;
+  server::FlushCoverageResponsePB resp;
+  rpc::RpcController rpc;
+
+  rpc.set_timeout(MonoDelta::FromMilliseconds(1000));
+  Status s = proxy.FlushCoverage(req, &resp, &rpc);
+  if (s.ok() && !resp.success()) {
+    s = Status::RemoteError("Server does not appear to be running a coverage build");
+  }
+  WARN_NOT_OK(s, Substitute("Unable to flush coverage on $0 pid $1", exe_, process_->pid()));
+#endif
+}
+
+void ExternalDaemon::CheckForLeaks() {
+#if defined(__has_feature)
+#  if __has_feature(address_sanitizer)
+  LOG(INFO) << "Attempting to check leaks for " << exe_ << " pid " << process_->pid();
+  server::GenericServiceProxy proxy(messenger_, bound_rpc_addr(), bound_rpc_addr().host());
+
+  server::CheckLeaksRequestPB req;
+  server::CheckLeaksResponsePB resp;
+  rpc::RpcController rpc;
+
+  rpc.set_timeout(MonoDelta::FromMilliseconds(1000));
+  Status s = proxy.CheckLeaks(req, &resp, &rpc);
+  if (s.ok()) {
+    if (!resp.success()) {
+      s = Status::RemoteError("Server does not appear to be running an LSAN build");
+    } else {
+      CHECK(!resp.found_leaks()) << "Found leaks in " << exe_ << " pid " << process_->pid();
+    }
+  }
+  WARN_NOT_OK(s, Substitute("Unable to check leaks on $0 pid $1", exe_, process_->pid()));
+#  endif
+#endif
+}
+
+HostPort ExternalDaemon::bound_rpc_hostport() const {
+  CHECK(status_);
+  CHECK_GE(status_->bound_rpc_addresses_size(), 1);
+  HostPort ret;
+  CHECK_OK(HostPortFromPB(status_->bound_rpc_addresses(0), &ret));
+  return ret;
+}
+
+Sockaddr ExternalDaemon::bound_rpc_addr() const {
+  HostPort hp = bound_rpc_hostport();
+  vector<Sockaddr> addrs;
+  CHECK_OK(hp.ResolveAddresses(&addrs));
+  CHECK(!addrs.empty());
+  return addrs[0];
+}
+
+HostPort ExternalDaemon::bound_http_hostport() const {
+  CHECK(status_);
+  if (status_->bound_http_addresses_size() == 0) {
+    return HostPort();
+  }
+  HostPort ret;
+  CHECK_OK(HostPortFromPB(status_->bound_http_addresses(0), &ret));
+  return ret;
+}
+
+const NodeInstancePB& ExternalDaemon::instance_id() const {
+  CHECK(status_);
+  return status_->node_instance();
+}
+
+const string& ExternalDaemon::uuid() const {
+  CHECK(status_);
+  return status_->node_instance().permanent_uuid();
+}
+
+Status ExternalDaemon::GetInt64Metric(const MetricEntityPrototype* entity_proto,
+                                      const char* entity_id,
+                                      const MetricPrototype* metric_proto,
+                                      const char* value_field,
+                                      int64_t* value) const {
+  CHECK(bound_http_hostport().Initialized());
+  // Fetch metrics whose name matches the given prototype.
+  string url = Substitute(
+      "http://$0/jsonmetricz?metrics=$1",
+      bound_http_hostport().ToString(),
+      metric_proto->name());
+  EasyCurl curl;
+  faststring dst;
+  RETURN_NOT_OK(curl.FetchURL(url, &dst));
+
+  // Parse the results, beginning with the top-level entity array.
+  JsonReader r(dst.ToString());
+  RETURN_NOT_OK(r.Init());
+  vector<const Value*> entities;
+  RETURN_NOT_OK(r.ExtractObjectArray(r.root(), NULL, &entities));
+  for (const Value* entity : entities) {
+    // Find the desired entity.
+    string type;
+    RETURN_NOT_OK(r.ExtractString(entity, "type", &type));
+    if (type != entity_proto->name()) {
+      continue;
+    }
+    if (entity_id) {
+      string id;
+      RETURN_NOT_OK(r.ExtractString(entity, "id", &id));
+      if (id != entity_id) {
+        continue;
+      }
+    }
+
+    // Find the desired metric within the entity.
+    vector<const Value*> metrics;
+    RETURN_NOT_OK(r.ExtractObjectArray(entity, "metrics", &metrics));
+    for (const Value* metric : metrics) {
+      string name;
+      RETURN_NOT_OK(r.ExtractString(metric, "name", &name));
+      if (name != metric_proto->name()) {
+        continue;
+      }
+      RETURN_NOT_OK(r.ExtractInt64(metric, value_field, value));
+      return Status::OK();
+    }
+  }
+  string msg;
+  if (entity_id) {
+    msg = Substitute("Could not find metric $0.$1 for entity $2",
+                     entity_proto->name(), metric_proto->name(),
+                     entity_id);
+  } else {
+    msg = Substitute("Could not find metric $0.$1",
+                     entity_proto->name(), metric_proto->name());
+  }
+  return Status::NotFound(msg);
+}
+
+//------------------------------------------------------------
+// ScopedResumeExternalDaemon
+//------------------------------------------------------------
+
+ScopedResumeExternalDaemon::ScopedResumeExternalDaemon(ExternalDaemon* daemon)
+    : daemon_(CHECK_NOTNULL(daemon)) {
+}
+
+ScopedResumeExternalDaemon::~ScopedResumeExternalDaemon() {
+  WARN_NOT_OK(daemon_->Resume(), "Could not resume external daemon");
+}
+
+//------------------------------------------------------------
+// ExternalMaster
+//------------------------------------------------------------
+
+ExternalMaster::ExternalMaster(ExternalDaemonOptions opts)
+    : ExternalDaemon(std::move(opts)) {
+}
+
+ExternalMaster::~ExternalMaster() {
+}
+
+Status ExternalMaster::Start() {
+  vector<string> flags(GetCommonFlags());
+  flags.push_back(Substitute("--rpc_bind_addresses=$0", rpc_bind_address().ToString()));
+  flags.push_back(Substitute("--webserver_interface=$0", rpc_bind_address().host()));
+  flags.emplace_back("--webserver_port=0");
+  return StartProcess(flags);
+}
+
+Status ExternalMaster::Restart() {
+  // We store the addresses on shutdown so make sure we did that first.
+  if (bound_rpc_.port() == 0) {
+    return Status::IllegalState("Master cannot be restarted. Must call Shutdown() first.");
+  }
+
+  vector<string> flags(GetCommonFlags());
+  flags.push_back(Substitute("--rpc_bind_addresses=$0", bound_rpc_.ToString()));
+
+  if (bound_http_.Initialized()) {
+    flags.push_back(Substitute("--webserver_interface=$0", bound_http_.host()));
+    flags.push_back(Substitute("--webserver_port=$0", bound_http_.port()));
+  } else {
+    flags.push_back(Substitute("--webserver_interface=$0", bound_rpc_.host()));
+    flags.emplace_back("--webserver_port=0");
+  }
+
+  return StartProcess(flags);
+}
+
+Status ExternalMaster::WaitForCatalogManager() {
+  unique_ptr<MasterServiceProxy> proxy(new MasterServiceProxy(
+      messenger_, bound_rpc_addr(), bound_rpc_addr().host()));
+  Stopwatch sw;
+  sw.start();
+  while (sw.elapsed().wall_seconds() < kMasterCatalogManagerTimeoutSeconds) {
+    ListTablesRequestPB req;
+    ListTablesResponsePB resp;
+    RpcController rpc;
+    Status s = proxy->ListTables(req, &resp, &rpc);
+    if (s.ok()) {
+      if (!resp.has_error()) {
+        // This master is the leader and is up and running.
+        break;
+      }
+      s = StatusFromPB(resp.error().status());
+      if (s.IsIllegalState()) {
+        // This master is not the leader but is otherwise up and running.
+        break;
+      }
+      if (!s.IsServiceUnavailable()) {
+        // Unexpected error from master.
+        return s;
+      }
+    } else if (!s.IsTimedOut() && !s.IsNetworkError()) {
+      // Unexpected error from proxy.
+      return s;
+    }
+
+    // There was some kind of transient network error or the master isn't yet
+    // ready. Sleep and retry.
+    SleepFor(MonoDelta::FromMilliseconds(50));
+  }
+  if (sw.elapsed().wall_seconds() > kMasterCatalogManagerTimeoutSeconds) {
+    return Status::TimedOut(
+        Substitute("Timed out after $0s waiting for master ($1) startup",
+                   kMasterCatalogManagerTimeoutSeconds,
+                   bound_rpc_addr().ToString()));
+  }
+  return Status::OK();
+}
+
+vector<string> ExternalMaster::GetCommonFlags() const {
+  return {
+    "--fs_wal_dir=" + wal_dir_,
+    "--fs_data_dirs=" + JoinStrings(data_dirs_, ","),
+    "--webserver_interface=localhost",
+
+    // See the in-line comment for "--ipki_server_key_size" flag in
+    // ExternalDaemon::StartProcess() method.
+    "--ipki_ca_key_size=1024",
+
+    // As for the TSK keys, 512 bits is the minimum since we are using the SHA256
+    // digest for token signing/verification.
+    "--tsk_num_rsa_bits=512",
+  };
+}
+
+
+//------------------------------------------------------------
+// ExternalTabletServer
+//------------------------------------------------------------
+
+ExternalTabletServer::ExternalTabletServer(ExternalDaemonOptions opts,
+                                           vector<HostPort> master_addrs)
+    : ExternalDaemon(std::move(opts)),
+      master_addrs_(std::move(master_addrs)) {
+  DCHECK(!master_addrs_.empty());
+}
+
+ExternalTabletServer::~ExternalTabletServer() {
+}
+
+Status ExternalTabletServer::Start() {
+  vector<string> flags;
+  flags.push_back("--fs_wal_dir=" + wal_dir_);
+  flags.push_back("--fs_data_dirs=" + JoinStrings(data_dirs_, ","));
+  flags.push_back(Substitute("--rpc_bind_addresses=$0",
+                             rpc_bind_address().ToString()));
+  flags.push_back(Substitute("--local_ip_for_outbound_sockets=$0",
+                             rpc_bind_address().host()));
+  flags.push_back(Substitute("--webserver_interface=$0",
+                             rpc_bind_address().host()));
+  flags.emplace_back("--webserver_port=0");
+  flags.push_back(Substitute("--tserver_master_addrs=$0",
+                             HostPort::ToCommaSeparatedString(master_addrs_)));
+  RETURN_NOT_OK(StartProcess(flags));
+  return Status::OK();
+}
+
+Status ExternalTabletServer::Restart() {
+  // We store the addresses on shutdown so make sure we did that first.
+  if (bound_rpc_.port() == 0) {
+    return Status::IllegalState("Tablet server cannot be restarted. Must call Shutdown() first.");
+  }
+  vector<string> flags;
+  flags.push_back("--fs_wal_dir=" + wal_dir_);
+  flags.push_back("--fs_data_dirs=" + JoinStrings(data_dirs_, ","));
+  flags.push_back(Substitute("--rpc_bind_addresses=$0", bound_rpc_.ToString()));
+  flags.push_back(Substitute("--local_ip_for_outbound_sockets=$0",
+                             rpc_bind_address().host()));
+  if (bound_http_.Initialized()) {
+    flags.push_back(Substitute("--webserver_port=$0", bound_http_.port()));
+    flags.push_back(Substitute("--webserver_interface=$0",
+                               bound_http_.host()));
+  }
+  flags.push_back(Substitute("--tserver_master_addrs=$0",
+                             HostPort::ToCommaSeparatedString(master_addrs_)));
+  return StartProcess(flags);
+}
+
+} // namespace cluster
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/mini-cluster/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
new file mode 100644
index 0000000..4ed96de
--- /dev/null
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -0,0 +1,568 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <sys/types.h>
+
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/security/test/mini_kdc.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class MetricEntityPrototype;
+class MetricPrototype;
+class NodeInstancePB;
+class Sockaddr;
+class Subprocess;
+
+namespace client {
+class KuduClient;
+class KuduClientBuilder;
+} // namespace client
+
+namespace master {
+class MasterServiceProxy;
+} // namespace master
+
+namespace rpc {
+class Messenger;
+} // namespace rpc
+
+namespace server {
+class ServerStatusPB;
+} // namespace server
+
+namespace cluster {
+
+class ExternalDaemon;
+class ExternalMaster;
+class ExternalTabletServer;
+
+struct ExternalMiniClusterOptions {
+  ExternalMiniClusterOptions();
+
+  // Number of masters to start.
+  // Default: 1
+  int num_masters;
+
+  // Number of TS to start.
+  // Default: 1
+  int num_tablet_servers;
+
+  // Directory in which to store data.
+  // Default: "", which auto-generates a unique path for this cluster.
+  std::string data_root;
+
+  MiniCluster::BindMode bind_mode;
+
+  // The path where the kudu daemons should be run from.
+  // Default: "", which uses the same path as the currently running executable.
+  // This works for unit tests, since they all end up in build/latest/bin.
+  std::string daemon_bin_path;
+
+  // Number of data directories to be created for each daemon.
+  // Default: 1
+  int num_data_dirs;
+
+  // Extra flags for tablet servers and masters respectively.
+  //
+  // In these flags, you may use the special string '${index}' which will
+  // be substituted with the index of the tablet server or master.
+  std::vector<std::string> extra_tserver_flags;
+  std::vector<std::string> extra_master_flags;
+
+  // If more than one master is specified, list of ports for the
+  // masters in a consensus configuration. Port at index 0 is used for the leader
+  // master.
+  std::vector<uint16_t> master_rpc_ports;
+
+  // Options to configure the MiniKdc before starting it up.
+  // Only used when 'enable_kerberos' is 'true'.
+  MiniKdcOptions mini_kdc_options;
+
+  // If true, set up a KDC as part of this ExternalMiniCluster, generate keytabs for
+  // the servers, and require Kerberos authentication from clients.
+  //
+  // Additionally, when the cluster is started, the environment of the
+  // test process will be modified to include Kerberos credentials for
+  // a principal named 'testuser'.
+  bool enable_kerberos;
+
+  // If true, sends logging output to stderr instead of a log file. Defaults to
+  // true.
+  bool logtostderr;
+
+  // Amount of time that may elapse between the creation of a daemon process
+  // and the process writing out its info file. Defaults to 30 seconds.
+  MonoDelta start_process_timeout;
+};
+
+// A mini-cluster made up of subprocesses running each of the daemons
+// separately. This is useful for black-box or grey-box failure testing
+// purposes -- it provides the ability to forcibly kill or stop particular
+// cluster participants, which isn't feasible in the normal InternalMiniCluster.
+// On the other hand, there is little access to inspect the internal state
+// of the daemons.
+class ExternalMiniCluster : public MiniCluster {
+ public:
+  // Constructs a cluster with the default options.
+  ExternalMiniCluster();
+
+  // Constructs a cluster with options specified in 'opts'.
+  explicit ExternalMiniCluster(ExternalMiniClusterOptions opts);
+
+  // Destroys a cluster.
+  virtual ~ExternalMiniCluster();
+
+  // Start the cluster.
+  Status Start() override;
+
+  // Restarts the cluster. Requires that it has been Shutdown() first.
+  Status Restart();
+
+  // Add a new TS to the cluster. The new TS is started.
+  // Requires that the master is already running.
+  Status AddTabletServer();
+
+  // Currently, this uses SIGKILL on each daemon for a non-graceful shutdown.
+  void ShutdownNodes(ClusterNodes nodes) override;
+
+  // Return the IP address that the tablet server with the given index will bind to.
+  // If options.bind_to_unique_loopback_addresses is false, this will be 127.0.0.1
+  // Otherwise, it is another IP in the local netblock.
+  std::string GetBindIpForTabletServer(int index) const;
+
+  // Same as above but for a master.
+  std::string GetBindIpForMaster(int index) const;
+
+  // Return a pointer to the running leader master. This may be NULL
+  // if the cluster is not started.
+  //
+  // TODO(unknown): Use the appropriate RPC here to return the leader master,
+  // to allow some of the existing tests (e.g., raft_consensus-itest)
+  // to use multiple masters.
+  ExternalMaster* leader_master() { return master(0); }
+
+  // Perform an RPC to determine the leader of the external mini
+  // cluster.  Set 'index' to the leader master's index (for calls to
+  // to master() below).
+  //
+  // NOTE: if a leader election occurs after this method is executed,
+  // the last result may not be valid.
+  Status GetLeaderMasterIndex(int* idx);
+
+  // If this cluster is configured for a single non-distributed
+  // master, return the single master or NULL if the master is not
+  // started. Exits with a CHECK failure if there are multiple
+  // masters.
+  ExternalMaster* master() const {
+    CHECK_EQ(masters_.size(), 1)
+        << "master() should not be used with multiple masters, use leader_master() instead.";
+    return master(0);
+  }
+
+  // Return master at 'idx' or NULL if the master at 'idx' has not
+  // been started.
+  ExternalMaster* master(int idx) const {
+    CHECK_LT(idx, masters_.size());
+    return masters_[idx].get();
+  }
+
+  ExternalTabletServer* tablet_server(int idx) const {
+    CHECK_LT(idx, tablet_servers_.size());
+    return tablet_servers_[idx].get();
+  }
+
+  // Return ExternalTabletServer given its UUID. If not found, returns NULL.
+  ExternalTabletServer* tablet_server_by_uuid(const std::string& uuid) const;
+
+  // Return the index of the ExternalTabletServer that has the given 'uuid', or
+  // -1 if no such UUID can be found.
+  int tablet_server_index_by_uuid(const std::string& uuid) const;
+
+  // Return all tablet servers and masters.
+  std::vector<ExternalDaemon*> daemons() const;
+
+  MiniKdc* kdc() const {
+    return CHECK_NOTNULL(kdc_.get());
+  }
+
+  int num_tablet_servers() const override {
+    return tablet_servers_.size();
+  }
+
+  int num_masters() const override {
+    return masters_.size();
+  }
+
+  BindMode bind_mode() const override {
+    return opts_.bind_mode;
+  }
+
+  std::vector<uint16_t> master_rpc_ports() const override {
+    return opts_.master_rpc_ports;
+  }
+
+  std::vector<HostPort> master_rpc_addrs() const override;
+
+  std::shared_ptr<rpc::Messenger> messenger() const override;
+  std::shared_ptr<master::MasterServiceProxy> master_proxy() const override;
+  std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const override;
+
+  // Wait until the number of registered tablet servers reaches the given count
+  // on all of the running masters. Returns Status::TimedOut if the desired
+  // count is not achieved with the given timeout.
+  Status WaitForTabletServerCount(int count, const MonoDelta& timeout);
+
+  // Runs gtest assertions that no servers have crashed.
+  void AssertNoCrashes();
+
+  // Wait until all tablets on the given tablet server are in the RUNNING
+  // state. Returns Status::TimedOut if 'timeout' elapses and at least one
+  // tablet is not yet RUNNING.
+  //
+  // If 'min_tablet_count' is not -1, will also wait for at least that many
+  // RUNNING tablets to appear before returning (potentially timing out if that
+  // number is never reached).
+  Status WaitForTabletsRunning(ExternalTabletServer* ts, int min_tablet_count,
+                               const MonoDelta& timeout);
+
+  // Create a client configured to talk to this cluster.
+  // Builder may contain override options for the client. The master address will
+  // be overridden to talk to the running master.
+  //
+  // REQUIRES: the cluster must have already been Start()ed.
+  Status CreateClient(client::KuduClientBuilder* builder,
+                      client::sp::shared_ptr<client::KuduClient>* client) const override;
+
+  // Sets the given flag on the given daemon, which must be running.
+  //
+  // This uses the 'force' flag on the RPC so that, even if the flag
+  // is considered unsafe to change at runtime, it is changed.
+  Status SetFlag(ExternalDaemon* daemon,
+                 const std::string& flag,
+                 const std::string& value) WARN_UNUSED_RESULT;
+
+  // Set the path where daemon binaries can be found.
+  // Overrides 'daemon_bin_path' set by ExternalMiniClusterOptions.
+  // The cluster must be shut down before calling this method.
+  void SetDaemonBinPath(std::string daemon_bin_path);
+
+  // Returns the path where 'binary' is expected to live, based on
+  // ExternalMiniClusterOptions.daemon_bin_path if it was provided, or on the
+  // path of the currently running executable otherwise.
+  std::string GetBinaryPath(const std::string& binary) const;
+
+  // Returns the path where 'daemon_id' is expected to store its data, based on
+  // ExternalMiniClusterOptions.data_root if it was provided, or on the
+  // standard Kudu test directory otherwise.
+  // 'dir_index' is an optional numeric suffix to be added to the default path.
+  // If it is not specified, the cluster must be configured to use a single data dir.
+  std::string GetDataPath(const std::string& daemon_id,
+                          boost::optional<uint32_t> dir_index = boost::none) const;
+
+  // Returns paths where 'daemon_id' is expected to store its data, each with a
+  // numeric suffix appropriate for 'opts_.num_data_dirs'
+  std::vector<std::string> GetDataPaths(const std::string& daemon_id) const;
+
+  // Returns the path where 'daemon_id' is expected to store its wal, or other
+  // files that reside in the wal dir.
+  std::string GetWalPath(const std::string& daemon_id) const;
+
+  // Returns the path where 'daemon_id' is expected to store its logs, or other
+  // files that reside in the log dir.
+  std::string GetLogPath(const std::string& daemon_id) const;
+
+ private:
+  FRIEND_TEST(MasterFailoverTest, TestKillAnyMaster);
+
+  Status StartSingleMaster();
+
+  Status StartDistributedMasters();
+
+  Status DeduceBinRoot(std::string* ret);
+  Status HandleOptions();
+
+  const ExternalMiniClusterOptions opts_;
+
+  // The root for binaries.
+  std::string daemon_bin_path_;
+
+  std::string data_root_;
+
+  std::vector<scoped_refptr<ExternalMaster> > masters_;
+  std::vector<scoped_refptr<ExternalTabletServer> > tablet_servers_;
+  std::unique_ptr<MiniKdc> kdc_;
+
+  std::shared_ptr<rpc::Messenger> messenger_;
+
+  DISALLOW_COPY_AND_ASSIGN(ExternalMiniCluster);
+};
+
+struct ExternalDaemonOptions {
+  explicit ExternalDaemonOptions(bool logtostderr)
+      : logtostderr(logtostderr) {
+  }
+
+  bool logtostderr;
+  std::shared_ptr<rpc::Messenger> messenger;
+  std::string exe;
+  HostPort rpc_bind_address;
+  std::string wal_dir;
+  std::vector<std::string> data_dirs;
+  std::string log_dir;
+  std::string perf_record_filename;
+  std::vector<std::string> extra_flags;
+  MonoDelta start_process_timeout;
+};
+
+class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
+ public:
+  explicit ExternalDaemon(ExternalDaemonOptions opts);
+
+  HostPort bound_rpc_hostport() const;
+  Sockaddr bound_rpc_addr() const;
+
+  // Return the host/port that this daemon is bound to for HTTP.
+  // May return an uninitialized HostPort if HTTP is disabled.
+  HostPort bound_http_hostport() const;
+
+  const NodeInstancePB& instance_id() const;
+  const std::string& uuid() const;
+
+  // Return the pid of the running process.
+  // Causes a CHECK failure if the process is not running.
+  pid_t pid() const;
+
+  // Return the pointer to the undelying Subprocess if it is set.
+  // Otherwise, returns nullptr.
+  Subprocess* process() const;
+
+  // Set the path of the executable to run as a daemon.
+  // Overrides the exe path specified in the constructor.
+  // The daemon must be shut down before calling this method.
+  void SetExePath(std::string exe);
+
+  // Enable Kerberos for this daemon. This creates a Kerberos principal
+  // and keytab, and sets the appropriate environment variables in the
+  // subprocess such that the server will use Kerberos authentication.
+  //
+  // 'bind_host' is the hostname that will be used to generate the Kerberos
+  // service principal.
+  //
+  // Must be called before 'StartProcess()'.
+  Status EnableKerberos(MiniKdc* kdc, const std::string& bind_host);
+
+  // Sends a SIGSTOP signal to the daemon.
+  Status Pause() WARN_UNUSED_RESULT;
+
+  // Sends a SIGCONT signal to the daemon.
+  Status Resume() WARN_UNUSED_RESULT;
+
+  // Return true if we have explicitly shut down the process.
+  bool IsShutdown() const;
+
+  // Return true if the process is still running.
+  // This may return false if the process crashed, even if we didn't
+  // explicitly call Shutdown().
+  bool IsProcessAlive() const;
+
+  // Wait for this process to crash due to a configured fault
+  // injection, or the given timeout to elapse. If the process
+  // crashes for some reason other than an injected fault, returns
+  // Status::Aborted.
+  //
+  // If the process is already crashed, returns immediately.
+  Status WaitForInjectedCrash(const MonoDelta& timeout) const;
+
+  // Same as the above, but expects the process to crash due to a
+  // LOG(FATAL) or CHECK failure. In other words, waits for it to
+  // crash from SIGABRT.
+  Status WaitForFatal(const MonoDelta& timeout) const;
+
+  virtual void Shutdown();
+
+  // Delete files specified by 'wal_dir_' and 'data_dirs_'.
+  Status DeleteFromDisk() const WARN_UNUSED_RESULT;
+
+  const std::string& wal_dir() const { return wal_dir_; }
+
+  const std::string& data_dir() const {
+    CHECK_EQ(1, data_dirs_.size());
+    return data_dirs_[0];
+  }
+
+  const std::vector<std::string>& data_dirs() const { return data_dirs_; }
+
+  // Returns the log dir of the external daemon.
+  const std::string& log_dir() const { return log_dir_; }
+
+  // Return a pointer to the flags used for this server on restart.
+  // Modifying these flags will only take effect on the next restart.
+  std::vector<std::string>* mutable_flags() { return &extra_flags_; }
+
+  // Retrieve the value of a given metric from this server. The metric must
+  // be of int64_t type.
+  //
+  // 'value_field' represents the particular field of the metric to be read.
+  // For example, for a counter or gauge, this should be 'value'. For a
+  // histogram, it might be 'total_count' or 'mean'.
+  //
+  // 'entity_id' may be NULL, in which case the first entity of the same type
+  // as 'entity_proto' will be matched.
+  Status GetInt64Metric(const MetricEntityPrototype* entity_proto,
+                        const char* entity_id,
+                        const MetricPrototype* metric_proto,
+                        const char* value_field,
+                        int64_t* value) const;
+
+ protected:
+  friend class RefCountedThreadSafe<ExternalDaemon>;
+  virtual ~ExternalDaemon();
+
+  // Starts a process with the given flags.
+  Status StartProcess(const std::vector<std::string>& user_flags);
+
+  // Wait for the process to exit, and then call 'wait_status_predicate'
+  // on the resulting exit status. NOTE: this is not the return code, but
+  // rather the value provided by waitpid(2): use WEXITSTATUS, etc.
+  //
+  // If the predicate matches, returns OK. Otherwise, returns an error.
+  // 'crash_type_str' should be a descriptive name for the type of crash,
+  // used in formatting the error message.
+  Status WaitForCrash(const MonoDelta& timeout,
+                      const std::function<bool(int)>& wait_status_predicate,
+                      const char* crash_type_str) const;
+
+  // In a code-coverage build, try to flush the coverage data to disk.
+  // In a non-coverage build, this does nothing.
+  void FlushCoverage();
+
+  // In an LSAN build, ask the daemon to check for leaked memory, and
+  // LOG(FATAL) if there are any leaks.
+  void CheckForLeaks();
+
+  // Get RPC bind address for daemon.
+  const HostPort& rpc_bind_address() const {
+    return rpc_bind_address_;
+  }
+
+  const std::shared_ptr<rpc::Messenger> messenger_;
+  const std::string wal_dir_;
+  std::vector<std::string> data_dirs_;
+  const std::string log_dir_;
+  const std::string perf_record_filename_;
+  const MonoDelta start_process_timeout_;
+  const bool logtostderr_;
+  const HostPort rpc_bind_address_;
+  std::string exe_;
+  std::vector<std::string> extra_flags_;
+  std::map<std::string, std::string> extra_env_;
+
+  std::unique_ptr<Subprocess> process_;
+  bool paused_ = false;
+
+  std::unique_ptr<Subprocess> perf_record_process_;
+
+  std::unique_ptr<server::ServerStatusPB> status_;
+
+  // These capture the daemons parameters and running ports and
+  // are used to Restart() the daemon with the same parameters.
+  HostPort bound_rpc_;
+  HostPort bound_http_;
+
+  DISALLOW_COPY_AND_ASSIGN(ExternalDaemon);
+};
+
+// Resumes a daemon that was stopped with ExternalDaemon::Pause() upon
+// exiting a scope.
+class ScopedResumeExternalDaemon {
+ public:
+  // 'daemon' must remain valid for the lifetime of a
+  // ScopedResumeExternalDaemon object.
+  explicit ScopedResumeExternalDaemon(ExternalDaemon* daemon);
+
+  // Resume 'daemon_'.
+  ~ScopedResumeExternalDaemon();
+
+ private:
+  ExternalDaemon* daemon_;
+
+  DISALLOW_COPY_AND_ASSIGN(ScopedResumeExternalDaemon);
+};
+
+class ExternalMaster : public ExternalDaemon {
+ public:
+  explicit ExternalMaster(ExternalDaemonOptions opts);
+
+  Status Start();
+
+  // Restarts the daemon.
+  // Requires that it has previously been shutdown.
+  Status Restart() WARN_UNUSED_RESULT;
+
+  // Blocks until the master's catalog manager is initialized and responding to
+  // RPCs.
+  Status WaitForCatalogManager() WARN_UNUSED_RESULT;
+
+ private:
+  std::vector<std::string> GetCommonFlags() const;
+
+  friend class RefCountedThreadSafe<ExternalMaster>;
+  virtual ~ExternalMaster();
+};
+
+class ExternalTabletServer : public ExternalDaemon {
+ public:
+  ExternalTabletServer(ExternalDaemonOptions opts,
+                       std::vector<HostPort> master_addrs);
+
+  Status Start();
+
+  // Restarts the daemon.
+  // Requires that it has previously been shutdown.
+  Status Restart() WARN_UNUSED_RESULT;
+
+ private:
+  const std::vector<HostPort> master_addrs_;
+
+  friend class RefCountedThreadSafe<ExternalTabletServer>;
+  virtual ~ExternalTabletServer();
+};
+
+} // namespace cluster
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/mini-cluster/internal_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.cc b/src/kudu/mini-cluster/internal_mini_cluster.cc
new file mode 100644
index 0000000..baa9330
--- /dev/null
+++ b/src/kudu/mini-cluster/internal_mini_cluster.cc
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+
+#include <cstdint>
+#include <ostream>
+#include <unordered_set>
+#include <utility>
+
+#include "kudu/client/client.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/master/ts_descriptor.h"
+#include "kudu/master/ts_manager.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/tablet_server_options.h"
+#include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace cluster {
+
+using client::KuduClient;
+using client::KuduClientBuilder;
+using master::CatalogManager;
+using master::MasterServiceProxy;
+using master::MiniMaster;
+using master::TSDescriptor;
+using std::shared_ptr;
+using tserver::MiniTabletServer;
+using tserver::TabletServer;
+
+InternalMiniClusterOptions::InternalMiniClusterOptions()
+  : num_masters(1),
+    num_tablet_servers(1),
+    num_data_dirs(1),
+    bind_mode(MiniCluster::kDefaultBindMode) {
+}
+
+InternalMiniCluster::InternalMiniCluster(Env* env, InternalMiniClusterOptions options)
+  : env_(env),
+    opts_(std::move(options)),
+    running_(false) {
+  if (opts_.data_root.empty()) {
+    opts_.data_root = JoinPathSegments(GetTestDataDirectory(), "minicluster-data");
+  }
+}
+
+InternalMiniCluster::~InternalMiniCluster() {
+  Shutdown();
+}
+
+Status InternalMiniCluster::Start() {
+  CHECK(!opts_.data_root.empty()) << "No Fs root was provided";
+  CHECK(!running_);
+
+  if (opts_.num_masters > 1) {
+    CHECK_GE(opts_.master_rpc_ports.size(), opts_.num_masters);
+  }
+
+  if (!env_->FileExists(opts_.data_root)) {
+    RETURN_NOT_OK(env_->CreateDir(opts_.data_root));
+  }
+
+  // start the masters
+  if (opts_.num_masters > 1) {
+    RETURN_NOT_OK_PREPEND(StartDistributedMasters(),
+                          "Couldn't start distributed masters");
+  } else {
+    RETURN_NOT_OK_PREPEND(StartSingleMaster(), "Couldn't start the single master");
+  }
+
+  for (int i = 0; i < opts_.num_tablet_servers; i++) {
+    RETURN_NOT_OK_PREPEND(AddTabletServer(),
+                          Substitute("Error adding TS $0", i));
+  }
+
+  RETURN_NOT_OK_PREPEND(WaitForTabletServerCount(opts_.num_tablet_servers),
+                        "Waiting for tablet servers to start");
+
+  RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
+                        .set_num_reactors(1)
+                        .set_max_negotiation_threads(1)
+                        .Build(&messenger_),
+                        "Failed to start Messenger for minicluster");
+
+  running_ = true;
+  return Status::OK();
+}
+
+Status InternalMiniCluster::StartDistributedMasters() {
+  CHECK_GT(opts_.num_data_dirs, 0);
+  CHECK_GE(opts_.master_rpc_ports.size(), opts_.num_masters);
+  CHECK_GT(opts_.master_rpc_ports.size(), 1);
+
+  vector<HostPort> master_rpc_addrs = this->master_rpc_addrs();
+  LOG(INFO) << "Creating distributed mini masters. Addrs: "
+            << HostPort::ToCommaSeparatedString(master_rpc_addrs);
+
+  for (int i = 0; i < opts_.num_masters; i++) {
+    shared_ptr<MiniMaster> mini_master(new MiniMaster(GetMasterFsRoot(i), master_rpc_addrs[i]));
+    mini_master->SetMasterAddresses(master_rpc_addrs);
+    RETURN_NOT_OK_PREPEND(mini_master->Start(), Substitute("Couldn't start follower $0", i));
+    VLOG(1) << "Started MiniMaster with UUID " << mini_master->permanent_uuid()
+            << " at index " << i;
+    mini_masters_.push_back(std::move(mini_master));
+  }
+  int i = 0;
+  for (const shared_ptr<MiniMaster>& master : mini_masters_) {
+    LOG(INFO) << "Waiting to initialize catalog manager on master " << i++;
+    RETURN_NOT_OK_PREPEND(master->WaitForCatalogManagerInit(),
+                          Substitute("Could not initialize catalog manager on master $0", i));
+  }
+  return Status::OK();
+}
+
+Status InternalMiniCluster::StartSync() {
+  RETURN_NOT_OK(Start());
+  int count = 0;
+  for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
+    RETURN_NOT_OK_PREPEND(tablet_server->WaitStarted(),
+                          Substitute("TabletServer $0 failed to start.", count));
+    count++;
+  }
+  return Status::OK();
+}
+
+Status InternalMiniCluster::StartSingleMaster() {
+  CHECK_GT(opts_.num_data_dirs, 0);
+  CHECK_EQ(1, opts_.num_masters);
+  CHECK_LE(opts_.master_rpc_ports.size(), 1);
+  uint16_t master_rpc_port = 0;
+  if (opts_.master_rpc_ports.size() == 1) {
+    master_rpc_port = opts_.master_rpc_ports[0];
+  }
+
+  // start the master (we need the port to set on the servers).
+  string bind_ip = GetBindIpForDaemon(MiniCluster::MASTER, /*index=*/ 0, opts_.bind_mode);
+  shared_ptr<MiniMaster> mini_master(new MiniMaster(GetMasterFsRoot(0),
+      HostPort(std::move(bind_ip), master_rpc_port), opts_.num_data_dirs));
+  RETURN_NOT_OK_PREPEND(mini_master->Start(), "Couldn't start master");
+  RETURN_NOT_OK(mini_master->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests(
+      MonoDelta::FromSeconds(kMasterStartupWaitTimeSeconds)));
+  mini_masters_.push_back(std::move(mini_master));
+  return Status::OK();
+}
+
+Status InternalMiniCluster::AddTabletServer() {
+  if (mini_masters_.empty()) {
+    return Status::IllegalState("Master not yet initialized");
+  }
+  int new_idx = mini_tablet_servers_.size();
+
+  uint16_t ts_rpc_port = 0;
+  if (opts_.tserver_rpc_ports.size() > new_idx) {
+    ts_rpc_port = opts_.tserver_rpc_ports[new_idx];
+  }
+
+  string bind_ip = GetBindIpForDaemon(MiniCluster::TSERVER, new_idx, opts_.bind_mode);
+  gscoped_ptr<MiniTabletServer> tablet_server(new MiniTabletServer(GetTabletServerFsRoot(new_idx),
+      HostPort(bind_ip, ts_rpc_port), opts_.num_data_dirs));
+
+  // set the master addresses
+  tablet_server->options()->master_addresses.clear();
+  for (const shared_ptr<MiniMaster>& master : mini_masters_) {
+    tablet_server->options()->master_addresses.emplace_back(master->bound_rpc_addr());
+  }
+  RETURN_NOT_OK(tablet_server->Start())
+  mini_tablet_servers_.push_back(shared_ptr<MiniTabletServer>(tablet_server.release()));
+  return Status::OK();
+}
+
+void InternalMiniCluster::ShutdownNodes(ClusterNodes nodes) {
+  if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::TS_ONLY) {
+    for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
+      tablet_server->Shutdown();
+    }
+    mini_tablet_servers_.clear();
+  }
+  if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::MASTERS_ONLY) {
+    for (const shared_ptr<MiniMaster>& master_server : mini_masters_) {
+      master_server->Shutdown();
+    }
+    mini_masters_.clear();
+  }
+  running_ = false;
+}
+
+MiniMaster* InternalMiniCluster::mini_master(int idx) const {
+  CHECK_GE(idx, 0) << "Master idx must be >= 0";
+  CHECK_LT(idx, mini_masters_.size()) << "Master idx must be < num masters started";
+  return mini_masters_[idx].get();
+}
+
+MiniTabletServer* InternalMiniCluster::mini_tablet_server(int idx) const {
+  CHECK_GE(idx, 0) << "TabletServer idx must be >= 0";
+  CHECK_LT(idx, mini_tablet_servers_.size()) << "TabletServer idx must be < 'num_ts_started_'";
+  return mini_tablet_servers_[idx].get();
+}
+
+vector<HostPort> InternalMiniCluster::master_rpc_addrs() const {
+  vector<HostPort> master_rpc_addrs;
+  for (int i = 0; i < opts_.master_rpc_ports.size(); i++) {
+    master_rpc_addrs.emplace_back(
+        GetBindIpForDaemon(MiniCluster::MASTER, i, opts_.bind_mode),
+        opts_.master_rpc_ports[i]);
+  }
+  return master_rpc_addrs;
+}
+
+string InternalMiniCluster::GetMasterFsRoot(int idx) const {
+  return JoinPathSegments(opts_.data_root, Substitute("master-$0-root", idx));
+}
+
+string InternalMiniCluster::GetTabletServerFsRoot(int idx) const {
+  return JoinPathSegments(opts_.data_root, Substitute("ts-$0-root", idx));
+}
+
+Status InternalMiniCluster::WaitForTabletServerCount(int count) const {
+  vector<shared_ptr<master::TSDescriptor>> descs;
+  return WaitForTabletServerCount(count, MatchMode::MATCH_TSERVERS, &descs);
+}
+
+Status InternalMiniCluster::WaitForTabletServerCount(int count,
+                                             MatchMode mode,
+                                             vector<shared_ptr<TSDescriptor>>* descs) const {
+  std::unordered_set<int> masters_to_search;
+  for (int i = 0; i < num_masters(); i++) {
+    if (!mini_master(i)->master()->IsShutdown()) {
+      masters_to_search.insert(i);
+    }
+  }
+
+  Stopwatch sw;
+  sw.start();
+  while (sw.elapsed().wall_seconds() < kRegistrationWaitTimeSeconds) {
+    for (auto iter = masters_to_search.begin(); iter != masters_to_search.end();) {
+      mini_master(*iter)->master()->ts_manager()->GetAllDescriptors(descs);
+      int match_count = 0;
+      switch (mode) {
+        case MatchMode::MATCH_TSERVERS:
+          // GetAllDescriptors() may return servers that are no longer online.
+          // Do a second step of verification to verify that the descs that we got
+          // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
+          for (const shared_ptr<TSDescriptor>& desc : *descs) {
+            for (const auto& mini_tablet_server : mini_tablet_servers_) {
+              const TabletServer* ts = mini_tablet_server->server();
+              if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
+                  ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
+                match_count++;
+                break;
+              }
+            }
+          }
+          break;
+        case MatchMode::DO_NOT_MATCH_TSERVERS:
+          match_count = descs->size();
+          break;
+        default:
+          LOG(FATAL) << "Invalid match mode";
+      }
+
+      if (match_count == count) {
+        // This master has returned the correct set of tservers.
+        iter = masters_to_search.erase(iter);
+      } else {
+        iter++;
+      }
+    }
+    if (masters_to_search.empty()) {
+      // All masters have returned the correct set of tservers.
+      LOG(INFO) << Substitute("$0 TS(s) registered with all masters after $1s",
+                              count, sw.elapsed().wall_seconds());
+      return Status::OK();
+    }
+    SleepFor(MonoDelta::FromMilliseconds(1));
+  }
+  return Status::TimedOut(Substitute(
+      "Timed out waiting for $0 TS(s) to register with all masters", count));
+}
+
+Status InternalMiniCluster::CreateClient(KuduClientBuilder* builder,
+                                 client::sp::shared_ptr<KuduClient>* client) const {
+  client::KuduClientBuilder defaults;
+  if (builder == nullptr) {
+    builder = &defaults;
+  }
+
+  builder->clear_master_server_addrs();
+  for (const shared_ptr<MiniMaster>& master : mini_masters_) {
+    CHECK(master);
+    builder->add_master_server_addr(master->bound_rpc_addr_str());
+  }
+  return builder->Build(client);
+}
+
+Status InternalMiniCluster::GetLeaderMasterIndex(int* idx) const {
+  const MonoTime deadline = MonoTime::Now() +
+      MonoDelta::FromSeconds(kMasterStartupWaitTimeSeconds);
+
+  int leader_idx = -1;
+  while (MonoTime::Now() < deadline) {
+    for (int i = 0; i < num_masters(); i++) {
+      master::MiniMaster* mm = mini_master(i);
+      if (!mm->is_started() || mm->master()->IsShutdown()) {
+        continue;
+      }
+      master::CatalogManager* catalog = mm->master()->catalog_manager();
+      master::CatalogManager::ScopedLeaderSharedLock l(catalog);
+      if (l.first_failed_status().ok()) {
+        leader_idx = i;
+        break;
+      }
+    }
+    if (leader_idx != -1) {
+      break;
+    }
+    SleepFor(MonoDelta::FromMilliseconds(100));
+  }
+  if (leader_idx == -1) {
+    return Status::NotFound("Leader master was not found within deadline");
+  }
+
+  if (idx) {
+    *idx = leader_idx;
+  }
+  return Status::OK();
+}
+
+std::shared_ptr<rpc::Messenger> InternalMiniCluster::messenger() const {
+  return messenger_;
+}
+
+std::shared_ptr<MasterServiceProxy> InternalMiniCluster::master_proxy() const {
+  CHECK_EQ(1, mini_masters_.size());
+  return master_proxy(0);
+}
+
+std::shared_ptr<MasterServiceProxy> InternalMiniCluster::master_proxy(int idx) const {
+  const auto& addr = CHECK_NOTNULL(mini_master(idx))->bound_rpc_addr();
+  return std::make_shared<MasterServiceProxy>(messenger_, addr, addr.host());
+}
+
+} // namespace cluster
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/mini-cluster/internal_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.h b/src/kudu/mini-cluster/internal_mini_cluster.h
new file mode 100644
index 0000000..b44717c
--- /dev/null
+++ b/src/kudu/mini-cluster/internal_mini_cluster.h
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/mini-cluster/mini_cluster.h"
+
+namespace kudu {
+
+class Env;
+class HostPort;
+class Status;
+
+namespace client {
+class KuduClient;
+class KuduClientBuilder;
+}
+
+namespace master {
+class MasterServiceProxy;
+class MiniMaster;
+class TSDescriptor;
+}
+
+namespace rpc {
+class Messenger;
+}
+
+namespace tserver {
+class MiniTabletServer;
+}
+
+namespace cluster {
+
+struct InternalMiniClusterOptions {
+  InternalMiniClusterOptions();
+
+  // Number of master servers.
+  // Default: 1
+  int num_masters;
+
+  // Number of TS to start.
+  // Default: 1
+  int num_tablet_servers;
+
+  // Number of data dirs for each daemon.
+  // Default: 1 (this will place the wals in the same dir)
+  int num_data_dirs;
+
+  // Directory in which to store data.
+  // Default: "", which auto-generates a unique path for this cluster.
+  // The default may only be used from a gtest unit test.
+  std::string data_root;
+
+  MiniCluster::BindMode bind_mode;
+
+  // List of RPC ports for the master to run on.
+  // Defaults to an empty list.
+  // In single-master mode, an empty list implies port 0 (transient port).
+  // In multi-master mode, an empty list is illegal and will result in a CHECK failure.
+  std::vector<uint16_t> master_rpc_ports;
+
+  // List of RPC ports for the tservers to run on.
+  // Defaults to an empty list.
+  // When adding a tablet server to the cluster via AddTabletServer(), if the
+  // index of that tablet server in the cluster is greater than the number of
+  // elements in this list, a transient port (port 0) will be used.
+  std::vector<uint16_t> tserver_rpc_ports;
+};
+
+// An in-process cluster with a MiniMaster and a configurable
+// number of MiniTabletServers for use in tests.
+class InternalMiniCluster : public MiniCluster {
+ public:
+  InternalMiniCluster(Env* env, InternalMiniClusterOptions options);
+  virtual ~InternalMiniCluster();
+
+  // Start a cluster with a Master and 'num_tablet_servers' TabletServers.
+  // All servers run on the loopback interface with ephemeral ports.
+  Status Start() override;
+
+  // Like the previous method but performs initialization synchronously, i.e.
+  // this will wait for all TS's to be started and initialized. Tests should
+  // use this if they interact with tablets immediately after Start();
+  Status StartSync();
+
+  void ShutdownNodes(ClusterNodes nodes) override;
+
+  // Setup a consensus configuration of distributed masters, with count specified in
+  // 'options'. Requires that a reserve RPC port is specified in
+  // 'options' for each master.
+  Status StartDistributedMasters();
+
+  // Add a new standalone master to the cluster. The new master is started.
+  Status StartSingleMaster();
+
+  // Add a new TS to the cluster. The new TS is started.
+  // Requires that the master is already running.
+  Status AddTabletServer();
+
+  // If this cluster is configured for a single non-distributed
+  // master, return the single master. Exits with a CHECK failure if
+  // there are multiple masters.
+  master::MiniMaster* mini_master() const {
+    CHECK_EQ(mini_masters_.size(), 1);
+    return mini_master(0);
+  }
+
+  // Returns the Master at index 'idx' for this InternalMiniCluster.
+  master::MiniMaster* mini_master(int idx) const;
+
+  // Return number of mini masters.
+  int num_masters() const override {
+    return mini_masters_.size();
+  }
+
+  // Returns the TabletServer at index 'idx' of this InternalMiniCluster.
+  // 'idx' must be between 0 and 'num_tablet_servers' -1.
+  tserver::MiniTabletServer* mini_tablet_server(int idx) const;
+
+  int num_tablet_servers() const override {
+    return mini_tablet_servers_.size();
+  }
+
+  BindMode bind_mode() const override {
+    return opts_.bind_mode;
+  }
+
+  std::vector<uint16_t> master_rpc_ports() const override {
+    return opts_.master_rpc_ports;
+  }
+
+  std::vector<HostPort> master_rpc_addrs() const override;
+
+  std::string GetMasterFsRoot(int idx) const;
+
+  std::string GetTabletServerFsRoot(int idx) const;
+
+  // Wait until the number of registered tablet servers reaches the given
+  // count on all masters. Returns Status::TimedOut if the desired count is not
+  // achieved within kRegistrationWaitTimeSeconds.
+  enum class MatchMode {
+    // Ensure that the tservers retrieved from each master match up against the
+    // tservers defined in this cluster. The matching is done via
+    // NodeInstancePBs comparisons. If even one match fails, the retrieved
+    // response is considered to be malformed and is retried.
+    //
+    // Note: tservers participate in matching even if they are shut down.
+    MATCH_TSERVERS,
+
+    // Do not perform any matching on the retrieved tservers.
+    DO_NOT_MATCH_TSERVERS,
+  };
+  Status WaitForTabletServerCount(int count) const;
+  Status WaitForTabletServerCount(int count, MatchMode mode,
+                                  std::vector<std::shared_ptr<master::TSDescriptor>>* descs) const;
+
+  Status CreateClient(client::KuduClientBuilder* builder,
+                      client::sp::shared_ptr<client::KuduClient>* client) const override;
+
+  // Determine the leader master of the cluster. Upon successful completion,
+  // sets 'idx' to the leader master's index. The result index index can be used
+  // as an argument for calls to mini_master().
+  //
+  // It's possible to use 'nullptr' instead of providing a valid placeholder
+  // for the result master index. That's for use cases when it's enough
+  // to determine if the cluster has established leader master
+  // without intent to get the actual index.
+  //
+  // Note: if a leader election occurs after this method is executed, the
+  // last result may not be valid.
+  Status GetLeaderMasterIndex(int* idx) const;
+
+  std::shared_ptr<rpc::Messenger> messenger() const override;
+  std::shared_ptr<master::MasterServiceProxy> master_proxy() const override;
+  std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const override;
+
+ private:
+  enum {
+    kRegistrationWaitTimeSeconds = 15,
+    kMasterStartupWaitTimeSeconds = 30,
+  };
+
+  Env* const env_;
+
+  InternalMiniClusterOptions opts_;
+
+  bool running_;
+
+  std::vector<std::shared_ptr<master::MiniMaster> > mini_masters_;
+  std::vector<std::shared_ptr<tserver::MiniTabletServer> > mini_tablet_servers_;
+
+  std::shared_ptr<rpc::Messenger> messenger_;
+
+  DISALLOW_COPY_AND_ASSIGN(InternalMiniCluster);
+};
+
+} // namespace cluster
+} // namespace kudu