You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/10/06 22:50:32 UTC
kudu git commit: tool: new action for running mini-clusters
Repository: kudu
Updated Branches:
refs/heads/master 0e713aadb -> 761ce10b7
tool: new action for running mini-clusters
Maintaining Kudu clients across various languages has been an ongoing
maintenance burden. Even when the client is just a thin wrapper around
another client (e.g. Kudu Python bindings), a great deal of work goes into
client testability. In practice, this has meant a bespoke mini cluster
implementation for each language. On the surface this doesn't seem that bad;
we just need to spawn some masters and tservers, right? Well, the work
quickly adds up:
o While the C++ mini cluster is heavily used and has seen many improvements,
the Java mini cluster has not received the same kind of love, and is less
robust as a result. KUDU-1976 is a great example of this deficiency.
o With the inclusion of authn came the addition of a "mini KDC", a special
daemon for Kerberized mini clusters. It was originally implemented in C++
and ported to Java, but has yet to be ported to the Python client; this is
one of the obstacles towards porting full authn support to Python.
o Dan has been prototyping Hive Metastore and Sentry integration for Kudu,
the testing of which will require "mini HMS" and possibly "mini Sentry"
testing implementations in C++, Java, and eventually, Python.
In sum, good support for non-C++ mini clusters is an ongoing commitment and
requires a great deal of work. This work hasn't always been forthcoming, and
the non-C++ clusters are deficient as a result. But it doesn't have to be
this way! Here's a thought: what if we reused the C++ mini cluster for tests
written in these other languages? We could write a "proxy" application whose
job it is to manage the C++ mini cluster and expose a rudimentary API that's
easily programmable from Java and Python.
This patch attempts to do just that. It adds a "mini_cluster" action to the
Kudu CLI which provides a rudimentary control shell that can be used to spin
up an ExternalMiniCluster. The shell is controlled via a wire protocol over
stdin/stdout. The protocol is protobuf-based with optional JSON encoding.
I should add that I like the idea of shipping "mini_cluster" into production
as part of the CLI, as it helps realize the vision of a single Kudu artifact
that can provide Kudu testability for any integrating product.
Change-Id: I0e693921ef780dc4a06e536c6b7408f7f0b252f6
Reviewed-on: http://gerrit.cloudera.org:8080/7853
Reviewed-by: Dan Burkert <da...@apache.org>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/761ce10b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/761ce10b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/761ce10b
Branch: refs/heads/master
Commit: 761ce10b797c4f6345f0b84cfe30bcc92e346893
Parents: 0e713aa
Author: Adar Dembo <ad...@cloudera.com>
Authored: Sun Aug 27 22:44:34 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Oct 6 22:50:18 2017 +0000
----------------------------------------------------------------------
src/kudu/mini-cluster/external_mini_cluster.h | 16 +-
src/kudu/security/test/mini_kdc.cc | 4 +-
src/kudu/tools/CMakeLists.txt | 56 ++-
src/kudu/tools/kudu-tool-test.cc | 326 +++++++++++++++++
src/kudu/tools/tool.proto | 173 +++++++++
src/kudu/tools/tool_action.h | 1 +
src/kudu/tools/tool_action_common.cc | 188 +++++++++-
src/kudu/tools/tool_action_common.h | 66 ++++
src/kudu/tools/tool_action_test.cc | 392 +++++++++++++++++++++
src/kudu/tools/tool_main.cc | 1 +
10 files changed, 1209 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/mini-cluster/external_mini_cluster.h
----------------------------------------------------------------------
diff --git a/src/kudu/mini-cluster/external_mini_cluster.h b/src/kudu/mini-cluster/external_mini_cluster.h
index 05c8106..29c209e 100644
--- a/src/kudu/mini-cluster/external_mini_cluster.h
+++ b/src/kudu/mini-cluster/external_mini_cluster.h
@@ -228,7 +228,11 @@ class ExternalMiniCluster : public MiniCluster {
std::vector<ExternalDaemon*> daemons() const;
MiniKdc* kdc() const {
- return CHECK_NOTNULL(kdc_.get());
+ return kdc_.get();
+ }
+
+ const std::string& data_root() const {
+ return opts_.data_root;
}
int num_tablet_servers() const override {
@@ -424,6 +428,8 @@ class ExternalDaemon : public RefCountedThreadSafe<ExternalDaemon> {
// crash from SIGABRT.
Status WaitForFatal(const MonoDelta& timeout) const;
+ virtual Status Start() = 0;
+ virtual Status Restart() = 0;
virtual void Shutdown();
// Delete files specified by 'wal_dir_' and 'data_dirs_'.
@@ -539,11 +545,11 @@ class ExternalMaster : public ExternalDaemon {
public:
explicit ExternalMaster(ExternalDaemonOptions opts);
- Status Start();
+ virtual Status Start() override;
// Restarts the daemon.
// Requires that it has previously been shutdown.
- Status Restart() WARN_UNUSED_RESULT;
+ virtual Status Restart() override WARN_UNUSED_RESULT;
// Blocks until the master's catalog manager is initialized and responding to
// RPCs.
@@ -561,11 +567,11 @@ class ExternalTabletServer : public ExternalDaemon {
ExternalTabletServer(ExternalDaemonOptions opts,
std::vector<HostPort> master_addrs);
- Status Start();
+ virtual Status Start() override;
// Restarts the daemon.
// Requires that it has previously been shutdown.
- Status Restart() WARN_UNUSED_RESULT;
+ virtual Status Restart() override WARN_UNUSED_RESULT;
private:
const std::vector<HostPort> master_addrs_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/security/test/mini_kdc.cc
----------------------------------------------------------------------
diff --git a/src/kudu/security/test/mini_kdc.cc b/src/kudu/security/test/mini_kdc.cc
index 95213f5..7830a2b 100644
--- a/src/kudu/security/test/mini_kdc.cc
+++ b/src/kudu/security/test/mini_kdc.cc
@@ -162,7 +162,9 @@ Status MiniKdc::Start() {
}
Status MiniKdc::Stop() {
- CHECK(kdc_process_);
+ if (!kdc_process_) {
+ return Status::OK();
+ }
VLOG(1) << "Stopping KDC";
unique_ptr<Subprocess> proc(kdc_process_.release());
RETURN_NOT_OK(proc->Kill(SIGKILL));
http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index c7ebb41..bcfc9c2 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -16,19 +16,41 @@
# under the License.
set(LINK_LIBS
- kudu_client
- log
+ cfile
consensus
- tserver
+ gutil
+ kudu_client
kudu_common
kudu_fs
kudu_util
- gutil
- cfile
+ log
tablet
+ tserver
${KUDU_BASE_LIBS}
)
+#######################################
+# tool_proto
+#######################################
+
+PROTOBUF_GENERATE_CPP(
+ TOOL_PROTO_SRCS TOOL_PROTO_HDRS TOOL_PROTO_TGTS
+ SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../..
+ BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../..
+ PROTO_FILES tool.proto)
+
+add_library(tool_proto
+ ${TOOL_PROTO_SRCS}
+ ${TOOL_PROTO_HDRS})
+target_link_libraries(tool_proto
+ kudu_common_proto
+ protobuf
+ wire_protocol_proto)
+
+#######################################
+# kudu_tools_util
+#######################################
+
add_library(kudu_tools_util
color.cc
data_gen_util.cc
@@ -36,12 +58,21 @@ add_library(kudu_tools_util
tool_action_common.cc
)
target_link_libraries(kudu_tools_util
+ tool_proto
${LINK_LIBS})
+#######################################
+# create-demo-table
+#######################################
+
add_executable(create-demo-table create-demo-table.cc)
target_link_libraries(create-demo-table
${LINK_LIBS})
+#######################################
+# ksck
+#######################################
+
add_library(ksck
ksck.cc
ksck_remote.cc
@@ -56,6 +87,10 @@ target_link_libraries(ksck
${KUDU_BASE_LIBS}
)
+#######################################
+# kudu
+#######################################
+
add_executable(kudu
tool_action_cluster.cc
tool_action_fs.cc
@@ -66,6 +101,7 @@ add_executable(kudu
tool_action_remote_replica.cc
tool_action_table.cc
tool_action_tablet.cc
+ tool_action_test.cc
tool_action_tserver.cc
tool_action_wal.cc
tool_main.cc
@@ -83,11 +119,17 @@ target_link_libraries(kudu
kudu_util
log
master
+ mini_cluster
tablet
+ tool_proto
tserver
${KUDU_BASE_LIBS}
)
+#######################################
+# kudu_tools_test_util
+#######################################
+
add_library(kudu_tools_test_util
tool_test_util.cc
)
@@ -95,6 +137,10 @@ target_link_libraries(kudu_tools_test_util
kudu_util
)
+#######################################
+# Unit tests
+#######################################
+
set(KUDU_TEST_LINK_LIBS
ksck
kudu_tools_util
http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 1779cef..7590d32 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -25,6 +25,7 @@
#include <memory>
#include <sstream>
#include <string>
+#include <tuple> // IWYU pragma: keep
#include <unordered_map>
#include <utility>
#include <vector>
@@ -85,6 +86,8 @@
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
+#include "kudu/tools/tool.pb.h"
+#include "kudu/tools/tool_action_common.h"
#include "kudu/tools/tool_test_util.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
@@ -381,6 +384,7 @@ TEST_F(ToolTest, TestHelpXML) {
"remote_replica",
"table",
"tablet",
+ "test",
"tserver",
"wal",
"dump",
@@ -404,6 +408,7 @@ TEST_F(ToolTest, TestTopLevelHelp) {
"remote_replica.*tablet replicas on a Kudu Tablet Server",
"table.*Kudu tables",
"tablet.*Kudu tablets",
+ "test.*test actions",
"tserver.*Kudu Tablet Server",
"wal.*write-ahead log"
};
@@ -526,6 +531,12 @@ TEST_F(ToolTest, TestModeHelp) {
NO_FATALS(RunTestHelp("tablet", kTabletModeRegexes));
}
{
+ const vector<string> kTestModeRegexes = {
+ "mini_cluster.*Spawn a control shell"
+ };
+ NO_FATALS(RunTestHelp("test", kTestModeRegexes));
+ }
+ {
const vector<string> kChangeConfigModeRegexes = {
"add_replica.*Add a new replica",
"change_replica_type.*Change the type of an existing replica",
@@ -1741,5 +1752,320 @@ TEST_F(ToolTest, TestMasterList) {
ASSERT_STR_CONTAINS(out, master->bound_rpc_hostport().ToString());
}
+// This test is parameterized on the serialization mode and Kerberos.
+class ControlShellToolTest :
+ public ToolTest,
+ public ::testing::WithParamInterface<std::tuple<ControlShellProtocol::SerializationMode,
+ bool>> {
+ public:
+ virtual void SetUp() override {
+ ToolTest::SetUp();
+
+ // Start the control shell.
+ string mode;
+ switch (serde_mode()) {
+ case ControlShellProtocol::SerializationMode::JSON: mode = "json"; break;
+ case ControlShellProtocol::SerializationMode::PB: mode = "pb"; break;
+ default: LOG(FATAL) << "Unknown serialization mode";
+ }
+ shell_.reset(new Subprocess({
+ tool_path_,
+ "test",
+ "mini_cluster",
+ Substitute("--serialization=$0", mode)
+ }));
+ shell_->ShareParentStdin(false);
+ shell_->ShareParentStdout(false);
+ ASSERT_OK(shell_->Start());
+
+ // Start the protocol interface.
+ proto_.reset(new ControlShellProtocol(serde_mode(),
+ ControlShellProtocol::CloseMode::CLOSE_ON_DESTROY,
+ shell_->ReleaseChildStdoutFd(),
+ shell_->ReleaseChildStdinFd()));
+ }
+
+ virtual void TearDown() override {
+ if (proto_) {
+ // Stopping the protocol interface will close the fds, causing the shell
+ // to exit on its own.
+ proto_.reset();
+ ASSERT_OK(shell_->Wait());
+ int exit_status;
+ ASSERT_OK(shell_->GetExitStatus(&exit_status));
+ ASSERT_EQ(0, exit_status);
+ }
+ ToolTest::TearDown();
+ }
+
+ protected:
+ // Send a control message to the shell and receive its response.
+ Status SendReceive(const ControlShellRequestPB& req, ControlShellResponsePB* resp) {
+ RETURN_NOT_OK(proto_->SendMessage(req));
+ RETURN_NOT_OK(proto_->ReceiveMessage(resp));
+ if (resp->has_error()) {
+ return StatusFromPB(resp->error());
+ }
+ return Status::OK();
+ }
+
+ ControlShellProtocol::SerializationMode serde_mode() const {
+ return ::testing::get<0>(GetParam());
+ }
+
+ bool enable_kerberos() const {
+ return ::testing::get<1>(GetParam());
+ }
+
+ unique_ptr<Subprocess> shell_;
+ unique_ptr<ControlShellProtocol> proto_;
+};
+
+INSTANTIATE_TEST_CASE_P(SerializationModes, ControlShellToolTest,
+ ::testing::Combine(::testing::Values(
+ ControlShellProtocol::SerializationMode::PB,
+ ControlShellProtocol::SerializationMode::JSON),
+ ::testing::Bool()));
+
+TEST_P(ControlShellToolTest, TestControlShell) {
+ const int kNumMasters = 1;
+ const int kNumTservers = 3;
+
+ // Create an illegal cluster first, to make sure that the shell continues to
+ // work in the event of an error.
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_create_cluster()->set_num_masters(4);
+ ASSERT_OK(proto_->SendMessage(req));
+ ASSERT_OK(proto_->ReceiveMessage(&resp));
+ ASSERT_TRUE(resp.has_error());
+ Status s = StatusFromPB(resp.error());
+ ASSERT_TRUE(s.IsInvalidArgument());
+ ASSERT_STR_CONTAINS(s.ToString(), "only one or three masters are supported");
+ }
+
+ // Create a cluster.
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_create_cluster()->set_data_root(JoinPathSegments(
+ test_dir_, "minicluster-data"));
+ req.mutable_create_cluster()->set_num_masters(kNumMasters);
+ req.mutable_create_cluster()->set_num_tservers(kNumTservers);
+ req.mutable_create_cluster()->set_enable_kerberos(enable_kerberos());
+ ASSERT_OK(SendReceive(req, &resp));
+ }
+
+ // Try creating a second cluster. It should fail.
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_create_cluster()->set_data_root(JoinPathSegments(
+ test_dir_, "minicluster-data"));
+ ASSERT_OK(proto_->SendMessage(req));
+ ASSERT_OK(proto_->ReceiveMessage(&resp));
+ ASSERT_TRUE(resp.has_error());
+ Status s = StatusFromPB(resp.error());
+ ASSERT_TRUE(s.IsInvalidArgument());
+ ASSERT_STR_CONTAINS(s.ToString(), "cluster already created");
+ }
+
+ // Start it.
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_start_cluster();
+ ASSERT_OK(SendReceive(req, &resp));
+ }
+
+ // Get the masters.
+ vector<DaemonInfoPB> masters;
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_get_masters();
+ ASSERT_OK(SendReceive(req, &resp));
+ ASSERT_TRUE(resp.has_get_masters());
+ ASSERT_EQ(kNumMasters, resp.get_masters().masters_size());
+ masters.assign(resp.get_masters().masters().begin(),
+ resp.get_masters().masters().end());
+ }
+
+ if (enable_kerberos()) {
+ // Set up the KDC environment variables so that a client can authenticate
+ // to this cluster.
+ //
+ // Normally this is handled automatically by the cluster's MiniKdc, but
+ // since the cluster is running in a subprocess, we have to do it manually.
+ unordered_map<string, string> kdc_env_vars;
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_get_kdc_env_vars();
+ ASSERT_OK(SendReceive(req, &resp));
+ ASSERT_TRUE(resp.has_get_kdc_env_vars());
+ for (const auto& e : resp.get_kdc_env_vars().env_vars()) {
+ PCHECK(setenv(e.first.c_str(), e.second.c_str(), 1) == 0);
+ }
+ } else {
+ // get_kdc_env_vars should fail on a non-Kerberized cluster.
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_get_kdc_env_vars();
+ ASSERT_OK(proto_->SendMessage(req));
+ ASSERT_OK(proto_->ReceiveMessage(&resp));
+ ASSERT_TRUE(resp.has_error());
+ Status s = StatusFromPB(resp.error());
+ ASSERT_TRUE(s.IsNotFound());
+ ASSERT_STR_CONTAINS(s.ToString(), "kdc not found");
+ }
+
+ // Create a table.
+ {
+ client::KuduClientBuilder client_builder;
+ for (const auto& e : masters) {
+ HostPort hp;
+ ASSERT_OK(HostPortFromPB(e.bound_rpc_address(), &hp));
+ client_builder.add_master_server_addr(hp.ToString());
+ }
+ shared_ptr<client::KuduClient> client;
+ ASSERT_OK(client_builder.Build(&client));
+ client::KuduSchemaBuilder schema_builder;
+ schema_builder.AddColumn("foo")
+ ->Type(client::KuduColumnSchema::INT32)
+ ->NotNull()
+ ->PrimaryKey();
+ client::KuduSchema schema;
+ ASSERT_OK(schema_builder.Build(&schema));
+ unique_ptr<client::KuduTableCreator> table_creator(
+ client->NewTableCreator());
+ ASSERT_OK(table_creator->table_name("test")
+ .schema(&schema)
+ .set_range_partition_columns({ "foo" })
+ .Create());
+ }
+
+ // Get the tservers.
+ vector<DaemonInfoPB> tservers;
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_get_tservers();
+ ASSERT_OK(SendReceive(req, &resp));
+ ASSERT_TRUE(resp.has_get_tservers());
+ ASSERT_EQ(kNumTservers, resp.get_tservers().tservers_size());
+ tservers.assign(resp.get_tservers().tservers().begin(),
+ resp.get_tservers().tservers().end());
+ }
+
+ // Stop a tserver.
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ *req.mutable_stop_daemon()->mutable_id() = tservers[0].id();
+ ASSERT_OK(SendReceive(req, &resp));
+ }
+
+ // Restart it.
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ *req.mutable_start_daemon()->mutable_id() = tservers[0].id();
+ ASSERT_OK(SendReceive(req, &resp));
+ }
+
+ // Stop a master.
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ *req.mutable_stop_daemon()->mutable_id() = masters[0].id();
+ ASSERT_OK(SendReceive(req, &resp));
+ }
+
+ // Restart it.
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ *req.mutable_start_daemon()->mutable_id() = masters[0].id();
+ ASSERT_OK(SendReceive(req, &resp));
+ }
+
+ // Restart some non-existent daemons.
+ vector<DaemonIdentifierPB> daemons_to_restart;
+ {
+ // Unknown daemon type.
+ DaemonIdentifierPB id;
+ id.set_type(UNKNOWN_DAEMON);
+ daemons_to_restart.emplace_back(std::move(id));
+ }
+ {
+ // Tablet server #5.
+ DaemonIdentifierPB id;
+ id.set_type(TSERVER);
+ id.set_index(5);
+ daemons_to_restart.emplace_back(std::move(id));
+ }
+ {
+ // Master without an index.
+ DaemonIdentifierPB id;
+ id.set_type(MASTER);
+ daemons_to_restart.emplace_back(std::move(id));
+ }
+ if (!enable_kerberos()) {
+ // KDC for a non-Kerberized cluster.
+ DaemonIdentifierPB id;
+ id.set_type(KDC);
+ daemons_to_restart.emplace_back(std::move(id));
+ }
+ for (const auto& daemon : daemons_to_restart) {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ *req.mutable_start_daemon()->mutable_id() = daemon;
+ ASSERT_OK(proto_->SendMessage(req));
+ ASSERT_OK(proto_->ReceiveMessage(&resp));
+ ASSERT_TRUE(resp.has_error());
+ }
+
+ // Stop the cluster.
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_stop_cluster();
+ ASSERT_OK(SendReceive(req, &resp));
+ }
+
+ // Restart it.
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_start_cluster();
+ ASSERT_OK(SendReceive(req, &resp));
+ }
+
+ if (enable_kerberos()) {
+ // Restart the KDC.
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_stop_daemon()->mutable_id()->set_type(KDC);
+ ASSERT_OK(SendReceive(req, &resp));
+ }
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_start_daemon()->mutable_id()->set_type(KDC);
+ ASSERT_OK(SendReceive(req, &resp));
+ }
+ }
+
+ // Destroy the cluster.
+ {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+ req.mutable_destroy_cluster();
+ ASSERT_OK(SendReceive(req, &resp));
+ }
+}
+
} // namespace tools
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/tool.proto
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool.proto b/src/kudu/tools/tool.proto
new file mode 100644
index 0000000..ff0b0db
--- /dev/null
+++ b/src/kudu/tools/tool.proto
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu.tools;
+
+option java_package = "org.apache.kudu.tools";
+
+import "kudu/common/common.proto";
+import "kudu/common/wire_protocol.proto";
+
+// Creates a new ExternalMiniCluster.
+//
+// The below fields generally map to options from ExternalMiniClusterOptions.
+// If not provided, the defaults from that class will be used instead.
+//
+// Only one cluster may be created at a time.
+message CreateClusterRequestPB {
+ // The desired number of masters.
+ //
+ // Currently only one or three masters are supported.
+ optional int32 num_masters = 1;
+
+ // The desired number of tablet servers.
+ optional int32 num_tservers = 2;
+
+ // Whether or not the cluster should be Kerberized.
+ optional bool enable_kerberos = 3;
+
+ // The directory where the cluster's data and logs should be placed.
+ optional string data_root = 4;
+
+ // Any additional gflags for masters or tablet servers. Each should be in a
+ // a format that's expected by gflag (i.e. "--foo=bar").
+ repeated string extra_master_flags = 5;
+ repeated string extra_tserver_flags = 6;
+}
+
+// Destroys a cluster created via 'create_cluster'.
+message DestroyClusterRequestPB {}
+
+// Starts all daemons in a newly created cluster, or restart all daemons
+// in a stopped cluster.
+//
+// No-op for already started clusters.
+message StartClusterRequestPB {}
+
+// Stops a cluster.
+//
+// No-op for already stopped clusters.
+message StopClusterRequestPB {}
+
+// Type of daemon managed in a cluster.
+enum DaemonType {
+ UNKNOWN_DAEMON = 0;
+ MASTER = 1;
+ TSERVER = 2;
+ KDC = 3;
+}
+
+// Identifier for a cluster daemon, unique to the cluster.
+message DaemonIdentifierPB {
+ // Whether the daemon is amaster, tserver, or whatever.
+ optional DaemonType type = 1;
+
+ // Index of the daemon in the cluster, if the cluster has multiple daemons
+ // of this type.
+ optional uint32 index = 2;
+}
+
+// Restart a stopped daemon.
+message StartDaemonRequestPB {
+ // The identifier of the daemon to be restarted. This identifier is unique
+ // and immutable for the lifetime of the cluster.
+ optional DaemonIdentifierPB id = 1;
+}
+
+// Stops a started daemon.
+//
+// No-op for already stopped daemons.
+message StopDaemonRequestPB {
+ // The identifier for the daemon to be stopped. This identifier is unique
+ // and immutable for the lifetime of the cluster.
+ optional DaemonIdentifierPB id = 1;
+}
+
+// Daemon information.
+message DaemonInfoPB {
+ // Unique identifier of the daemon.
+ optional DaemonIdentifierPB id = 1;
+
+ // Daemon's bound RPC address.
+ optional HostPortPB bound_rpc_address = 2;
+}
+
+// Response to a GetMastersRequestPB.
+message GetMastersResponsePB {
+ // List of masters.
+ repeated DaemonInfoPB masters = 1;
+}
+
+// Gets information on each started master.
+message GetMastersRequestPB {}
+
+// Response to a GetTServersRequestPB.
+message GetTServersResponsePB {
+ // List of tablet servers.
+ repeated DaemonInfoPB tservers = 1;
+}
+
+// Gets information on each started tablet server.
+message GetTServersRequestPB {}
+
+// Response to a GetKDCEnvVarsRequestPB.
+message GetKDCEnvVarsResponsePB {
+
+ // Environment variables, mapped from key to value.
+ map<string, string> env_vars = 1;
+}
+
+// Gets all environment variables another process may need in order to
+// communicate with this cluster's KDC.
+//
+// It is an error to call this on a non-Kerberized cluster.
+message GetKDCEnvVarsRequestPB {}
+
+// Sent by the control shell in response to a control shell command request.
+message ControlShellResponsePB {
+
+ // Only set if there was some kind of shell-side error.
+ optional AppStatusPB error = 1;
+
+ // The command response. Only set for commands that actually expect a response.
+ oneof response {
+ GetMastersResponsePB get_masters = 2;
+ GetTServersResponsePB get_tservers = 3;
+ GetKDCEnvVarsResponsePB get_kdc_env_vars = 4;
+ }
+}
+
+// Command sent to the control shell.
+//
+// Because the control shell communicates via pipe and not krpc, we can't make
+// use of service dispatch and must instead multiplex all command requests and
+// responses via ControlShellRequestPB and ControlShellResponsePB respectively.
+message ControlShellRequestPB {
+
+ // The command request.
+ oneof request {
+ CreateClusterRequestPB create_cluster = 1;
+ DestroyClusterRequestPB destroy_cluster = 2;
+ StartClusterRequestPB start_cluster = 3;
+ StopClusterRequestPB stop_cluster = 4;
+ StartDaemonRequestPB start_daemon = 5;
+ StopDaemonRequestPB stop_daemon = 6;
+ GetMastersRequestPB get_masters = 7;
+ GetTServersRequestPB get_tservers = 8;
+ GetKDCEnvVarsRequestPB get_kdc_env_vars = 9;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/tool_action.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action.h b/src/kudu/tools/tool_action.h
index cdb6d79..8a86e36 100644
--- a/src/kudu/tools/tool_action.h
+++ b/src/kudu/tools/tool_action.h
@@ -316,6 +316,7 @@ std::unique_ptr<Mode> BuildPerfMode();
std::unique_ptr<Mode> BuildRemoteReplicaMode();
std::unique_ptr<Mode> BuildTableMode();
std::unique_ptr<Mode> BuildTabletMode();
+std::unique_ptr<Mode> BuildTestMode();
std::unique_ptr<Mode> BuildTServerMode();
std::unique_ptr<Mode> BuildWalMode();
http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/tool_action_common.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index fea476d..35471ee 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -17,7 +17,10 @@
#include "kudu/tools/tool_action_common.h"
+#include <unistd.h>
+
#include <algorithm>
+#include <cerrno>
#include <cstddef>
#include <iomanip>
#include <iostream>
@@ -30,10 +33,10 @@
#include <boost/algorithm/string/predicate.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
+#include <google/protobuf/util/json_util.h>
-#include "kudu/client/client.h"
#include "kudu/client/client-internal.h" // IWYU pragma: keep
-#include "kudu/gutil/map-util.h"
+#include "kudu/client/client.h"
#include "kudu/client/shared_ptr.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row_operations.h"
@@ -44,19 +47,24 @@
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid.pb.h"
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/master.proxy.h" // IWYU pragma: keep
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/server/server_base.proxy.h"
+#include "kudu/tools/tool.pb.h" // IWYU pragma: keep
#include "kudu/tools/tool_action.h"
#include "kudu/tserver/tserver.pb.h"
-#include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
#include "kudu/tserver/tserver_admin.proxy.h" // IWYU pragma: keep
+#include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
+#include "kudu/util/faststring.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
@@ -136,6 +144,7 @@ using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
+using strings::Substitute;
using tserver::TabletServerAdminServiceProxy;
using tserver::TabletServerServiceProxy;
using tserver::WriteRequestPB;
@@ -578,5 +587,178 @@ Status LeaderMasterProxy::SyncRpc(const ListMastersRequestPB& req,
ListMastersResponsePB*,
RpcController*)>& func);
+const int ControlShellProtocol::kMaxMessageBytes = 1024 * 1024;
+
+ControlShellProtocol::ControlShellProtocol(SerializationMode serialization_mode,
+ CloseMode close_mode,
+ int read_fd,
+ int write_fd)
+ : serialization_mode_(serialization_mode),
+ close_mode_(close_mode),
+ read_fd_(read_fd),
+ write_fd_(write_fd) {
+}
+
+ControlShellProtocol::~ControlShellProtocol() {
+ if (close_mode_ == CloseMode::CLOSE_ON_DESTROY) {
+ close(read_fd_);
+ close(write_fd_);
+ }
+}
+
+template <class M>
+Status ControlShellProtocol::ReceiveMessage(M* message) {
+ switch (serialization_mode_) {
+ case SerializationMode::JSON:
+ {
+ // Read and accumulate one byte at a time, looking for the newline.
+ //
+ // TODO(adar): it would be more efficient to read a chunk of data, look
+ // for a newline, and if found, store the remainder for the next message.
+ faststring buf;
+ faststring one_byte;
+ one_byte.resize(1);
+ while (true) {
+ RETURN_NOT_OK_PREPEND(DoRead(&one_byte), "unable to receive message byte");
+ if (one_byte[0] == '\n') {
+ break;
+ }
+ buf.push_back(one_byte[0]);
+ }
+
+ // Parse the JSON-encoded message.
+ const auto& google_status =
+ google::protobuf::util::JsonStringToMessage(buf.ToString(), message);
+ if (!google_status.ok()) {
+ return Status::InvalidArgument(
+ Substitute("unable to parse JSON: $0", buf.ToString()),
+ google_status.error_message().ToString());
+ }
+ break;
+ }
+ case SerializationMode::PB:
+ {
+ // Read four bytes of size (big-endian).
+ faststring size_buf;
+ size_buf.resize(sizeof(uint32_t));
+ RETURN_NOT_OK_PREPEND(DoRead(&size_buf), "unable to receive message size");
+ uint32_t body_size = NetworkByteOrder::Load32(size_buf.data());
+
+ if (body_size > kMaxMessageBytes) {
+ return Status::IOError(
+ Substitute("message size ($0) exceeds maximum message size ($1)",
+ body_size, kMaxMessageBytes));
+ }
+
+ // Read the variable size body.
+ faststring body_buf;
+ body_buf.resize(body_size);
+ RETURN_NOT_OK_PREPEND(DoRead(&body_buf), "unable to receive message body");
+
+ // Parse the body into a PB request.
+ RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(
+ message, body_buf.data(), body_buf.length()),
+ Substitute("unable to parse PB: $0", body_buf.ToString()));
+ break;
+ }
+ default: LOG(FATAL) << "Unknown mode";
+ }
+
+ VLOG(1) << "Received message: " << pb_util::SecureDebugString(*message);
+ return Status::OK();
+}
+
+template <class M>
+Status ControlShellProtocol::SendMessage(const M& message) {
+ VLOG(1) << "Sending message: " << pb_util::SecureDebugString(message);
+
+ faststring buf;
+ switch (serialization_mode_) {
+ case SerializationMode::JSON:
+ {
+ string serialized;
+ const auto& google_status =
+ google::protobuf::util::MessageToJsonString(message, &serialized);
+ if (!google_status.ok()) {
+ return Status::InvalidArgument(Substitute(
+ "unable to serialize JSON: $0", pb_util::SecureDebugString(message)),
+ google_status.error_message().ToString());
+ }
+
+ buf.append(serialized);
+ buf.append("\n");
+ break;
+ }
+ case SerializationMode::PB:
+ {
+ size_t msg_size = message.ByteSizeLong();
+ buf.resize(sizeof(uint32_t) + msg_size);
+ NetworkByteOrder::Store32(buf.data(), msg_size);
+ if (!message.SerializeWithCachedSizesToArray(buf.data() + sizeof(uint32_t))) {
+ return Status::Corruption("failed to serialize PB to array");
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ RETURN_NOT_OK_PREPEND(DoWrite(buf), "unable to send message");
+ return Status::OK();
+}
+
+Status ControlShellProtocol::DoRead(faststring* buf) {
+ uint8_t* pos = buf->data();
+ size_t rem = buf->length();
+ while (rem > 0) {
+ ssize_t r = read(read_fd_, pos, rem);
+ if (r == -1) {
+ if (errno == EINTR) {
+ // Interrupted by a signal, retry.
+ continue;
+ }
+ return Status::IOError("Error reading from pipe", "", errno);
+ }
+ if (r == 0) {
+ return Status::EndOfFile("Other end of pipe was closed");
+ }
+ DCHECK_GE(rem, r);
+ rem -= r;
+ pos += r;
+ }
+ return Status::OK();
+}
+
+Status ControlShellProtocol::DoWrite(const faststring& buf) {
+ const uint8_t* pos = buf.data();
+ size_t rem = buf.length();
+ while (rem > 0) {
+ ssize_t r = write(write_fd_, pos, rem);
+ if (r == -1) {
+ if (errno == EINTR) {
+ // Interrupted by a signal, retry.
+ continue;
+ }
+ if (errno == EPIPE) {
+ return Status::EndOfFile("Other end of pipe was closed");
+ }
+ return Status::IOError("Error writing to pipe", "", errno);
+ }
+ DCHECK_GE(rem, r);
+ rem -= r;
+ pos += r;
+ }
+ return Status::OK();
+}
+
+// Explicit specialization for callers outside this compilation unit.
+template
+Status ControlShellProtocol::ReceiveMessage(ControlShellRequestPB* message);
+template
+Status ControlShellProtocol::ReceiveMessage(ControlShellResponsePB* message);
+template
+Status ControlShellProtocol::SendMessage(const ControlShellRequestPB& message);
+template
+Status ControlShellProtocol::SendMessage(const ControlShellResponsePB& message);
+
} // namespace tools
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/tool_action_common.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_common.h b/src/kudu/tools/tool_action_common.h
index d9698ed..ea08643 100644
--- a/src/kudu/tools/tool_action_common.h
+++ b/src/kudu/tools/tool_action_common.h
@@ -24,6 +24,7 @@
#include <vector>
#include "kudu/client/shared_ptr.h"
+#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/status.h"
@@ -35,6 +36,8 @@ class function;
namespace kudu {
+class faststring;
+
namespace client {
class KuduClient;
} // namespace client
@@ -167,5 +170,68 @@ class LeaderMasterProxy {
client::sp::shared_ptr<client::KuduClient> client_;
};
+// Facilitates sending and receiving messages with the tool control shell.
+//
+// May be used by a subprocess communicating with the shell via pipe, or by the
+// shell itself to read/write messages via stdin/stdout respectively.
+class ControlShellProtocol {
+ public:
+ enum class SerializationMode {
+ // Each message is serialized as a four byte little-endian size followed by
+ // the protobuf-encoded message itself.
+ PB,
+
+ // Each message is serialized into a protobuf-like JSON representation
+ // terminated with a newline character.
+ JSON,
+ };
+
+ // Whether the provided fds are closed at class destruction time.
+ enum class CloseMode {
+ CLOSE_ON_DESTROY,
+ NO_CLOSE_ON_DESTROY,
+ };
+
+ // Constructs a new protocol instance.
+ //
+ // If 'close_mode' is CLOSE_ON_DESTROY, the instance has effectively taken
+ // control of 'read_fd' and 'write_fd' and the caller shouldn't use them.
+ ControlShellProtocol(SerializationMode serialization_mode,
+ CloseMode close_mode,
+ int read_fd,
+ int write_fd);
+
+ ~ControlShellProtocol();
+
+ // Receives a protobuf message, blocking if the pipe is empty.
+ //
+ // Returns EndOfFile if the writer on the other end of the pipe was closed.
+ //
+ // Returns an error if serialization_mode_ is PB and the received message
+ // sizes exceeds kMaxMessageBytes.
+ template <class M>
+ Status ReceiveMessage(M* message);
+
+ // Sends a protobuf message, blocking if the pipe is full.
+ //
+ // Returns EndOfFile if the reader on the other end of the pipe was closed.
+ template <class M>
+ Status SendMessage(const M& message);
+
+ private:
+ // Private helpers to drive actual pipe reading and writing.
+ Status DoRead(faststring* buf);
+ Status DoWrite(const faststring& buf);
+
+ static const int kMaxMessageBytes;
+
+ const SerializationMode serialization_mode_;
+ const CloseMode close_mode_;
+ const int read_fd_;
+ const int write_fd_;
+
+ DISALLOW_COPY_AND_ASSIGN(ControlShellProtocol);
+};
+
} // namespace tools
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/tool_action_test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_test.cc b/src/kudu/tools/tool_action_test.cc
new file mode 100644
index 0000000..0373577
--- /dev/null
+++ b/src/kudu/tools/tool_action_test.cc
@@ -0,0 +1,392 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/tools/tool_action.h"
+
+#include <unistd.h>
+
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <google/protobuf/stubs/status.h>
+#include <google/protobuf/stubs/stringpiece.h>
+#include <google/protobuf/util/json_util.h>
+
+#include "kudu/common/common.pb.h"
+#include "kudu/common/wire_protocol.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
+#include "kudu/security/test/mini_kdc.h"
+#include "kudu/tools/tool.pb.h"
+#include "kudu/tools/tool_action_common.h"
+#include "kudu/util/env.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/path_util.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+
+DEFINE_string(serialization, "json", "Serialization method to be used by the "
+ "control shell. Valid values are 'json' (protobuf serialized "
+ "into JSON and terminated with a newline character) or 'pb' "
+ "(four byte protobuf message length in little endian followed by "
+ "the protobuf message itself).");
+DEFINE_validator(serialization, [](const char* /*n*/, const std::string& v) {
+ return boost::iequals(v, "pb") ||
+ boost::iequals(v, "json");
+});
+
+namespace kudu {
+
+namespace tools {
+
+using cluster::ExternalDaemon;
+using cluster::ExternalMiniCluster;
+using cluster::ExternalMiniClusterOptions;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace {
+
+Status MakeDataRoot(string* data_root) {
+ // The ExternalMiniCluster can't generate the data root on our behalf because
+ // we're not running inside a gtest. So we'll use this approach instead,
+ // which is what the Java external mini cluster used for a long time.
+ const char* tmpdir = getenv("TEST_TMPDIR");
+ string tmpdir_str = tmpdir ? tmpdir : Substitute("/tmp/kudutest-$0", getuid());
+ string root = JoinPathSegments(tmpdir_str, "minicluster-data");
+ RETURN_NOT_OK(env_util::CreateDirsRecursively(Env::Default(), root));
+
+ *data_root = root;
+ return Status::OK();
+}
+
+Status CheckClusterExists(const unique_ptr<ExternalMiniCluster>& cluster) {
+ if (!cluster) {
+ return Status::NotFound("cluster not found");
+ }
+ return Status::OK();
+}
+
+Status FindDaemon(const unique_ptr<ExternalMiniCluster>& cluster,
+ const DaemonIdentifierPB& id,
+ ExternalDaemon** daemon,
+ MiniKdc** kdc) {
+ RETURN_NOT_OK(CheckClusterExists(cluster));
+
+ if (!id.has_type()) {
+ return Status::InvalidArgument("request is missing daemon type");
+ }
+
+ switch (id.type()) {
+ case MASTER:
+ if (!id.has_index()) {
+ return Status::InvalidArgument("request is missing daemon index");
+ }
+ if (id.index() >= cluster->num_masters()) {
+ return Status::NotFound(Substitute("no master with index $0",
+ id.index()));
+ }
+ *daemon = cluster->master(id.index());
+ *kdc = nullptr;
+ break;
+ case TSERVER:
+ if (!id.has_index()) {
+ return Status::InvalidArgument("request is missing daemon index");
+ }
+ if (id.index() >= cluster->num_tablet_servers()) {
+ return Status::NotFound(Substitute("no tserver with index $0",
+ id.index()));
+ }
+ *daemon = cluster->tablet_server(id.index());
+ *kdc = nullptr;
+ break;
+ case KDC:
+ if (!cluster->kdc()) {
+ return Status::NotFound("kdc not found");
+ }
+ *daemon = nullptr;
+ *kdc = cluster->kdc();
+ break;
+ default:
+ return Status::InvalidArgument(
+ Substitute("unknown daemon type: $0", DaemonType_Name(id.type())));
+ }
+ return Status::OK();
+}
+
+Status ProcessRequest(const ControlShellRequestPB& req,
+ ControlShellResponsePB* resp,
+ unique_ptr<ExternalMiniCluster>* cluster) {
+ switch (req.request_case()) {
+ case ControlShellRequestPB::kCreateCluster:
+ {
+ if (*cluster) {
+ RETURN_NOT_OK(Status::InvalidArgument("cluster already created"));
+ }
+ const CreateClusterRequestPB& cc = req.create_cluster();
+ ExternalMiniClusterOptions opts;
+ if (cc.has_num_masters()) {
+ if (cc.num_masters() != 1 && cc.num_masters() != 3) {
+ RETURN_NOT_OK(Status::InvalidArgument(
+ "only one or three masters are supported"));
+ }
+ opts.num_masters = cc.num_masters();
+ }
+ if (cc.has_num_tservers()) {
+ opts.num_tablet_servers = cc.num_tservers();
+ }
+ opts.enable_kerberos = cc.enable_kerberos();
+ if (cc.has_data_root()) {
+ opts.data_root = cc.data_root();
+ } else {
+ RETURN_NOT_OK(MakeDataRoot(&opts.data_root));
+ }
+ opts.extra_master_flags.assign(cc.extra_master_flags().begin(),
+ cc.extra_master_flags().end());
+ opts.extra_tserver_flags.assign(cc.extra_tserver_flags().begin(),
+ cc.extra_tserver_flags().end());
+ if (opts.num_masters > 1) {
+ opts.master_rpc_ports = { 11030, 11031, 11032 };
+ }
+ if (opts.enable_kerberos) {
+ opts.mini_kdc_options.data_root = JoinPathSegments(opts.data_root, "krb5kdc");
+ }
+
+ cluster->reset(new ExternalMiniCluster(std::move(opts)));
+ break;
+ }
+ case ControlShellRequestPB::kDestroyCluster:
+ {
+ RETURN_NOT_OK(CheckClusterExists(*cluster));
+ cluster->reset();
+ break;
+ }
+ case ControlShellRequestPB::kStartCluster:
+ {
+ RETURN_NOT_OK(CheckClusterExists(*cluster));
+ if ((*cluster)->num_masters() != 0) {
+ DCHECK_GT((*cluster)->num_tablet_servers(), 0);
+ RETURN_NOT_OK((*cluster)->Restart());
+ } else {
+ RETURN_NOT_OK((*cluster)->Start());
+ }
+ break;
+ }
+ case ControlShellRequestPB::kStopCluster:
+ {
+ RETURN_NOT_OK(CheckClusterExists(*cluster));
+ (*cluster)->Shutdown();
+ break;
+ }
+ case ControlShellRequestPB::kStartDaemon:
+ {
+ if (!req.start_daemon().has_id()) {
+ RETURN_NOT_OK(Status::InvalidArgument("missing process id"));
+ }
+ ExternalDaemon* daemon;
+ MiniKdc* kdc;
+ RETURN_NOT_OK(FindDaemon(*cluster, req.start_daemon().id(), &daemon, &kdc));
+ if (daemon) {
+ DCHECK(!kdc);
+ RETURN_NOT_OK(daemon->Restart());
+ } else {
+ DCHECK(kdc);
+ RETURN_NOT_OK(kdc->Start());
+ }
+ break;
+ }
+ case ControlShellRequestPB::kStopDaemon:
+ {
+ if (!req.stop_daemon().has_id()) {
+ RETURN_NOT_OK(Status::InvalidArgument("missing process id"));
+ }
+ ExternalDaemon* daemon;
+ MiniKdc* kdc;
+ RETURN_NOT_OK(FindDaemon(*cluster, req.stop_daemon().id(), &daemon, &kdc));
+ if (daemon) {
+ DCHECK(!kdc);
+ daemon->Shutdown();
+ } else {
+ DCHECK(kdc);
+ RETURN_NOT_OK(kdc->Stop());
+ }
+ break;
+ }
+ case ControlShellRequestPB::kGetMasters:
+ {
+ RETURN_NOT_OK(CheckClusterExists(*cluster));
+ for (int i = 0; i < (*cluster)->num_masters(); i++) {
+ HostPortPB pb;
+ RETURN_NOT_OK(HostPortToPB((*cluster)->master(i)->bound_rpc_hostport(), &pb));
+ DaemonInfoPB* info = resp->mutable_get_masters()->mutable_masters()->Add();
+ info->mutable_id()->set_type(MASTER);
+ info->mutable_id()->set_index(i);
+ *info->mutable_bound_rpc_address() = std::move(pb);
+ }
+ break;
+ }
+ case ControlShellRequestPB::kGetTservers:
+ {
+ RETURN_NOT_OK(CheckClusterExists(*cluster));
+ for (int i = 0; i < (*cluster)->num_tablet_servers(); i++) {
+ HostPortPB pb;
+ RETURN_NOT_OK(HostPortToPB((*cluster)->tablet_server(i)->bound_rpc_hostport(), &pb));
+ DaemonInfoPB* info = resp->mutable_get_tservers()->mutable_tservers()->Add();
+ info->mutable_id()->set_type(TSERVER);
+ info->mutable_id()->set_index(i);
+ *info->mutable_bound_rpc_address() = std::move(pb);
+ }
+ break;
+ }
+ case ControlShellRequestPB::kGetKdcEnvVars:
+ {
+ if (!(*cluster)->kdc()) {
+ RETURN_NOT_OK(Status::NotFound("kdc not found"));
+ }
+ auto env_vars = (*cluster)->kdc()->GetEnvVars();
+ resp->mutable_get_kdc_env_vars()->mutable_env_vars()->insert(
+ env_vars.begin(), env_vars.end());
+ break;
+ }
+ default:
+ RETURN_NOT_OK(Status::InvalidArgument("unknown cluster control request"));
+ }
+
+ return Status::OK();
+}
+
+Status RunControlShell(const RunnerContext& /*context*/) {
+ // Set up the protocol.
+ //
+ // Because we use stdin and stdout to communicate with the shell's parent,
+ // it's critical that none of our subprocesses write to stdout. To that end,
+ // the protocol will use stdout via another fd, and we'll redirect fd 1 to stderr.
+ int new_stdout = dup(STDOUT_FILENO);
+ PCHECK(new_stdout != -1);
+ PCHECK(dup2(STDERR_FILENO, STDOUT_FILENO) == STDOUT_FILENO);
+ ControlShellProtocol::SerializationMode serde_mode;
+ if (boost::iequals(FLAGS_serialization, "json")) {
+ serde_mode = ControlShellProtocol::SerializationMode::JSON;
+ } else {
+ DCHECK(boost::iequals(FLAGS_serialization, "pb"));
+ serde_mode = ControlShellProtocol::SerializationMode::PB;
+ }
+ ControlShellProtocol protocol(serde_mode,
+ ControlShellProtocol::CloseMode::NO_CLOSE_ON_DESTROY,
+ STDIN_FILENO,
+ new_stdout);
+
+ // Run the shell loop, processing each message as it is received.
+ unique_ptr<ExternalMiniCluster> cluster;
+ while (true) {
+ ControlShellRequestPB req;
+ ControlShellResponsePB resp;
+
+ // Receive a new request, blocking until one is received.
+ //
+ // IO errors are fatal while others will result in an error response.
+ Status s = protocol.ReceiveMessage(&req);
+ if (s.IsEndOfFile()) {
+ break;
+ }
+ if (s.IsIOError()) {
+ return s;
+ }
+
+ // If we've made it here, we're definitely going to respond.
+
+ if (s.ok()) {
+ // We've successfully received a message. Try to process it.
+ s = ProcessRequest(req, &resp, &cluster);
+ }
+
+ if (!s.ok()) {
+ // This may be the result of ReceiveMessage() or ProcessRequest(),
+ // whichever failed first.
+ StatusToPB(s, resp.mutable_error());
+ }
+
+ // Send the response. All errors are fatal.
+ s = protocol.SendMessage(resp);
+ if (s.IsEndOfFile()) {
+ break;
+ }
+ RETURN_NOT_OK(s);
+ }
+
+ // Normal exit, clean up data root.
+ if (cluster) {
+ cluster->Shutdown();
+ WARN_NOT_OK(Env::Default()->DeleteRecursively(cluster->data_root()),
+ "Could not delete data root");
+ }
+ return Status::OK();
+}
+
+string SerializeRequest(const ControlShellRequestPB& req) {
+ string serialized;
+ auto google_status = google::protobuf::util::MessageToJsonString(
+ req, &serialized);
+ CHECK(google_status.ok()) << Substitute(
+ "unable to serialize JSON ($0): $1",
+ google_status.error_message().ToString(), pb_util::SecureDebugString(req));
+ return serialized;
+}
+
+} // anonymous namespace
+
+unique_ptr<Mode> BuildTestMode() {
+
+ ControlShellRequestPB create;
+ create.mutable_create_cluster()->set_num_tservers(3);
+ ControlShellRequestPB start;
+ start.mutable_start_cluster();
+
+ string extra = Substitute(
+ "The protocol for the control shell is protobuf-based and is documented "
+ "in src/kudu/tools/tool.proto. It is currently considered to be highly "
+ "experimental and subject to change.\n"
+ "\n"
+ "Example JSON input to create and start a cluster:\n"
+ " $0\n"
+ " $1\n",
+ SerializeRequest(create),
+ SerializeRequest(start));
+
+ unique_ptr<Action> control_shell =
+ ActionBuilder("mini_cluster", &RunControlShell)
+ .Description("Spawn a control shell for running a mini-cluster")
+ .ExtraDescription(extra)
+ .AddOptionalParameter("serialization")
+ .Build();
+
+ return ModeBuilder("test")
+ .Description("Various test actions")
+ .AddAction(std::move(control_shell))
+ .Build();
+}
+
+} // namespace tools
+} // namespace kudu
+
http://git-wip-us.apache.org/repos/asf/kudu/blob/761ce10b/src/kudu/tools/tool_main.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_main.cc b/src/kudu/tools/tool_main.cc
index 31f722e..4622074 100644
--- a/src/kudu/tools/tool_main.cc
+++ b/src/kudu/tools/tool_main.cc
@@ -70,6 +70,7 @@ unique_ptr<Mode> RootMode(const string& name) {
.AddMode(BuildRemoteReplicaMode())
.AddMode(BuildTableMode())
.AddMode(BuildTabletMode())
+ .AddMode(BuildTestMode())
.AddMode(BuildTServerMode())
.AddMode(BuildWalMode())
.Build();