You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/10/02 19:25:22 UTC
[5/6] kudu git commit: mini-cluster: new module for the mini cluster
implementations
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/integration-tests/external_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
deleted file mode 100644
index da7e81f..0000000
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ /dev/null
@@ -1,1280 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/integration-tests/external_mini_cluster.h"
-
-#include <algorithm>
-#include <csignal>
-#include <cstdint>
-#include <cstdlib>
-#include <functional>
-#include <memory>
-#include <string>
-#include <unordered_set>
-#include <utility>
-
-#include <gflags/gflags.h>
-#include <gtest/gtest.h>
-#include <rapidjson/document.h>
-
-#include "kudu/client/client.h"
-#include "kudu/client/master_rpc.h"
-#include "kudu/common/wire_protocol.h"
-#include "kudu/common/wire_protocol.pb.h"
-#include "kudu/gutil/basictypes.h"
-#include "kudu/gutil/strings/join.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/strings/util.h"
-#include "kudu/master/master.pb.h"
-#include "kudu/master/master.proxy.h"
-#include "kudu/rpc/messenger.h"
-#include "kudu/rpc/rpc_controller.h"
-#include "kudu/security/test/mini_kdc.h"
-#include "kudu/server/server_base.pb.h"
-#include "kudu/server/server_base.proxy.h"
-#include "kudu/tablet/metadata.pb.h"
-#include "kudu/tablet/tablet.pb.h"
-#include "kudu/tserver/tserver.pb.h"
-#include "kudu/tserver/tserver_service.proxy.h"
-#include "kudu/util/async_util.h"
-#include "kudu/util/curl_util.h"
-#include "kudu/util/env.h"
-#include "kudu/util/env_util.h"
-#include "kudu/util/faststring.h"
-#include "kudu/util/fault_injection.h"
-#include "kudu/util/jsonreader.h"
-#include "kudu/util/metrics.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/net/sockaddr.h"
-#include "kudu/util/path_util.h"
-#include "kudu/util/pb_util.h"
-#include "kudu/util/status.h"
-#include "kudu/util/stopwatch.h"
-#include "kudu/util/subprocess.h"
-#include "kudu/util/test_util.h"
-
-using kudu::client::internal::ConnectToClusterRpc;
-using kudu::master::ListTablesRequestPB;
-using kudu::master::ListTablesResponsePB;
-using kudu::master::MasterServiceProxy;
-using kudu::pb_util::SecureDebugString;
-using kudu::pb_util::SecureShortDebugString;
-using kudu::rpc::RpcController;
-using kudu::server::ServerStatusPB;
-using kudu::tserver::ListTabletsRequestPB;
-using kudu::tserver::ListTabletsResponsePB;
-using kudu::tserver::TabletServerServiceProxy;
-using rapidjson::Value;
-using std::pair;
-using std::string;
-using std::unique_ptr;
-using std::unordered_set;
-using std::vector;
-using strings::Substitute;
-
-typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
-
-DEFINE_bool(perf_record, false,
- "Whether to run \"perf record --call-graph fp\" on each daemon in the cluster");
-
-namespace kudu {
-
-static const char* const kMasterBinaryName = "kudu-master";
-static const char* const kTabletServerBinaryName = "kudu-tserver";
-static double kTabletServerRegistrationTimeoutSeconds = 15.0;
-static double kMasterCatalogManagerTimeoutSeconds = 60.0;
-
-ExternalMiniClusterOptions::ExternalMiniClusterOptions()
- : num_masters(1),
- num_tablet_servers(1),
- bind_mode(MiniCluster::kDefaultBindMode),
- num_data_dirs(1),
- enable_kerberos(false),
- logtostderr(true),
- start_process_timeout(MonoDelta::FromSeconds(30)) {
-}
-
-ExternalMiniCluster::ExternalMiniCluster()
- : opts_(ExternalMiniClusterOptions()) {
-}
-
-ExternalMiniCluster::ExternalMiniCluster(ExternalMiniClusterOptions opts)
- : opts_(std::move(opts)) {
-}
-
-ExternalMiniCluster::~ExternalMiniCluster() {
- Shutdown();
-}
-
-Status ExternalMiniCluster::DeduceBinRoot(std::string* ret) {
- string exe;
- RETURN_NOT_OK(Env::Default()->GetExecutablePath(&exe));
- *ret = DirName(exe);
- return Status::OK();
-}
-
-Status ExternalMiniCluster::HandleOptions() {
- daemon_bin_path_ = opts_.daemon_bin_path;
- if (daemon_bin_path_.empty()) {
- RETURN_NOT_OK(DeduceBinRoot(&daemon_bin_path_));
- }
-
- data_root_ = opts_.data_root;
- if (data_root_.empty()) {
- // If they don't specify a data root, use the current gtest directory.
- data_root_ = JoinPathSegments(GetTestDataDirectory(), "minicluster-data");
- }
-
- return Status::OK();
-}
-
-Status ExternalMiniCluster::Start() {
- CHECK(masters_.empty()) << "Masters are not empty (size: " << masters_.size()
- << "). Maybe you meant Restart()?";
- CHECK(tablet_servers_.empty()) << "Tablet servers are not empty (size: "
- << tablet_servers_.size() << "). Maybe you meant Restart()?";
- RETURN_NOT_OK(HandleOptions());
-
- RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
- .set_num_reactors(1)
- .set_max_negotiation_threads(1)
- .Build(&messenger_),
- "Failed to start Messenger for minicluster");
-
- Status s = Env::Default()->CreateDir(data_root_);
- if (!s.ok() && !s.IsAlreadyPresent()) {
- RETURN_NOT_OK_PREPEND(s, "Could not create root dir " + data_root_);
- }
-
- if (opts_.enable_kerberos) {
- kdc_.reset(new MiniKdc(opts_.mini_kdc_options));
- RETURN_NOT_OK(kdc_->Start());
- RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-admin"),
- "could not create admin principal");
- RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-user"),
- "could not create user principal");
- RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("joe-interloper"),
- "could not create unauthorized principal");
-
- RETURN_NOT_OK_PREPEND(kdc_->Kinit("test-admin"),
- "could not kinit as admin");
- RETURN_NOT_OK_PREPEND(kdc_->SetKrb5Environment(),
- "could not set krb5 client env");
- }
-
- if (opts_.num_masters != 1) {
- RETURN_NOT_OK_PREPEND(StartDistributedMasters(),
- "Failed to add distributed masters");
- } else {
- RETURN_NOT_OK_PREPEND(StartSingleMaster(),
- Substitute("Failed to start a single Master"));
- }
-
- for (int i = 1; i <= opts_.num_tablet_servers; i++) {
- RETURN_NOT_OK_PREPEND(AddTabletServer(),
- Substitute("Failed starting tablet server $0", i));
- }
- RETURN_NOT_OK(WaitForTabletServerCount(
- opts_.num_tablet_servers,
- MonoDelta::FromSeconds(kTabletServerRegistrationTimeoutSeconds)));
-
- return Status::OK();
-}
-
-
-void ExternalMiniCluster::ShutdownNodes(ClusterNodes nodes) {
- if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::TS_ONLY) {
- for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
- ts->Shutdown();
- }
- }
- if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::MASTERS_ONLY) {
- for (const scoped_refptr<ExternalMaster>& master : masters_) {
- if (master) {
- master->Shutdown();
- }
- }
- }
-}
-
-Status ExternalMiniCluster::Restart() {
- for (const scoped_refptr<ExternalMaster>& master : masters_) {
- if (master && master->IsShutdown()) {
- RETURN_NOT_OK_PREPEND(master->Restart(), "Cannot restart master bound at: " +
- master->bound_rpc_hostport().ToString());
- }
- }
-
- for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
- if (ts->IsShutdown()) {
- RETURN_NOT_OK_PREPEND(ts->Restart(), "Cannot restart tablet server bound at: " +
- ts->bound_rpc_hostport().ToString());
- }
- }
-
- RETURN_NOT_OK(WaitForTabletServerCount(
- tablet_servers_.size(),
- MonoDelta::FromSeconds(kTabletServerRegistrationTimeoutSeconds)));
-
- return Status::OK();
-}
-
-void ExternalMiniCluster::SetDaemonBinPath(string daemon_bin_path) {
- daemon_bin_path_ = std::move(daemon_bin_path);
- for (auto& master : masters_) {
- master->SetExePath(GetBinaryPath(kMasterBinaryName));
- }
- for (auto& ts : tablet_servers_) {
- ts->SetExePath(GetBinaryPath(kTabletServerBinaryName));
- }
-}
-
-string ExternalMiniCluster::GetBinaryPath(const string& binary) const {
- CHECK(!daemon_bin_path_.empty());
- return JoinPathSegments(daemon_bin_path_, binary);
-}
-
-string ExternalMiniCluster::GetLogPath(const string& daemon_id) const {
- CHECK(!data_root_.empty());
- return JoinPathSegments(JoinPathSegments(data_root_, daemon_id), "logs");
-}
-
-string ExternalMiniCluster::GetDataPath(const string& daemon_id,
- boost::optional<uint32_t> dir_index) const {
- CHECK(!data_root_.empty());
- string data_path = "data";
- if (dir_index) {
- CHECK_LT(*dir_index, opts_.num_data_dirs);
- data_path = Substitute("$0-$1", data_path, dir_index.get());
- } else {
- CHECK_EQ(1, opts_.num_data_dirs);
- }
- return JoinPathSegments(JoinPathSegments(data_root_, daemon_id), data_path);
-}
-
-vector<string> ExternalMiniCluster::GetDataPaths(const string& daemon_id) const {
- if (opts_.num_data_dirs == 1) {
- return { GetDataPath(daemon_id) };
- }
- vector<string> paths;
- for (uint32_t dir_index = 0; dir_index < opts_.num_data_dirs; dir_index++) {
- paths.emplace_back(GetDataPath(daemon_id, dir_index));
- }
- return paths;
-}
-
-string ExternalMiniCluster::GetWalPath(const string& daemon_id) const {
- CHECK(!data_root_.empty());
- return JoinPathSegments(JoinPathSegments(data_root_, daemon_id), "wal");
-}
-
-namespace {
-vector<string> SubstituteInFlags(const vector<string>& orig_flags,
- int index) {
- string str_index = strings::Substitute("$0", index);
- vector<string> ret;
- for (const string& orig : orig_flags) {
- ret.push_back(StringReplace(orig, "${index}", str_index, true));
- }
- return ret;
-}
-
-} // anonymous namespace
-
-Status ExternalMiniCluster::StartSingleMaster() {
- string daemon_id = "master-0";
-
- ExternalDaemonOptions opts(opts_.logtostderr);
- opts.messenger = messenger_;
- opts.exe = GetBinaryPath(kMasterBinaryName);
- opts.wal_dir = GetWalPath(daemon_id);
- opts.data_dirs = GetDataPaths(daemon_id);
- opts.log_dir = GetLogPath(daemon_id);
- if (FLAGS_perf_record) {
- opts.perf_record_filename =
- Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
- }
- opts.extra_flags = SubstituteInFlags(opts_.extra_master_flags, 0);
- opts.start_process_timeout = opts_.start_process_timeout;
-
- opts.rpc_bind_address = HostPort(GetBindIpForMaster(0), 0);
- scoped_refptr<ExternalMaster> master = new ExternalMaster(opts);
- if (opts_.enable_kerberos) {
- // The bind host here is the hostname that will be used to generate the
- // Kerberos principal, so it has to match the bind address for the master
- // rpc endpoint.
- RETURN_NOT_OK_PREPEND(master->EnableKerberos(kdc_.get(), opts.rpc_bind_address.host()),
- "could not enable Kerberos");
- }
-
- RETURN_NOT_OK(master->Start());
- masters_.push_back(master);
- return Status::OK();
-}
-
-Status ExternalMiniCluster::StartDistributedMasters() {
- int num_masters = opts_.num_masters;
-
- if (opts_.master_rpc_ports.size() != num_masters) {
- LOG(FATAL) << num_masters << " masters requested, but only " <<
- opts_.master_rpc_ports.size() << " ports specified in 'master_rpc_ports'";
- }
-
- vector<HostPort> peer_hostports = master_rpc_addrs();
- vector<string> flags = opts_.extra_master_flags;
- flags.push_back(Substitute("--master_addresses=$0",
- HostPort::ToCommaSeparatedString(peer_hostports)));
- string exe = GetBinaryPath(kMasterBinaryName);
-
- // Start the masters.
- for (int i = 0; i < num_masters; i++) {
- string daemon_id = Substitute("master-$0", i);
-
- ExternalDaemonOptions opts(opts_.logtostderr);
- opts.messenger = messenger_;
- opts.exe = exe;
- opts.wal_dir = GetWalPath(daemon_id);
- opts.data_dirs = GetDataPaths(daemon_id);
- opts.log_dir = GetLogPath(daemon_id);
- if (FLAGS_perf_record) {
- opts.perf_record_filename =
- Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
- }
- opts.extra_flags = SubstituteInFlags(flags, i);
- opts.start_process_timeout = opts_.start_process_timeout;
- opts.rpc_bind_address = peer_hostports[i];
-
- scoped_refptr<ExternalMaster> peer = new ExternalMaster(opts);
- if (opts_.enable_kerberos) {
- RETURN_NOT_OK_PREPEND(peer->EnableKerberos(kdc_.get(), peer_hostports[i].host()),
- "could not enable Kerberos");
- }
- RETURN_NOT_OK_PREPEND(peer->Start(),
- Substitute("Unable to start Master at index $0", i));
- masters_.push_back(peer);
- }
-
- return Status::OK();
-}
-
-string ExternalMiniCluster::GetBindIpForTabletServer(int index) const {
- return MiniCluster::GetBindIpForDaemon(MiniCluster::TSERVER, index, opts_.bind_mode);
-}
-
-string ExternalMiniCluster::GetBindIpForMaster(int index) const {
- return MiniCluster::GetBindIpForDaemon(MiniCluster::MASTER, index, opts_.bind_mode);
-}
-
-Status ExternalMiniCluster::AddTabletServer() {
- CHECK(leader_master() != nullptr)
- << "Must have started at least 1 master before adding tablet servers";
-
- int idx = tablet_servers_.size();
- string daemon_id = Substitute("ts-$0", idx);
-
- vector<HostPort> master_hostports;
- for (int i = 0; i < num_masters(); i++) {
- master_hostports.push_back(DCHECK_NOTNULL(master(i))->bound_rpc_hostport());
- }
- string bind_host = GetBindIpForTabletServer(idx);
-
- ExternalDaemonOptions opts(opts_.logtostderr);
- opts.messenger = messenger_;
- opts.exe = GetBinaryPath(kTabletServerBinaryName);
- opts.wal_dir = GetWalPath(daemon_id);
- opts.data_dirs = GetDataPaths(daemon_id);
- opts.log_dir = GetLogPath(daemon_id);
- if (FLAGS_perf_record) {
- opts.perf_record_filename =
- Substitute("$0/perf-$1.data", opts.log_dir, daemon_id);
- }
- opts.extra_flags = SubstituteInFlags(opts_.extra_tserver_flags, idx);
- opts.start_process_timeout = opts_.start_process_timeout;
- opts.rpc_bind_address = HostPort(bind_host, 0);
-
- scoped_refptr<ExternalTabletServer> ts =
- new ExternalTabletServer(opts, master_hostports);
- if (opts_.enable_kerberos) {
- RETURN_NOT_OK_PREPEND(ts->EnableKerberos(kdc_.get(), bind_host),
- "could not enable Kerberos");
- }
-
- RETURN_NOT_OK(ts->Start());
- tablet_servers_.push_back(ts);
- return Status::OK();
-}
-
-Status ExternalMiniCluster::WaitForTabletServerCount(int count, const MonoDelta& timeout) {
- MonoTime deadline = MonoTime::Now() + timeout;
-
- unordered_set<int> masters_to_search;
- for (int i = 0; i < masters_.size(); i++) {
- if (!masters_[i]->IsShutdown()) {
- masters_to_search.insert(i);
- }
- }
-
- while (true) {
- MonoDelta remaining = deadline - MonoTime::Now();
- if (remaining.ToSeconds() < 0) {
- return Status::TimedOut(Substitute(
- "Timed out waiting for $0 TS(s) to register with all masters", count));
- }
-
- for (auto iter = masters_to_search.begin(); iter != masters_to_search.end();) {
- master::ListTabletServersRequestPB req;
- master::ListTabletServersResponsePB resp;
- rpc::RpcController rpc;
- rpc.set_timeout(remaining);
- RETURN_NOT_OK_PREPEND(master_proxy(*iter)->ListTabletServers(req, &resp, &rpc),
- "ListTabletServers RPC failed");
- // ListTabletServers() may return servers that are no longer online.
- // Do a second step of verification to verify that the descs that we got
- // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
- int match_count = 0;
- for (const master::ListTabletServersResponsePB_Entry& e : resp.servers()) {
- for (const scoped_refptr<ExternalTabletServer>& ets : tablet_servers_) {
- if (ets->instance_id().permanent_uuid() == e.instance_id().permanent_uuid() &&
- ets->instance_id().instance_seqno() == e.instance_id().instance_seqno()) {
- match_count++;
- break;
- }
- }
- }
- if (match_count == count) {
- // This master has returned the correct set of tservers.
- iter = masters_to_search.erase(iter);
- } else {
- iter++;
- }
- }
-
- if (masters_to_search.empty()) {
- // All masters have returned the correct set of tservers.
- LOG(INFO) << count << " TS(s) registered with all masters";
- return Status::OK();
- }
- SleepFor(MonoDelta::FromMilliseconds(1));
- }
-}
-
-void ExternalMiniCluster::AssertNoCrashes() {
- vector<ExternalDaemon*> daemons = this->daemons();
- int num_crashes = 0;
- for (ExternalDaemon* d : daemons) {
- if (d->IsShutdown()) continue;
- if (!d->IsProcessAlive()) {
- LOG(ERROR) << "Process with UUID " << d->uuid() << " has crashed";
- num_crashes++;
- }
- }
- ASSERT_EQ(0, num_crashes) << "At least one process crashed";
-}
-
-Status ExternalMiniCluster::WaitForTabletsRunning(ExternalTabletServer* ts,
- int min_tablet_count,
- const MonoDelta& timeout) {
- TabletServerServiceProxy proxy(messenger_, ts->bound_rpc_addr(), ts->bound_rpc_addr().host());
- ListTabletsRequestPB req;
- ListTabletsResponsePB resp;
-
- MonoTime deadline = MonoTime::Now() + timeout;
- while (MonoTime::Now() < deadline) {
- rpc::RpcController rpc;
- rpc.set_timeout(MonoDelta::FromSeconds(10));
- RETURN_NOT_OK(proxy.ListTablets(req, &resp, &rpc));
- if (resp.has_error()) {
- return StatusFromPB(resp.error().status());
- }
-
- bool all_running = true;
- for (const StatusAndSchemaPB& status : resp.status_and_schema()) {
- if (status.tablet_status().state() != tablet::RUNNING) {
- all_running = false;
- }
- }
-
- // We're done if:
- // 1. All the tablets are running, and
- // 2. We've observed as many tablets as we had expected or more.
- if (all_running && resp.status_and_schema_size() >= min_tablet_count) {
- return Status::OK();
- }
-
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
-
- return Status::TimedOut(SecureDebugString(resp));
-}
-
-Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) {
- scoped_refptr<ConnectToClusterRpc> rpc;
- Synchronizer sync;
- vector<pair<Sockaddr, string>> addrs_with_names;
- Sockaddr leader_master_addr;
- MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5);
-
- for (const scoped_refptr<ExternalMaster>& master : masters_) {
- addrs_with_names.emplace_back(master->bound_rpc_addr(), master->bound_rpc_addr().host());
- }
- const auto& cb = [&](const Status& status,
- const pair<Sockaddr, string>& leader_master,
- const master::ConnectToMasterResponsePB& resp) {
- if (status.ok()) {
- leader_master_addr = leader_master.first;
- }
- sync.StatusCB(status);
- };
- rpc.reset(new ConnectToClusterRpc(cb,
- std::move(addrs_with_names),
- deadline,
- MonoDelta::FromSeconds(5),
- messenger_));
- rpc->SendRpc();
- RETURN_NOT_OK(sync.Wait());
- bool found = false;
- for (int i = 0; i < masters_.size(); i++) {
- if (masters_[i]->bound_rpc_hostport().port() == leader_master_addr.port()) {
- found = true;
- *idx = i;
- break;
- }
- }
- if (!found) {
- // There is never a situation where this should happen, so it's
- // better to exit with a FATAL log message right away vs. return a
- // Status::IllegalState().
- LOG(FATAL) << "Leader master is not in masters_";
- }
- return Status::OK();
-}
-
-ExternalTabletServer* ExternalMiniCluster::tablet_server_by_uuid(const std::string& uuid) const {
- for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
- if (ts->instance_id().permanent_uuid() == uuid) {
- return ts.get();
- }
- }
- return nullptr;
-}
-
-int ExternalMiniCluster::tablet_server_index_by_uuid(const std::string& uuid) const {
- for (int i = 0; i < tablet_servers_.size(); i++) {
- if (tablet_servers_[i]->uuid() == uuid) {
- return i;
- }
- }
- return -1;
-}
-
-vector<ExternalDaemon*> ExternalMiniCluster::daemons() const {
- vector<ExternalDaemon*> results;
- for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) {
- results.push_back(ts.get());
- }
- for (const scoped_refptr<ExternalMaster>& master : masters_) {
- results.push_back(master.get());
- }
- return results;
-}
-
-vector<HostPort> ExternalMiniCluster::master_rpc_addrs() const {
- vector<HostPort> master_rpc_addrs;
- for (int i = 0; i < opts_.master_rpc_ports.size(); i++) {
- master_rpc_addrs.emplace_back(
- GetBindIpForDaemon(MiniCluster::MASTER, i, opts_.bind_mode),
- opts_.master_rpc_ports[i]);
- }
- return master_rpc_addrs;
-}
-
-std::shared_ptr<rpc::Messenger> ExternalMiniCluster::messenger() const {
- return messenger_;
-}
-
-std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy() const {
- CHECK_EQ(masters_.size(), 1);
- return master_proxy(0);
-}
-
-std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy(int idx) const {
- CHECK_LT(idx, masters_.size());
- const auto& addr = CHECK_NOTNULL(master(idx))->bound_rpc_addr();
- return std::make_shared<MasterServiceProxy>(messenger_, addr, addr.host());
-}
-
-Status ExternalMiniCluster::CreateClient(client::KuduClientBuilder* builder,
- client::sp::shared_ptr<client::KuduClient>* client) const {
- client::KuduClientBuilder defaults;
- if (builder == nullptr) {
- builder = &defaults;
- }
-
- CHECK(!masters_.empty());
- builder->clear_master_server_addrs();
- for (const scoped_refptr<ExternalMaster>& master : masters_) {
- builder->add_master_server_addr(master->bound_rpc_hostport().ToString());
- }
- return builder->Build(client);
-}
-
-Status ExternalMiniCluster::SetFlag(ExternalDaemon* daemon,
- const string& flag,
- const string& value) {
- const auto& addr = daemon->bound_rpc_addr();
- server::GenericServiceProxy proxy(messenger_, addr, addr.host());
-
- rpc::RpcController controller;
- controller.set_timeout(MonoDelta::FromSeconds(30));
- server::SetFlagRequestPB req;
- server::SetFlagResponsePB resp;
- req.set_flag(flag);
- req.set_value(value);
- req.set_force(true);
- RETURN_NOT_OK_PREPEND(proxy.SetFlag(req, &resp, &controller),
- "rpc failed");
- if (resp.result() != server::SetFlagResponsePB::SUCCESS) {
- return Status::RemoteError("failed to set flag",
- SecureShortDebugString(resp));
- }
- return Status::OK();
-}
-
-//------------------------------------------------------------
-// ExternalDaemon
-//------------------------------------------------------------
-
-ExternalDaemon::ExternalDaemon(ExternalDaemonOptions opts)
- : messenger_(std::move(opts.messenger)),
- wal_dir_(std::move(opts.wal_dir)),
- data_dirs_(std::move(opts.data_dirs)),
- log_dir_(std::move(opts.log_dir)),
- perf_record_filename_(std::move(opts.perf_record_filename)),
- start_process_timeout_(opts.start_process_timeout),
- logtostderr_(opts.logtostderr),
- rpc_bind_address_(std::move(opts.rpc_bind_address)),
- exe_(std::move(opts.exe)),
- extra_flags_(std::move(opts.extra_flags)) {
- CHECK(rpc_bind_address_.Initialized());
-}
-
-ExternalDaemon::~ExternalDaemon() {
-}
-
-Status ExternalDaemon::EnableKerberos(MiniKdc* kdc, const string& bind_host) {
- string spn = "kudu/" + bind_host;
- string ktpath;
- RETURN_NOT_OK_PREPEND(kdc->CreateServiceKeytab(spn, &ktpath),
- "could not create keytab");
- extra_env_ = kdc->GetEnvVars();
- extra_flags_.push_back(Substitute("--keytab_file=$0", ktpath));
- extra_flags_.push_back(Substitute("--principal=$0", spn));
- extra_flags_.emplace_back("--rpc_authentication=required");
- extra_flags_.emplace_back("--superuser_acl=test-admin");
- extra_flags_.emplace_back("--user_acl=test-user");
- return Status::OK();
-}
-
-Status ExternalDaemon::StartProcess(const vector<string>& user_flags) {
- CHECK(!process_);
-
- vector<string> argv;
-
- // First the exe for argv[0].
- argv.push_back(exe_);
-
- // Then all the flags coming from the minicluster framework.
- argv.insert(argv.end(), user_flags.begin(), user_flags.end());
-
- // Disable fsync to dramatically speed up runtime. This is safe as no tests
- // rely on forcefully cutting power to a machine or equivalent.
- argv.emplace_back("--never_fsync");
-
- // Generate smaller RSA keys -- generating a 1024-bit key is faster
- // than generating the default 2048-bit key, and we don't care about
- // strong encryption in tests. Setting it lower (e.g. 512 bits) results
- // in OpenSSL errors RSA_sign:digest too big for rsa key:rsa_sign.c:122
- // since we are using strong/high TLS v1.2 cipher suites, so the minimum
- // size of TLS-related RSA key is 768 bits (due to the usage of
- // the ECDHE-RSA-AES256-GCM-SHA384 suite). However, to work with Java
- // client it's necessary to have at least 1024 bits for certificate RSA key
- // due to Java security policies.
- argv.emplace_back("--ipki_server_key_size=1024");
-
- // Disable minidumps by default since many tests purposely inject faults.
- argv.emplace_back("--enable_minidumps=false");
-
- // Disable log redaction.
- argv.emplace_back("--redact=flag");
-
- // Enable metrics logging.
- argv.emplace_back("--metrics_log_interval_ms=1000");
-
- if (logtostderr_) {
- // Ensure that logging goes to the test output and doesn't get buffered.
- argv.emplace_back("--logtostderr");
- argv.emplace_back("--logbuflevel=-1");
- }
-
- // Even if we are logging to stderr, metrics logs and minidumps end up being
- // written based on -log_dir. So, we have to set that too.
- argv.push_back("--log_dir=" + log_dir_);
- RETURN_NOT_OK(env_util::CreateDirsRecursively(Env::Default(), log_dir_));
-
- // Tell the server to dump its port information so we can pick it up.
- string info_path = JoinPathSegments(data_dirs_[0], "info.pb");
- argv.push_back("--server_dump_info_path=" + info_path);
- argv.emplace_back("--server_dump_info_format=pb");
-
- // We use ephemeral ports in many tests. They don't work for production, but are OK
- // in unit tests.
- argv.emplace_back("--rpc_server_allow_ephemeral_ports");
-
- // Allow unsafe and experimental flags from tests, since we often use
- // fault injection, etc.
- argv.emplace_back("--unlock_experimental_flags");
- argv.emplace_back("--unlock_unsafe_flags");
-
- // Then the "extra flags" passed into the ctor (from the ExternalMiniCluster
- // options struct). These come at the end so they can override things like
- // web port or RPC bind address if necessary.
- argv.insert(argv.end(), extra_flags_.begin(), extra_flags_.end());
-
- // A previous instance of the daemon may have run in the same directory. So, remove
- // the previous info file if it's there.
- ignore_result(Env::Default()->DeleteFile(info_path));
-
- // Start the daemon.
- unique_ptr<Subprocess> p(new Subprocess(argv));
- p->ShareParentStdout(false);
- p->SetEnvVars(extra_env_);
- string env_str;
- JoinMapKeysAndValues(extra_env_, "=", ",", &env_str);
- LOG(INFO) << "Running " << exe_ << "\n" << JoinStrings(argv, "\n")
- << " with env {" << env_str << "}";
- RETURN_NOT_OK_PREPEND(p->Start(),
- Substitute("Failed to start subprocess $0", exe_));
-
- // If requested, start a monitoring subprocess.
- unique_ptr<Subprocess> perf_record;
- if (!perf_record_filename_.empty()) {
- perf_record.reset(new Subprocess({
- "perf",
- "record",
- "--call-graph",
- "fp",
- "-o",
- perf_record_filename_,
- Substitute("--pid=$0", p->pid())
- }, SIGINT));
- RETURN_NOT_OK_PREPEND(perf_record->Start(),
- "Could not start perf record subprocess");
- }
-
- // The process is now starting -- wait for the bound port info to show up.
- Stopwatch sw;
- sw.start();
- bool success = false;
- while (sw.elapsed().wall_seconds() < start_process_timeout_.ToSeconds()) {
- if (Env::Default()->FileExists(info_path)) {
- success = true;
- break;
- }
- SleepFor(MonoDelta::FromMilliseconds(10));
- int wait_status;
- Status s = p->WaitNoBlock(&wait_status);
- if (s.IsTimedOut()) {
- // The process is still running.
- continue;
- }
-
- // If the process exited with expected exit status we need to still swap() the process
- // and exit as if it had succeeded.
- if (WIFEXITED(wait_status) && WEXITSTATUS(wait_status) == fault_injection::kExitStatus) {
- process_.swap(p);
- perf_record_process_.swap(perf_record);
- return Status::OK();
- }
-
- RETURN_NOT_OK_PREPEND(s, Substitute("Failed waiting on $0", exe_));
- string exit_info;
- RETURN_NOT_OK(p->GetExitStatus(nullptr, &exit_info));
- return Status::RuntimeError(exit_info);
- }
-
- if (!success) {
- ignore_result(p->Kill(SIGKILL));
- return Status::TimedOut(
- Substitute("Timed out after $0s waiting for process ($1) to write info file ($2)",
- start_process_timeout_.ToString(), exe_, info_path));
- }
-
- status_.reset(new ServerStatusPB());
- RETURN_NOT_OK_PREPEND(pb_util::ReadPBFromPath(Env::Default(), info_path, status_.get()),
- "Failed to read info file from " + info_path);
- LOG(INFO) << "Started " << exe_ << " as pid " << p->pid();
- VLOG(1) << exe_ << " instance information:\n" << SecureDebugString(*status_);
-
- process_.swap(p);
- perf_record_process_.swap(perf_record);
- return Status::OK();
-}
-
-void ExternalDaemon::SetExePath(string exe) {
- CHECK(IsShutdown()) << "Call Shutdown() before changing the executable path";
- exe_ = std::move(exe);
-}
-
-Status ExternalDaemon::Pause() {
- if (!process_) {
- return Status::IllegalState(Substitute(
- "Request to pause '$0' but the process is not there", exe_));
- }
- VLOG(1) << "Pausing " << exe_ << " with pid " << process_->pid();
- const Status s = process_->Kill(SIGSTOP);
- RETURN_NOT_OK(s);
- paused_ = true;
- return s;
-}
-
-Status ExternalDaemon::Resume() {
- if (!process_) {
- return Status::IllegalState(Substitute(
- "Request to resume '$0' but the process is not there", exe_));
- }
- VLOG(1) << "Resuming " << exe_ << " with pid " << process_->pid();
- const Status s = process_->Kill(SIGCONT);
- RETURN_NOT_OK(s);
- paused_ = false;
- return s;
-}
-
-bool ExternalDaemon::IsShutdown() const {
- return !process_;
-}
-
-bool ExternalDaemon::IsProcessAlive() const {
- if (IsShutdown()) {
- return false;
- }
- Status s = process_->WaitNoBlock();
- // If the non-blocking Wait "times out", that means the process
- // is running.
- return s.IsTimedOut();
-}
-
-Status ExternalDaemon::WaitForInjectedCrash(const MonoDelta& timeout) const {
- return WaitForCrash(timeout, [](int status) {
- return WIFEXITED(status) && WEXITSTATUS(status) == fault_injection::kExitStatus;
- }, "fault injection");
-}
-
-Status ExternalDaemon::WaitForFatal(const MonoDelta& timeout) const {
- return WaitForCrash(timeout, [](int status) {
- return WIFSIGNALED(status) && WTERMSIG(status) == SIGABRT;
- }, "FATAL crash");
-}
-
-
-Status ExternalDaemon::WaitForCrash(const MonoDelta& timeout,
- const std::function<bool(int)>& wait_status_predicate,
- const char* crash_type_str) const {
- CHECK(process_) << "process not started";
- MonoTime deadline = MonoTime::Now() + timeout;
-
- int i = 1;
- while (IsProcessAlive() && MonoTime::Now() < deadline) {
- int sleep_ms = std::min(i++ * 10, 200);
- SleepFor(MonoDelta::FromMilliseconds(sleep_ms));
- }
-
- if (IsProcessAlive()) {
- return Status::TimedOut(Substitute("Process did not crash within $0",
- timeout.ToString()));
- }
-
- // If the process has exited, make sure it exited with the expected status.
- int wait_status;
- RETURN_NOT_OK_PREPEND(process_->WaitNoBlock(&wait_status),
- "could not get wait status");
-
- if (!wait_status_predicate(wait_status)) {
- string info_str;
- RETURN_NOT_OK_PREPEND(process_->GetExitStatus(nullptr, &info_str),
- "could not get description of exit");
- return Status::Aborted(
- Substitute("process exited, but not due to a $0: $1", crash_type_str, info_str));
- }
- return Status::OK();
-}
-
-pid_t ExternalDaemon::pid() const {
- return process_->pid();
-}
-
-Subprocess* ExternalDaemon::process() const {
- return process_.get();
-}
-
-void ExternalDaemon::Shutdown() {
- if (!process_) return;
-
- // Before we kill the process, store the addresses. If we're told to
- // start again we'll reuse these. Store only the port if the
- // daemons were using wildcard address for binding.
- if (rpc_bind_address().host() != MiniCluster::kWildcardIpAddr) {
- bound_rpc_ = bound_rpc_hostport();
- bound_http_ = bound_http_hostport();
- } else {
- bound_rpc_.set_host(MiniCluster::kWildcardIpAddr);
- bound_rpc_.set_port(bound_rpc_hostport().port());
- bound_http_.set_host(MiniCluster::kWildcardIpAddr);
- bound_http_.set_port(bound_http_hostport().port());
- }
-
- if (IsProcessAlive()) {
- if (!paused_) {
- // In coverage builds, ask the process nicely to flush coverage info
- // before we kill -9 it. Otherwise, we never get any coverage from
- // external clusters.
- FlushCoverage();
- // Similarly, check for leaks in LSAN builds before killing.
- CheckForLeaks();
- }
-
- LOG(INFO) << "Killing " << exe_ << " with pid " << process_->pid();
- ignore_result(process_->Kill(SIGKILL));
- }
- WARN_NOT_OK(process_->Wait(), "Waiting on " + exe_);
- paused_ = false;
- process_.reset();
- perf_record_process_.reset();
-}
-
-Status ExternalDaemon::DeleteFromDisk() const {
- for (const string& data_dir : data_dirs()) {
- RETURN_NOT_OK(Env::Default()->DeleteRecursively(data_dir));
- }
- RETURN_NOT_OK(Env::Default()->DeleteRecursively(wal_dir()));
- return Status::OK();
-}
-
-void ExternalDaemon::FlushCoverage() {
-#ifndef COVERAGE_BUILD
- return; // NOLINT(*)
-#else
- LOG(INFO) << "Attempting to flush coverage for " << exe_ << " pid " << process_->pid();
- server::GenericServiceProxy proxy(
- messenger_, bound_rpc_addr(), bound_rpc_addr().host());
-
- server::FlushCoverageRequestPB req;
- server::FlushCoverageResponsePB resp;
- rpc::RpcController rpc;
-
- rpc.set_timeout(MonoDelta::FromMilliseconds(1000));
- Status s = proxy.FlushCoverage(req, &resp, &rpc);
- if (s.ok() && !resp.success()) {
- s = Status::RemoteError("Server does not appear to be running a coverage build");
- }
- WARN_NOT_OK(s, Substitute("Unable to flush coverage on $0 pid $1", exe_, process_->pid()));
-#endif
-}
-
-void ExternalDaemon::CheckForLeaks() {
-#if defined(__has_feature)
-# if __has_feature(address_sanitizer)
- LOG(INFO) << "Attempting to check leaks for " << exe_ << " pid " << process_->pid();
- server::GenericServiceProxy proxy(messenger_, bound_rpc_addr(), bound_rpc_addr().host());
-
- server::CheckLeaksRequestPB req;
- server::CheckLeaksResponsePB resp;
- rpc::RpcController rpc;
-
- rpc.set_timeout(MonoDelta::FromMilliseconds(1000));
- Status s = proxy.CheckLeaks(req, &resp, &rpc);
- if (s.ok()) {
- if (!resp.success()) {
- s = Status::RemoteError("Server does not appear to be running an LSAN build");
- } else {
- CHECK(!resp.found_leaks()) << "Found leaks in " << exe_ << " pid " << process_->pid();
- }
- }
- WARN_NOT_OK(s, Substitute("Unable to check leaks on $0 pid $1", exe_, process_->pid()));
-# endif
-#endif
-}
-
-HostPort ExternalDaemon::bound_rpc_hostport() const {
- CHECK(status_);
- CHECK_GE(status_->bound_rpc_addresses_size(), 1);
- HostPort ret;
- CHECK_OK(HostPortFromPB(status_->bound_rpc_addresses(0), &ret));
- return ret;
-}
-
-Sockaddr ExternalDaemon::bound_rpc_addr() const {
- HostPort hp = bound_rpc_hostport();
- vector<Sockaddr> addrs;
- CHECK_OK(hp.ResolveAddresses(&addrs));
- CHECK(!addrs.empty());
- return addrs[0];
-}
-
-HostPort ExternalDaemon::bound_http_hostport() const {
- CHECK(status_);
- if (status_->bound_http_addresses_size() == 0) {
- return HostPort();
- }
- HostPort ret;
- CHECK_OK(HostPortFromPB(status_->bound_http_addresses(0), &ret));
- return ret;
-}
-
-const NodeInstancePB& ExternalDaemon::instance_id() const {
- CHECK(status_);
- return status_->node_instance();
-}
-
-const string& ExternalDaemon::uuid() const {
- CHECK(status_);
- return status_->node_instance().permanent_uuid();
-}
-
-Status ExternalDaemon::GetInt64Metric(const MetricEntityPrototype* entity_proto,
- const char* entity_id,
- const MetricPrototype* metric_proto,
- const char* value_field,
- int64_t* value) const {
- CHECK(bound_http_hostport().Initialized());
- // Fetch metrics whose name matches the given prototype.
- string url = Substitute(
- "http://$0/jsonmetricz?metrics=$1",
- bound_http_hostport().ToString(),
- metric_proto->name());
- EasyCurl curl;
- faststring dst;
- RETURN_NOT_OK(curl.FetchURL(url, &dst));
-
- // Parse the results, beginning with the top-level entity array.
- JsonReader r(dst.ToString());
- RETURN_NOT_OK(r.Init());
- vector<const Value*> entities;
- RETURN_NOT_OK(r.ExtractObjectArray(r.root(), NULL, &entities));
- for (const Value* entity : entities) {
- // Find the desired entity.
- string type;
- RETURN_NOT_OK(r.ExtractString(entity, "type", &type));
- if (type != entity_proto->name()) {
- continue;
- }
- if (entity_id) {
- string id;
- RETURN_NOT_OK(r.ExtractString(entity, "id", &id));
- if (id != entity_id) {
- continue;
- }
- }
-
- // Find the desired metric within the entity.
- vector<const Value*> metrics;
- RETURN_NOT_OK(r.ExtractObjectArray(entity, "metrics", &metrics));
- for (const Value* metric : metrics) {
- string name;
- RETURN_NOT_OK(r.ExtractString(metric, "name", &name));
- if (name != metric_proto->name()) {
- continue;
- }
- RETURN_NOT_OK(r.ExtractInt64(metric, value_field, value));
- return Status::OK();
- }
- }
- string msg;
- if (entity_id) {
- msg = Substitute("Could not find metric $0.$1 for entity $2",
- entity_proto->name(), metric_proto->name(),
- entity_id);
- } else {
- msg = Substitute("Could not find metric $0.$1",
- entity_proto->name(), metric_proto->name());
- }
- return Status::NotFound(msg);
-}
-
-//------------------------------------------------------------
-// ScopedResumeExternalDaemon
-//------------------------------------------------------------
-
-ScopedResumeExternalDaemon::ScopedResumeExternalDaemon(ExternalDaemon* daemon)
- : daemon_(CHECK_NOTNULL(daemon)) {
-}
-
-ScopedResumeExternalDaemon::~ScopedResumeExternalDaemon() {
- WARN_NOT_OK(daemon_->Resume(), "Could not resume external daemon");
-}
-
-//------------------------------------------------------------
-// ExternalMaster
-//------------------------------------------------------------
-
-ExternalMaster::ExternalMaster(ExternalDaemonOptions opts)
- : ExternalDaemon(std::move(opts)) {
-}
-
-ExternalMaster::~ExternalMaster() {
-}
-
-Status ExternalMaster::Start() {
- vector<string> flags(GetCommonFlags());
- flags.push_back(Substitute("--rpc_bind_addresses=$0", rpc_bind_address().ToString()));
- flags.push_back(Substitute("--webserver_interface=$0", rpc_bind_address().host()));
- flags.emplace_back("--webserver_port=0");
- return StartProcess(flags);
-}
-
-Status ExternalMaster::Restart() {
- // We store the addresses on shutdown so make sure we did that first.
- if (bound_rpc_.port() == 0) {
- return Status::IllegalState("Master cannot be restarted. Must call Shutdown() first.");
- }
-
- vector<string> flags(GetCommonFlags());
- flags.push_back(Substitute("--rpc_bind_addresses=$0", bound_rpc_.ToString()));
-
- if (bound_http_.Initialized()) {
- flags.push_back(Substitute("--webserver_interface=$0", bound_http_.host()));
- flags.push_back(Substitute("--webserver_port=$0", bound_http_.port()));
- } else {
- flags.push_back(Substitute("--webserver_interface=$0", bound_rpc_.host()));
- flags.emplace_back("--webserver_port=0");
- }
-
- return StartProcess(flags);
-}
-
-Status ExternalMaster::WaitForCatalogManager() {
- unique_ptr<MasterServiceProxy> proxy(new MasterServiceProxy(
- messenger_, bound_rpc_addr(), bound_rpc_addr().host()));
- Stopwatch sw;
- sw.start();
- while (sw.elapsed().wall_seconds() < kMasterCatalogManagerTimeoutSeconds) {
- ListTablesRequestPB req;
- ListTablesResponsePB resp;
- RpcController rpc;
- Status s = proxy->ListTables(req, &resp, &rpc);
- if (s.ok()) {
- if (!resp.has_error()) {
- // This master is the leader and is up and running.
- break;
- } else {
- s = StatusFromPB(resp.error().status());
- if (s.IsIllegalState()) {
- // This master is not the leader but is otherwise up and running.
- break;
- } else if (!s.IsServiceUnavailable()) {
- // Unexpected error from master.
- return s;
- }
- }
- } else if (!s.IsTimedOut() && !s.IsNetworkError()) {
- // Unexpected error from proxy.
- return s;
- }
-
- // There was some kind of transient network error or the master isn't yet
- // ready. Sleep and retry.
- SleepFor(MonoDelta::FromMilliseconds(50));
- }
- if (sw.elapsed().wall_seconds() > kMasterCatalogManagerTimeoutSeconds) {
- return Status::TimedOut(
- Substitute("Timed out after $0s waiting for master ($1) startup",
- kMasterCatalogManagerTimeoutSeconds,
- bound_rpc_addr().ToString()));
- }
- return Status::OK();
-}
-
-vector<string> ExternalMaster::GetCommonFlags() const {
- return {
- "--fs_wal_dir=" + wal_dir_,
- "--fs_data_dirs=" + JoinStrings(data_dirs_, ","),
- "--webserver_interface=localhost",
-
- // See the in-line comment for "--ipki_server_key_size" flag in
- // ExternalDaemon::StartProcess() method.
- "--ipki_ca_key_size=1024",
-
- // As for the TSK keys, 512 bits is the minimum since we are using the SHA256
- // digest for token signing/verification.
- "--tsk_num_rsa_bits=512",
- };
-}
-
-
-//------------------------------------------------------------
-// ExternalTabletServer
-//------------------------------------------------------------
-
-ExternalTabletServer::ExternalTabletServer(ExternalDaemonOptions opts,
- vector<HostPort> master_addrs)
- : ExternalDaemon(std::move(opts)),
- master_addrs_(std::move(master_addrs)) {
- DCHECK(!master_addrs_.empty());
-}
-
-ExternalTabletServer::~ExternalTabletServer() {
-}
-
-Status ExternalTabletServer::Start() {
- vector<string> flags;
- flags.push_back("--fs_wal_dir=" + wal_dir_);
- flags.push_back("--fs_data_dirs=" + JoinStrings(data_dirs_, ","));
- flags.push_back(Substitute("--rpc_bind_addresses=$0",
- rpc_bind_address().ToString()));
- flags.push_back(Substitute("--local_ip_for_outbound_sockets=$0",
- rpc_bind_address().host()));
- flags.push_back(Substitute("--webserver_interface=$0",
- rpc_bind_address().host()));
- flags.emplace_back("--webserver_port=0");
- flags.push_back(Substitute("--tserver_master_addrs=$0",
- HostPort::ToCommaSeparatedString(master_addrs_)));
- RETURN_NOT_OK(StartProcess(flags));
- return Status::OK();
-}
-
-Status ExternalTabletServer::Restart() {
- // We store the addresses on shutdown so make sure we did that first.
- if (bound_rpc_.port() == 0) {
- return Status::IllegalState("Tablet server cannot be restarted. Must call Shutdown() first.");
- }
- vector<string> flags;
- flags.push_back("--fs_wal_dir=" + wal_dir_);
- flags.push_back("--fs_data_dirs=" + JoinStrings(data_dirs_, ","));
- flags.push_back(Substitute("--rpc_bind_addresses=$0", bound_rpc_.ToString()));
- flags.push_back(Substitute("--local_ip_for_outbound_sockets=$0",
- rpc_bind_address().host()));
- if (bound_http_.Initialized()) {
- flags.push_back(Substitute("--webserver_port=$0", bound_http_.port()));
- flags.push_back(Substitute("--webserver_interface=$0",
- bound_http_.host()));
- }
- flags.push_back(Substitute("--tserver_master_addrs=$0",
- HostPort::ToCommaSeparatedString(master_addrs_)));
- return StartProcess(flags);
-}
-
-
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/integration-tests/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster.h b/src/kudu/integration-tests/external_mini_cluster.h
deleted file mode 100644
index 0a23d9f..0000000
--- a/src/kudu/integration-tests/external_mini_cluster.h
+++ /dev/null
@@ -1,564 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <sys/types.h>
-
-#include <cstdint>
-#include <functional>
-#include <map>
-#include <memory>
-#include <ostream>
-#include <string>
-#include <vector>
-
-#include <boost/optional/optional.hpp>
-#include <glog/logging.h>
-#include <gtest/gtest_prod.h>
-
-#include "kudu/client/shared_ptr.h"
-#include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
-#include "kudu/gutil/ref_counted.h"
-#include "kudu/integration-tests/mini_cluster.h"
-#include "kudu/security/test/mini_kdc.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/net/net_util.h"
-#include "kudu/util/status.h"
-
-namespace kudu {
-
-class ExternalDaemon;
-class ExternalMaster;
-class ExternalTabletServer;
-class MetricEntityPrototype;
-class MetricPrototype;
-class NodeInstancePB;
-class Sockaddr;
-class Subprocess;
-
-namespace client {
-class KuduClient;
-class KuduClientBuilder;
-} // namespace client
-
-namespace master {
-class MasterServiceProxy;
-} // namespace master
-
-namespace rpc {
-class Messenger;
-} // namespace rpc
-
-namespace server {
-class ServerStatusPB;
-} // namespace server
-
-struct ExternalMiniClusterOptions {
- ExternalMiniClusterOptions();
-
- // Number of masters to start.
- // Default: 1
- int num_masters;
-
- // Number of TS to start.
- // Default: 1
- int num_tablet_servers;
-
- // Directory in which to store data.
- // Default: "", which auto-generates a unique path for this cluster.
- std::string data_root;
-
- MiniCluster::BindMode bind_mode;
-
- // The path where the kudu daemons should be run from.
- // Default: "", which uses the same path as the currently running executable.
- // This works for unit tests, since they all end up in build/latest/bin.
- std::string daemon_bin_path;
-
- // Number of data directories to be created for each daemon.
- // Default: 1
- int num_data_dirs;
-
- // Extra flags for tablet servers and masters respectively.
- //
- // In these flags, you may use the special string '${index}' which will
- // be substituted with the index of the tablet server or master.
- std::vector<std::string> extra_tserver_flags;
- std::vector<std::string> extra_master_flags;
-
- // If more than one master is specified, list of ports for the
- // masters in a consensus configuration. Port at index 0 is used for the leader
- // master.
- std::vector<uint16_t> master_rpc_ports;
-
- // Options to configure the MiniKdc before starting it up.
- // Only used when 'enable_kerberos' is 'true'.
- MiniKdcOptions mini_kdc_options;
-
- // If true, set up a KDC as part of this ExternalMiniCluster, generate keytabs for
- // the servers, and require Kerberos authentication from clients.
- //
- // Additionally, when the cluster is started, the environment of the
- // test process will be modified to include Kerberos credentials for
- // a principal named 'testuser'.
- bool enable_kerberos;
-
- // If true, sends logging output to stderr instead of a log file. Defaults to
- // true.
- bool logtostderr;
-
- // Amount of time that may elapse between the creation of a daemon process
- // and the process writing out its info file. Defaults to 30 seconds.
- MonoDelta start_process_timeout;
-};
-
-// A mini-cluster made up of subprocesses running each of the daemons
-// separately. This is useful for black-box or grey-box failure testing
-// purposes -- it provides the ability to forcibly kill or stop particular
-// cluster participants, which isn't feasible in the normal InternalMiniCluster.
-// On the other hand, there is little access to inspect the internal state
-// of the daemons.
-class ExternalMiniCluster : public MiniCluster {
- public:
- // Constructs a cluster with the default options.
- ExternalMiniCluster();
-
- // Constructs a cluster with options specified in 'opts'.
- explicit ExternalMiniCluster(ExternalMiniClusterOptions opts);
-
- // Destroys a cluster.
- virtual ~ExternalMiniCluster();
-
- // Start the cluster.
- Status Start() override;
-
- // Restarts the cluster. Requires that it has been Shutdown() first.
- Status Restart();
-
- // Add a new TS to the cluster. The new TS is started.
- // Requires that the master is already running.
- Status AddTabletServer();
-
- // Currently, this uses SIGKILL on each daemon for a non-graceful shutdown.
- void ShutdownNodes(ClusterNodes nodes) override;
-
- // Return the IP address that the tablet server with the given index will bind to.
- // If options.bind_to_unique_loopback_addresses is false, this will be 127.0.0.1
- // Otherwise, it is another IP in the local netblock.
- std::string GetBindIpForTabletServer(int index) const;
-
- // Same as above but for a master.
- std::string GetBindIpForMaster(int index) const;
-
- // Return a pointer to the running leader master. This may be NULL
- // if the cluster is not started.
- //
- // TODO: Use the appropriate RPC here to return the leader master,
- // to allow some of the existing tests (e.g., raft_consensus-itest)
- // to use multiple masters.
- ExternalMaster* leader_master() { return master(0); }
-
- // Perform an RPC to determine the leader of the external mini
- // cluster. Set 'index' to the leader master's index (for calls to
- // to master() below).
- //
- // NOTE: if a leader election occurs after this method is executed,
- // the last result may not be valid.
- Status GetLeaderMasterIndex(int* idx);
-
- // If this cluster is configured for a single non-distributed
- // master, return the single master or NULL if the master is not
- // started. Exits with a CHECK failure if there are multiple
- // masters.
- ExternalMaster* master() const {
- CHECK_EQ(masters_.size(), 1)
- << "master() should not be used with multiple masters, use leader_master() instead.";
- return master(0);
- }
-
- // Return master at 'idx' or NULL if the master at 'idx' has not
- // been started.
- ExternalMaster* master(int idx) const {
- CHECK_LT(idx, masters_.size());
- return masters_[idx].get();
- }
-
- ExternalTabletServer* tablet_server(int idx) const {
- CHECK_LT(idx, tablet_servers_.size());
- return tablet_servers_[idx].get();
- }
-
- // Return ExternalTabletServer given its UUID. If not found, returns NULL.
- ExternalTabletServer* tablet_server_by_uuid(const std::string& uuid) const;
-
- // Return the index of the ExternalTabletServer that has the given 'uuid', or
- // -1 if no such UUID can be found.
- int tablet_server_index_by_uuid(const std::string& uuid) const;
-
- // Return all tablet servers and masters.
- std::vector<ExternalDaemon*> daemons() const;
-
- MiniKdc* kdc() const {
- return CHECK_NOTNULL(kdc_.get());
- }
-
- int num_tablet_servers() const override {
- return tablet_servers_.size();
- }
-
- int num_masters() const override {
- return masters_.size();
- }
-
- BindMode bind_mode() const override {
- return opts_.bind_mode;
- }
-
- std::vector<uint16_t> master_rpc_ports() const override {
- return opts_.master_rpc_ports;
- }
-
- std::vector<HostPort> master_rpc_addrs() const override;
-
- std::shared_ptr<rpc::Messenger> messenger() const override;
- std::shared_ptr<master::MasterServiceProxy> master_proxy() const override;
- std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx) const override;
-
- // Wait until the number of registered tablet servers reaches the given count
- // on all of the running masters. Returns Status::TimedOut if the desired
- // count is not achieved with the given timeout.
- Status WaitForTabletServerCount(int count, const MonoDelta& timeout);
-
- // Runs gtest assertions that no servers have crashed.
- void AssertNoCrashes();
-
- // Wait until all tablets on the given tablet server are in the RUNNING
- // state. Returns Status::TimedOut if 'timeout' elapses and at least one
- // tablet is not yet RUNNING.
- //
- // If 'min_tablet_count' is not -1, will also wait for at least that many
- // RUNNING tablets to appear before returning (potentially timing out if that
- // number is never reached).
- Status WaitForTabletsRunning(ExternalTabletServer* ts, int min_tablet_count,
- const MonoDelta& timeout);
-
- // Create a client configured to talk to this cluster.
- // Builder may contain override options for the client. The master address will
- // be overridden to talk to the running master.
- //
- // REQUIRES: the cluster must have already been Start()ed.
- Status CreateClient(client::KuduClientBuilder* builder,
- client::sp::shared_ptr<client::KuduClient>* client) const override;
-
- // Sets the given flag on the given daemon, which must be running.
- //
- // This uses the 'force' flag on the RPC so that, even if the flag
- // is considered unsafe to change at runtime, it is changed.
- Status SetFlag(ExternalDaemon* daemon,
- const std::string& flag,
- const std::string& value) WARN_UNUSED_RESULT;
-
- // Set the path where daemon binaries can be found.
- // Overrides 'daemon_bin_path' set by ExternalMiniClusterOptions.
- // The cluster must be shut down before calling this method.
- void SetDaemonBinPath(std::string daemon_bin_path);
-
- // Returns the path where 'binary' is expected to live, based on
- // ExternalMiniClusterOptions.daemon_bin_path if it was provided, or on the
- // path of the currently running executable otherwise.
- std::string GetBinaryPath(const std::string& binary) const;
-
- // Returns the path where 'daemon_id' is expected to store its data, based on
- // ExternalMiniClusterOptions.data_root if it was provided, or on the
- // standard Kudu test directory otherwise.
- // 'dir_index' is an optional numeric suffix to be added to the default path.
- // If it is not specified, the cluster must be configured to use a single data dir.
- std::string GetDataPath(const std::string& daemon_id,
- boost::optional<uint32_t> dir_index = boost::none) const;
-
- // Returns paths where 'daemon_id' is expected to store its data, each with a
- // numeric suffix appropriate for 'opts_.num_data_dirs'
- std::vector<std::string> GetDataPaths(const std::string& daemon_id) const;
-
- // Returns the path where 'daemon_id' is expected to store its wal, or other
- // files that reside in the wal dir.
- std::string GetWalPath(const std::string& daemon_id) const;
-
- // Returns the path where 'daemon_id' is expected to store its logs, or other
- // files that reside in the log dir.
- std::string GetLogPath(const std::string& daemon_id) const;
-
- private:
- FRIEND_TEST(MasterFailoverTest, TestKillAnyMaster);
-
- Status StartSingleMaster();
-
- Status StartDistributedMasters();
-
- Status DeduceBinRoot(std::string* ret);
- Status HandleOptions();
-
- const ExternalMiniClusterOptions opts_;
-
- // The root for binaries.
- std::string daemon_bin_path_;
-
- std::string data_root_;
-
- std::vector<scoped_refptr<ExternalMaster> > masters_;
- std::vector<scoped_refptr<ExternalTabletServer> > tablet_servers_;
- std::unique_ptr<MiniKdc> kdc_;
-
- std::shared_ptr<rpc::Messenger> messenger_;
-
- DISALLOW_COPY_AND_ASSIGN(ExternalMiniCluster);
-};
-
-struct ExternalDaemonOptions {
- explicit ExternalDaemonOptions(bool logtostderr)
- : logtostderr(logtostderr) {
- }
-
- bool logtostderr;
- std::shared_ptr<rpc::Messenger> messenger;
- std::string exe;
- HostPort rpc_bind_address;
- std::string wal_dir;
- std::vector<std::string> data_dirs;
- std::string log_dir;
- std::string perf_record_filename;
- std::vector<std::string> extra_flags;
- MonoDelta start_process_timeout;
-};
-
-class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
- public:
- explicit ExternalDaemon(ExternalDaemonOptions opts);
-
- HostPort bound_rpc_hostport() const;
- Sockaddr bound_rpc_addr() const;
-
- // Return the host/port that this daemon is bound to for HTTP.
- // May return an uninitialized HostPort if HTTP is disabled.
- HostPort bound_http_hostport() const;
-
- const NodeInstancePB& instance_id() const;
- const std::string& uuid() const;
-
- // Return the pid of the running process.
- // Causes a CHECK failure if the process is not running.
- pid_t pid() const;
-
- // Return the pointer to the undelying Subprocess if it is set.
- // Otherwise, returns nullptr.
- Subprocess* process() const;
-
- // Set the path of the executable to run as a daemon.
- // Overrides the exe path specified in the constructor.
- // The daemon must be shut down before calling this method.
- void SetExePath(std::string exe);
-
- // Enable Kerberos for this daemon. This creates a Kerberos principal
- // and keytab, and sets the appropriate environment variables in the
- // subprocess such that the server will use Kerberos authentication.
- //
- // 'bind_host' is the hostname that will be used to generate the Kerberos
- // service principal.
- //
- // Must be called before 'StartProcess()'.
- Status EnableKerberos(MiniKdc* kdc, const std::string& bind_host);
-
- // Sends a SIGSTOP signal to the daemon.
- Status Pause() WARN_UNUSED_RESULT;
-
- // Sends a SIGCONT signal to the daemon.
- Status Resume() WARN_UNUSED_RESULT;
-
- // Return true if we have explicitly shut down the process.
- bool IsShutdown() const;
-
- // Return true if the process is still running.
- // This may return false if the process crashed, even if we didn't
- // explicitly call Shutdown().
- bool IsProcessAlive() const;
-
- // Wait for this process to crash due to a configured fault
- // injection, or the given timeout to elapse. If the process
- // crashes for some reason other than an injected fault, returns
- // Status::Aborted.
- //
- // If the process is already crashed, returns immediately.
- Status WaitForInjectedCrash(const MonoDelta& timeout) const;
-
- // Same as the above, but expects the process to crash due to a
- // LOG(FATAL) or CHECK failure. In other words, waits for it to
- // crash from SIGABRT.
- Status WaitForFatal(const MonoDelta& timeout) const;
-
- virtual void Shutdown();
-
- // Delete files specified by 'wal_dir_' and 'data_dirs_'.
- Status DeleteFromDisk() const WARN_UNUSED_RESULT;
-
- const std::string& wal_dir() const { return wal_dir_; }
-
- const std::string& data_dir() const {
- CHECK_EQ(1, data_dirs_.size());
- return data_dirs_[0];
- }
-
- const std::vector<std::string>& data_dirs() const { return data_dirs_; }
-
- // Returns the log dir of the external daemon.
- const std::string& log_dir() const { return log_dir_; }
-
- // Return a pointer to the flags used for this server on restart.
- // Modifying these flags will only take effect on the next restart.
- std::vector<std::string>* mutable_flags() { return &extra_flags_; }
-
- // Retrieve the value of a given metric from this server. The metric must
- // be of int64_t type.
- //
- // 'value_field' represents the particular field of the metric to be read.
- // For example, for a counter or gauge, this should be 'value'. For a
- // histogram, it might be 'total_count' or 'mean'.
- //
- // 'entity_id' may be NULL, in which case the first entity of the same type
- // as 'entity_proto' will be matched.
- Status GetInt64Metric(const MetricEntityPrototype* entity_proto,
- const char* entity_id,
- const MetricPrototype* metric_proto,
- const char* value_field,
- int64_t* value) const;
-
- protected:
- friend class RefCountedThreadSafe<ExternalDaemon>;
- virtual ~ExternalDaemon();
-
- // Starts a process with the given flags.
- Status StartProcess(const std::vector<std::string>& user_flags);
-
- // Wait for the process to exit, and then call 'wait_status_predicate'
- // on the resulting exit status. NOTE: this is not the return code, but
- // rather the value provided by waitpid(2): use WEXITSTATUS, etc.
- //
- // If the predicate matches, returns OK. Otherwise, returns an error.
- // 'crash_type_str' should be a descriptive name for the type of crash,
- // used in formatting the error message.
- Status WaitForCrash(const MonoDelta& timeout,
- const std::function<bool(int)>& wait_status_predicate,
- const char* crash_type_str) const;
-
- // In a code-coverage build, try to flush the coverage data to disk.
- // In a non-coverage build, this does nothing.
- void FlushCoverage();
-
- // In an LSAN build, ask the daemon to check for leaked memory, and
- // LOG(FATAL) if there are any leaks.
- void CheckForLeaks();
-
- // Get RPC bind address for daemon.
- const HostPort& rpc_bind_address() const {
- return rpc_bind_address_;
- }
-
- const std::shared_ptr<rpc::Messenger> messenger_;
- const std::string wal_dir_;
- std::vector<std::string> data_dirs_;
- const std::string log_dir_;
- const std::string perf_record_filename_;
- const MonoDelta start_process_timeout_;
- const bool logtostderr_;
- const HostPort rpc_bind_address_;
- std::string exe_;
- std::vector<std::string> extra_flags_;
- std::map<std::string, std::string> extra_env_;
-
- std::unique_ptr<Subprocess> process_;
- bool paused_ = false;
-
- std::unique_ptr<Subprocess> perf_record_process_;
-
- std::unique_ptr<server::ServerStatusPB> status_;
-
- // These capture the daemons parameters and running ports and
- // are used to Restart() the daemon with the same parameters.
- HostPort bound_rpc_;
- HostPort bound_http_;
-
- DISALLOW_COPY_AND_ASSIGN(ExternalDaemon);
-};
-
-// Resumes a daemon that was stopped with ExternalDaemon::Pause() upon
-// exiting a scope.
-class ScopedResumeExternalDaemon {
- public:
- // 'daemon' must remain valid for the lifetime of a
- // ScopedResumeExternalDaemon object.
- explicit ScopedResumeExternalDaemon(ExternalDaemon* daemon);
-
- // Resume 'daemon_'.
- ~ScopedResumeExternalDaemon();
-
- private:
- ExternalDaemon* daemon_;
-
- DISALLOW_COPY_AND_ASSIGN(ScopedResumeExternalDaemon);
-};
-
-class ExternalMaster : public ExternalDaemon {
- public:
- explicit ExternalMaster(ExternalDaemonOptions opts);
-
- Status Start();
-
- // Restarts the daemon.
- // Requires that it has previously been shutdown.
- Status Restart() WARN_UNUSED_RESULT;
-
- // Blocks until the master's catalog manager is initialized and responding to
- // RPCs.
- Status WaitForCatalogManager() WARN_UNUSED_RESULT;
-
- private:
- std::vector<std::string> GetCommonFlags() const;
-
- friend class RefCountedThreadSafe<ExternalMaster>;
- virtual ~ExternalMaster();
-};
-
-class ExternalTabletServer : public ExternalDaemon {
- public:
- ExternalTabletServer(ExternalDaemonOptions opts,
- std::vector<HostPort> master_addrs);
-
- Status Start();
-
- // Restarts the daemon.
- // Requires that it has previously been shutdown.
- Status Restart() WARN_UNUSED_RESULT;
-
- private:
- const std::vector<HostPort> master_addrs_;
-
- friend class RefCountedThreadSafe<ExternalTabletServer>;
- virtual ~ExternalTabletServer();
-};
-
-} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/integration-tests/external_mini_cluster_fs_inspector.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster_fs_inspector.cc b/src/kudu/integration-tests/external_mini_cluster_fs_inspector.cc
index 3873d43..f22f12f 100644
--- a/src/kudu/integration-tests/external_mini_cluster_fs_inspector.cc
+++ b/src/kudu/integration-tests/external_mini_cluster_fs_inspector.cc
@@ -27,7 +27,7 @@
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/fs/fs_manager.h"
-#include "kudu/integration-tests/external_mini_cluster.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/util/env.h"
#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
@@ -40,7 +40,7 @@ namespace itest {
using std::set;
using std::string;
using std::vector;
-
+using cluster::ExternalMiniCluster;
using consensus::ConsensusMetadataPB;
using strings::Substitute;
using tablet::TabletDataState;
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/integration-tests/external_mini_cluster_fs_inspector.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/external_mini_cluster_fs_inspector.h b/src/kudu/integration-tests/external_mini_cluster_fs_inspector.h
index 3341bfe..3fde5f9 100644
--- a/src/kudu/integration-tests/external_mini_cluster_fs_inspector.h
+++ b/src/kudu/integration-tests/external_mini_cluster_fs_inspector.h
@@ -14,9 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
-#ifndef KUDU_INTEGRATION_TESTS_CLUSTER_EXTERNAL_MINI_CLUSTER_FS_INSPECTOR_H_
-#define KUDU_INTEGRATION_TESTS_CLUSTER_EXTERNAL_MINI_CLUSTER_FS_INSPECTOR_H_
+#pragma once
#include <cstdint>
#include <string>
@@ -28,13 +26,17 @@
#include "kudu/util/monotime.h"
namespace kudu {
+
class Env;
-class ExternalMiniCluster;
class Status;
+namespace cluster {
+class ExternalMiniCluster;
+} // namespace cluster
+
namespace consensus {
class ConsensusMetadataPB;
-}
+} // namespace consensus
namespace itest {
@@ -44,7 +46,7 @@ namespace itest {
class ExternalMiniClusterFsInspector {
public:
// Does not take ownership of the ExternalMiniCluster pointer.
- explicit ExternalMiniClusterFsInspector(ExternalMiniCluster* cluster);
+ explicit ExternalMiniClusterFsInspector(cluster::ExternalMiniCluster* cluster);
~ExternalMiniClusterFsInspector();
Status ListFilesInDir(const std::string& path, std::vector<std::string>* entries);
@@ -130,12 +132,10 @@ class ExternalMiniClusterFsInspector {
const std::string& tablet_id) const;
Env* const env_;
- ExternalMiniCluster* const cluster_;
+ cluster::ExternalMiniCluster* const cluster_;
DISALLOW_COPY_AND_ASSIGN(ExternalMiniClusterFsInspector);
};
} // namespace itest
} // namespace kudu
-
-#endif // KUDU_INTEGRATION_TESTS_CLUSTER_EXTERNAL_MINI_CLUSTER_FS_INSPECTOR_H_
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/integration-tests/flex_partitioning-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/flex_partitioning-itest.cc b/src/kudu/integration-tests/flex_partitioning-itest.cc
index c965bb1..8863331 100644
--- a/src/kudu/integration-tests/flex_partitioning-itest.cc
+++ b/src/kudu/integration-tests/flex_partitioning-itest.cc
@@ -43,9 +43,9 @@
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
-#include "kudu/integration-tests/external_mini_cluster.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/tools/data_gen_util.h"
#include "kudu/util/monotime.h"
@@ -71,6 +71,8 @@ using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
using kudu::client::KuduValue;
using kudu::client::sp::shared_ptr;
+using kudu::cluster::ExternalMiniCluster;
+using kudu::cluster::ExternalMiniClusterOptions;
using kudu::master::GetTableLocationsRequestPB;
using kudu::master::GetTableLocationsResponsePB;
using kudu::master::MasterErrorPB;
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/integration-tests/full_stack-insert-scan-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/full_stack-insert-scan-test.cc b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
index b23cc42..9ca82a4 100644
--- a/src/kudu/integration-tests/full_stack-insert-scan-test.cc
+++ b/src/kudu/integration-tests/full_stack-insert-scan-test.cc
@@ -44,8 +44,8 @@
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/integration-tests/internal_mini_cluster.h"
#include "kudu/master/mini_master.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/mini_tablet_server.h"
@@ -101,6 +101,8 @@ using client::KuduSession;
using client::KuduStatusMemberCallback;
using client::KuduTable;
using client::KuduTableCreator;
+using cluster::InternalMiniCluster;
+using cluster::InternalMiniClusterOptions;
using strings::Split;
using strings::Substitute;
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/integration-tests/fuzz-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index 81173e2..d393bdf 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -52,8 +52,8 @@
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
-#include "kudu/integration-tests/internal_mini_cluster.h"
#include "kudu/master/mini_master.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tablet/key_value_test_schema.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet.h"
@@ -113,6 +113,8 @@ using client::KuduUpsert;
using client::KuduValue;
using client::KuduWriteOperation;
using client::sp::shared_ptr;
+using cluster::InternalMiniCluster;
+using cluster::InternalMiniClusterOptions;
using std::list;
using std::map;
using std::string;
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/integration-tests/internal_mini_cluster-itest-base.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/internal_mini_cluster-itest-base.cc b/src/kudu/integration-tests/internal_mini_cluster-itest-base.cc
index 80f171e..69bfc4c 100644
--- a/src/kudu/integration-tests/internal_mini_cluster-itest-base.cc
+++ b/src/kudu/integration-tests/internal_mini_cluster-itest-base.cc
@@ -26,6 +26,9 @@
namespace kudu {
+using cluster::InternalMiniCluster;
+using cluster::InternalMiniClusterOptions;
+
void MiniClusterITestBase::TearDown() {
StopCluster();
KuduTest::TearDown();
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/integration-tests/internal_mini_cluster-itest-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/internal_mini_cluster-itest-base.h b/src/kudu/integration-tests/internal_mini_cluster-itest-base.h
index 4d189c4..ca20a42 100644
--- a/src/kudu/integration-tests/internal_mini_cluster-itest-base.h
+++ b/src/kudu/integration-tests/internal_mini_cluster-itest-base.h
@@ -22,18 +22,18 @@
#include <unordered_map>
#include "kudu/client/shared_ptr.h"
-#include "kudu/integration-tests/internal_mini_cluster.h"
+#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/util/test_util.h"
namespace kudu {
namespace client {
class KuduClient;
-}
+} // namespace client
namespace itest {
struct TServerDetails;
-}
+} // namespace itest
// Simple base utility class to provide a mini cluster with common setup
// routines useful for integration tests.
@@ -45,7 +45,7 @@ class MiniClusterITestBase : public KuduTest {
void StartCluster(int num_tablet_servers = 3);
void StopCluster();
- std::unique_ptr<InternalMiniCluster> cluster_;
+ std::unique_ptr<cluster::InternalMiniCluster> cluster_;
client::sp::shared_ptr<client::KuduClient> client_;
std::unordered_map<std::string, itest::TServerDetails*> ts_map_;
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/350c8a79/src/kudu/integration-tests/internal_mini_cluster.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/internal_mini_cluster.cc b/src/kudu/integration-tests/internal_mini_cluster.cc
deleted file mode 100644
index 17153a0..0000000
--- a/src/kudu/integration-tests/internal_mini_cluster.cc
+++ /dev/null
@@ -1,375 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/integration-tests/internal_mini_cluster.h"
-
-#include <cstdint>
-#include <ostream>
-#include <unordered_set>
-#include <utility>
-
-#include "kudu/client/client.h"
-#include "kudu/common/wire_protocol.pb.h"
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/master/catalog_manager.h"
-#include "kudu/master/master.h"
-#include "kudu/master/master.proxy.h"
-#include "kudu/master/mini_master.h"
-#include "kudu/master/ts_descriptor.h"
-#include "kudu/master/ts_manager.h"
-#include "kudu/rpc/messenger.h"
-#include "kudu/tserver/mini_tablet_server.h"
-#include "kudu/tserver/tablet_server.h"
-#include "kudu/tserver/tablet_server_options.h"
-#include "kudu/util/env.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/net/net_util.h"
-#include "kudu/util/net/sockaddr.h"
-#include "kudu/util/path_util.h"
-#include "kudu/util/status.h"
-#include "kudu/util/stopwatch.h"
-#include "kudu/util/test_util.h"
-
-using std::string;
-using std::vector;
-using strings::Substitute;
-
-namespace kudu {
-
-using client::KuduClient;
-using client::KuduClientBuilder;
-using master::CatalogManager;
-using master::MasterServiceProxy;
-using master::MiniMaster;
-using master::TSDescriptor;
-using std::shared_ptr;
-using tserver::MiniTabletServer;
-using tserver::TabletServer;
-
-InternalMiniClusterOptions::InternalMiniClusterOptions()
- : num_masters(1),
- num_tablet_servers(1),
- num_data_dirs(1),
- bind_mode(MiniCluster::kDefaultBindMode) {
-}
-
-InternalMiniCluster::InternalMiniCluster(Env* env, InternalMiniClusterOptions options)
- : env_(env),
- opts_(std::move(options)),
- running_(false) {
- if (opts_.data_root.empty()) {
- opts_.data_root = JoinPathSegments(GetTestDataDirectory(), "minicluster-data");
- }
-}
-
-InternalMiniCluster::~InternalMiniCluster() {
- Shutdown();
-}
-
-Status InternalMiniCluster::Start() {
- CHECK(!opts_.data_root.empty()) << "No Fs root was provided";
- CHECK(!running_);
-
- if (opts_.num_masters > 1) {
- CHECK_GE(opts_.master_rpc_ports.size(), opts_.num_masters);
- }
-
- if (!env_->FileExists(opts_.data_root)) {
- RETURN_NOT_OK(env_->CreateDir(opts_.data_root));
- }
-
- // start the masters
- if (opts_.num_masters > 1) {
- RETURN_NOT_OK_PREPEND(StartDistributedMasters(),
- "Couldn't start distributed masters");
- } else {
- RETURN_NOT_OK_PREPEND(StartSingleMaster(), "Couldn't start the single master");
- }
-
- for (int i = 0; i < opts_.num_tablet_servers; i++) {
- RETURN_NOT_OK_PREPEND(AddTabletServer(),
- Substitute("Error adding TS $0", i));
- }
-
- RETURN_NOT_OK_PREPEND(WaitForTabletServerCount(opts_.num_tablet_servers),
- "Waiting for tablet servers to start");
-
- RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
- .set_num_reactors(1)
- .set_max_negotiation_threads(1)
- .Build(&messenger_),
- "Failed to start Messenger for minicluster");
-
- running_ = true;
- return Status::OK();
-}
-
-Status InternalMiniCluster::StartDistributedMasters() {
- CHECK_GT(opts_.num_data_dirs, 0);
- CHECK_GE(opts_.master_rpc_ports.size(), opts_.num_masters);
- CHECK_GT(opts_.master_rpc_ports.size(), 1);
-
- vector<HostPort> master_rpc_addrs = this->master_rpc_addrs();
- LOG(INFO) << "Creating distributed mini masters. Addrs: "
- << HostPort::ToCommaSeparatedString(master_rpc_addrs);
-
- for (int i = 0; i < opts_.num_masters; i++) {
- shared_ptr<MiniMaster> mini_master(new MiniMaster(GetMasterFsRoot(i), master_rpc_addrs[i]));
- mini_master->SetMasterAddresses(master_rpc_addrs);
- RETURN_NOT_OK_PREPEND(mini_master->Start(), Substitute("Couldn't start follower $0", i));
- VLOG(1) << "Started MiniMaster with UUID " << mini_master->permanent_uuid()
- << " at index " << i;
- mini_masters_.push_back(std::move(mini_master));
- }
- int i = 0;
- for (const shared_ptr<MiniMaster>& master : mini_masters_) {
- LOG(INFO) << "Waiting to initialize catalog manager on master " << i++;
- RETURN_NOT_OK_PREPEND(master->WaitForCatalogManagerInit(),
- Substitute("Could not initialize catalog manager on master $0", i));
- }
- return Status::OK();
-}
-
-Status InternalMiniCluster::StartSync() {
- RETURN_NOT_OK(Start());
- int count = 0;
- for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
- RETURN_NOT_OK_PREPEND(tablet_server->WaitStarted(),
- Substitute("TabletServer $0 failed to start.", count));
- count++;
- }
- return Status::OK();
-}
-
-Status InternalMiniCluster::StartSingleMaster() {
- CHECK_GT(opts_.num_data_dirs, 0);
- CHECK_EQ(1, opts_.num_masters);
- CHECK_LE(opts_.master_rpc_ports.size(), 1);
- uint16_t master_rpc_port = 0;
- if (opts_.master_rpc_ports.size() == 1) {
- master_rpc_port = opts_.master_rpc_ports[0];
- }
-
- // start the master (we need the port to set on the servers).
- string bind_ip = GetBindIpForDaemon(MiniCluster::MASTER, /*index=*/ 0, opts_.bind_mode);
- shared_ptr<MiniMaster> mini_master(new MiniMaster(GetMasterFsRoot(0),
- HostPort(std::move(bind_ip), master_rpc_port), opts_.num_data_dirs));
- RETURN_NOT_OK_PREPEND(mini_master->Start(), "Couldn't start master");
- RETURN_NOT_OK(mini_master->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests(
- MonoDelta::FromSeconds(kMasterStartupWaitTimeSeconds)));
- mini_masters_.push_back(std::move(mini_master));
- return Status::OK();
-}
-
-Status InternalMiniCluster::AddTabletServer() {
- if (mini_masters_.empty()) {
- return Status::IllegalState("Master not yet initialized");
- }
- int new_idx = mini_tablet_servers_.size();
-
- uint16_t ts_rpc_port = 0;
- if (opts_.tserver_rpc_ports.size() > new_idx) {
- ts_rpc_port = opts_.tserver_rpc_ports[new_idx];
- }
-
- string bind_ip = GetBindIpForDaemon(MiniCluster::TSERVER, new_idx, opts_.bind_mode);
- gscoped_ptr<MiniTabletServer> tablet_server(new MiniTabletServer(GetTabletServerFsRoot(new_idx),
- HostPort(bind_ip, ts_rpc_port), opts_.num_data_dirs));
-
- // set the master addresses
- tablet_server->options()->master_addresses.clear();
- for (const shared_ptr<MiniMaster>& master : mini_masters_) {
- tablet_server->options()->master_addresses.emplace_back(master->bound_rpc_addr());
- }
- RETURN_NOT_OK(tablet_server->Start())
- mini_tablet_servers_.push_back(shared_ptr<MiniTabletServer>(tablet_server.release()));
- return Status::OK();
-}
-
-void InternalMiniCluster::ShutdownNodes(ClusterNodes nodes) {
- if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::TS_ONLY) {
- for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
- tablet_server->Shutdown();
- }
- mini_tablet_servers_.clear();
- }
- if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::MASTERS_ONLY) {
- for (const shared_ptr<MiniMaster>& master_server : mini_masters_) {
- master_server->Shutdown();
- }
- mini_masters_.clear();
- }
- running_ = false;
-}
-
-MiniMaster* InternalMiniCluster::mini_master(int idx) const {
- CHECK_GE(idx, 0) << "Master idx must be >= 0";
- CHECK_LT(idx, mini_masters_.size()) << "Master idx must be < num masters started";
- return mini_masters_[idx].get();
-}
-
-MiniTabletServer* InternalMiniCluster::mini_tablet_server(int idx) const {
- CHECK_GE(idx, 0) << "TabletServer idx must be >= 0";
- CHECK_LT(idx, mini_tablet_servers_.size()) << "TabletServer idx must be < 'num_ts_started_'";
- return mini_tablet_servers_[idx].get();
-}
-
-vector<HostPort> InternalMiniCluster::master_rpc_addrs() const {
- vector<HostPort> master_rpc_addrs;
- for (int i = 0; i < opts_.master_rpc_ports.size(); i++) {
- master_rpc_addrs.emplace_back(
- GetBindIpForDaemon(MiniCluster::MASTER, i, opts_.bind_mode),
- opts_.master_rpc_ports[i]);
- }
- return master_rpc_addrs;
-}
-
-string InternalMiniCluster::GetMasterFsRoot(int idx) const {
- return JoinPathSegments(opts_.data_root, Substitute("master-$0-root", idx));
-}
-
-string InternalMiniCluster::GetTabletServerFsRoot(int idx) const {
- return JoinPathSegments(opts_.data_root, Substitute("ts-$0-root", idx));
-}
-
-Status InternalMiniCluster::WaitForTabletServerCount(int count) const {
- vector<shared_ptr<master::TSDescriptor>> descs;
- return WaitForTabletServerCount(count, MatchMode::MATCH_TSERVERS, &descs);
-}
-
-Status InternalMiniCluster::WaitForTabletServerCount(int count,
- MatchMode mode,
- vector<shared_ptr<TSDescriptor>>* descs) const {
- std::unordered_set<int> masters_to_search;
- for (int i = 0; i < num_masters(); i++) {
- if (!mini_master(i)->master()->IsShutdown()) {
- masters_to_search.insert(i);
- }
- }
-
- Stopwatch sw;
- sw.start();
- while (sw.elapsed().wall_seconds() < kRegistrationWaitTimeSeconds) {
- for (auto iter = masters_to_search.begin(); iter != masters_to_search.end();) {
- mini_master(*iter)->master()->ts_manager()->GetAllDescriptors(descs);
- int match_count = 0;
- switch (mode) {
- case MatchMode::MATCH_TSERVERS:
- // GetAllDescriptors() may return servers that are no longer online.
- // Do a second step of verification to verify that the descs that we got
- // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
- for (const shared_ptr<TSDescriptor>& desc : *descs) {
- for (const auto& mini_tablet_server : mini_tablet_servers_) {
- const TabletServer* ts = mini_tablet_server->server();
- if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
- ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
- match_count++;
- break;
- }
- }
- }
- break;
- case MatchMode::DO_NOT_MATCH_TSERVERS:
- match_count = descs->size();
- break;
- default:
- LOG(FATAL) << "Invalid match mode";
- }
-
- if (match_count == count) {
- // This master has returned the correct set of tservers.
- iter = masters_to_search.erase(iter);
- } else {
- iter++;
- }
- }
- if (masters_to_search.empty()) {
- // All masters have returned the correct set of tservers.
- LOG(INFO) << Substitute("$0 TS(s) registered with all masters after $1s",
- count, sw.elapsed().wall_seconds());
- return Status::OK();
- }
- SleepFor(MonoDelta::FromMilliseconds(1));
- }
- return Status::TimedOut(Substitute(
- "Timed out waiting for $0 TS(s) to register with all masters", count));
-}
-
-Status InternalMiniCluster::CreateClient(KuduClientBuilder* builder,
- client::sp::shared_ptr<KuduClient>* client) const {
- client::KuduClientBuilder defaults;
- if (builder == nullptr) {
- builder = &defaults;
- }
-
- builder->clear_master_server_addrs();
- for (const shared_ptr<MiniMaster>& master : mini_masters_) {
- CHECK(master);
- builder->add_master_server_addr(master->bound_rpc_addr_str());
- }
- return builder->Build(client);
-}
-
-Status InternalMiniCluster::GetLeaderMasterIndex(int* idx) const {
- const MonoTime deadline = MonoTime::Now() +
- MonoDelta::FromSeconds(kMasterStartupWaitTimeSeconds);
-
- int leader_idx = -1;
- while (MonoTime::Now() < deadline) {
- for (int i = 0; i < num_masters(); i++) {
- master::MiniMaster* mm = mini_master(i);
- if (!mm->is_started() || mm->master()->IsShutdown()) {
- continue;
- }
- master::CatalogManager* catalog = mm->master()->catalog_manager();
- master::CatalogManager::ScopedLeaderSharedLock l(catalog);
- if (l.first_failed_status().ok()) {
- leader_idx = i;
- break;
- }
- }
- if (leader_idx != -1) {
- break;
- }
- SleepFor(MonoDelta::FromMilliseconds(100));
- }
- if (leader_idx == -1) {
- return Status::NotFound("Leader master was not found within deadline");
- }
-
- if (idx) {
- *idx = leader_idx;
- }
- return Status::OK();
-}
-
-std::shared_ptr<rpc::Messenger> InternalMiniCluster::messenger() const {
- return messenger_;
-}
-
-std::shared_ptr<MasterServiceProxy> InternalMiniCluster::master_proxy() const {
- CHECK_EQ(1, mini_masters_.size());
- return master_proxy(0);
-}
-
-std::shared_ptr<MasterServiceProxy> InternalMiniCluster::master_proxy(int idx) const {
- const auto& addr = CHECK_NOTNULL(mini_master(idx))->bound_rpc_addr();
- return std::make_shared<MasterServiceProxy>(messenger_, addr, addr.host());
-}
-
-} // namespace kudu