You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/10/02 19:25:20 UTC
[3/6] kudu git commit: mini-cluster: new module for the mini cluster
implementations
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/mini-cluster/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.cc b/src/kudu/mini-cluster/external_mini_cluster.cc
new file mode 100644
index 0000000..896e063
--- /dev/null
+++ b/src/kudu/mini-cluster/external_mini_cluster.cc
@@ -0,0 +1,1281 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/mini-cluster/external_mini_cluster.h"
+
+#include <algorithm>
+#include <csignal>
+#include <cstdint>
+#include <cstdlib>
+#include <functional>
+#include <memory>
+#include <string>
+#include <unordered_set>
+#include <utility>
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <rapidjson/document.h>
+
+#include "kudu/client/client.h"
+#include "kudu/client/master_rpc.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/master/master.pb.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/security/test/mini_kdc.h"
+#include "kudu/server/server_base.pb.h"
+#include "kudu/server/server_base.proxy.h"
+#include "kudu/tablet/metadata.pb.h"
+#include "kudu/tablet/tablet.pb.h"
+#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/async_util.h"
+#include "kudu/util/curl_util.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/jsonreader.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/subprocess.h"
+#include "kudu/util/test_util.h"
+
+using kudu::client::internal::ConnectToClusterRpc;
+using kudu::master::ListTablesRequestPB;
+using kudu::master::ListTablesResponsePB;
+using kudu::master::MasterServiceProxy;
+using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
+using kudu::rpc::RpcController;
+using kudu::server::ServerStatusPB;
+using kudu::tserver::ListTabletsRequestPB;
+using kudu::tserver::ListTabletsResponsePB;
+using kudu::tserver::TabletServerServiceProxy;
+using rapidjson::Value;
+using std::pair;
+using std::string;
+using std::unique_ptr;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
+
+DEFINE_bool(perf_record, false,
+ "Whether to run \"perf record --call-graph fp\" on each daemon in the cluster");
+
+namespace kudu {
+namespace cluster {
+
+static const char* const kMasterBinaryName = "kudu-master";
+static const char* const kTabletServerBinaryName = "kudu-tserver";
+static double kTabletServerRegistrationTimeoutSeconds = 15.0;
+static double kMasterCatalogManagerTimeoutSeconds = 60.0;
+
+ExternalMiniClusterOptions::ExternalMiniClusterOptions()
+ : num_masters(1),
+ num_tablet_servers(1),
+ bind_mode(MiniCluster::kDefaultBindMode),
+ num_data_dirs(1),
+ enable_kerberos(false),
+ logtostderr(true),
+ start_process_timeout(MonoDelta::FromSeconds(30)) {
+}
+
+ExternalMiniCluster::ExternalMiniCluster()
+ : opts_(ExternalMiniClusterOptions()) {
+}
+
+ExternalMiniCluster::ExternalMiniCluster(ExternalMiniClusterOptions opts)
+ : opts_(std::move(opts)) {
+}
+
+ExternalMiniCluster::~ExternalMiniCluster() {
+ Shutdown();
+}
+
+Status ExternalMiniCluster::DeduceBinRoot(std::string* ret) {
+ string exe;
+ RETURN_NOT_OK(Env::Default()->GetExecutablePath(&exe));
+ *ret = DirName(exe);
+ return Status::OK();
+}
+
+Status ExternalMiniCluster::HandleOptions() {
+ daemon_bin_path_ = opts_.daemon_bin_path;
+ if (daemon_bin_path_.empty()) {
+ RETURN_NOT_OK(DeduceBinRoot(&daemon_bin_path_));
+ }
+
+ data_root_ = opts_.data_root;
+ if (data_root_.empty()) {
+ // If they don't specify a data root, use the current gtest directory.
+ data_root_ = JoinPathSegments(GetTestDataDirectory(), "minicluster-data");
+ }
+
+ return Status::OK();
+}
+
+Status ExternalMiniCluster::Start() {
+ CHECK(masters_.empty()) << "Masters are not empty (size: " << masters_.size()
+ << "). Maybe you meant Restart()?";
+ CHECK(tablet_servers_.empty()) << "Tablet servers are not empty (size: "
+ << tablet_servers_.size() << "). Maybe you meant Restart()?";
+ RETURN_NOT_OK(HandleOptions());
+
+ RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
+ .set_num_reactors(1)
+ .set_max_negotiation_threads(1)
+ .Build(&messenger_),
+ "Failed to start Messenger for minicluster");
+
+ Status s = Env::Default()->CreateDir(data_root_);
+ if (!s.ok() && !s.IsAlreadyPresent()) {
+ RETURN_NOT_OK_PREPEND(s, "Could not create root dir " + data_root_);
+ }
+
+ if (opts_.enable_kerberos) {
+ kdc_.reset(new MiniKdc(opts_.mini_kdc_options));
+ RETURN_NOT_OK(kdc_->Start());
+ RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-admin"),
+ "could not create admin principal");
+ RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-user"),
+ "could not create user principal");
+ RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("joe-interloper"),
+ "could not create unauthorized principal");
+
+ RETURN_NOT_OK_PREPEND(kdc_->Kinit("test-admin"),
+ "could not kinit as admin");
+ RETURN_NOT_OK_PREPEND(kdc_->SetKrb5Environment(),
+ "could not set krb5 client env");
+ }
+
+ if (opts_.num_masters != 1) {
+ RETURN_NOT_OK_PREPEND(StartDistributedMasters(),
+ "Failed to add distributed masters");
+ } else {
+ RETURN_NOT_OK_PREPEND(StartSingleMaster(),
+ Substitute("Failed to start a single Master"));
+ }
+
+ for (int i = 1; i <= opts_.num_tablet_servers; i++) {
+ RETURN_NOT_OK_PREPEND(AddTabletServer(),
+ Substitute("Failed starting tablet server $0", i));
+ }
+ RETURN_NOT_OK(WaitForTabletServerCount(
+ opts_.num_tablet_servers,
+ MonoDelta::FromSeconds(kTabletServerRegistrationTimeoutSeconds)));
+
+ return Status::OK();
+}
+
+
+void ExternalMiniCluster::ShutdownNodes(ClusterNodes nodes) {
+ if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::TS_ONLY) {
+ for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
+ ts->Shutdown();
+ }
+ }
+ if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::MASTERS_ONLY) {
+ for (const scoped_refptr<ExternalMaster>& master : masters_) {
+ if (master) {
+ master->Shutdown();
+ }
+ }
+ }
+}
+
+Status ExternalMiniCluster::Restart() {
+ for (const scoped_refptr<ExternalMaster>& master : masters_) {
+ if (master && master->IsShutdown()) {
+ RETURN_NOT_OK_PREPEND(master->Restart(), "Cannot restart master bound at: " +
+ master->bound_rpc_hostport().ToString());
+ }
+ }
+
+ for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
+ if (ts->IsShutdown()) {
+ RETURN_NOT_OK_PREPEND(ts->Restart(), "Cannot restart tablet server bound at: " +
+ ts->bound_rpc_hostport().ToString());
+ }
+ }
+
+ RETURN_NOT_OK(WaitForTabletServerCount(
+ tablet_servers_.size(),
+ MonoDelta::FromSeconds(kTabletServerRegistrationTimeoutSeconds)));
+
+ return Status::OK();
+}
+
+void ExternalMiniCluster::SetDaemonBinPath(string daemon_bin_path) {
+ daemon_bin_path_ = std::move(daemon_bin_path);
+ for (auto& master : masters_) {
+ master->SetExePath(GetBinaryPath(kMasterBinaryName));
+ }
+ for (auto& ts : tablet_servers_) {
+ ts->SetExePath(GetBinaryPath(kTabletServerBinaryName));
+ }
+}
+
+string ExternalMiniCluster::GetBinaryPath(const string& binary) const {
+ CHECK(!daemon_bin_path_.empty());
+ return JoinPathSegments(daemon_bin_path_, binary);
+}
+
+string ExternalMiniCluster::GetLogPath(const string& daemon_id) const {
+ CHECK(!data_root_.empty());
+ return JoinPathSegments(JoinPathSegments(data_root_, daemon_id), "logs");
+}
+
+string ExternalMiniCluster::GetDataPath(const string& daemon_id,
+ boost::optional<uint32_t> dir_index) const {
+ CHECK(!data_root_.empty());
+ string data_path = "data";
+ if (dir_index) {
+ CHECK_LT(*dir_index, opts_.num_data_dirs);
+ data_path = Substitute("$0-$1", data_path, dir_index.get());
+ } else {
+ CHECK_EQ(1, opts_.num_data_dirs);
+ }
+ return JoinPathSegments(JoinPathSegments(data_root_, daemon_id), data_path);
+}
+
+vector<string> ExternalMiniCluster::GetDataPaths(const string& daemon_id) const {
+ if (opts_.num_data_dirs == 1) {
+ return { GetDataPath(daemon_id) };
+ }
+ vector<string> paths;
+ for (uint32_t dir_index = 0; dir_index < opts_.num_data_dirs; dir_index++) {
+ paths.emplace_back(GetDataPath(daemon_id, dir_index));
+ }
+ return paths;
+}
+
+string ExternalMiniCluster::GetWalPath(const string& daemon_id) const {
+ CHECK(!data_root_.empty());
+ return JoinPathSegments(JoinPathSegments(data_root_, daemon_id), "wal");
+}
+
+namespace {
+vector<string> SubstituteInFlags(const vector<string>& orig_flags,
+ int index) {
+ string str_index = strings::Substitute("$0", index);
+ vector<string> ret;
+ for (const string& orig : orig_flags) {
+ ret.push_back(StringReplace(orig, "${index}", str_index, true));
+ }
+ return ret;
+}
+
+} // anonymous namespace
+
+Status ExternalMiniCluster::StartSingleMaster() {
+ string daemon_id = "master-0";
+
+ ExternalDaemonOptions opts(opts_.logtostderr);
+ opts.messenger = messenger_;
+ opts.exe = GetBinaryPath(kMasterBinaryName);
+ opts.wal_dir = GetWalPath(daemon_id);
+ opts.data_dirs = GetDataPaths(daemon_id);
+ opts.log_dir = GetLogPath(daemon_id);
+ if (FLAGS_perf_record) {
+ opts.perf_record_filename =
+ Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
+ }
+ opts.extra_flags = SubstituteInFlags(opts_.extra_master_flags, 0);
+ opts.start_process_timeout = opts_.start_process_timeout;
+
+ opts.rpc_bind_address = HostPort(GetBindIpForMaster(0), 0);
+ scoped_refptr<ExternalMaster> master = new ExternalMaster(opts);
+ if (opts_.enable_kerberos) {
+ // The bind host here is the hostname that will be used to generate the
+ // Kerberos principal, so it has to match the bind address for the master
+ // rpc endpoint.
+ RETURN_NOT_OK_PREPEND(master->EnableKerberos(kdc_.get(), opts.rpc_bind_address.host()),
+ "could not enable Kerberos");
+ }
+
+ RETURN_NOT_OK(master->Start());
+ masters_.push_back(master);
+ return Status::OK();
+}
+
+Status ExternalMiniCluster::StartDistributedMasters() {
+ int num_masters = opts_.num_masters;
+
+ if (opts_.master_rpc_ports.size() != num_masters) {
+ LOG(FATAL) << num_masters << " masters requested, but only " <<
+ opts_.master_rpc_ports.size() << " ports specified in 'master_rpc_ports'";
+ }
+
+ vector<HostPort> peer_hostports = master_rpc_addrs();
+ vector<string> flags = opts_.extra_master_flags;
+ flags.push_back(Substitute("--master_addresses=$0",
+ HostPort::ToCommaSeparatedString(peer_hostports)));
+ string exe = GetBinaryPath(kMasterBinaryName);
+
+ // Start the masters.
+ for (int i = 0; i < num_masters; i++) {
+ string daemon_id = Substitute("master-$0", i);
+
+ ExternalDaemonOptions opts(opts_.logtostderr);
+ opts.messenger = messenger_;
+ opts.exe = exe;
+ opts.wal_dir = GetWalPath(daemon_id);
+ opts.data_dirs = GetDataPaths(daemon_id);
+ opts.log_dir = GetLogPath(daemon_id);
+ if (FLAGS_perf_record) {
+ opts.perf_record_filename =
+ Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
+ }
+ opts.extra_flags = SubstituteInFlags(flags, i);
+ opts.start_process_timeout = opts_.start_process_timeout;
+ opts.rpc_bind_address = peer_hostports[i];
+
+ scoped_refptr<ExternalMaster> peer = new ExternalMaster(opts);
+ if (opts_.enable_kerberos) {
+ RETURN_NOT_OK_PREPEND(peer->EnableKerberos(kdc_.get(), peer_hostports[i].host()),
+ "could not enable Kerberos");
+ }
+ RETURN_NOT_OK_PREPEND(peer->Start(),
+ Substitute("Unable to start Master at index $0", i));
+ masters_.push_back(peer);
+ }
+
+ return Status::OK();
+}
+
+string ExternalMiniCluster::GetBindIpForTabletServer(int index) const {
+ return MiniCluster::GetBindIpForDaemon(MiniCluster::TSERVER, index, opts_.bind_mode);
+}
+
+string ExternalMiniCluster::GetBindIpForMaster(int index) const {
+ return MiniCluster::GetBindIpForDaemon(MiniCluster::MASTER, index, opts_.bind_mode);
+}
+
+Status ExternalMiniCluster::AddTabletServer() {
+ CHECK(leader_master() != nullptr)
+ << "Must have started at least 1 master before adding tablet servers";
+
+ int idx = tablet_servers_.size();
+ string daemon_id = Substitute("ts-$0", idx);
+
+ vector<HostPort> master_hostports;
+ for (int i = 0; i < num_masters(); i++) {
+ master_hostports.push_back(DCHECK_NOTNULL(master(i))->bound_rpc_hostport());
+ }
+ string bind_host = GetBindIpForTabletServer(idx);
+
+ ExternalDaemonOptions opts(opts_.logtostderr);
+ opts.messenger = messenger_;
+ opts.exe = GetBinaryPath(kTabletServerBinaryName);
+ opts.wal_dir = GetWalPath(daemon_id);
+ opts.data_dirs = GetDataPaths(daemon_id);
+ opts.log_dir = GetLogPath(daemon_id);
+ if (FLAGS_perf_record) {
+ opts.perf_record_filename =
+ Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
+ }
+ opts.extra_flags = SubstituteInFlags(opts_.extra_tserver_flags, idx);
+ opts.start_process_timeout = opts_.start_process_timeout;
+ opts.rpc_bind_address = HostPort(bind_host, 0);
+
+ scoped_refptr<ExternalTabletServer> ts =
+ new ExternalTabletServer(opts, master_hostports);
+ if (opts_.enable_kerberos) {
+ RETURN_NOT_OK_PREPEND(ts->EnableKerberos(kdc_.get(), bind_host),
+ "could not enable Kerberos");
+ }
+
+ RETURN_NOT_OK(ts->Start());
+ tablet_servers_.push_back(ts);
+ return Status::OK();
+}
+
+Status ExternalMiniCluster::WaitForTabletServerCount(int count, const MonoDelta& timeout) {
+ MonoTime deadline = MonoTime::Now() + timeout;
+
+ unordered_set<int> masters_to_search;
+ for (int i = 0; i < masters_.size(); i++) {
+ if (!masters_[i]->IsShutdown()) {
+ masters_to_search.insert(i);
+ }
+ }
+
+ while (true) {
+ MonoDelta remaining = deadline - MonoTime::Now();
+ if (remaining.ToSeconds() < 0) {
+ return Status::TimedOut(Substitute(
+ "Timed out waiting for $0 TS(s) to register with all masters", count));
+ }
+
+ for (auto iter = masters_to_search.begin(); iter != masters_to_search.end();) {
+ master::ListTabletServersRequestPB req;
+ master::ListTabletServersResponsePB resp;
+ rpc::RpcController rpc;
+ rpc.set_timeout(remaining);
+ RETURN_NOT_OK_PREPEND(master_proxy(*iter)->ListTabletServers(req, &resp, &rpc),
+ "ListTabletServers RPC failed");
+ // ListTabletServers() may return servers that are no longer online.
+ // Do a second step of verification to verify that the descs that we got
+ // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
+ int match_count = 0;
+ for (const master::ListTabletServersResponsePB_Entry& e : resp.servers()) {
+ for (const scoped_refptr<ExternalTabletServer>& ets : tablet_servers_) {
+ if (ets->instance_id().permanent_uuid() == e.instance_id().permanent_uuid() &&
+ ets->instance_id().instance_seqno() == e.instance_id().instance_seqno()) {
+ match_count++;
+ break;
+ }
+ }
+ }
+ if (match_count == count) {
+ // This master has returned the correct set of tservers.
+ iter = masters_to_search.erase(iter);
+ } else {
+ iter++;
+ }
+ }
+
+ if (masters_to_search.empty()) {
+ // All masters have returned the correct set of tservers.
+ LOG(INFO) << count << " TS(s) registered with all masters";
+ return Status::OK();
+ }
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ }
+}
+
+void ExternalMiniCluster::AssertNoCrashes() {
+ vector<ExternalDaemon*> daemons = this->daemons();
+ int num_crashes = 0;
+ for (ExternalDaemon* d : daemons) {
+ if (d->IsShutdown()) continue;
+ if (!d->IsProcessAlive()) {
+ LOG(ERROR) << "Process with UUID " << d->uuid() << " has crashed";
+ num_crashes++;
+ }
+ }
+ ASSERT_EQ(0, num_crashes) << "At least one process crashed";
+}
+
+Status ExternalMiniCluster::WaitForTabletsRunning(ExternalTabletServer* ts,
+ int min_tablet_count,
+ const MonoDelta& timeout) {
+ TabletServerServiceProxy proxy(messenger_, ts->bound_rpc_addr(), ts->bound_rpc_addr().host());
+ ListTabletsRequestPB req;
+ ListTabletsResponsePB resp;
+
+ MonoTime deadline = MonoTime::Now() + timeout;
+ while (MonoTime::Now() < deadline) {
+ rpc::RpcController rpc;
+ rpc.set_timeout(MonoDelta::FromSeconds(10));
+ RETURN_NOT_OK(proxy.ListTablets(req, &resp, &rpc));
+ if (resp.has_error()) {
+ return StatusFromPB(resp.error().status());
+ }
+
+ bool all_running = true;
+ for (const StatusAndSchemaPB& status : resp.status_and_schema()) {
+ if (status.tablet_status().state() != tablet::RUNNING) {
+ all_running = false;
+ }
+ }
+
+ // We're done if:
+ // 1. All the tablets are running, and
+ // 2. We've observed as many tablets as we had expected or more.
+ if (all_running && resp.status_and_schema_size() >= min_tablet_count) {
+ return Status::OK();
+ }
+
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ }
+
+ return Status::TimedOut(SecureDebugString(resp));
+}
+
+Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) {
+ scoped_refptr<ConnectToClusterRpc> rpc;
+ Synchronizer sync;
+ vector<pair<Sockaddr, string>> addrs_with_names;
+ Sockaddr leader_master_addr;
+ MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
+
+ for (const scoped_refptr<ExternalMaster>& master : masters_) {
+ addrs_with_names.emplace_back(master->bound_rpc_addr(), master->bound_rpc_addr().host());
+ }
+ const auto& cb = [&](const Status& status,
+ const pair<Sockaddr, string>& leader_master,
+ const master::ConnectToMasterResponsePB& resp) {
+ if (status.ok()) {
+ leader_master_addr = leader_master.first;
+ }
+ sync.StatusCB(status);
+ };
+ rpc.reset(new ConnectToClusterRpc(cb,
+ std::move(addrs_with_names),
+ deadline,
+ MonoDelta::FromSeconds(5),
+ messenger_));
+ rpc->SendRpc();
+ RETURN_NOT_OK(sync.Wait());
+ bool found = false;
+ for (int i = 0; i < masters_.size(); i++) {
+ if (masters_[i]->bound_rpc_hostport().port() == leader_master_addr.port()) {
+ found = true;
+ *idx = i;
+ break;
+ }
+ }
+ if (!found) {
+ // There is never a situation where this should happen, so it's
+ // better to exit with a FATAL log message right away vs. return a
+ // Status::IllegalState().
+ LOG(FATAL) << "Leader master is not in masters_";
+ }
+ return Status::OK();
+}
+
+ExternalTabletServer* ExternalMiniCluster::tablet_server_by_uuid(const std::string& uuid) const {
+ for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
+ if (ts->instance_id().permanent_uuid() == uuid) {
+ return ts.get();
+ }
+ }
+ return nullptr;
+}
+
+int ExternalMiniCluster::tablet_server_index_by_uuid(const std::string& uuid) const {
+ for (int i = 0; i < tablet_servers_.size(); i++) {
+ if (tablet_servers_[i]->uuid() == uuid) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+vector<ExternalDaemon*> ExternalMiniCluster::daemons() const {
+ vector<ExternalDaemon*> results;
+ for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
+ results.push_back(ts.get());
+ }
+ for (const scoped_refptr<ExternalMaster>& master : masters_) {
+ results.push_back(master.get());
+ }
+ return results;
+}
+
+vector<HostPort> ExternalMiniCluster::master_rpc_addrs() const {
+ vector<HostPort> master_rpc_addrs;
+ for (int i = 0; i < opts_.master_rpc_ports.size(); i++) {
+ master_rpc_addrs.emplace_back(
+ GetBindIpForDaemon(MiniCluster::MASTER, i, opts_.bind_mode),
+ opts_.master_rpc_ports[i]);
+ }
+ return master_rpc_addrs;
+}
+
+std::shared_ptr<rpc::Messenger> ExternalMiniCluster::messenger() const {
+ return messenger_;
+}
+
+std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy() const {
+ CHECK_EQ(masters_.size(), 1);
+ return master_proxy(0);
+}
+
+std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy(int idx) const {
+ CHECK_LT(idx, masters_.size());
+ const auto& addr = CHECK_NOTNULL(master(idx))->bound_rpc_addr();
+ return std::make_shared<MasterServiceProxy>(messenger_, addr, addr.host());
+}
+
+Status ExternalMiniCluster::CreateClient(client::KuduClientBuilder* builder,
+ client::sp::shared_ptr<client::KuduClient>* client) const {
+ client::KuduClientBuilder defaults;
+ if (builder == nullptr) {
+ builder = &defaults;
+ }
+
+ CHECK(!masters_.empty());
+ builder->clear_master_server_addrs();
+ for (const scoped_refptr<ExternalMaster>& master : masters_) {
+ builder->add_master_server_addr(master->bound_rpc_hostport().ToString());
+ }
+ return builder->Build(client);
+}
+
+Status ExternalMiniCluster::SetFlag(ExternalDaemon* daemon,
+ const string& flag,
+ const string& value) {
+ const auto& addr = daemon->bound_rpc_addr();
+ server::GenericServiceProxy proxy(messenger_, addr, addr.host());
+
+ rpc::RpcController controller;
+ controller.set_timeout(MonoDelta::FromSeconds(30));
+ server::SetFlagRequestPB req;
+ server::SetFlagResponsePB resp;
+ req.set_flag(flag);
+ req.set_value(value);
+ req.set_force(true);
+ RETURN_NOT_OK_PREPEND(proxy.SetFlag(req, &resp, &controller),
+ "rpc failed");
+ if (resp.result() != server::SetFlagResponsePB::SUCCESS) {
+ return Status::RemoteError("failed to set flag",
+ SecureShortDebugString(resp));
+ }
+ return Status::OK();
+}
+
+//------------------------------------------------------------
+// ExternalDaemon
+//------------------------------------------------------------
+
+ExternalDaemon::ExternalDaemon(ExternalDaemonOptions opts)
+ : messenger_(std::move(opts.messenger)),
+ wal_dir_(std::move(opts.wal_dir)),
+ data_dirs_(std::move(opts.data_dirs)),
+ log_dir_(std::move(opts.log_dir)),
+ perf_record_filename_(std::move(opts.perf_record_filename)),
+ start_process_timeout_(opts.start_process_timeout),
+ logtostderr_(opts.logtostderr),
+ rpc_bind_address_(std::move(opts.rpc_bind_address)),
+ exe_(std::move(opts.exe)),
+ extra_flags_(std::move(opts.extra_flags)) {
+ CHECK(rpc_bind_address_.Initialized());
+}
+
+ExternalDaemon::~ExternalDaemon() {
+}
+
+Status ExternalDaemon::EnableKerberos(MiniKdc* kdc, const string& bind_host) {
+ string spn = "kudu/" + bind_host;
+ string ktpath;
+ RETURN_NOT_OK_PREPEND(kdc->CreateServiceKeytab(spn, &ktpath),
+ "could not create keytab");
+ extra_env_ = kdc->GetEnvVars();
+ extra_flags_.push_back(Substitute("--keytab_file=$0", ktpath));
+ extra_flags_.push_back(Substitute("--principal=$0", spn));
+ extra_flags_.emplace_back("--rpc_authentication=required");
+ extra_flags_.emplace_back("--superuser_acl=test-admin");
+ extra_flags_.emplace_back("--user_acl=test-user");
+ return Status::OK();
+}
+
+Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
+ CHECK(!process_);
+
+ vector<string> argv;
+
+ // First the exe for argv[0].
+ argv.push_back(exe_);
+
+ // Then all the flags coming from the minicluster framework.
+ argv.insert(argv.end(), user_flags.begin(), user_flags.end());
+
+ // Disable fsync to dramatically speed up runtime. This is safe as no tests
+ // rely on forcefully cutting power to a machine or equivalent.
+ argv.emplace_back("--never_fsync");
+
+ // Generate smaller RSA keys -- generating a 1024-bit key is faster
+ // than generating the default 2048-bit key, and we don't care about
+ // strong encryption in tests. Setting it lower (e.g. 512 bits) results
+ // in OpenSSL errors RSA_sign:digest too big for rsa key:rsa_sign.c:122
+ // since we are using strong/high TLS v1.2 cipher suites, so the minimum
+ // size of TLS-related RSA key is 768 bits (due to the usage of
+ // the ECDHE-RSA-AES256-GCM-SHA384 suite). However, to work with Java
+ // client it's necessary to have at least 1024 bits for certificate RSA key
+ // due to Java security policies.
+ argv.emplace_back("--ipki_server_key_size=1024");
+
+ // Disable minidumps by default since many tests purposely inject faults.
+ argv.emplace_back("--enable_minidumps=false");
+
+ // Disable log redaction.
+ argv.emplace_back("--redact=flag");
+
+ // Enable metrics logging.
+ argv.emplace_back("--metrics_log_interval_ms=1000");
+
+ if (logtostderr_) {
+ // Ensure that logging goes to the test output and doesn't get buffered.
+ argv.emplace_back("--logtostderr");
+ argv.emplace_back("--logbuflevel=-1");
+ }
+
+ // Even if we are logging to stderr, metrics logs and minidumps end up being
+ // written based on -log_dir. So, we have to set that too.
+ argv.push_back("--log_dir=" + log_dir_);
+ RETURN_NOT_OK(env_util::CreateDirsRecursively(Env::Default(), log_dir_));
+
+ // Tell the server to dump its port information so we can pick it up.
+ string info_path = JoinPathSegments(data_dirs_[0], "info.pb");
+ argv.push_back("--server_dump_info_path=" + info_path);
+ argv.emplace_back("--server_dump_info_format=pb");
+
+ // We use ephemeral ports in many tests. They don't work for production, but are OK
+ // in unit tests.
+ argv.emplace_back("--rpc_server_allow_ephemeral_ports");
+
+ // Allow unsafe and experimental flags from tests, since we often use
+ // fault injection, etc.
+ argv.emplace_back("--unlock_experimental_flags");
+ argv.emplace_back("--unlock_unsafe_flags");
+
+ // Then the "extra flags" passed into the ctor (from the ExternalMiniCluster
+ // options struct). These come at the end so they can override things like
+ // web port or RPC bind address if necessary.
+ argv.insert(argv.end(), extra_flags_.begin(), extra_flags_.end());
+
+ // A previous instance of the daemon may have run in the same directory. So, remove
+ // the previous info file if it's there.
+ ignore_result(Env::Default()->DeleteFile(info_path));
+
+ // Start the daemon.
+ unique_ptr<Subprocess> p(new Subprocess(argv));
+ p->ShareParentStdout(false);
+ p->SetEnvVars(extra_env_);
+ string env_str;
+ JoinMapKeysAndValues(extra_env_, "=", ",", &env_str);
+ LOG(INFO) << "Running " << exe_ << "\n" << JoinStrings(argv, "\n")
+ << " with env {" << env_str << "}";
+ RETURN_NOT_OK_PREPEND(p->Start(),
+ Substitute("Failed to start subprocess $0", exe_));
+
+ // If requested, start a monitoring subprocess.
+ unique_ptr<Subprocess> perf_record;
+ if (!perf_record_filename_.empty()) {
+ perf_record.reset(new Subprocess({
+ "perf",
+ "record",
+ "--call-graph",
+ "fp",
+ "-o",
+ perf_record_filename_,
+ Substitute("--pid=$0", p->pid())
+ }, SIGINT));
+ RETURN_NOT_OK_PREPEND(perf_record->Start(),
+ "Could not start perf record subprocess");
+ }
+
+ // The process is now starting -- wait for the bound port info to show up.
+ Stopwatch sw;
+ sw.start();
+ bool success = false;
+ while (sw.elapsed().wall_seconds() < start_process_timeout_.ToSeconds()) {
+ if (Env::Default()->FileExists(info_path)) {
+ success = true;
+ break;
+ }
+ SleepFor(MonoDelta::FromMilliseconds(10));
+ int wait_status;
+ Status s = p->WaitNoBlock(&wait_status);
+ if (s.IsTimedOut()) {
+ // The process is still running.
+ continue;
+ }
+
+ // If the process exited with expected exit status we need to still swap() the process
+ // and exit as if it had succeeded.
+ if (WIFEXITED(wait_status) && WEXITSTATUS(wait_status) == fault_injection::kExitStatus) {
+ process_.swap(p);
+ perf_record_process_.swap(perf_record);
+ return Status::OK();
+ }
+
+ RETURN_NOT_OK_PREPEND(s, Substitute("Failed waiting on $0", exe_));
+ string exit_info;
+ RETURN_NOT_OK(p->GetExitStatus(nullptr, &exit_info));
+ return Status::RuntimeError(exit_info);
+ }
+
+ if (!success) {
+ ignore_result(p->Kill(SIGKILL));
+ return Status::TimedOut(
+ Substitute("Timed out after $0s waiting for process ($1) to write info file ($2)",
+ start_process_timeout_.ToString(), exe_, info_path));
+ }
+
+ status_.reset(new ServerStatusPB());
+ RETURN_NOT_OK_PREPEND(pb_util::ReadPBFromPath(Env::Default(), info_path, status_.get()),
+ "Failed to read info file from " + info_path);
+ LOG(INFO) << "Started " << exe_ << " as pid " << p->pid();
+ VLOG(1) << exe_ << " instance information:\n" << SecureDebugString(*status_);
+
+ process_.swap(p);
+ perf_record_process_.swap(perf_record);
+ return Status::OK();
+}
+
+void ExternalDaemon::SetExePath(string exe) {
+ CHECK(IsShutdown()) << "Call Shutdown() before changing the executable path";
+ exe_ = std::move(exe);
+}
+
+Status ExternalDaemon::Pause() {
+ if (!process_) {
+ return Status::IllegalState(Substitute(
+ "Request to pause '$0' but the process is not there", exe_));
+ }
+ VLOG(1) << "Pausing " << exe_ << " with pid " << process_->pid();
+ const Status s = process_->Kill(SIGSTOP);
+ RETURN_NOT_OK(s);
+ paused_ = true;
+ return s;
+}
+
+Status ExternalDaemon::Resume() {
+ if (!process_) {
+ return Status::IllegalState(Substitute(
+ "Request to resume '$0' but the process is not there", exe_));
+ }
+ VLOG(1) << "Resuming " << exe_ << " with pid " << process_->pid();
+ const Status s = process_->Kill(SIGCONT);
+ RETURN_NOT_OK(s);
+ paused_ = false;
+ return s;
+}
+
+bool ExternalDaemon::IsShutdown() const {
+ return !process_;
+}
+
+bool ExternalDaemon::IsProcessAlive() const {
+ if (IsShutdown()) {
+ return false;
+ }
+ Status s = process_->WaitNoBlock();
+ // If the non-blocking Wait "times out", that means the process
+ // is running.
+ return s.IsTimedOut();
+}
+
+Status ExternalDaemon::WaitForInjectedCrash(const MonoDelta& timeout) const {
+ return WaitForCrash(timeout, [](int status) {
+ return WIFEXITED(status) && WEXITSTATUS(status) == fault_injection::kExitStatus;
+ }, "fault injection");
+}
+
+Status ExternalDaemon::WaitForFatal(const MonoDelta& timeout) const {
+ return WaitForCrash(timeout, [](int status) {
+ return WIFSIGNALED(status) && WTERMSIG(status) == SIGABRT;
+ }, "FATAL crash");
+}
+
+
+Status ExternalDaemon::WaitForCrash(const MonoDelta& timeout,
+ const std::function<bool(int)>& wait_status_predicate,
+ const char* crash_type_str) const {
+ CHECK(process_) << "process not started";
+ MonoTime deadline = MonoTime::Now() + timeout;
+
+ int i = 1;
+ while (IsProcessAlive() && MonoTime::Now() < deadline) {
+ int sleep_ms = std::min(i++ * 10, 200);
+ SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
+ }
+
+ if (IsProcessAlive()) {
+ return Status::TimedOut(Substitute("Process did not crash within $0",
+ timeout.ToString()));
+ }
+
+ // If the process has exited, make sure it exited with the expected status.
+ int wait_status;
+ RETURN_NOT_OK_PREPEND(process_->WaitNoBlock(&wait_status),
+ "could not get wait status");
+
+ if (!wait_status_predicate(wait_status)) {
+ string info_str;
+ RETURN_NOT_OK_PREPEND(process_->GetExitStatus(nullptr, &info_str),
+ "could not get description of exit");
+ return Status::Aborted(
+ Substitute("process exited, but not due to a $0: $1", crash_type_str, info_str));
+ }
+ return Status::OK();
+}
+
+pid_t ExternalDaemon::pid() const {
+ return process_->pid();
+}
+
+Subprocess* ExternalDaemon::process() const {
+ return process_.get();
+}
+
+void ExternalDaemon::Shutdown() {
+ if (!process_) return;
+
+ // Before we kill the process, store the addresses. If we're told to
+ // start again we'll reuse these. Store only the port if the
+ // daemons were using wildcard address for binding.
+ if (rpc_bind_address().host() != MiniCluster::kWildcardIpAddr) {
+ bound_rpc_ = bound_rpc_hostport();
+ bound_http_ = bound_http_hostport();
+ } else {
+ bound_rpc_.set_host(MiniCluster::kWildcardIpAddr);
+ bound_rpc_.set_port(bound_rpc_hostport().port());
+ bound_http_.set_host(MiniCluster::kWildcardIpAddr);
+ bound_http_.set_port(bound_http_hostport().port());
+ }
+
+ if (IsProcessAlive()) {
+ if (!paused_) {
+ // In coverage builds, ask the process nicely to flush coverage info
+ // before we kill -9 it. Otherwise, we never get any coverage from
+ // external clusters.
+ FlushCoverage();
+ // Similarly, check for leaks in LSAN builds before killing.
+ CheckForLeaks();
+ }
+
+ LOG(INFO) << "Killing " << exe_ << " with pid " << process_->pid();
+ ignore_result(process_->Kill(SIGKILL));
+ }
+ WARN_NOT_OK(process_->Wait(), "Waiting on " + exe_);
+ paused_ = false;
+ process_.reset();
+ perf_record_process_.reset();
+}
+
+Status ExternalDaemon::DeleteFromDisk() const {
+ for (const string& data_dir : data_dirs()) {
+ RETURN_NOT_OK(Env::Default()->DeleteRecursively(data_dir));
+ }
+ RETURN_NOT_OK(Env::Default()->DeleteRecursively(wal_dir()));
+ return Status::OK();
+}
+
+void ExternalDaemon::FlushCoverage() {
+#ifndef COVERAGE_BUILD
+ return; // NOLINT(*)
+#else
+ LOG(INFO) << "Attempting to flush coverage for " << exe_ << " pid " << process_->pid();
+ server::GenericServiceProxy proxy(
+ messenger_, bound_rpc_addr(), bound_rpc_addr().host());
+
+ server::FlushCoverageRequestPB req;
+ server::FlushCoverageResponsePB resp;
+ rpc::RpcController rpc;
+
+ rpc.set_timeout(MonoDelta::FromMilliseconds(1000));
+ Status s = proxy.FlushCoverage(req, &resp, &rpc);
+ if (s.ok() && !resp.success()) {
+ s = Status::RemoteError("Server does not appear to be running a coverage build");
+ }
+ WARN_NOT_OK(s, Substitute("Unable to flush coverage on $0 pid $1", exe_, process_->pid()));
+#endif
+}
+
+void ExternalDaemon::CheckForLeaks() {
+#if defined(__has_feature)
+# if __has_feature(address_sanitizer)
+ LOG(INFO) << "Attempting to check leaks for " << exe_ << " pid " << process_->pid();
+ server::GenericServiceProxy proxy(messenger_, bound_rpc_addr(), bound_rpc_addr().host());
+
+ server::CheckLeaksRequestPB req;
+ server::CheckLeaksResponsePB resp;
+ rpc::RpcController rpc;
+
+ rpc.set_timeout(MonoDelta::FromMilliseconds(1000));
+ Status s = proxy.CheckLeaks(req, &resp, &rpc);
+ if (s.ok()) {
+ if (!resp.success()) {
+ s = Status::RemoteError("Server does not appear to be running an LSAN build");
+ } else {
+ CHECK(!resp.found_leaks()) << "Found leaks in " << exe_ << " pid " << process_->pid();
+ }
+ }
+ WARN_NOT_OK(s, Substitute("Unable to check leaks on $0 pid $1", exe_, process_->pid()));
+# endif
+#endif
+}
+
+HostPort ExternalDaemon::bound_rpc_hostport() const {
+ CHECK(status_);
+ CHECK_GE(status_->bound_rpc_addresses_size(), 1);
+ HostPort ret;
+ CHECK_OK(HostPortFromPB(status_->bound_rpc_addresses(0), &ret));
+ return ret;
+}
+
+Sockaddr ExternalDaemon::bound_rpc_addr() const {
+ HostPort hp = bound_rpc_hostport();
+ vector<Sockaddr> addrs;
+ CHECK_OK(hp.ResolveAddresses(&addrs));
+ CHECK(!addrs.empty());
+ return addrs[0];
+}
+
+HostPort ExternalDaemon::bound_http_hostport() const {
+ CHECK(status_);
+ if (status_->bound_http_addresses_size() == 0) {
+ return HostPort();
+ }
+ HostPort ret;
+ CHECK_OK(HostPortFromPB(status_->bound_http_addresses(0), &ret));
+ return ret;
+}
+
+const NodeInstancePB& ExternalDaemon::instance_id() const {
+ CHECK(status_);
+ return status_->node_instance();
+}
+
+const string& ExternalDaemon::uuid() const {
+ CHECK(status_);
+ return status_->node_instance().permanent_uuid();
+}
+
+Status ExternalDaemon::GetInt64Metric(const MetricEntityPrototype* entity_proto,
+ const char* entity_id,
+ const MetricPrototype* metric_proto,
+ const char* value_field,
+ int64_t* value) const {
+ CHECK(bound_http_hostport().Initialized());
+ // Fetch metrics whose name matches the given prototype.
+ string url = Substitute(
+ "http://$0/jsonmetricz?metrics=$1",
+ bound_http_hostport().ToString(),
+ metric_proto->name());
+ EasyCurl curl;
+ faststring dst;
+ RETURN_NOT_OK(curl.FetchURL(url, &dst));
+
+ // Parse the results, beginning with the top-level entity array.
+ JsonReader r(dst.ToString());
+ RETURN_NOT_OK(r.Init());
+ vector<const Value*> entities;
+ RETURN_NOT_OK(r.ExtractObjectArray(r.root(), NULL, &entities));
+ for (const Value* entity : entities) {
+ // Find the desired entity.
+ string type;
+ RETURN_NOT_OK(r.ExtractString(entity, "type", &type));
+ if (type != entity_proto->name()) {
+ continue;
+ }
+ if (entity_id) {
+ string id;
+ RETURN_NOT_OK(r.ExtractString(entity, "id", &id));
+ if (id != entity_id) {
+ continue;
+ }
+ }
+
+ // Find the desired metric within the entity.
+ vector<const Value*> metrics;
+ RETURN_NOT_OK(r.ExtractObjectArray(entity, "metrics", &metrics));
+ for (const Value* metric : metrics) {
+ string name;
+ RETURN_NOT_OK(r.ExtractString(metric, "name", &name));
+ if (name != metric_proto->name()) {
+ continue;
+ }
+ RETURN_NOT_OK(r.ExtractInt64(metric, value_field, value));
+ return Status::OK();
+ }
+ }
+ string msg;
+ if (entity_id) {
+ msg = Substitute("Could not find metric $0.$1 for entity $2",
+ entity_proto->name(), metric_proto->name(),
+ entity_id);
+ } else {
+ msg = Substitute("Could not find metric $0.$1",
+ entity_proto->name(), metric_proto->name());
+ }
+ return Status::NotFound(msg);
+}
+
+//------------------------------------------------------------
+// ScopedResumeExternalDaemon
+//------------------------------------------------------------
+
+ScopedResumeExternalDaemon::ScopedResumeExternalDaemon(ExternalDaemon* daemon)
+ : daemon_(CHECK_NOTNULL(daemon)) {
+}
+
+ScopedResumeExternalDaemon::~ScopedResumeExternalDaemon() {
+ WARN_NOT_OK(daemon_->Resume(), "Could not resume external daemon");
+}
+
+//------------------------------------------------------------
+// ExternalMaster
+//------------------------------------------------------------
+
+ExternalMaster::ExternalMaster(ExternalDaemonOptions opts)
+ : ExternalDaemon(std::move(opts)) {
+}
+
+ExternalMaster::~ExternalMaster() {
+}
+
+Status ExternalMaster::Start() {
+ vector<string> flags(GetCommonFlags());
+ flags.push_back(Substitute("--rpc_bind_addresses=$0", rpc_bind_address().ToString()));
+ flags.push_back(Substitute("--webserver_interface=$0", rpc_bind_address().host()));
+ flags.emplace_back("--webserver_port=0");
+ return StartProcess(flags);
+}
+
+Status ExternalMaster::Restart() {
+ // We store the addresses on shutdown so make sure we did that first.
+ if (bound_rpc_.port() == 0) {
+ return Status::IllegalState("Master cannot be restarted. Must call Shutdown() first.");
+ }
+
+ vector<string> flags(GetCommonFlags());
+ flags.push_back(Substitute("--rpc_bind_addresses=$0", bound_rpc_.ToString()));
+
+ if (bound_http_.Initialized()) {
+ flags.push_back(Substitute("--webserver_interface=$0", bound_http_.host()));
+ flags.push_back(Substitute("--webserver_port=$0", bound_http_.port()));
+ } else {
+ flags.push_back(Substitute("--webserver_interface=$0", bound_rpc_.host()));
+ flags.emplace_back("--webserver_port=0");
+ }
+
+ return StartProcess(flags);
+}
+
+Status ExternalMaster::WaitForCatalogManager() {
+ unique_ptr<MasterServiceProxy> proxy(new MasterServiceProxy(
+ messenger_, bound_rpc_addr(), bound_rpc_addr().host()));
+ Stopwatch sw;
+ sw.start();
+ while (sw.elapsed().wall_seconds() < kMasterCatalogManagerTimeoutSeconds) {
+ ListTablesRequestPB req;
+ ListTablesResponsePB resp;
+ RpcController rpc;
+ Status s = proxy->ListTables(req, &resp, &rpc);
+ if (s.ok()) {
+ if (!resp.has_error()) {
+ // This master is the leader and is up and running.
+ break;
+ }
+ s = StatusFromPB(resp.error().status());
+ if (s.IsIllegalState()) {
+ // This master is not the leader but is otherwise up and running.
+ break;
+ }
+ if (!s.IsServiceUnavailable()) {
+ // Unexpected error from master.
+ return s;
+ }
+ } else if (!s.IsTimedOut() && !s.IsNetworkError()) {
+ // Unexpected error from proxy.
+ return s;
+ }
+
+ // There was some kind of transient network error or the master isn't yet
+ // ready. Sleep and retry.
+ SleepFor(MonoDelta::FromMilliseconds(50));
+ }
+ if (sw.elapsed().wall_seconds() > kMasterCatalogManagerTimeoutSeconds) {
+ return Status::TimedOut(
+ Substitute("Timed out after $0s waiting for master ($1) startup",
+ kMasterCatalogManagerTimeoutSeconds,
+ bound_rpc_addr().ToString()));
+ }
+ return Status::OK();
+}
+
+vector<string> ExternalMaster::GetCommonFlags() const {
+ return {
+ "--fs_wal_dir=" + wal_dir_,
+ "--fs_data_dirs=" + JoinStrings(data_dirs_, ","),
+ "--webserver_interface=localhost",
+
+ // See the in-line comment for "--ipki_server_key_size" flag in
+ // ExternalDaemon::StartProcess() method.
+ "--ipki_ca_key_size=1024",
+
+ // As for the TSK keys, 512 bits is the minimum since we are using the SHA256
+ // digest for token signing/verification.
+ "--tsk_num_rsa_bits=512",
+ };
+}
+
+
+//------------------------------------------------------------
+// ExternalTabletServer
+//------------------------------------------------------------
+
+ExternalTabletServer::ExternalTabletServer(ExternalDaemonOptions opts,
+ vector<HostPort> master_addrs)
+ : ExternalDaemon(std::move(opts)),
+ master_addrs_(std::move(master_addrs)) {
+ DCHECK(!master_addrs_.empty());
+}
+
+ExternalTabletServer::~ExternalTabletServer() {
+}
+
+Status ExternalTabletServer::Start() {
+ vector<string> flags;
+ flags.push_back("--fs_wal_dir=" + wal_dir_);
+ flags.push_back("--fs_data_dirs=" + JoinStrings(data_dirs_, ","));
+ flags.push_back(Substitute("--rpc_bind_addresses=$0",
+ rpc_bind_address().ToString()));
+ flags.push_back(Substitute("--local_ip_for_outbound_sockets=$0",
+ rpc_bind_address().host()));
+ flags.push_back(Substitute("--webserver_interface=$0",
+ rpc_bind_address().host()));
+ flags.emplace_back("--webserver_port=0");
+ flags.push_back(Substitute("--tserver_master_addrs=$0",
+ HostPort::ToCommaSeparatedString(master_addrs_)));
+ RETURN_NOT_OK(StartProcess(flags));
+ return Status::OK();
+}
+
+Status ExternalTabletServer::Restart() {
+ // We store the addresses on shutdown so make sure we did that first.
+ if (bound_rpc_.port() == 0) {
+ return Status::IllegalState("Tablet server cannot be restarted. Must call Shutdown() first.");
+ }
+ vector<string> flags;
+ flags.push_back("--fs_wal_dir=" + wal_dir_);
+ flags.push_back("--fs_data_dirs=" + JoinStrings(data_dirs_, ","));
+ flags.push_back(Substitute("--rpc_bind_addresses=$0", bound_rpc_.ToString()));
+ flags.push_back(Substitute("--local_ip_for_outbound_sockets=$0",
+ rpc_bind_address().host()));
+ if (bound_http_.Initialized()) {
+ flags.push_back(Substitute("--webserver_port=$0", bound_http_.port()));
+ flags.push_back(Substitute("--webserver_interface=$0",
+ bound_http_.host()));
+ }
+ flags.push_back(Substitute("--tserver_master_addrs=$0",
+ HostPort::ToCommaSeparatedString(master_addrs_)));
+ return StartProcess(flags);
+}
+
+} // namespace cluster
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/mini-cluster/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
new file mode 100644
index 0000000..4ed96de
--- /dev/null
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -0,0 +1,568 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <sys/types.h>
+
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/mini-cluster/mini_cluster.h"
+#include "kudu/security/test/mini_kdc.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class MetricEntityPrototype;
+class MetricPrototype;
+class NodeInstancePB;
+class Sockaddr;
+class Subprocess;
+
+namespace client {
+class KuduClient;
+class KuduClientBuilder;
+} // namespace client
+
+namespace master {
+class MasterServiceProxy;
+} // namespace master
+
+namespace rpc {
+class Messenger;
+} // namespace rpc
+
+namespace server {
+class ServerStatusPB;
+} // namespace server
+
+namespace cluster {
+
+class ExternalDaemon;
+class ExternalMaster;
+class ExternalTabletServer;
+
+struct ExternalMiniClusterOptions {
+ ExternalMiniClusterOptions();
+
+ // Number of masters to start.
+ // Default: 1
+ int num_masters;
+
+ // Number of TS to start.
+ // Default: 1
+ int num_tablet_servers;
+
+ // Directory in which to store data.
+ // Default: "", which auto-generates a unique path for this cluster.
+ std::string data_root;
+
+ MiniCluster::BindMode bind_mode;
+
+ // The path where the kudu daemons should be run from.
+ // Default: "", which uses the same path as the currently running executable.
+ // This works for unit tests, since they all end up in build/latest/bin.
+ std::string daemon_bin_path;
+
+ // Number of data directories to be created for each daemon.
+ // Default: 1
+ int num_data_dirs;
+
+ // Extra flags for tablet servers and masters respectively.
+ //
+ // In these flags, you may use the special string '${index}' which will
+ // be substituted with the index of the tablet server or master.
+ std::vector<std::string> extra_tserver_flags;
+ std::vector<std::string> extra_master_flags;
+
+ // If more than one master is specified, list of ports for the
+ // masters in a consensus configuration. Port at index 0 is used for the leader
+ // master.
+ std::vector<uint16_t> master_rpc_ports;
+
+ // Options to configure the MiniKdc before starting it up.
+ // Only used when 'enable_kerberos' is 'true'.
+ MiniKdcOptions mini_kdc_options;
+
+ // If true, set up a KDC as part of this ExternalMiniCluster, generate keytabs for
+ // the servers, and require Kerberos authentication from clients.
+ //
+ // Additionally, when the cluster is started, the environment of the
+ // test process will be modified to include Kerberos credentials for
+ // a principal named 'testuser'.
+ bool enable_kerberos;
+
+ // If true, sends logging output to stderr instead of a log file. Defaults to
+ // true.
+ bool logtostderr;
+
+ // Amount of time that may elapse between the creation of a daemon process
+ // and the process writing out its info file. Defaults to 30 seconds.
+ MonoDelta start_process_timeout;
+};
+
+// A mini-cluster made up of subprocesses running each of the daemons
+// separately. This is useful for black-box or grey-box failure testing
+// purposes -- it provides the ability to forcibly kill or stop particular
+// cluster participants, which isn't feasible in the normal InternalMiniCluster.
+// On the other hand, there is little access to inspect the internal state
+// of the daemons.
+class ExternalMiniCluster : public MiniCluster {
+ public:
+ // Constructs a cluster with the default options.
+ ExternalMiniCluster();
+
+ // Constructs a cluster with options specified in 'opts'.
+ explicit ExternalMiniCluster(ExternalMiniClusterOptions opts);
+
+ // Destroys a cluster.
+ virtual ~ExternalMiniCluster();
+
+ // Start the cluster.
+ Status Start() override;
+
+ // Restarts the cluster. Requires that it has been Shutdown() first.
+ Status Restart();
+
+ // Add a new TS to the cluster. The new TS is started.
+ // Requires that the master is already running.
+ Status AddTabletServer();
+
+ // Currently, this uses SIGKILL on each daemon for a non-graceful shutdown.
+ void ShutdownNodes(ClusterNodes nodes) override;
+
+ // Return the IP address that the tablet server with the given index will bind to.
+ // If options.bind_to_unique_loopback_addresses is false, this will be 127.0.0.1
+ // Otherwise, it is another IP in the local netblock.
+ std::string GetBindIpForTabletServer(int index) const;
+
+ // Same as above but for a master.
+ std::string GetBindIpForMaster(int index) const;
+
+ // Return a pointer to the running leader master. This may be NULL
+ // if the cluster is not started.
+ //
+ // TODO(unknown): Use the appropriate RPC here to return the leader master,
+ // to allow some of the existing tests (e.g., raft_consensus-itest)
+ // to use multiple masters.
+ ExternalMaster* leader_master() { return master(0); }
+
+ // Perform an RPC to determine the leader of the external mini
+ // cluster. Set 'index' to the leader master's index (for calls to
+ // to master() below).
+ //
+ // NOTE: if a leader election occurs after this method is executed,
+ // the last result may not be valid.
+ Status GetLeaderMasterIndex(int* idx);
+
+ // If this cluster is configured for a single non-distributed
+ // master, return the single master or NULL if the master is not
+ // started. Exits with a CHECK failure if there are multiple
+ // masters.
+ ExternalMaster* master() const {
+ CHECK_EQ(masters_.size(), 1)
+ << "master() should not be used with multiple masters, use leader_master() instead.";
+ return master(0);
+ }
+
+ // Return master at 'idx' or NULL if the master at 'idx' has not
+ // been started.
+ ExternalMaster* master(int idx) const {
+ CHECK_LT(idx, masters_.size());
+ return masters_[idx].get();
+ }
+
+ ExternalTabletServer* tablet_server(int idx) const {
+ CHECK_LT(idx, tablet_servers_.size());
+ return tablet_servers_[idx].get();
+ }
+
+ // Return ExternalTabletServer given its UUID. If not found, returns NULL.
+ ExternalTabletServer* tablet_server_by_uuid(const std::string& uuid) const;
+
+ // Return the index of the ExternalTabletServer that has the given 'uuid', or
+ // -1 if no such UUID can be found.
+ int tablet_server_index_by_uuid(const std::string& uuid) const;
+
+ // Return all tablet servers and masters.
+ std::vector<ExternalDaemon*> daemons() const;
+
+ MiniKdc* kdc() const {
+ return CHECK_NOTNULL(kdc_.get());
+ }
+
+ int num_tablet_servers() const override {
+ return tablet_servers_.size();
+ }
+
+ int num_masters() const override {
+ return masters_.size();
+ }
+
+ BindMode bind_mode() const override {
+ return opts_.bind_mode;
+ }
+
+ std::vector<uint16_t> master_rpc_ports() const override {
+ return opts_.master_rpc_ports;
+ }
+
+ std::vector<HostPort> master_rpc_addrs() const override;
+
+ std::shared_ptr<rpc::Messenger> messenger() const override;
+ std::shared_ptr<master::MasterServiceProxy> master_proxy() const override;
+ std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const override;
+
+ // Wait until the number of registered tablet servers reaches the given count
+ // on all of the running masters. Returns Status::TimedOut if the desired
+ // count is not achieved with the given timeout.
+ Status WaitForTabletServerCount(int count, const MonoDelta& timeout);
+
+ // Runs gtest assertions that no servers have crashed.
+ void AssertNoCrashes();
+
+ // Wait until all tablets on the given tablet server are in the RUNNING
+ // state. Returns Status::TimedOut if 'timeout' elapses and at least one
+ // tablet is not yet RUNNING.
+ //
+ // If 'min_tablet_count' is not -1, will also wait for at least that many
+ // RUNNING tablets to appear before returning (potentially timing out if that
+ // number is never reached).
+ Status WaitForTabletsRunning(ExternalTabletServer* ts, int min_tablet_count,
+ const MonoDelta& timeout);
+
+ // Create a client configured to talk to this cluster.
+ // Builder may contain override options for the client. The master address will
+ // be overridden to talk to the running master.
+ //
+ // REQUIRES: the cluster must have already been Start()ed.
+ Status CreateClient(client::KuduClientBuilder* builder,
+ client::sp::shared_ptr<client::KuduClient>* client) const override;
+
+ // Sets the given flag on the given daemon, which must be running.
+ //
+ // This uses the 'force' flag on the RPC so that, even if the flag
+ // is considered unsafe to change at runtime, it is changed.
+ Status SetFlag(ExternalDaemon* daemon,
+ const std::string& flag,
+ const std::string& value) WARN_UNUSED_RESULT;
+
+ // Set the path where daemon binaries can be found.
+ // Overrides 'daemon_bin_path' set by ExternalMiniClusterOptions.
+ // The cluster must be shut down before calling this method.
+ void SetDaemonBinPath(std::string daemon_bin_path);
+
+ // Returns the path where 'binary' is expected to live, based on
+ // ExternalMiniClusterOptions.daemon_bin_path if it was provided, or on the
+ // path of the currently running executable otherwise.
+ std::string GetBinaryPath(const std::string& binary) const;
+
+ // Returns the path where 'daemon_id' is expected to store its data, based on
+ // ExternalMiniClusterOptions.data_root if it was provided, or on the
+ // standard Kudu test directory otherwise.
+ // 'dir_index' is an optional numeric suffix to be added to the default path.
+ // If it is not specified, the cluster must be configured to use a single data dir.
+ std::string GetDataPath(const std::string& daemon_id,
+ boost::optional<uint32_t> dir_index = boost::none) const;
+
+ // Returns paths where 'daemon_id' is expected to store its data, each with a
+ // numeric suffix appropriate for 'opts_.num_data_dirs'
+ std::vector<std::string> GetDataPaths(const std::string& daemon_id) const;
+
+ // Returns the path where 'daemon_id' is expected to store its wal, or other
+ // files that reside in the wal dir.
+ std::string GetWalPath(const std::string& daemon_id) const;
+
+ // Returns the path where 'daemon_id' is expected to store its logs, or other
+ // files that reside in the log dir.
+ std::string GetLogPath(const std::string& daemon_id) const;
+
+ private:
+ FRIEND_TEST(MasterFailoverTest, TestKillAnyMaster);
+
+ Status StartSingleMaster();
+
+ Status StartDistributedMasters();
+
+ Status DeduceBinRoot(std::string* ret);
+ Status HandleOptions();
+
+ const ExternalMiniClusterOptions opts_;
+
+ // The root for binaries.
+ std::string daemon_bin_path_;
+
+ std::string data_root_;
+
+ std::vector<scoped_refptr<ExternalMaster> > masters_;
+ std::vector<scoped_refptr<ExternalTabletServer> > tablet_servers_;
+ std::unique_ptr<MiniKdc> kdc_;
+
+ std::shared_ptr<rpc::Messenger> messenger_;
+
+ DISALLOW_COPY_AND_ASSIGN(ExternalMiniCluster);
+};
+
+struct ExternalDaemonOptions {
+ explicit ExternalDaemonOptions(bool logtostderr)
+ : logtostderr(logtostderr) {
+ }
+
+ bool logtostderr;
+ std::shared_ptr<rpc::Messenger> messenger;
+ std::string exe;
+ HostPort rpc_bind_address;
+ std::string wal_dir;
+ std::vector<std::string> data_dirs;
+ std::string log_dir;
+ std::string perf_record_filename;
+ std::vector<std::string> extra_flags;
+ MonoDelta start_process_timeout;
+};
+
+class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
+ public:
+ explicit ExternalDaemon(ExternalDaemonOptions opts);
+
+ HostPort bound_rpc_hostport() const;
+ Sockaddr bound_rpc_addr() const;
+
+ // Return the host/port that this daemon is bound to for HTTP.
+ // May return an uninitialized HostPort if HTTP is disabled.
+ HostPort bound_http_hostport() const;
+
+ const NodeInstancePB& instance_id() const;
+ const std::string& uuid() const;
+
+ // Return the pid of the running process.
+ // Causes a CHECK failure if the process is not running.
+ pid_t pid() const;
+
+ // Return the pointer to the undelying Subprocess if it is set.
+ // Otherwise, returns nullptr.
+ Subprocess* process() const;
+
+ // Set the path of the executable to run as a daemon.
+ // Overrides the exe path specified in the constructor.
+ // The daemon must be shut down before calling this method.
+ void SetExePath(std::string exe);
+
+ // Enable Kerberos for this daemon. This creates a Kerberos principal
+ // and keytab, and sets the appropriate environment variables in the
+ // subprocess such that the server will use Kerberos authentication.
+ //
+ // 'bind_host' is the hostname that will be used to generate the Kerberos
+ // service principal.
+ //
+ // Must be called before 'StartProcess()'.
+ Status EnableKerberos(MiniKdc* kdc, const std::string& bind_host);
+
+ // Sends a SIGSTOP signal to the daemon.
+ Status Pause() WARN_UNUSED_RESULT;
+
+ // Sends a SIGCONT signal to the daemon.
+ Status Resume() WARN_UNUSED_RESULT;
+
+ // Return true if we have explicitly shut down the process.
+ bool IsShutdown() const;
+
+ // Return true if the process is still running.
+ // This may return false if the process crashed, even if we didn't
+ // explicitly call Shutdown().
+ bool IsProcessAlive() const;
+
+ // Wait for this process to crash due to a configured fault
+ // injection, or the given timeout to elapse. If the process
+ // crashes for some reason other than an injected fault, returns
+ // Status::Aborted.
+ //
+ // If the process is already crashed, returns immediately.
+ Status WaitForInjectedCrash(const MonoDelta& timeout) const;
+
+ // Same as the above, but expects the process to crash due to a
+ // LOG(FATAL) or CHECK failure. In other words, waits for it to
+ // crash from SIGABRT.
+ Status WaitForFatal(const MonoDelta& timeout) const;
+
+ virtual void Shutdown();
+
+ // Delete files specified by 'wal_dir_' and 'data_dirs_'.
+ Status DeleteFromDisk() const WARN_UNUSED_RESULT;
+
+ const std::string& wal_dir() const { return wal_dir_; }
+
+ const std::string& data_dir() const {
+ CHECK_EQ(1, data_dirs_.size());
+ return data_dirs_[0];
+ }
+
+ const std::vector<std::string>& data_dirs() const { return data_dirs_; }
+
+ // Returns the log dir of the external daemon.
+ const std::string& log_dir() const { return log_dir_; }
+
+ // Return a pointer to the flags used for this server on restart.
+ // Modifying these flags will only take effect on the next restart.
+ std::vector<std::string>* mutable_flags() { return &extra_flags_; }
+
+ // Retrieve the value of a given metric from this server. The metric must
+ // be of int64_t type.
+ //
+ // 'value_field' represents the particular field of the metric to be read.
+ // For example, for a counter or gauge, this should be 'value'. For a
+ // histogram, it might be 'total_count' or 'mean'.
+ //
+ // 'entity_id' may be NULL, in which case the first entity of the same type
+ // as 'entity_proto' will be matched.
+ Status GetInt64Metric(const MetricEntityPrototype* entity_proto,
+ const char* entity_id,
+ const MetricPrototype* metric_proto,
+ const char* value_field,
+ int64_t* value) const;
+
+ protected:
+ friend class RefCountedThreadSafe<ExternalDaemon>;
+ virtual ~ExternalDaemon();
+
+ // Starts a process with the given flags.
+ Status StartProcess(const std::vector<std::string>& user_flags);
+
+ // Wait for the process to exit, and then call 'wait_status_predicate'
+ // on the resulting exit status. NOTE: this is not the return code, but
+ // rather the value provided by waitpid(2): use WEXITSTATUS, etc.
+ //
+ // If the predicate matches, returns OK. Otherwise, returns an error.
+ // 'crash_type_str' should be a descriptive name for the type of crash,
+ // used in formatting the error message.
+ Status WaitForCrash(const MonoDelta& timeout,
+ const std::function<bool(int)>& wait_status_predicate,
+ const char* crash_type_str) const;
+
+ // In a code-coverage build, try to flush the coverage data to disk.
+ // In a non-coverage build, this does nothing.
+ void FlushCoverage();
+
+ // In an LSAN build, ask the daemon to check for leaked memory, and
+ // LOG(FATAL) if there are any leaks.
+ void CheckForLeaks();
+
+ // Get RPC bind address for daemon.
+ const HostPort& rpc_bind_address() const {
+ return rpc_bind_address_;
+ }
+
+ const std::shared_ptr<rpc::Messenger> messenger_;
+ const std::string wal_dir_;
+ std::vector<std::string> data_dirs_;
+ const std::string log_dir_;
+ const std::string perf_record_filename_;
+ const MonoDelta start_process_timeout_;
+ const bool logtostderr_;
+ const HostPort rpc_bind_address_;
+ std::string exe_;
+ std::vector<std::string> extra_flags_;
+ std::map<std::string, std::string> extra_env_;
+
+ std::unique_ptr<Subprocess> process_;
+ bool paused_ = false;
+
+ std::unique_ptr<Subprocess> perf_record_process_;
+
+ std::unique_ptr<server::ServerStatusPB> status_;
+
+ // These capture the daemons parameters and running ports and
+ // are used to Restart() the daemon with the same parameters.
+ HostPort bound_rpc_;
+ HostPort bound_http_;
+
+ DISALLOW_COPY_AND_ASSIGN(ExternalDaemon);
+};
+
+// Resumes a daemon that was stopped with ExternalDaemon::Pause() upon
+// exiting a scope.
+class ScopedResumeExternalDaemon {
+ public:
+ // 'daemon' must remain valid for the lifetime of a
+ // ScopedResumeExternalDaemon object.
+ explicit ScopedResumeExternalDaemon(ExternalDaemon* daemon);
+
+ // Resume 'daemon_'.
+ ~ScopedResumeExternalDaemon();
+
+ private:
+ ExternalDaemon* daemon_;
+
+ DISALLOW_COPY_AND_ASSIGN(ScopedResumeExternalDaemon);
+};
+
+class ExternalMaster : public ExternalDaemon {
+ public:
+ explicit ExternalMaster(ExternalDaemonOptions opts);
+
+ Status Start();
+
+ // Restarts the daemon.
+ // Requires that it has previously been shutdown.
+ Status Restart() WARN_UNUSED_RESULT;
+
+ // Blocks until the master's catalog manager is initialized and responding to
+ // RPCs.
+ Status WaitForCatalogManager() WARN_UNUSED_RESULT;
+
+ private:
+ std::vector<std::string> GetCommonFlags() const;
+
+ friend class RefCountedThreadSafe<ExternalMaster>;
+ virtual ~ExternalMaster();
+};
+
+class ExternalTabletServer : public ExternalDaemon {
+ public:
+ ExternalTabletServer(ExternalDaemonOptions opts,
+ std::vector<HostPort> master_addrs);
+
+ Status Start();
+
+ // Restarts the daemon.
+ // Requires that it has previously been shutdown.
+ Status Restart() WARN_UNUSED_RESULT;
+
+ private:
+ const std::vector<HostPort> master_addrs_;
+
+ friend class RefCountedThreadSafe<ExternalTabletServer>;
+ virtual ~ExternalTabletServer();
+};
+
+} // namespace cluster
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/mini-cluster/internal_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.cc b/src/kudu/mini-cluster/internal_mini_cluster.cc
new file mode 100644
index 0000000..baa9330
--- /dev/null
+++ b/src/kudu/mini-cluster/internal_mini_cluster.cc
@@ -0,0 +1,377 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/mini-cluster/internal_mini_cluster.h"
+
+#include <cstdint>
+#include <ostream>
+#include <unordered_set>
+#include <utility>
+
+#include "kudu/client/client.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/master/catalog_manager.h"
+#include "kudu/master/master.h"
+#include "kudu/master/master.proxy.h"
+#include "kudu/master/mini_master.h"
+#include "kudu/master/ts_descriptor.h"
+#include "kudu/master/ts_manager.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/tserver/mini_tablet_server.h"
+#include "kudu/tserver/tablet_server.h"
+#include "kudu/tserver/tablet_server_options.h"
+#include "kudu/util/env.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace cluster {
+
+using client::KuduClient;
+using client::KuduClientBuilder;
+using master::CatalogManager;
+using master::MasterServiceProxy;
+using master::MiniMaster;
+using master::TSDescriptor;
+using std::shared_ptr;
+using tserver::MiniTabletServer;
+using tserver::TabletServer;
+
+InternalMiniClusterOptions::InternalMiniClusterOptions()
+ : num_masters(1),
+ num_tablet_servers(1),
+ num_data_dirs(1),
+ bind_mode(MiniCluster::kDefaultBindMode) {
+}
+
+InternalMiniCluster::InternalMiniCluster(Env* env, InternalMiniClusterOptions options)
+ : env_(env),
+ opts_(std::move(options)),
+ running_(false) {
+ if (opts_.data_root.empty()) {
+ opts_.data_root = JoinPathSegments(GetTestDataDirectory(), "minicluster-data");
+ }
+}
+
+InternalMiniCluster::~InternalMiniCluster() {
+ Shutdown();
+}
+
+Status InternalMiniCluster::Start() {
+ CHECK(!opts_.data_root.empty()) << "No Fs root was provided";
+ CHECK(!running_);
+
+ if (opts_.num_masters > 1) {
+ CHECK_GE(opts_.master_rpc_ports.size(), opts_.num_masters);
+ }
+
+ if (!env_->FileExists(opts_.data_root)) {
+ RETURN_NOT_OK(env_->CreateDir(opts_.data_root));
+ }
+
+ // start the masters
+ if (opts_.num_masters > 1) {
+ RETURN_NOT_OK_PREPEND(StartDistributedMasters(),
+ "Couldn't start distributed masters");
+ } else {
+ RETURN_NOT_OK_PREPEND(StartSingleMaster(), "Couldn't start the single master");
+ }
+
+ for (int i = 0; i < opts_.num_tablet_servers; i++) {
+ RETURN_NOT_OK_PREPEND(AddTabletServer(),
+ Substitute("Error adding TS $0", i));
+ }
+
+ RETURN_NOT_OK_PREPEND(WaitForTabletServerCount(opts_.num_tablet_servers),
+ "Waiting for tablet servers to start");
+
+ RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
+ .set_num_reactors(1)
+ .set_max_negotiation_threads(1)
+ .Build(&messenger_),
+ "Failed to start Messenger for minicluster");
+
+ running_ = true;
+ return Status::OK();
+}
+
+Status InternalMiniCluster::StartDistributedMasters() {
+ CHECK_GT(opts_.num_data_dirs, 0);
+ CHECK_GE(opts_.master_rpc_ports.size(), opts_.num_masters);
+ CHECK_GT(opts_.master_rpc_ports.size(), 1);
+
+ vector<HostPort> master_rpc_addrs = this->master_rpc_addrs();
+ LOG(INFO) << "Creating distributed mini masters. Addrs: "
+ << HostPort::ToCommaSeparatedString(master_rpc_addrs);
+
+ for (int i = 0; i < opts_.num_masters; i++) {
+ shared_ptr<MiniMaster> mini_master(new MiniMaster(GetMasterFsRoot(i), master_rpc_addrs[i]));
+ mini_master->SetMasterAddresses(master_rpc_addrs);
+ RETURN_NOT_OK_PREPEND(mini_master->Start(), Substitute("Couldn't start follower $0", i));
+ VLOG(1) << "Started MiniMaster with UUID " << mini_master->permanent_uuid()
+ << " at index " << i;
+ mini_masters_.push_back(std::move(mini_master));
+ }
+ int i = 0;
+ for (const shared_ptr<MiniMaster>& master : mini_masters_) {
+ LOG(INFO) << "Waiting to initialize catalog manager on master " << i++;
+ RETURN_NOT_OK_PREPEND(master->WaitForCatalogManagerInit(),
+ Substitute("Could not initialize catalog manager on master $0", i));
+ }
+ return Status::OK();
+}
+
+Status InternalMiniCluster::StartSync() {
+ RETURN_NOT_OK(Start());
+ int count = 0;
+ for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
+ RETURN_NOT_OK_PREPEND(tablet_server->WaitStarted(),
+ Substitute("TabletServer $0 failed to start.", count));
+ count++;
+ }
+ return Status::OK();
+}
+
+Status InternalMiniCluster::StartSingleMaster() {
+ CHECK_GT(opts_.num_data_dirs, 0);
+ CHECK_EQ(1, opts_.num_masters);
+ CHECK_LE(opts_.master_rpc_ports.size(), 1);
+ uint16_t master_rpc_port = 0;
+ if (opts_.master_rpc_ports.size() == 1) {
+ master_rpc_port = opts_.master_rpc_ports[0];
+ }
+
+ // start the master (we need the port to set on the servers).
+ string bind_ip = GetBindIpForDaemon(MiniCluster::MASTER, /*index=*/ 0, opts_.bind_mode);
+ shared_ptr<MiniMaster> mini_master(new MiniMaster(GetMasterFsRoot(0),
+ HostPort(std::move(bind_ip), master_rpc_port), opts_.num_data_dirs));
+ RETURN_NOT_OK_PREPEND(mini_master->Start(), "Couldn't start master");
+ RETURN_NOT_OK(mini_master->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests(
+ MonoDelta::FromSeconds(kMasterStartupWaitTimeSeconds)));
+ mini_masters_.push_back(std::move(mini_master));
+ return Status::OK();
+}
+
+Status InternalMiniCluster::AddTabletServer() {
+ if (mini_masters_.empty()) {
+ return Status::IllegalState("Master not yet initialized");
+ }
+ int new_idx = mini_tablet_servers_.size();
+
+ uint16_t ts_rpc_port = 0;
+ if (opts_.tserver_rpc_ports.size() > new_idx) {
+ ts_rpc_port = opts_.tserver_rpc_ports[new_idx];
+ }
+
+ string bind_ip = GetBindIpForDaemon(MiniCluster::TSERVER, new_idx, opts_.bind_mode);
+ gscoped_ptr<MiniTabletServer> tablet_server(new MiniTabletServer(GetTabletServerFsRoot(new_idx),
+ HostPort(bind_ip, ts_rpc_port), opts_.num_data_dirs));
+
+ // set the master addresses
+ tablet_server->options()->master_addresses.clear();
+ for (const shared_ptr<MiniMaster>& master : mini_masters_) {
+ tablet_server->options()->master_addresses.emplace_back(master->bound_rpc_addr());
+ }
+ RETURN_NOT_OK(tablet_server->Start())
+ mini_tablet_servers_.push_back(shared_ptr<MiniTabletServer>(tablet_server.release()));
+ return Status::OK();
+}
+
+void InternalMiniCluster::ShutdownNodes(ClusterNodes nodes) {
+ if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::TS_ONLY) {
+ for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
+ tablet_server->Shutdown();
+ }
+ mini_tablet_servers_.clear();
+ }
+ if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::MASTERS_ONLY) {
+ for (const shared_ptr<MiniMaster>& master_server : mini_masters_) {
+ master_server->Shutdown();
+ }
+ mini_masters_.clear();
+ }
+ running_ = false;
+}
+
+MiniMaster* InternalMiniCluster::mini_master(int idx) const {
+ CHECK_GE(idx, 0) << "Master idx must be >= 0";
+ CHECK_LT(idx, mini_masters_.size()) << "Master idx must be < num masters started";
+ return mini_masters_[idx].get();
+}
+
+MiniTabletServer* InternalMiniCluster::mini_tablet_server(int idx) const {
+ CHECK_GE(idx, 0) << "TabletServer idx must be >= 0";
+ CHECK_LT(idx, mini_tablet_servers_.size()) << "TabletServer idx must be < 'num_ts_started_'";
+ return mini_tablet_servers_[idx].get();
+}
+
+vector<HostPort> InternalMiniCluster::master_rpc_addrs() const {
+ vector<HostPort> master_rpc_addrs;
+ for (int i = 0; i < opts_.master_rpc_ports.size(); i++) {
+ master_rpc_addrs.emplace_back(
+ GetBindIpForDaemon(MiniCluster::MASTER, i, opts_.bind_mode),
+ opts_.master_rpc_ports[i]);
+ }
+ return master_rpc_addrs;
+}
+
+string InternalMiniCluster::GetMasterFsRoot(int idx) const {
+ return JoinPathSegments(opts_.data_root, Substitute("master-$0-root", idx));
+}
+
+string InternalMiniCluster::GetTabletServerFsRoot(int idx) const {
+ return JoinPathSegments(opts_.data_root, Substitute("ts-$0-root", idx));
+}
+
+Status InternalMiniCluster::WaitForTabletServerCount(int count) const {
+ vector<shared_ptr<master::TSDescriptor>> descs;
+ return WaitForTabletServerCount(count, MatchMode::MATCH_TSERVERS, &descs);
+}
+
+Status InternalMiniCluster::WaitForTabletServerCount(int count,
+ MatchMode mode,
+ vector<shared_ptr<TSDescriptor>>* descs) const {
+ std::unordered_set<int> masters_to_search;
+ for (int i = 0; i < num_masters(); i++) {
+ if (!mini_master(i)->master()->IsShutdown()) {
+ masters_to_search.insert(i);
+ }
+ }
+
+ Stopwatch sw;
+ sw.start();
+ while (sw.elapsed().wall_seconds() < kRegistrationWaitTimeSeconds) {
+ for (auto iter = masters_to_search.begin(); iter != masters_to_search.end();) {
+ mini_master(*iter)->master()->ts_manager()->GetAllDescriptors(descs);
+ int match_count = 0;
+ switch (mode) {
+ case MatchMode::MATCH_TSERVERS:
+ // GetAllDescriptors() may return servers that are no longer online.
+ // Do a second step of verification to verify that the descs that we got
+ // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
+ for (const shared_ptr<TSDescriptor>& desc : *descs) {
+ for (const auto& mini_tablet_server : mini_tablet_servers_) {
+ const TabletServer* ts = mini_tablet_server->server();
+ if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
+ ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
+ match_count++;
+ break;
+ }
+ }
+ }
+ break;
+ case MatchMode::DO_NOT_MATCH_TSERVERS:
+ match_count = descs->size();
+ break;
+ default:
+ LOG(FATAL) << "Invalid match mode";
+ }
+
+ if (match_count == count) {
+ // This master has returned the correct set of tservers.
+ iter = masters_to_search.erase(iter);
+ } else {
+ iter++;
+ }
+ }
+ if (masters_to_search.empty()) {
+ // All masters have returned the correct set of tservers.
+ LOG(INFO) << Substitute("$0 TS(s) registered with all masters after $1s",
+ count, sw.elapsed().wall_seconds());
+ return Status::OK();
+ }
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ }
+ return Status::TimedOut(Substitute(
+ "Timed out waiting for $0 TS(s) to register with all masters", count));
+}
+
+Status InternalMiniCluster::CreateClient(KuduClientBuilder* builder,
+ client::sp::shared_ptr<KuduClient>* client) const {
+ client::KuduClientBuilder defaults;
+ if (builder == nullptr) {
+ builder = &defaults;
+ }
+
+ builder->clear_master_server_addrs();
+ for (const shared_ptr<MiniMaster>& master : mini_masters_) {
+ CHECK(master);
+ builder->add_master_server_addr(master->bound_rpc_addr_str());
+ }
+ return builder->Build(client);
+}
+
+Status InternalMiniCluster::GetLeaderMasterIndex(int* idx) const {
+ const MonoTime deadline = MonoTime::Now() +
+ MonoDelta::FromSeconds(kMasterStartupWaitTimeSeconds);
+
+ int leader_idx = -1;
+ while (MonoTime::Now() < deadline) {
+ for (int i = 0; i < num_masters(); i++) {
+ master::MiniMaster* mm = mini_master(i);
+ if (!mm->is_started() || mm->master()->IsShutdown()) {
+ continue;
+ }
+ master::CatalogManager* catalog = mm->master()->catalog_manager();
+ master::CatalogManager::ScopedLeaderSharedLock l(catalog);
+ if (l.first_failed_status().ok()) {
+ leader_idx = i;
+ break;
+ }
+ }
+ if (leader_idx != -1) {
+ break;
+ }
+ SleepFor(MonoDelta::FromMilliseconds(100));
+ }
+ if (leader_idx == -1) {
+ return Status::NotFound("Leader master was not found within deadline");
+ }
+
+ if (idx) {
+ *idx = leader_idx;
+ }
+ return Status::OK();
+}
+
+std::shared_ptr<rpc::Messenger> InternalMiniCluster::messenger() const {
+ return messenger_;
+}
+
+std::shared_ptr<MasterServiceProxy> InternalMiniCluster::master_proxy() const {
+ CHECK_EQ(1, mini_masters_.size());
+ return master_proxy(0);
+}
+
+std::shared_ptr<MasterServiceProxy> InternalMiniCluster::master_proxy(int idx) const {
+ const auto& addr = CHECK_NOTNULL(mini_master(idx))->bound_rpc_addr();
+ return std::make_shared<MasterServiceProxy>(messenger_, addr, addr.host());
+}
+
+} // namespace cluster
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/mini-cluster/internal_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/internal_mini_cluster.h b/src/kudu/mini-cluster/internal_mini_cluster.h
new file mode 100644
index 0000000..b44717c
--- /dev/null
+++ b/src/kudu/mini-cluster/internal_mini_cluster.h
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/mini-cluster/mini_cluster.h"
+
+namespace kudu {
+
+class Env;
+class HostPort;
+class Status;
+
+namespace client {
+class KuduClient;
+class KuduClientBuilder;
+}
+
+namespace master {
+class MasterServiceProxy;
+class MiniMaster;
+class TSDescriptor;
+}
+
+namespace rpc {
+class Messenger;
+}
+
+namespace tserver {
+class MiniTabletServer;
+}
+
+namespace cluster {
+
+struct InternalMiniClusterOptions {
+ InternalMiniClusterOptions();
+
+ // Number of master servers.
+ // Default: 1
+ int num_masters;
+
+ // Number of TS to start.
+ // Default: 1
+ int num_tablet_servers;
+
+ // Number of data dirs for each daemon.
+ // Default: 1 (this will place the wals in the same dir)
+ int num_data_dirs;
+
+ // Directory in which to store data.
+ // Default: "", which auto-generates a unique path for this cluster.
+ // The default may only be used from a gtest unit test.
+ std::string data_root;
+
+ MiniCluster::BindMode bind_mode;
+
+ // List of RPC ports for the master to run on.
+ // Defaults to an empty list.
+ // In single-master mode, an empty list implies port 0 (transient port).
+ // In multi-master mode, an empty list is illegal and will result in a CHECK failure.
+ std::vector<uint16_t> master_rpc_ports;
+
+ // List of RPC ports for the tservers to run on.
+ // Defaults to an empty list.
+ // When adding a tablet server to the cluster via AddTabletServer(), if the
+ // index of that tablet server in the cluster is greater than the number of
+ // elements in this list, a transient port (port 0) will be used.
+ std::vector<uint16_t> tserver_rpc_ports;
+};
+
+// An in-process cluster with a MiniMaster and a configurable
+// number of MiniTabletServers for use in tests.
+class InternalMiniCluster : public MiniCluster {
+ public:
+ InternalMiniCluster(Env* env, InternalMiniClusterOptions options);
+ virtual ~InternalMiniCluster();
+
+ // Start a cluster with a Master and 'num_tablet_servers' TabletServers.
+ // All servers run on the loopback interface with ephemeral ports.
+ Status Start() override;
+
+ // Like the previous method but performs initialization synchronously, i.e.
+ // this will wait for all TS's to be started and initialized. Tests should
+ // use this if they interact with tablets immediately after Start();
+ Status StartSync();
+
+ void ShutdownNodes(ClusterNodes nodes) override;
+
+ // Setup a consensus configuration of distributed masters, with count specified in
+ // 'options'. Requires that a reserve RPC port is specified in
+ // 'options' for each master.
+ Status StartDistributedMasters();
+
+ // Add a new standalone master to the cluster. The new master is started.
+ Status StartSingleMaster();
+
+ // Add a new TS to the cluster. The new TS is started.
+ // Requires that the master is already running.
+ Status AddTabletServer();
+
+ // If this cluster is configured for a single non-distributed
+ // master, return the single master. Exits with a CHECK failure if
+ // there are multiple masters.
+ master::MiniMaster* mini_master() const {
+ CHECK_EQ(mini_masters_.size(), 1);
+ return mini_master(0);
+ }
+
+ // Returns the Master at index 'idx' for this InternalMiniCluster.
+ master::MiniMaster* mini_master(int idx) const;
+
+ // Return number of mini masters.
+ int num_masters() const override {
+ return mini_masters_.size();
+ }
+
+ // Returns the TabletServer at index 'idx' of this InternalMiniCluster.
+ // 'idx' must be between 0 and 'num_tablet_servers' -1.
+ tserver::MiniTabletServer* mini_tablet_server(int idx) const;
+
+ int num_tablet_servers() const override {
+ return mini_tablet_servers_.size();
+ }
+
+ BindMode bind_mode() const override {
+ return opts_.bind_mode;
+ }
+
+ std::vector<uint16_t> master_rpc_ports() const override {
+ return opts_.master_rpc_ports;
+ }
+
+ std::vector<HostPort> master_rpc_addrs() const override;
+
+ std::string GetMasterFsRoot(int idx) const;
+
+ std::string GetTabletServerFsRoot(int idx) const;
+
+ // Wait until the number of registered tablet servers reaches the given
+ // count on all masters. Returns Status::TimedOut if the desired count is not
+ // achieved within kRegistrationWaitTimeSeconds.
+ enum class MatchMode {
+ // Ensure that the tservers retrieved from each master match up against the
+ // tservers defined in this cluster. The matching is done via
+ // NodeInstancePBs comparisons. If even one match fails, the retrieved
+ // response is considered to be malformed and is retried.
+ //
+ // Note: tservers participate in matching even if they are shut down.
+ MATCH_TSERVERS,
+
+ // Do not perform any matching on the retrieved tservers.
+ DO_NOT_MATCH_TSERVERS,
+ };
+ Status WaitForTabletServerCount(int count) const;
+ Status WaitForTabletServerCount(int count, MatchMode mode,
+ std::vector<std::shared_ptr<master::TSDescriptor>>* descs) const;
+
+ Status CreateClient(client::KuduClientBuilder* builder,
+ client::sp::shared_ptr<client::KuduClient>* client) const override;
+
+ // Determine the leader master of the cluster. Upon successful completion,
+ // sets 'idx' to the leader master's index. The result index index can be used
+ // as an argument for calls to mini_master().
+ //
+ // It's possible to use 'nullptr' instead of providing a valid placeholder
+ // for the result master index. That's for use cases when it's enough
+ // to determine if the cluster has established leader master
+ // without intent to get the actual index.
+ //
+ // Note: if a leader election occurs after this method is executed, the
+ // last result may not be valid.
+ Status GetLeaderMasterIndex(int* idx) const;
+
+ std::shared_ptr<rpc::Messenger> messenger() const override;
+ std::shared_ptr<master::MasterServiceProxy> master_proxy() const override;
+ std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const override;
+
+ private:
+ enum {
+ kRegistrationWaitTimeSeconds = 15,
+ kMasterStartupWaitTimeSeconds = 30,
+ };
+
+ Env* const env_;
+
+ InternalMiniClusterOptions opts_;
+
+ bool running_;
+
+ std::vector<std::shared_ptr<master::MiniMaster> > mini_masters_;
+ std::vector<std::shared_ptr<tserver::MiniTabletServer> > mini_tablet_servers_;
+
+ std::shared_ptr<rpc::Messenger> messenger_;
+
+ DISALLOW_COPY_AND_ASSIGN(InternalMiniCluster);
+};
+
+} // namespace cluster
+} // namespace kudu