You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2019/11/20 00:04:53 UTC

[kudu] 02/02: KUDU-2971 p1: add subprocess module

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 68f9fbc420a7ade895ffa639971978713fbbee4f
Author: Hao Hao <ha...@cloudera.com>
AuthorDate: Mon Oct 7 16:06:12 2019 -0700

    KUDU-2971 p1: add subprocess module
    
    Utility classes exist that allow for IPC over stdin/stdout via protobuf
    and JSON-encoded protobuf. This commit moves those classes into their
    own directory so it can be reused by other subprocesses.
    
    Following commits can then extend it to support concurrent communications
    with subprocess. There are no functional changes in this patch.
    
    Change-Id: If73e27772e1897a04f04229c4906a24c61e361f2
    Reviewed-on: http://gerrit.cloudera.org:8080/14425
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 CMakeLists.txt                             |   1 +
 src/kudu/subprocess/CMakeLists.txt         |  30 ++++
 src/kudu/subprocess/subprocess_protocol.cc | 217 +++++++++++++++++++++++++++++
 src/kudu/subprocess/subprocess_protocol.h  |  96 +++++++++++++
 src/kudu/tools/CMakeLists.txt              |   2 +
 src/kudu/tools/kudu-tool-test.cc           |  25 ++--
 src/kudu/tools/tool_action_common.cc       | 173 -----------------------
 src/kudu/tools/tool_action_common.h        |  65 ---------
 src/kudu/tools/tool_action_test.cc         |  30 ++--
 9 files changed, 374 insertions(+), 265 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 692dbc8..775e39a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1490,6 +1490,7 @@ add_subdirectory(src/kudu/rpc)
 add_subdirectory(src/kudu/security)
 add_subdirectory(src/kudu/sentry)
 add_subdirectory(src/kudu/server)
+add_subdirectory(src/kudu/subprocess)
 add_subdirectory(src/kudu/tablet)
 add_subdirectory(src/kudu/thrift)
 add_subdirectory(src/kudu/tools)
diff --git a/src/kudu/subprocess/CMakeLists.txt b/src/kudu/subprocess/CMakeLists.txt
new file mode 100644
index 0000000..6637c82
--- /dev/null
+++ b/src/kudu/subprocess/CMakeLists.txt
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#######################################
+# kudu_subprocess
+#######################################
+
+add_library(kudu_subprocess
+  subprocess_protocol.cc
+)
+target_link_libraries(kudu_subprocess
+  gutil
+  kudu_util
+  tool_proto
+  ${KUDU_BASE_LIBS}
+)
diff --git a/src/kudu/subprocess/subprocess_protocol.cc b/src/kudu/subprocess/subprocess_protocol.cc
new file mode 100644
index 0000000..f4f8540
--- /dev/null
+++ b/src/kudu/subprocess/subprocess_protocol.cc
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/subprocess/subprocess_protocol.h"
+
+#include <errno.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <unistd.h>
+
+#include <ostream>
+#include <string>
+
+#include <glog/logging.h>
+#include <google/protobuf/util/json_util.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tools/tool.pb.h"  // IWYU pragma: keep
+#include "kudu/util/faststring.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+
+using kudu::pb_util::SecureDebugString;
+using strings::Substitute;
+using std::string;
+
+namespace kudu {
+namespace subprocess {
+
+const int SubprocessProtocol::kMaxMessageBytes = 1024 * 1024;
+
+SubprocessProtocol::SubprocessProtocol(SerializationMode serialization_mode,
+                                       CloseMode close_mode,
+                                       int read_fd,
+                                       int write_fd,
+                                       int max_msg_bytes)
+    : serialization_mode_(serialization_mode),
+      close_mode_(close_mode),
+      read_fd_(read_fd),
+      write_fd_(write_fd),
+      max_msg_bytes_(max_msg_bytes) {
+}
+
+SubprocessProtocol::~SubprocessProtocol() {
+  if (close_mode_ == CloseMode::CLOSE_ON_DESTROY) {
+    int ret;
+    RETRY_ON_EINTR(ret, close(read_fd_));
+    RETRY_ON_EINTR(ret, close(write_fd_));
+  }
+}
+
+template <class M>
+Status SubprocessProtocol::ReceiveMessage(M* message) {
+  switch (serialization_mode_) {
+    case SerializationMode::JSON:
+    {
+      // Read and accumulate one byte at a time, looking for the newline.
+      //
+      // TODO(adar): it would be more efficient to read a chunk of data, look
+      // for a newline, and if found, store the remainder for the next message.
+      faststring buf;
+      faststring one_byte;
+      one_byte.resize(1);
+      while (true) {
+        RETURN_NOT_OK_PREPEND(DoRead(&one_byte), "unable to receive message byte");
+        if (one_byte[0] == '\n') {
+          break;
+        }
+        buf.push_back(one_byte[0]);
+      }
+
+      // Parse the JSON-encoded message.
+      const auto& google_status =
+          google::protobuf::util::JsonStringToMessage(buf.ToString(), message);
+      if (!google_status.ok()) {
+        return Status::InvalidArgument(
+            Substitute("unable to parse JSON: $0", buf.ToString()),
+            google_status.error_message().ToString());
+      }
+      break;
+    }
+    case SerializationMode::PB:
+    {
+      // Read four bytes of size (big-endian).
+      faststring size_buf;
+      size_buf.resize(sizeof(uint32_t));
+      RETURN_NOT_OK_PREPEND(DoRead(&size_buf), "unable to receive message size");
+      uint32_t body_size = NetworkByteOrder::Load32(size_buf.data());
+
+      if (body_size > max_msg_bytes_) {
+        return Status::IOError(
+            Substitute("message size ($0) exceeds maximum message size ($1)",
+                       body_size, max_msg_bytes_));
+      }
+
+      // Read the variable size body.
+      faststring body_buf;
+      body_buf.resize(body_size);
+      RETURN_NOT_OK_PREPEND(DoRead(&body_buf), "unable to receive message body");
+
+      // Parse the body into a PB request.
+      RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(
+          message, body_buf.data(), body_buf.length()),
+              Substitute("unable to parse PB: $0", body_buf.ToString()));
+      break;
+    }
+    default: LOG(FATAL) << "Unknown mode";
+  }
+
+  VLOG(1) << "Received message: " << pb_util::SecureDebugString(*message);
+  return Status::OK();
+}
+
+template <class M>
+Status SubprocessProtocol::SendMessage(const M& message) {
+  VLOG(1) << "Sending message: " << pb_util::SecureDebugString(message);
+
+  faststring buf;
+  switch (serialization_mode_) {
+    case SerializationMode::JSON:
+    {
+      string serialized;
+      const auto& google_status =
+          google::protobuf::util::MessageToJsonString(message, &serialized);
+      if (!google_status.ok()) {
+        return Status::InvalidArgument(Substitute(
+            "unable to serialize JSON: $0", pb_util::SecureDebugString(message)),
+                                       google_status.error_message().ToString());
+      }
+
+      buf.append(serialized);
+      buf.append("\n");
+      break;
+    }
+    case SerializationMode::PB:
+    {
+      size_t msg_size = message.ByteSizeLong();
+      buf.resize(sizeof(uint32_t) + msg_size);
+      NetworkByteOrder::Store32(buf.data(), msg_size);
+      if (!message.SerializeWithCachedSizesToArray(buf.data() + sizeof(uint32_t))) {
+        return Status::Corruption("failed to serialize PB to array");
+      }
+      break;
+    }
+    default:
+      break;
+  }
+  RETURN_NOT_OK_PREPEND(DoWrite(buf), "unable to send message");
+  return Status::OK();
+}
+
+Status SubprocessProtocol::DoRead(faststring* buf) {
+  uint8_t* pos = buf->data();
+  size_t rem = buf->length();
+  while (rem > 0) {
+    ssize_t r;
+    RETRY_ON_EINTR(r, read(read_fd_, pos, rem));
+    if (r == -1) {
+      return Status::IOError("Error reading from pipe", "", errno);
+    }
+    if (r == 0) {
+      return Status::EndOfFile("Other end of pipe was closed");
+    }
+    DCHECK_GE(rem, r);
+    rem -= r;
+    pos += r;
+  }
+  return Status::OK();
+}
+
+Status SubprocessProtocol::DoWrite(const faststring& buf) {
+  const uint8_t* pos = buf.data();
+  size_t rem = buf.length();
+  while (rem > 0) {
+    ssize_t r;
+    RETRY_ON_EINTR(r, write(write_fd_, pos, rem));
+    if (r == -1) {
+      if (errno == EPIPE) {
+        return Status::EndOfFile("Other end of pipe was closed");
+      }
+      return Status::IOError("Error writing to pipe", "", errno);
+    }
+    DCHECK_GE(rem, r);
+    rem -= r;
+    pos += r;
+  }
+  return Status::OK();
+}
+
+
+// Explicit specialization for callers outside this compilation unit.
+template
+Status SubprocessProtocol::ReceiveMessage(tools::ControlShellRequestPB* message);
+template
+Status SubprocessProtocol::ReceiveMessage(tools::ControlShellResponsePB* message);
+template
+Status SubprocessProtocol::SendMessage(const tools::ControlShellRequestPB& message);
+template
+Status SubprocessProtocol::SendMessage(const tools::ControlShellResponsePB& message);
+
+} // namespace subprocess
+} // namespace kudu
diff --git a/src/kudu/subprocess/subprocess_protocol.h b/src/kudu/subprocess/subprocess_protocol.h
new file mode 100644
index 0000000..22efce4
--- /dev/null
+++ b/src/kudu/subprocess/subprocess_protocol.h
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/gutil/macros.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class faststring; // NOLINT
+
+namespace subprocess {
+// Facilitates sending and receiving messages with a subprocess via protobuf-based
+// protocol.
+//
+// May be used by a subprocess communicating with the parent process via pipe, or
+// by the parent process itself to read/write messages via stdin/stdout respectively.
+class SubprocessProtocol {
+ public:
+  enum class SerializationMode {
+    // Each message is serialized as a four byte big-endian size followed by
+    // the protobuf-encoded message itself.
+    PB,
+
+    // Each message is serialized into a protobuf-like JSON representation
+    // terminated with a newline character.
+    JSON,
+  };
+
+  // Whether the provided fds are closed at class destruction time.
+  enum class CloseMode {
+    CLOSE_ON_DESTROY,
+    NO_CLOSE_ON_DESTROY,
+  };
+
+  // Constructs a new protocol instance.
+  //
+  // If 'close_mode' is CLOSE_ON_DESTROY, the instance has effectively taken
+  // control of 'read_fd' and 'write_fd' and the caller shouldn't use them.
+  // 'max_msg_bytes' represents the maximum number of bytes per message.
+  SubprocessProtocol(SerializationMode serialization_mode,
+                     CloseMode close_mode,
+                     int read_fd,
+                     int write_fd,
+                     int max_msg_bytes = kMaxMessageBytes);
+
+  ~SubprocessProtocol();
+
+  // Receives a protobuf message, blocking if the pipe is empty.
+  //
+  // Returns EndOfFile if the writer on the other end of the pipe was closed.
+  //
+  // Returns an error if serialization_mode_ is PB and the received message
+  // sizes exceeds kMaxMessageBytes.
+  template <class M>
+  Status ReceiveMessage(M* message);
+
+  // Sends a protobuf message, blocking if the pipe is full.
+  //
+  // Returns EndOfFile if the reader on the other end of the pipe was closed.
+  template <class M>
+  Status SendMessage(const M& message);
+
+ private:
+  // Private helpers to drive actual pipe reading and writing.
+  Status DoRead(faststring* buf);
+  Status DoWrite(const faststring& buf);
+
+  static const int kMaxMessageBytes;
+
+  const SerializationMode serialization_mode_;
+  const CloseMode close_mode_;
+  const int read_fd_;
+  const int write_fd_;
+  const int max_msg_bytes_;
+
+  DISALLOW_COPY_AND_ASSIGN(SubprocessProtocol);
+};
+
+} // namespace subprocess
+} // namespace kudu
diff --git a/src/kudu/tools/CMakeLists.txt b/src/kudu/tools/CMakeLists.txt
index face315..84215c2 100644
--- a/src/kudu/tools/CMakeLists.txt
+++ b/src/kudu/tools/CMakeLists.txt
@@ -129,6 +129,7 @@ target_link_libraries(kudu
   kudu_client
   kudu_common
   kudu_fs
+  kudu_subprocess
   kudu_tools_rebalance
   kudu_util
   log
@@ -165,6 +166,7 @@ SET_KUDU_TEST_LINK_LIBS(
   itest_util
   ksck
   kudu_hms
+  kudu_subprocess
   kudu_tools_rebalance
   kudu_tools_test_util
   kudu_tools_util
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index bd4ec0b..5a77955 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -99,6 +99,7 @@
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/mini-cluster/mini_cluster.h"
 #include "kudu/rpc/rpc_controller.h"
+#include "kudu/subprocess/subprocess_protocol.h"
 #include "kudu/tablet/local_tablet_writer.h"
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet-harness.h"
@@ -108,7 +109,6 @@
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/thrift/client.h"
 #include "kudu/tools/tool.pb.h"
-#include "kudu/tools/tool_action_common.h"
 #include "kudu/tools/tool_replica_util.h"
 #include "kudu/tools/tool_test_util.h"
 #include "kudu/tserver/mini_tablet_server.h"
@@ -185,6 +185,7 @@ using kudu::master::MiniMaster;
 using kudu::master::TServerStatePB;
 using kudu::master::TSManager;
 using kudu::rpc::RpcController;
+using kudu::subprocess::SubprocessProtocol;
 using kudu::tablet::LocalTabletWriter;
 using kudu::tablet::Tablet;
 using kudu::tablet::TabletDataState;
@@ -4446,7 +4447,7 @@ TEST_F(ToolTest, TestHmsList) {
 // This test is parameterized on the serialization mode and Kerberos.
 class ControlShellToolTest :
     public ToolTest,
-    public ::testing::WithParamInterface<std::tuple<ControlShellProtocol::SerializationMode,
+    public ::testing::WithParamInterface<std::tuple<SubprocessProtocol::SerializationMode,
                                                     bool>> {
  public:
   virtual void SetUp() override {
@@ -4455,8 +4456,8 @@ class ControlShellToolTest :
     // Start the control shell.
     string mode;
     switch (serde_mode()) {
-      case ControlShellProtocol::SerializationMode::JSON: mode = "json"; break;
-      case ControlShellProtocol::SerializationMode::PB: mode = "pb"; break;
+      case SubprocessProtocol::SerializationMode::JSON: mode = "json"; break;
+      case SubprocessProtocol::SerializationMode::PB: mode = "pb"; break;
       default: LOG(FATAL) << "Unknown serialization mode";
     }
     shell_.reset(new Subprocess({
@@ -4470,10 +4471,10 @@ class ControlShellToolTest :
     ASSERT_OK(shell_->Start());
 
     // Start the protocol interface.
-    proto_.reset(new ControlShellProtocol(serde_mode(),
-                                          ControlShellProtocol::CloseMode::CLOSE_ON_DESTROY,
-                                          shell_->ReleaseChildStdoutFd(),
-                                          shell_->ReleaseChildStdinFd()));
+    proto_.reset(new SubprocessProtocol(serde_mode(),
+                                        SubprocessProtocol::CloseMode::CLOSE_ON_DESTROY,
+                                        shell_->ReleaseChildStdoutFd(),
+                                        shell_->ReleaseChildStdinFd()));
   }
 
   virtual void TearDown() override {
@@ -4500,7 +4501,7 @@ class ControlShellToolTest :
     return Status::OK();
   }
 
-  ControlShellProtocol::SerializationMode serde_mode() const {
+  SubprocessProtocol::SerializationMode serde_mode() const {
     return ::testing::get<0>(GetParam());
   }
 
@@ -4509,13 +4510,13 @@ class ControlShellToolTest :
   }
 
   unique_ptr<Subprocess> shell_;
-  unique_ptr<ControlShellProtocol> proto_;
+  unique_ptr<SubprocessProtocol> proto_;
 };
 
 INSTANTIATE_TEST_CASE_P(SerializationModes, ControlShellToolTest,
                         ::testing::Combine(::testing::Values(
-                            ControlShellProtocol::SerializationMode::PB,
-                            ControlShellProtocol::SerializationMode::JSON),
+                            SubprocessProtocol::SerializationMode::PB,
+                            SubprocessProtocol::SerializationMode::JSON),
                                            ::testing::Bool()));
 
 TEST_P(ControlShellToolTest, TestControlShell) {
diff --git a/src/kudu/tools/tool_action_common.cc b/src/kudu/tools/tool_action_common.cc
index bd9855d..6b17cc8 100644
--- a/src/kudu/tools/tool_action_common.cc
+++ b/src/kudu/tools/tool_action_common.cc
@@ -18,10 +18,8 @@
 #include "kudu/tools/tool_action_common.h"
 
 #include <stdlib.h>
-#include <unistd.h>
 
 #include <algorithm>
-#include <cerrno>
 #include <iomanip>
 #include <iostream>
 #include <iterator>
@@ -35,7 +33,6 @@
 #include <boost/algorithm/string/predicate.hpp>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
-#include <google/protobuf/util/json_util.h>
 // IWYU pragma: no_include <yaml-cpp/node/impl.h>
 // IWYU pragma: no_include <yaml-cpp/node/node.h>
 
@@ -52,7 +49,6 @@
 #include "kudu/consensus/log.pb.h"
 #include "kudu/consensus/log_util.h"
 #include "kudu/consensus/opid.pb.h"
-#include "kudu/gutil/endian.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/join.h"
@@ -76,7 +72,6 @@
 #include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
 #include "kudu/util/async_util.h"
 #include "kudu/util/env.h"
-#include "kudu/util/faststring.h"
 #include "kudu/util/jsonwriter.h"
 #include "kudu/util/mem_tracker.pb.h"
 #include "kudu/util/memory/arena.h"
@@ -859,173 +854,5 @@ Status LeaderMasterProxy::SyncRpc(
                                RpcController*,
                                const ResponseCallback&)>& func);
 
-const int ControlShellProtocol::kMaxMessageBytes = 1024 * 1024;
-
-ControlShellProtocol::ControlShellProtocol(SerializationMode serialization_mode,
-                                           CloseMode close_mode,
-                                           int read_fd,
-                                           int write_fd)
-    : serialization_mode_(serialization_mode),
-      close_mode_(close_mode),
-      read_fd_(read_fd),
-      write_fd_(write_fd) {
-}
-
-ControlShellProtocol::~ControlShellProtocol() {
-  if (close_mode_ == CloseMode::CLOSE_ON_DESTROY) {
-    int ret;
-    RETRY_ON_EINTR(ret, close(read_fd_));
-    RETRY_ON_EINTR(ret, close(write_fd_));
-  }
-}
-
-template <class M>
-Status ControlShellProtocol::ReceiveMessage(M* message) {
-  switch (serialization_mode_) {
-    case SerializationMode::JSON:
-    {
-      // Read and accumulate one byte at a time, looking for the newline.
-      //
-      // TODO(adar): it would be more efficient to read a chunk of data, look
-      // for a newline, and if found, store the remainder for the next message.
-      faststring buf;
-      faststring one_byte;
-      one_byte.resize(1);
-      while (true) {
-        RETURN_NOT_OK_PREPEND(DoRead(&one_byte), "unable to receive message byte");
-        if (one_byte[0] == '\n') {
-          break;
-        }
-        buf.push_back(one_byte[0]);
-      }
-
-      // Parse the JSON-encoded message.
-      const auto& google_status =
-          google::protobuf::util::JsonStringToMessage(buf.ToString(), message);
-      if (!google_status.ok()) {
-        return Status::InvalidArgument(
-            Substitute("unable to parse JSON: $0", buf.ToString()),
-            google_status.error_message().ToString());
-      }
-      break;
-    }
-    case SerializationMode::PB:
-    {
-      // Read four bytes of size (big-endian).
-      faststring size_buf;
-      size_buf.resize(sizeof(uint32_t));
-      RETURN_NOT_OK_PREPEND(DoRead(&size_buf), "unable to receive message size");
-      uint32_t body_size = NetworkByteOrder::Load32(size_buf.data());
-
-      if (body_size > kMaxMessageBytes) {
-        return Status::IOError(
-            Substitute("message size ($0) exceeds maximum message size ($1)",
-                       body_size, kMaxMessageBytes));
-      }
-
-      // Read the variable size body.
-      faststring body_buf;
-      body_buf.resize(body_size);
-      RETURN_NOT_OK_PREPEND(DoRead(&body_buf), "unable to receive message body");
-
-      // Parse the body into a PB request.
-      RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(
-          message, body_buf.data(), body_buf.length()),
-                            Substitute("unable to parse PB: $0", body_buf.ToString()));
-      break;
-    }
-    default: LOG(FATAL) << "Unknown mode";
-  }
-
-  VLOG(1) << "Received message: " << pb_util::SecureDebugString(*message);
-  return Status::OK();
-}
-
-template <class M>
-Status ControlShellProtocol::SendMessage(const M& message) {
-  VLOG(1) << "Sending message: " << pb_util::SecureDebugString(message);
-
-  faststring buf;
-  switch (serialization_mode_) {
-    case SerializationMode::JSON:
-    {
-      string serialized;
-      const auto& google_status =
-          google::protobuf::util::MessageToJsonString(message, &serialized);
-      if (!google_status.ok()) {
-        return Status::InvalidArgument(Substitute(
-            "unable to serialize JSON: $0", pb_util::SecureDebugString(message)),
-                                       google_status.error_message().ToString());
-      }
-
-      buf.append(serialized);
-      buf.append("\n");
-      break;
-    }
-    case SerializationMode::PB:
-    {
-      size_t msg_size = message.ByteSizeLong();
-      buf.resize(sizeof(uint32_t) + msg_size);
-      NetworkByteOrder::Store32(buf.data(), msg_size);
-      if (!message.SerializeWithCachedSizesToArray(buf.data() + sizeof(uint32_t))) {
-        return Status::Corruption("failed to serialize PB to array");
-      }
-      break;
-    }
-    default:
-      break;
-  }
-  RETURN_NOT_OK_PREPEND(DoWrite(buf), "unable to send message");
-  return Status::OK();
-}
-
-Status ControlShellProtocol::DoRead(faststring* buf) {
-  uint8_t* pos = buf->data();
-  size_t rem = buf->length();
-  while (rem > 0) {
-    ssize_t r;
-    RETRY_ON_EINTR(r, read(read_fd_, pos, rem));
-    if (r == -1) {
-      return Status::IOError("Error reading from pipe", "", errno);
-    }
-    if (r == 0) {
-      return Status::EndOfFile("Other end of pipe was closed");
-    }
-    DCHECK_GE(rem, r);
-    rem -= r;
-    pos += r;
-  }
-  return Status::OK();
-}
-
-Status ControlShellProtocol::DoWrite(const faststring& buf) {
-  const uint8_t* pos = buf.data();
-  size_t rem = buf.length();
-  while (rem > 0) {
-    ssize_t r;
-    RETRY_ON_EINTR(r, write(write_fd_, pos, rem));
-    if (r == -1) {
-      if (errno == EPIPE) {
-        return Status::EndOfFile("Other end of pipe was closed");
-      }
-      return Status::IOError("Error writing to pipe", "", errno);
-    }
-    DCHECK_GE(rem, r);
-    rem -= r;
-    pos += r;
-  }
-  return Status::OK();
-}
-
-// Explicit specialization for callers outside this compilation unit.
-template
-Status ControlShellProtocol::ReceiveMessage(ControlShellRequestPB* message);
-template
-Status ControlShellProtocol::ReceiveMessage(ControlShellResponsePB* message);
-template
-Status ControlShellProtocol::SendMessage(const ControlShellRequestPB& message);
-template
-Status ControlShellProtocol::SendMessage(const ControlShellResponsePB& message);
-
 } // namespace tools
 } // namespace kudu
diff --git a/src/kudu/tools/tool_action_common.h b/src/kudu/tools/tool_action_common.h
index 8c5ba9a..c68e885 100644
--- a/src/kudu/tools/tool_action_common.h
+++ b/src/kudu/tools/tool_action_common.h
@@ -24,7 +24,6 @@
 #include <vector>
 
 #include "kudu/client/shared_ptr.h"
-#include "kudu/gutil/macros.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/response_callback.h"
@@ -38,7 +37,6 @@ class function;
 namespace kudu {
 
 class MonoDelta;
-class faststring; // NOLINT
 
 namespace client {
 class KuduClient;
@@ -259,68 +257,5 @@ class LeaderMasterProxy {
   client::sp::shared_ptr<client::KuduClient> client_;
 };
 
-// Facilitates sending and receiving messages with the tool control shell.
-//
-// May be used by a subprocess communicating with the shell via pipe, or by the
-// shell itself to read/write messages via stdin/stdout respectively.
-class ControlShellProtocol {
- public:
-  enum class SerializationMode {
-    // Each message is serialized as a four byte big-endian size followed by
-    // the protobuf-encoded message itself.
-    PB,
-
-    // Each message is serialized into a protobuf-like JSON representation
-    // terminated with a newline character.
-    JSON,
-  };
-
-  // Whether the provided fds are closed at class destruction time.
-  enum class CloseMode {
-    CLOSE_ON_DESTROY,
-    NO_CLOSE_ON_DESTROY,
-  };
-
-  // Constructs a new protocol instance.
-  //
-  // If 'close_mode' is CLOSE_ON_DESTROY, the instance has effectively taken
-  // control of 'read_fd' and 'write_fd' and the caller shouldn't use them.
-  ControlShellProtocol(SerializationMode serialization_mode,
-                       CloseMode close_mode,
-                       int read_fd,
-                       int write_fd);
-
-  ~ControlShellProtocol();
-
-  // Receives a protobuf message, blocking if the pipe is empty.
-  //
-  // Returns EndOfFile if the writer on the other end of the pipe was closed.
-  //
-  // Returns an error if serialization_mode_ is PB and the received message
-  // sizes exceeds kMaxMessageBytes.
-  template <class M>
-  Status ReceiveMessage(M* message);
-
-  // Sends a protobuf message, blocking if the pipe is full.
-  //
-  // Returns EndOfFile if the reader on the other end of the pipe was closed.
-  template <class M>
-  Status SendMessage(const M& message);
-
- private:
-  // Private helpers to drive actual pipe reading and writing.
-  Status DoRead(faststring* buf);
-  Status DoWrite(const faststring& buf);
-
-  static const int kMaxMessageBytes;
-
-  const SerializationMode serialization_mode_;
-  const CloseMode close_mode_;
-  const int read_fd_;
-  const int write_fd_;
-
-  DISALLOW_COPY_AND_ASSIGN(ControlShellProtocol);
-};
-
 } // namespace tools
 } // namespace kudu
diff --git a/src/kudu/tools/tool_action_test.cc b/src/kudu/tools/tool_action_test.cc
index da0de8d..e775e9c 100644
--- a/src/kudu/tools/tool_action_test.cc
+++ b/src/kudu/tools/tool_action_test.cc
@@ -38,8 +38,8 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/mini-cluster/external_mini_cluster.h"
 #include "kudu/security/test/mini_kdc.h"
+#include "kudu/subprocess/subprocess_protocol.h"
 #include "kudu/tools/tool.pb.h"
-#include "kudu/tools/tool_action_common.h"
 #include "kudu/util/env.h"
 #include "kudu/util/env_util.h"
 #include "kudu/util/path_util.h"
@@ -56,17 +56,17 @@ DEFINE_validator(serialization, [](const char* /*n*/, const std::string& v) {
          boost::iequals(v, "json");
 });
 
-namespace kudu {
-
-namespace tools {
-
-using cluster::ExternalDaemon;
-using cluster::ExternalMiniCluster;
-using cluster::ExternalMiniClusterOptions;
+using kudu::cluster::ExternalDaemon;
+using kudu::cluster::ExternalMiniCluster;
+using kudu::cluster::ExternalMiniClusterOptions;
+using kudu::subprocess::SubprocessProtocol;
 using std::string;
 using std::unique_ptr;
 using strings::Substitute;
 
+namespace kudu {
+namespace tools {
+
 namespace {
 
 Status MakeClusterRoot(string* cluster_root) {
@@ -322,17 +322,17 @@ Status RunControlShell(const RunnerContext& /*context*/) {
   int ret;
   RETRY_ON_EINTR(ret, dup2(STDERR_FILENO, STDOUT_FILENO));
   PCHECK(ret == STDOUT_FILENO);
-  ControlShellProtocol::SerializationMode serde_mode;
+  SubprocessProtocol::SerializationMode serde_mode;
   if (boost::iequals(FLAGS_serialization, "json")) {
-    serde_mode = ControlShellProtocol::SerializationMode::JSON;
+    serde_mode = SubprocessProtocol::SerializationMode::JSON;
   } else {
     DCHECK(boost::iequals(FLAGS_serialization, "pb"));
-    serde_mode = ControlShellProtocol::SerializationMode::PB;
+    serde_mode = SubprocessProtocol::SerializationMode::PB;
   }
-  ControlShellProtocol protocol(serde_mode,
-                                ControlShellProtocol::CloseMode::NO_CLOSE_ON_DESTROY,
-                                STDIN_FILENO,
-                                new_stdout);
+  SubprocessProtocol protocol(serde_mode,
+                              SubprocessProtocol::CloseMode::NO_CLOSE_ON_DESTROY,
+                              STDIN_FILENO,
+                              new_stdout);
 
   // Run the shell loop, processing each message as it is received.
   unique_ptr<ExternalMiniCluster> cluster;