You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2023/02/28 23:16:45 UTC

[kudu] branch branch-1.17.x updated: KUDU-3450 handling of oversized messages in subprocess server

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch branch-1.17.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.17.x by this push:
     new 1d95cd23d KUDU-3450 handling of oversized messages in subprocess server
1d95cd23d is described below

commit 1d95cd23d1da57a50cb2b95bbf1ff9bd59c7f62d
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Tue Feb 14 17:11:33 2023 -0800

    KUDU-3450 handling of oversized messages in subprocess server
    
    This patch adds a new flag --subprocess_max_message_size_bytes to
    configure the maximum allowed size for the body of the response message
    received by the subprocess server, with default value of 8 MiB.
    Prior to this patch, the hard-coded limit was 1 MiB.
    
    I also updated the default values for the following flags, increasing
    them 4x times:
      * --subprocess_request_queue_size_bytes       (4 --> 16 MiB)
      * --subprocess_response_queue_size_bytes      (4 --> 16 MiB)
    
    With this patch, the behavior of the subprocess server has changed
    when encountering an error while reading a response message from
    its subprocess.  Now, the server does not just bail upon a larger than
    expected message or in case of other error, but rather tries to read
    out and discard the message to clear the communication channel,
    so there is a chance to receive next messages from the subprocess.
    
    A new metric has been added for the subprocess server to report on
    the number of read and discarded messages because of various errors
    while reading and decoding the data from the communication channel:
      * server_dropped_messages
    
    In addition, this patch introduces a few test scenarios to cover the
    newly introduced functionality.
    
    I also sprayed the relevant code with syntactic sugar of structural
    binding since the code requires C++17-capable compiler anyways.
    
    A follow-up patch should take care of the corresponding client-side
    components that are used to run the Ranger client as a subprocess.
    
    Change-Id: I05b09e757f304b22e37438c2445ecc161ef412c9
    Reviewed-on: http://gerrit.cloudera.org:8080/19500
    Reviewed-by: Attila Bukor <ab...@apache.org>
    Tested-by: Alexey Serbin <al...@apache.org>
    (cherry picked from commit ae22d32ef5895a02a763665cfd2812aa61be5ab3)
    Reviewed-on: http://gerrit.cloudera.org:8080/19558
    Tested-by: Kudu Jenkins
    Reviewed-by: Yingchun Lai <la...@apache.org>
---
 src/kudu/ranger/ranger_client.cc              |  9 +++
 src/kudu/subprocess/echo_subprocess.cc        |  9 +++
 src/kudu/subprocess/server.cc                 | 97 ++++++++++++++++++---------
 src/kudu/subprocess/server.h                  |  6 ++
 src/kudu/subprocess/subprocess_protocol.cc    | 72 ++++++++++++++------
 src/kudu/subprocess/subprocess_protocol.h     | 24 ++++---
 src/kudu/subprocess/subprocess_proxy-test.cc  | 74 +++++++++++++++++---
 src/kudu/subprocess/subprocess_server-test.cc | 86 ++++++++++++++++++++++++
 8 files changed, 310 insertions(+), 67 deletions(-)

diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index 54d8cc99e..ef3bd6d62 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -159,6 +159,12 @@ METRIC_DEFINE_histogram(server, ranger_server_outbound_queue_time_ms,
     "Duration of time in ms spent in the Ranger server's outbound request queue",
     kudu::MetricLevel::kInfo,
     60000LU, 1);
+METRIC_DEFINE_counter(server, ranger_server_dropped_messages,
+    "Number of messages dropped by the subprocess server",
+    kudu::MetricUnit::kMessages,
+    "Number of responses that the Ranger client had sent, but the subprocess "
+    "server failed to receive because they were oversized, corrupted, etc.",
+    kudu::MetricLevel::kWarn);
 
 DECLARE_string(keytab_file);
 DECLARE_string(principal);
@@ -366,6 +372,7 @@ Status BuildArgv(const string& fifo_path, const string& log_properties_path,
 
 } // anonymous namespace
 
+#define CINIT(member, x) member = METRIC_##x.Instantiate(entity)
 #define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
 RangerSubprocessMetrics::RangerSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
   HISTINIT(sp_inbound_queue_length, ranger_subprocess_inbound_queue_length);
@@ -377,8 +384,10 @@ RangerSubprocessMetrics::RangerSubprocessMetrics(const scoped_refptr<MetricEntit
   HISTINIT(server_inbound_queue_time_ms, ranger_server_inbound_queue_time_ms);
   HISTINIT(server_outbound_queue_size_bytes, ranger_server_outbound_queue_size_bytes);
   HISTINIT(server_outbound_queue_time_ms, ranger_server_outbound_queue_time_ms);
+  CINIT(server_dropped_messages, ranger_server_dropped_messages);
 }
 #undef HISTINIT
+#undef CINIT
 
 RangerClient::RangerClient(Env* env, const scoped_refptr<MetricEntity>& metric_entity)
     : env_(env), metric_entity_(metric_entity) {
diff --git a/src/kudu/subprocess/echo_subprocess.cc b/src/kudu/subprocess/echo_subprocess.cc
index df5634d38..751b512f3 100644
--- a/src/kudu/subprocess/echo_subprocess.cc
+++ b/src/kudu/subprocess/echo_subprocess.cc
@@ -76,16 +76,24 @@ METRIC_DEFINE_histogram(server, echo_server_outbound_queue_time_ms,
     "Duration of time in ms spent in the Echo server's outbound request queue",
     kudu::MetricLevel::kInfo,
     60000LU, 1);
+METRIC_DEFINE_counter(server, echo_server_dropped_messages,
+    "Number of messages dropped by the subprocess server",
+    kudu::MetricUnit::kMessages,
+    "Number of responses that the Echo server had sent, but the subprocess "
+    "server failed to receive because they were oversized, corrupted, etc.",
+    kudu::MetricLevel::kWarn);
 
 namespace kudu {
 namespace subprocess {
 
+#define CINIT(member, x) member = METRIC_##x.Instantiate(entity)
 #define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
 EchoSubprocessMetrics::EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
   HISTINIT(server_inbound_queue_size_bytes, echo_server_inbound_queue_size_bytes);
   HISTINIT(server_inbound_queue_time_ms, echo_server_inbound_queue_time_ms);
   HISTINIT(server_outbound_queue_size_bytes, echo_server_outbound_queue_size_bytes);
   HISTINIT(server_outbound_queue_time_ms, echo_server_outbound_queue_time_ms);
+  CINIT(server_dropped_messages, echo_server_dropped_messages);
   HISTINIT(sp_execution_time_ms, echo_subprocess_execution_time_ms);
   HISTINIT(sp_inbound_queue_length, echo_subprocess_inbound_queue_length);
   HISTINIT(sp_inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms);
@@ -93,6 +101,7 @@ EchoSubprocessMetrics::EchoSubprocessMetrics(const scoped_refptr<MetricEntity>&
   HISTINIT(sp_outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms);
 }
 #undef HISTINIT
+#undef CINIT
 
 } // namespace subprocess
 } // namespace kudu
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index ca4111f78..94f2d9488 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -23,6 +23,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <tuple>
 #include <type_traits>
 #include <utility>
 #include <vector>
@@ -43,18 +44,28 @@
 #include "kudu/util/subprocess.h"
 #include "kudu/util/thread.h"
 
-DEFINE_int32(subprocess_request_queue_size_bytes, 4 * 1024 * 1024,
+DEFINE_int32(subprocess_request_queue_size_bytes, 16 * 1024 * 1024,
              "Maximum size in bytes of the outbound request queue. This is best "
              "effort: if a single request is larger than this, it is still "
              "added to the queue");
 TAG_FLAG(subprocess_request_queue_size_bytes, advanced);
 
-DEFINE_int32(subprocess_response_queue_size_bytes, 4 * 1024 * 1024,
+DEFINE_int32(subprocess_response_queue_size_bytes, 16 * 1024 * 1024,
              "Maximum size in bytes of the inbound response queue. This is best "
              "effort: if a single request is larger than this, it is still "
              "added to the queue");
 TAG_FLAG(subprocess_response_queue_size_bytes, advanced);
 
+DEFINE_uint32(subprocess_max_message_size_bytes, 8 * 1024 * 1024,
+              "Maximum payload size for a message in protobuf format the "
+              "subprocess server accepts from a subprocess (i.e. its child), "
+              "in bytes, 0 means unlimited. If a subprocess sends a message "
+              "with bigger payload than the specified limit, the server "
+              "rejects the message and responds with AppStatusPB::IO_ERROR "
+              "error code. This setting is not effective for messages in JSON "
+              "format.");
+TAG_FLAG(subprocess_max_message_size_bytes, advanced);
+
 DEFINE_int32(subprocess_num_responder_threads, 3,
              "Number of threads that will be dedicated to reading responses "
              "from the inbound queue and returning to callers");
@@ -95,6 +106,7 @@ SubprocessServer::SubprocessServer(Env* env, const string& receiver_file,
                                    vector<string> subprocess_argv,
                                    SubprocessMetrics metrics)
     : call_timeout_(MonoDelta::FromSeconds(FLAGS_subprocess_timeout_secs)),
+      max_message_size_bytes_(FLAGS_subprocess_max_message_size_bytes),
       next_id_(1),
       closing_(1),
       env_(env),
@@ -143,10 +155,12 @@ Status SubprocessServer::Init() {
 
   // Start the message protocol.
   CHECK(!message_protocol_);
-  message_protocol_.reset(new SubprocessProtocol(SubprocessProtocol::SerializationMode::PB,
-                                                 SubprocessProtocol::CloseMode::CLOSE_ON_DESTROY,
-                                                 receiver_fifo_->read_fd(),
-                                                 process_->ReleaseChildStdinFd()));
+  message_protocol_.reset(new SubprocessProtocol(
+      SubprocessProtocol::SerializationMode::PB,
+      SubprocessProtocol::CloseMode::CLOSE_ON_DESTROY,
+      receiver_fifo_->read_fd(),
+      process_->ReleaseChildStdinFd(),
+      max_message_size_bytes_));
   const int num_threads = FLAGS_subprocess_num_responder_threads;
   responder_threads_.resize(num_threads);
   for (int i = 0; i < num_threads; i++) {
@@ -175,8 +189,9 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req,
   metrics_.server_outbound_queue_size_bytes->Increment(outbound_call_queue_.size());
   CallAndTimer call_and_timer = {
       make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_), {} };
+  const auto deadline = call_and_timer.first->deadline();
   RETURN_NOT_OK_PREPEND(
-      outbound_call_queue_.BlockingPut(std::move(call_and_timer), call_and_timer.first->deadline()),
+      outbound_call_queue_.BlockingPut(std::move(call_and_timer), deadline),
       "couldn't enqueue call");
   return sync.Wait();
 }
@@ -230,8 +245,7 @@ void SubprocessServer::Shutdown() {
     std::lock_guard<simple_spinlock> l(in_flight_lock_);
     calls = std::move(call_by_id_);
   }
-  for (const auto& id_and_call : calls) {
-    const auto& call = id_and_call.second;
+  for (const auto& [_, call] : calls) {
     call->RespondError(Status::ServiceUnavailable("subprocess is shutting down"));
   }
 }
@@ -239,24 +253,49 @@ void SubprocessServer::Shutdown() {
 void SubprocessServer::ReceiveMessagesThread() {
   DCHECK(message_protocol_) << "message protocol is not initialized";
   while (closing_.count() > 0) {
-    // Receive a new response from the subprocess.
+    // Receive next response from the subprocess.
     SubprocessResponsePB response;
-    Status s = message_protocol_->ReceiveMessage(&response);
+    const auto s = message_protocol_->ReceiveMessage(&response);
     if (s.IsEndOfFile()) {
       // The underlying pipe was closed. We're likely shutting down.
       LOG(INFO) << "Received an EOF from the subprocess";
       return;
     }
-    // TODO(awong): getting an error here indicates that this server and the
-    // underlying subprocess are not in sync (e.g. not speaking the same
-    // protocol). We should consider either crashing here, or restarting the
-    // subprocess.
-    DCHECK(s.ok());
-    WARN_NOT_OK(s, "failed to receive response from the subprocess");
+
+    // If the ReceiveMessage() call above returned an error, it means the server
+    // either rejected the message due to some limitations (e.g. maximum size
+    // of the encoded message), or failed to parse the message due to
+    // corruption, or that the server and the underlying subprocess are not in
+    // sync (e.g., speaking different protocol). Since the subprocess protocol
+    // doesn't have a means to address such a condition in a graceful way, not
+    // much can be done in this case. The information on the response identifier
+    // could not be retrieved from the serialized SubprocessResponsePB message
+    // in the discarded data, hence there isn't a way to find what request
+    // that non-delivered message corresponds to.
+    //
+    // Anyways, since the consistency of the server's runtime structures and the
+    // data haven't been compromised, no need to crash here: just log a warning
+    // about this issue. The request will be reported as timed out since when
+    // no corresponding record is found in the inbound request queue at the
+    // deadline.
+    if (PREDICT_FALSE(!s.ok())) {
+      // Log an error and continue: in some cases (e.g., an oversized message
+      // sent by the subprocess), this is a recoverable error once the oversized
+      // message is read out from the communication channel and discarded. At
+      // least, next non-oversized responses can be read without any issue.
+      //
+      // TODO(aserbin): if the data stream is corrupted, resetting the state
+      //                of the subprocess (e.g., restarting the subprocess)
+      //                might provide a short-term remedy as well
+      metrics_.server_dropped_messages->Increment();
+      LOG(ERROR) << Substitute(
+          "failed to receive response from the subprocess: $0", s.ToString());
+      continue;
+    }
+
     // Before adding to the queue, record the size of the response queue.
     metrics_.server_inbound_queue_size_bytes->Increment(inbound_response_queue_.size());
-    ResponsePBAndTimer resp_and_timer = { std::move(response), {} };
-    if (s.ok() && !inbound_response_queue_.BlockingPut(std::move(resp_and_timer)).ok()) {
+    if (!inbound_response_queue_.BlockingPut(ResponsePBAndTimer{ response, {} }).ok()) {
       // The queue has been shut down and we should shut down too.
       DCHECK_EQ(0, closing_.count());
       LOG(INFO) << "failed to put response onto inbound queue";
@@ -273,10 +312,9 @@ void SubprocessServer::ResponderThread() {
     // is shutting down. Also note that even if this fails because we're
     // shutting down, we still populate 'resps' and must run their callbacks.
     s = inbound_response_queue_.BlockingDrainTo(&resps);
-    for (auto& resp_and_timer : resps) {
+    for (auto& [resp, timer] : resps) {
       metrics_.server_inbound_queue_time_ms->Increment(
-          resp_and_timer.second.elapsed().ToMilliseconds());
-      const auto& resp = resp_and_timer.first;
+          timer.elapsed().ToMilliseconds());
 
       if (!resp.has_id()) {
         LOG(FATAL) << Substitute("Received invalid response: $0",
@@ -297,8 +335,7 @@ void SubprocessServer::ResponderThread() {
     calls_and_resps.reserve(resps.size());
     {
       std::lock_guard<simple_spinlock> l(in_flight_lock_);
-      for (auto& resp_and_timer : resps) {
-        auto& resp = resp_and_timer.first;
+      for (auto& [resp, _] : resps) {
         auto id = resp.id();
         auto call = EraseKeyReturnValuePtr(&call_by_id_, id);
         if (call) {
@@ -306,8 +343,8 @@ void SubprocessServer::ResponderThread() {
         }
       }
     }
-    for (auto& call_and_resp : calls_and_resps) {
-      call_and_resp.first->RespondSuccess(std::move(call_and_resp.second));
+    for (auto& [call, resp] : calls_and_resps) {
+      call->RespondSuccess(std::move(resp));
     }
     // If we didn't find our call, it timed out and the its callback has
     // already been called by the deadline checker.
@@ -359,18 +396,16 @@ void SubprocessServer::SendMessagesThread() {
     s = outbound_call_queue_.BlockingDrainTo(&calls);
     {
       std::lock_guard<simple_spinlock> l(in_flight_lock_);
-      for (const auto& call_and_timer : calls) {
-        const auto& call = call_and_timer.first;
+      for (const auto& [call, _] : calls) {
         EmplaceOrDie(&call_by_id_, call->id(), call);
       }
     }
     // NOTE: it's possible that before sending the request, the call already
     // timed out and the deadline checker already called its callback. If so,
     // the following call will no-op.
-    for (const auto& call_and_timer : calls) {
-      const auto& call = call_and_timer.first;
+    for (const auto& [call, timer] : calls) {
       metrics_.server_outbound_queue_time_ms->Increment(
-          call_and_timer.second.elapsed().ToMilliseconds());
+          timer.elapsed().ToMilliseconds());
 
       call->SendRequest(message_protocol_.get());
     }
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index b5d39a972..a5222e910 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -26,6 +26,7 @@
 #include <memory>
 #include <mutex>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -79,6 +80,7 @@ struct SubprocessMetrics {
   scoped_refptr<Histogram> server_inbound_queue_time_ms;
   scoped_refptr<Histogram> server_outbound_queue_size_bytes;
   scoped_refptr<Histogram> server_outbound_queue_time_ms;
+  scoped_refptr<Counter> server_dropped_messages;
 };
 
 // Encapsulates the pending state of a request that is in the process of being
@@ -271,6 +273,10 @@ class SubprocessServer {
   // Fixed timeout to be used for each call.
   const MonoDelta call_timeout_;
 
+  // The upper limit on the size of a message in protobuf format received
+  // from a subprocess, 0 means no limit.
+  const uint32_t max_message_size_bytes_;
+
   // Next request ID to be assigned.
   std::atomic<CallId> next_id_;
 
diff --git a/src/kudu/subprocess/subprocess_protocol.cc b/src/kudu/subprocess/subprocess_protocol.cc
index e131dc65c..2d3ca3de5 100644
--- a/src/kudu/subprocess/subprocess_protocol.cc
+++ b/src/kudu/subprocess/subprocess_protocol.cc
@@ -19,9 +19,11 @@
 
 #include <unistd.h>
 
+#include <algorithm>
 #include <cerrno>
 #include <cstddef>
 #include <cstdint>
+#include <limits>
 #include <ostream>
 #include <string>
 
@@ -29,6 +31,7 @@
 #include <google/protobuf/util/json_util.h>
 
 #include "kudu/gutil/endian.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/subprocess/subprocess.pb.h" // IWYU pragma: keep
 #include "kudu/tools/tool.pb.h"  // IWYU pragma: keep
@@ -43,13 +46,11 @@ using std::string;
 namespace kudu {
 namespace subprocess {
 
-const int SubprocessProtocol::kMaxMessageBytes = 1024 * 1024;
-
 SubprocessProtocol::SubprocessProtocol(SerializationMode serialization_mode,
                                        CloseMode close_mode,
                                        int read_fd,
                                        int write_fd,
-                                       int max_msg_bytes)
+                                       uint32_t max_msg_bytes)
     : serialization_mode_(serialization_mode),
       close_mode_(close_mode),
       read_fd_(read_fd),
@@ -97,24 +98,36 @@ Status SubprocessProtocol::ReceiveMessage(M* message) {
     }
     case SerializationMode::PB:
     {
-      // Read four bytes of size (big-endian).
+      // Read four bytes where the size of the payload is encoded (big-endian).
       faststring size_buf;
       size_buf.resize(sizeof(uint32_t));
       RETURN_NOT_OK_PREPEND(DoRead(&size_buf), "unable to receive message size");
-      uint32_t body_size = NetworkByteOrder::Load32(size_buf.data());
+      const uint32_t body_size = NetworkByteOrder::Load32(size_buf.data());
+
+      if (max_msg_bytes_ != 0 && PREDICT_FALSE(body_size > max_msg_bytes_)) {
+        const auto msg = Substitute(
+            "message size ($0) exceeds maximum message size ($1)",
+            body_size, max_msg_bytes_);
 
-      if (body_size > max_msg_bytes_) {
-        return Status::IOError(
-            Substitute("message size ($0) exceeds maximum message size ($1)",
-                       body_size, max_msg_bytes_));
+        // Try to read out and discard of the oversized message to clean the
+        // channel for next messages, if any.
+        LOG(WARNING) << Substitute(
+            "$0: reading and discarding of the oversized message", msg);
+        WARN_NOT_OK(DoReadAndDiscard(body_size),
+                    "failed to read out oversized message");
+        return Status::IOError(msg);
       }
 
       // Read the variable size body.
+      //
+      // TODO(aserbin): maybe, use a pre-allocated buffer to read in the body
+      //                of the message since the limit on the maximum message
+      //                size is known beforehand?
       faststring body_buf;
       body_buf.resize(body_size);
       RETURN_NOT_OK_PREPEND(DoRead(&body_buf), "unable to receive message body");
 
-      // Parse the body into a PB request.
+      // Parse the body into a PB message.
       RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(
           message, body_buf.data(), body_buf.length()),
               Substitute("unable to parse PB: $0", body_buf.ToString()));
@@ -165,38 +178,59 @@ Status SubprocessProtocol::SendMessage(const M& message) {
   return Status::OK();
 }
 
-Status SubprocessProtocol::DoRead(faststring* buf) {
+Status SubprocessProtocol::DoRead(faststring* buf) const {
+  DCHECK_LE(buf->length(), std::numeric_limits<ssize_t>::max());
   uint8_t* pos = buf->data();
-  size_t rem = buf->length();
+  ssize_t rem = buf->length();
   while (rem > 0) {
     ssize_t r;
     RETRY_ON_EINTR(r, read(read_fd_, pos, rem));
     if (r == -1) {
-      return Status::IOError("Error reading from pipe", "", errno);
+      const int err = errno;
+      return Status::IOError("Error reading from pipe", "", err);
     }
     if (r == 0) {
       return Status::EndOfFile("Other end of pipe was closed");
     }
-    DCHECK_GE(rem, r);
     rem -= r;
     pos += r;
   }
   return Status::OK();
 }
 
-Status SubprocessProtocol::DoWrite(const faststring& buf) {
+Status SubprocessProtocol::DoReadAndDiscard(ssize_t size) const {
+  DCHECK_LE(size, std::numeric_limits<ssize_t>::max());
+  uint8_t buf[4096];
+  ssize_t rem = size;
+  while (rem > 0) {
+    ssize_t r;
+    RETRY_ON_EINTR(r, read(read_fd_, buf, std::max<ssize_t>(rem, sizeof(buf))));
+    if (r == -1) {
+      const int err = errno;
+      return Status::IOError("Error reading from pipe", "", err);
+    }
+    if (r == 0) {
+      return Status::EndOfFile("Other end of pipe was closed");
+    }
+    rem -= r;
+  }
+  return Status::OK();
+}
+
+Status SubprocessProtocol::DoWrite(const faststring& buf) const {
+  DCHECK_LE(buf.length(), std::numeric_limits<ssize_t>::max());
   const uint8_t* pos = buf.data();
-  size_t rem = buf.length();
+  ssize_t rem = buf.length();
   while (rem > 0) {
     ssize_t r;
     RETRY_ON_EINTR(r, write(write_fd_, pos, rem));
     if (r == -1) {
-      if (errno == EPIPE) {
+      const int err = errno;
+      if (err == EPIPE) {
         return Status::EndOfFile("Other end of pipe was closed");
       }
-      return Status::IOError("Error writing to pipe", "", errno);
+      return Status::IOError("Error writing to pipe", "", err);
     }
-    DCHECK_GE(rem, r);
     rem -= r;
     pos += r;
   }
diff --git a/src/kudu/subprocess/subprocess_protocol.h b/src/kudu/subprocess/subprocess_protocol.h
index 22efce44f..84b09e335 100644
--- a/src/kudu/subprocess/subprocess_protocol.h
+++ b/src/kudu/subprocess/subprocess_protocol.h
@@ -17,6 +17,10 @@
 
 #pragma once
 
+#include <sys/types.h>
+
+#include <cstdint>
+
 #include "kudu/gutil/macros.h"
 #include "kudu/util/status.h"
 
@@ -52,12 +56,13 @@ class SubprocessProtocol {
   //
   // If 'close_mode' is CLOSE_ON_DESTROY, the instance has effectively taken
   // control of 'read_fd' and 'write_fd' and the caller shouldn't use them.
-  // 'max_msg_bytes' represents the maximum number of bytes per message.
+  // 'max_msg_bytes' represents the maximum number of bytes per message,
+  // where 0 has the semantics of 'unlimited size'.
   SubprocessProtocol(SerializationMode serialization_mode,
                      CloseMode close_mode,
                      int read_fd,
                      int write_fd,
-                     int max_msg_bytes = kMaxMessageBytes);
+                     uint32_t max_msg_bytes = 0);
 
   ~SubprocessProtocol();
 
@@ -65,8 +70,10 @@ class SubprocessProtocol {
   //
   // Returns EndOfFile if the writer on the other end of the pipe was closed.
   //
-  // Returns an error if serialization_mode_ is PB and the received message
-  // sizes exceeds kMaxMessageBytes.
+  // Returns corresponding error if:
+  //   * the message could not be parsed
+  //   * serialization_mode_ is PB and the payload of the message exceeds
+  //     the limit specified by 'max_msg_bytes_'
   template <class M>
   Status ReceiveMessage(M* message);
 
@@ -78,16 +85,15 @@ class SubprocessProtocol {
 
  private:
   // Private helpers to drive actual pipe reading and writing.
-  Status DoRead(faststring* buf);
-  Status DoWrite(const faststring& buf);
-
-  static const int kMaxMessageBytes;
+  Status DoRead(faststring* buf) const;
+  Status DoReadAndDiscard(ssize_t size) const;
+  Status DoWrite(const faststring& buf) const;
 
   const SerializationMode serialization_mode_;
   const CloseMode close_mode_;
   const int read_fd_;
   const int write_fd_;
-  const int max_msg_bytes_;
+  const uint32_t max_msg_bytes_;  // 0 has the semantics of 'unlimited size'
 
   DISALLOW_COPY_AND_ASSIGN(SubprocessProtocol);
 };
diff --git a/src/kudu/subprocess/subprocess_proxy-test.cc b/src/kudu/subprocess/subprocess_proxy-test.cc
index ebfe003eb..00c615b1e 100644
--- a/src/kudu/subprocess/subprocess_proxy-test.cc
+++ b/src/kudu/subprocess/subprocess_proxy-test.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/subprocess/subprocess_proxy.h"
 
+#include <cstddef>
 #include <cstdint>
 #include <functional>
 #include <memory>
@@ -30,8 +31,8 @@
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/subprocess/server.h"
 #include "kudu/subprocess/echo_subprocess.h"
+#include "kudu/subprocess/server.h"
 #include "kudu/subprocess/subprocess.pb.h"
 #include "kudu/util/env.h"
 #include "kudu/util/metrics.h"
@@ -41,16 +42,18 @@
 #include "kudu/util/test_util.h"
 
 DECLARE_int32(subprocess_timeout_secs);
+DECLARE_uint32(subprocess_max_message_size_bytes);
 
+METRIC_DECLARE_counter(echo_server_dropped_messages);
+METRIC_DECLARE_histogram(echo_server_inbound_queue_size_bytes);
+METRIC_DECLARE_histogram(echo_server_inbound_queue_time_ms);
+METRIC_DECLARE_histogram(echo_server_outbound_queue_size_bytes);
+METRIC_DECLARE_histogram(echo_server_outbound_queue_time_ms);
+METRIC_DECLARE_histogram(echo_subprocess_execution_time_ms);
 METRIC_DECLARE_histogram(echo_subprocess_inbound_queue_length);
-METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_length);
 METRIC_DECLARE_histogram(echo_subprocess_inbound_queue_time_ms);
+METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_length);
 METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_time_ms);
-METRIC_DECLARE_histogram(echo_subprocess_execution_time_ms);
-METRIC_DECLARE_histogram(echo_server_outbound_queue_size_bytes);
-METRIC_DECLARE_histogram(echo_server_inbound_queue_size_bytes);
-METRIC_DECLARE_histogram(echo_server_outbound_queue_time_ms);
-METRIC_DECLARE_histogram(echo_server_inbound_queue_time_ms);
 
 using std::unique_ptr;
 using std::string;
@@ -98,8 +101,10 @@ class EchoSubprocessTest : public KuduTest {
   const string test_dir_;
 };
 
+#define GET_COUNTER(metric_entity, metric_name) \
+  down_cast<Counter*>((metric_entity)->FindOrNull(METRIC_##metric_name).get())
 #define GET_HIST(metric_entity, metric_name) \
-  down_cast<Histogram*>((metric_entity)->FindOrNull(METRIC_##metric_name).get());
+  down_cast<Histogram*>((metric_entity)->FindOrNull(METRIC_##metric_name).get())
 
 TEST_F(EchoSubprocessTest, TestBasicSubprocessMetrics) {
   const string kMessage = "don't catch you slippin' now";
@@ -209,6 +214,59 @@ TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) {
   });
 }
 
+TEST_F(EchoSubprocessTest, DroppedMessagesMetric) {
+  FLAGS_subprocess_timeout_secs = 1;
+  FLAGS_subprocess_max_message_size_bytes = 100;
+  ASSERT_OK(ResetEchoSubprocess());
+
+  const auto* counter = GET_COUNTER(metric_entity_, echo_server_dropped_messages);
+  ASSERT_EQ(0, counter->value());
+
+  // Send an oversized message -- it should be dropped.
+  {
+    EchoRequestPB req;
+    req.set_data(string(100, 'x'));
+    EchoResponsePB resp;
+    const auto s = echo_subprocess_->Execute(req, &resp);
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+  }
+  // The dropped message's counter should be increment by one.
+  ASSERT_EQ(1, counter->value());
+
+  // Send a non-oversized message.
+  {
+    EchoRequestPB req;
+    req.set_data("x");
+    EchoResponsePB resp;
+    ASSERT_OK(echo_subprocess_->Execute(req, &resp));
+  }
+  // The dropped message's counter should stay at its prior value.
+  ASSERT_EQ(1, counter->value());
+
+  // Send a few more oversized messages.
+  for (size_t i = 0; i < 2; ++i) {
+    EchoRequestPB req;
+    req.set_data(string(1000 + i, 'x'));
+    EchoResponsePB resp;
+    const auto s = echo_subprocess_->Execute(req, &resp);
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+  }
+  // The dropped message's counter should be incremented by the number of
+  // oversized messages sent after capturing the prior reading.
+  ASSERT_EQ(1 + 2, counter->value());
+
+  // Sent several non-oversized message.
+  for (size_t i = 0; i < 5; ++i) {
+    EchoRequestPB req;
+    req.set_data(string(i, 'x'));
+    EchoResponsePB resp;
+    ASSERT_OK(echo_subprocess_->Execute(req, &resp));
+  }
+  // The dropped message's counter should stay at its prior value.
+  ASSERT_EQ(3, counter->value());
+}
+
+#undef GET_COUNTER
 #undef GET_HIST
 
 } // namespace subprocess
diff --git a/src/kudu/subprocess/subprocess_server-test.cc b/src/kudu/subprocess/subprocess_server-test.cc
index af0055971..b6a2a7bd2 100644
--- a/src/kudu/subprocess/subprocess_server-test.cc
+++ b/src/kudu/subprocess/subprocess_server-test.cc
@@ -19,6 +19,7 @@
 #include <ostream>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -47,6 +48,7 @@ DECLARE_int32(subprocess_request_queue_size_bytes);
 DECLARE_int32(subprocess_response_queue_size_bytes);
 DECLARE_int32(subprocess_num_responder_threads);
 DECLARE_int32(subprocess_timeout_secs);
+DECLARE_uint32(subprocess_max_message_size_bytes);
 
 using google::protobuf::Any;
 using std::make_shared;
@@ -335,6 +337,90 @@ TEST_F(SubprocessServerTest, TestRunFromMultipleThreads) {
   }
 }
 
+// The subprocess server should reject messages in protobuf format if their
+// payload is over the threshold specified by the
+// --subprocess_max_message_size_bytes flag.
+TEST_F(SubprocessServerTest, MaxPayloadSize) {
+  // Set a short timeout to speed up testing.
+  FLAGS_subprocess_timeout_secs = 1;
+
+  // The protobuf-encoded message has some metadata, so for a single-character
+  // payload let's set the upper limit to 100 to make sure the encoded message
+  // size is still under 100 bytes.
+  FLAGS_subprocess_max_message_size_bytes = 100;
+  ASSERT_OK(ResetSubprocessServer());
+
+  // Send in a message that isn't oversized as per the current limit.
+  {
+    auto req = CreateEchoSubprocessRequestPB("0");
+    SubprocessResponsePB res;
+    ASSERT_OK(server_->Execute(&req, &res));
+  }
+
+  // Send an oversized message.
+  {
+    auto req = CreateEchoSubprocessRequestPB(string(100, 'x'));
+    SubprocessResponsePB res;
+    const auto s = server_->Execute(&req, &res);
+
+    // The request will timeout because the oversized response is read and
+    // discarded, and there isn't any application-level data to be sent back.
+    // Unfortunately, the current design of the subprocess protocol doesn't
+    // allow for reporting on communication errors between the server and
+    // the subprocesses it spawned.
+    ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "timed out while in flight");
+  }
+
+  // Non-oversized follow-up messages should be received without any issues:
+  // the communication channel should be cleared of any oversized requests
+  // sent earlier.
+  {
+    auto req = CreateEchoSubprocessRequestPB("1");
+    SubprocessResponsePB res;
+    ASSERT_OK(server_->Execute(&req, &res));
+  }
+}
+
+// Test the semantics of the --subprocess_max_message_size_bytes=0 setting,
+// meaning there isn't upper limit on the size of protobuf-encoded responses
+// from the subprocess.
+//
+// NOTE: the Java-based implementation of the related Subprocess components
+//       and test harnesses have their own limit on the maximum allowed size
+//       of the message, but in this context it's enough just to test that
+//       --subprocess_max_message_size_bytes=0 isn't treated literally as 0
+//       for the maximum size of SubprocessResponsePB messages
+TEST_F(SubprocessServerTest, UnlimitedPayloadSize) {
+  // Set a short timeout to speed up testing.
+  FLAGS_subprocess_timeout_secs = 1;
+
+  // No upper limit on protobuf-encoded responses.
+  FLAGS_subprocess_max_message_size_bytes = 0;
+  ASSERT_OK(ResetSubprocessServer());
+
+  // An empty message.
+  {
+    auto req = CreateEchoSubprocessRequestPB("");
+    SubprocessResponsePB res;
+    ASSERT_OK(server_->Execute(&req, &res));
+  }
+
+  // A very short message.
+  {
+    auto req = CreateEchoSubprocessRequestPB("x");
+    SubprocessResponsePB res;
+    ASSERT_OK(server_->Execute(&req, &res));
+  }
+
+  // Just in case, send a longer message: should be OK as well.
+  {
+    auto req = CreateEchoSubprocessRequestPB(string(1024, 'x'));
+    SubprocessResponsePB res;
+    ASSERT_OK(server_->Execute(&req, &res));
+  }
+}
+
 } // namespace subprocess
 } // namespace kudu