You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/03 06:51:37 UTC

[kudu] branch master updated (93e8587 -> e2301c0)

This is an automated email from the ASF dual-hosted git repository.

awong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 93e8587  [security] introduce security level override for OpenSSL
     new 0b6889d  [java] subprocess: add a metrics message
     new e2301c0  util: deadline for BlockingQueue::Blocking{Put,Get}

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kudu/subprocess/echo/EchoProtocolHandler.java  |   2 +-
 .../kudu/subprocess/echo/TestEchoSubprocess.java   | 349 +++++++++++++++------
 ...ubprocessException.java => InboundRequest.java} |  35 ++-
 .../org/apache/kudu/subprocess/MessageParser.java  |  50 +--
 .../org/apache/kudu/subprocess/MessageReader.java  |  11 +-
 .../org/apache/kudu/subprocess/MessageWriter.java  |   9 +-
 .../apache/kudu/subprocess/OutboundResponse.java   |  56 ++++
 .../apache/kudu/subprocess/ProtocolHandler.java    |  18 +-
 .../apache/kudu/subprocess/SubprocessExecutor.java |  30 +-
 .../apache/kudu/subprocess/SubprocessMetrics.java  | 108 +++++++
 .../apache/kudu/subprocess/MessageTestUtil.java    |  38 ++-
 src/kudu/consensus/log.cc                          |   2 +-
 src/kudu/integration-tests/linked_list-test-util.h |   2 +-
 src/kudu/subprocess/server.cc                      |  33 +-
 src/kudu/subprocess/server.h                       |   4 +-
 src/kudu/subprocess/subprocess.proto               |  29 +-
 src/kudu/subprocess/subprocess_server-test.cc      |   4 +-
 src/kudu/util/blocking_queue-test.cc               | 106 ++++++-
 src/kudu/util/blocking_queue.h                     |  78 +++--
 19 files changed, 716 insertions(+), 248 deletions(-)
 copy java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/{KuduSubprocessException.java => InboundRequest.java} (51%)
 create mode 100644 java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/OutboundResponse.java
 create mode 100644 java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMetrics.java


[kudu] 02/02: util: deadline for BlockingQueue::Blocking{Put,Get}

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e2301c04e40eb55d81435327efa44ccefccc807d
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Feb 28 19:11:11 2020 -0800

    util: deadline for BlockingQueue::Blocking{Put,Get}
    
    Drawing inspiration from BlockingDrainTo(), this adds a deadline to
    BlockingPut() and BlockingGet(), with the same error semantics.
    
    This also updates SubprocessServer to use of BlockingPut(), instead of
    manual deadline-checking.
    
    Change-Id: If32bad5667195d72eeb03e4942751824d4dadb7a
    Reviewed-on: http://gerrit.cloudera.org:8080/15324
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/consensus/log.cc                          |   2 +-
 src/kudu/integration-tests/linked_list-test-util.h |   2 +-
 src/kudu/subprocess/server.cc                      |  33 +------
 src/kudu/subprocess/server.h                       |   4 +-
 src/kudu/subprocess/subprocess_server-test.cc      |   4 +-
 src/kudu/util/blocking_queue-test.cc               | 106 ++++++++++++++++++---
 src/kudu/util/blocking_queue.h                     |  78 +++++++++------
 7 files changed, 152 insertions(+), 77 deletions(-)

diff --git a/src/kudu/consensus/log.cc b/src/kudu/consensus/log.cc
index f9a6018..ed6057a 100644
--- a/src/kudu/consensus/log.cc
+++ b/src/kudu/consensus/log.cc
@@ -856,7 +856,7 @@ Status Log::AsyncAppend(unique_ptr<LogEntryBatch> entry_batch, const StatusCallb
 
   entry_batch->set_callback(callback);
   TRACE_EVENT_FLOW_BEGIN0("log", "Batch", entry_batch.get());
-  if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch.get()))) {
+  if (PREDICT_FALSE(!entry_batch_queue_.BlockingPut(entry_batch.get()).ok())) {
     TRACE_EVENT_FLOW_END0("log", "Batch", entry_batch.get());
     return kLogShutdownStatus;
   }
diff --git a/src/kudu/integration-tests/linked_list-test-util.h b/src/kudu/integration-tests/linked_list-test-util.h
index 5547f66..1461a2a 100644
--- a/src/kudu/integration-tests/linked_list-test-util.h
+++ b/src/kudu/integration-tests/linked_list-test-util.h
@@ -288,7 +288,7 @@ class ScopedRowUpdater {
     CHECK_OK(session->SetFlushMode(client::KuduSession::AUTO_FLUSH_BACKGROUND));
 
     int64_t next_key;
-    while (to_update_.BlockingGet(&next_key)) {
+    while (to_update_.BlockingGet(&next_key).ok()) {
       std::unique_ptr<client::KuduUpdate> update(table_->NewUpdate());
       CHECK_OK(update->mutable_row()->SetInt64(kKeyColumnName, next_key));
       CHECK_OK(update->mutable_row()->SetBool(kUpdatedColumnName, true));
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 04fd71c..c8dc754 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -33,7 +33,6 @@
 #include "kudu/subprocess/subprocess.pb.h"
 #include "kudu/subprocess/subprocess_protocol.h"
 #include "kudu/util/async_util.h"
-#include "kudu/util/blocking_queue.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/status.h"
@@ -131,7 +130,8 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req,
   Synchronizer sync;
   auto cb = sync.AsStdStatusCallback();
   auto call = make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_);
-  RETURN_NOT_OK(QueueCall(call));
+  RETURN_NOT_OK_PREPEND(outbound_call_queue_.BlockingPut(call, call->deadline()),
+                        "couldn't enqueue call");
   return sync.Wait();
 }
 
@@ -186,7 +186,7 @@ void SubprocessServer::ReceiveMessagesThread() {
     // subprocess.
     DCHECK(s.ok());
     WARN_NOT_OK(s, "failed to receive response from the subprocess");
-    if (s.ok() && !inbound_response_queue_.BlockingPut(response)) {
+    if (s.ok() && !inbound_response_queue_.BlockingPut(response).ok()) {
       // The queue has been shut down and we should shut down too.
       DCHECK_EQ(0, closing_.count());
       LOG(INFO) << "failed to put response onto inbound queue";
@@ -290,32 +290,5 @@ void SubprocessServer::SendMessagesThread() {
   LOG(INFO) << "outbound queue shut down: " << s.ToString();
 }
 
-Status SubprocessServer::QueueCall(const shared_ptr<SubprocessCall>& call) {
-  if (MonoTime::Now() > call->deadline()) {
-    return Status::TimedOut("timed out before queueing call");
-  }
-
-  do {
-    QueueStatus queue_status = outbound_call_queue_.Put(call);
-    switch (queue_status) {
-      case QUEUE_SUCCESS:
-        return Status::OK();
-      case QUEUE_SHUTDOWN:
-        return Status::ServiceUnavailable("outbound queue shutting down");
-      case QUEUE_FULL: {
-        // If we still have more time allotted for this call, wait for a bit
-        // and try again; otherwise, time out.
-        if (MonoTime::Now() > call->deadline()) {
-          return Status::TimedOut("timed out trying to queue call");
-        }
-        closing_.WaitFor(
-            MonoDelta::FromMilliseconds(FLAGS_subprocess_queue_full_retry_ms));
-      }
-    }
-  } while (true);
-
-  return Status::OK();
-}
-
 } // namespace subprocess
 } // namespace kudu
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index 5afc3e9..5e9d625 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -192,8 +192,8 @@ class SubprocessServer {
   // Initialize the server, starting the subprocess and worker threads.
   Status Init() WARN_UNUSED_RESULT;
 
-  // Synchronously send a request to the subprocess and populate 'resp' with
-  // contents returned from the subprocess, or return an error if anything
+  // Synchronously sends a request to the subprocess and populates 'resp' with
+  // contents returned from the subprocess, or returns an error if anything
   // failed or timed out along the way.
   Status Execute(SubprocessRequestPB* req, SubprocessResponsePB* resp) WARN_UNUSED_RESULT;
 
diff --git a/src/kudu/subprocess/subprocess_server-test.cc b/src/kudu/subprocess/subprocess_server-test.cc
index 53d3824..d81f1b4 100644
--- a/src/kudu/subprocess/subprocess_server-test.cc
+++ b/src/kudu/subprocess/subprocess_server-test.cc
@@ -176,7 +176,7 @@ TEST_F(SubprocessServerTest, TestTimeoutBeforeQueueing) {
   SubprocessResponsePB response;
   Status s = server_->Execute(&request, &response);
   ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
-  ASSERT_STR_CONTAINS(s.ToString(), "timed out before queueing call");
+  ASSERT_STR_CONTAINS(s.ToString(), "couldn't enqueue call");
 }
 
 // Test when we try sending too many requests at once.
@@ -213,7 +213,7 @@ TEST_F(SubprocessServerTest, TestTimeoutWhileQueueingCalls) {
   bool has_timeout_when_queueing = false;
   for (const auto& s : results) {
     if (s.IsTimedOut() &&
-        s.ToString().find("timed out trying to queue call") != string::npos) {
+        s.ToString().find("couldn't enqueue call") != string::npos) {
       has_timeout_when_queueing = true;
     }
   }
diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc
index a2271ff..bc897e1 100644
--- a/src/kudu/util/blocking_queue-test.cc
+++ b/src/kudu/util/blocking_queue-test.cc
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "kudu/util/blocking_queue.h"
+
 #include <cstddef>
 #include <cstdint>
+#include <list>
 #include <map>
 #include <string>
 #include <thread>
@@ -26,9 +29,9 @@
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/util/countdown_latch.h"
-#include "kudu/util/blocking_queue.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/mutex.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 
@@ -49,11 +52,11 @@ void InsertSomeThings() {
 TEST(BlockingQueueTest, Test1) {
   thread inserter_thread(InsertSomeThings);
   int32_t i;
-  ASSERT_TRUE(test1_queue.BlockingGet(&i));
+  ASSERT_OK(test1_queue.BlockingGet(&i));
   ASSERT_EQ(1, i);
-  ASSERT_TRUE(test1_queue.BlockingGet(&i));
+  ASSERT_OK(test1_queue.BlockingGet(&i));
   ASSERT_EQ(2, i);
-  ASSERT_TRUE(test1_queue.BlockingGet(&i));
+  ASSERT_OK(test1_queue.BlockingGet(&i));
   ASSERT_EQ(3, i);
   inserter_thread.join();
 }
@@ -79,6 +82,83 @@ TEST(BlockingQueueTest, TestBlockingDrainTo) {
   ASSERT_TRUE(s.IsAborted());
 }
 
+TEST(BlockingQueueTest, TestBlockingPut) {
+  const MonoDelta kShortTimeout = MonoDelta::FromMilliseconds(200);
+  const MonoDelta kEvenShorterTimeout = MonoDelta::FromMilliseconds(100);
+  BlockingQueue<int32_t> test_queue(2);
+
+  // First, a trivial check that we don't do anything if our deadline has
+  // already passed.
+  Status s = test_queue.BlockingPut(1, MonoTime::Now() - kShortTimeout);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+  // Now put a couple elements onto the queue.
+  ASSERT_OK(test_queue.BlockingPut(1));
+  ASSERT_OK(test_queue.BlockingPut(2));
+
+  // We're at capacity, so further puts should time out...
+  s = test_queue.BlockingPut(3, MonoTime::Now() + kShortTimeout);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+  // ... until space is freed up before the deadline.
+  thread t([&] {
+    SleepFor(kEvenShorterTimeout);
+    int out;
+    ASSERT_OK(test_queue.BlockingGet(&out));
+  });
+  SCOPED_CLEANUP({
+    t.join();
+  });
+  ASSERT_OK(test_queue.BlockingPut(3, MonoTime::Now() + kShortTimeout));
+
+  // If we shut down, we shouldn't be able to put more elements onto the queue.
+  test_queue.Shutdown();
+  s = test_queue.BlockingPut(3, MonoTime::Now() + kShortTimeout);
+  ASSERT_TRUE(s.IsAborted()) << s.ToString();
+}
+
+TEST(BlockingQueueTest, TestBlockingGet) {
+  const MonoDelta kShortTimeout = MonoDelta::FromMilliseconds(200);
+  const MonoDelta kEvenShorterTimeout = MonoDelta::FromMilliseconds(100);
+  BlockingQueue<int32_t> test_queue(2);
+  ASSERT_OK(test_queue.BlockingPut(1));
+  ASSERT_OK(test_queue.BlockingPut(2));
+
+  // Test that if we have stuff in our queue, regardless of deadlines, we'll be
+  // able to get them out.
+  int32_t val = 0;
+  ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() - kShortTimeout));
+  ASSERT_EQ(1, val);
+  ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout));
+  ASSERT_EQ(2, val);
+
+  // But without stuff in the queue, we'll time out...
+  Status s = test_queue.BlockingGet(&val, MonoTime::Now() - kShortTimeout);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+  s = test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+  // ... until new elements show up.
+  thread t([&] {
+    SleepFor(kEvenShorterTimeout);
+    ASSERT_OK(test_queue.BlockingPut(3));
+  });
+  SCOPED_CLEANUP({
+    t.join();
+  });
+  ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout));
+  ASSERT_EQ(3, val);
+
+  // If we shut down with stuff in our queue, we'll continue to return those
+  // elements. Otherwise, we'll return an error.
+  ASSERT_OK(test_queue.BlockingPut(4));
+  test_queue.Shutdown();
+  ASSERT_OK(test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout));
+  ASSERT_EQ(4, val);
+  s = test_queue.BlockingGet(&val, MonoTime::Now() + kShortTimeout);
+  ASSERT_TRUE(s.IsAborted()) << s.ToString();
+}
+
 // Test that, when the queue is shut down with elements still pending,
 // Drain still returns OK until the elements are all gone.
 TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
@@ -91,7 +171,7 @@ TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
 
   // Get() should still return an element.
   int i;
-  ASSERT_TRUE(q.BlockingGet(&i));
+  ASSERT_OK(q.BlockingGet(&i));
   ASSERT_EQ(1, i);
 
   // Drain should still return OK, since it yielded elements.
@@ -102,7 +182,8 @@ TEST(BlockingQueueTest, TestGetAndDrainAfterShutdown) {
   // Now that it's empty, it should return Aborted.
   Status s = q.BlockingDrainTo(&out);
   ASSERT_TRUE(s.IsAborted()) << s.ToString();
-  ASSERT_FALSE(q.BlockingGet(&i));
+  s = q.BlockingGet(&i);
+  ASSERT_FALSE(s.ok()) << s.ToString();
 }
 
 TEST(BlockingQueueTest, TestTooManyInsertions) {
@@ -154,9 +235,10 @@ TEST(BlockingQueueTest, TestGetFromShutdownQueue) {
   test_queue.Shutdown();
   ASSERT_EQ(test_queue.Put(456), QUEUE_SHUTDOWN);
   int64_t i;
-  ASSERT_TRUE(test_queue.BlockingGet(&i));
+  ASSERT_OK(test_queue.BlockingGet(&i));
   ASSERT_EQ(123, i);
-  ASSERT_FALSE(test_queue.BlockingGet(&i));
+  Status s = test_queue.BlockingGet(&i);
+  ASSERT_FALSE(s.ok()) << s.ToString();
 }
 
 TEST(BlockingQueueTest, TestGscopedPtrMethods) {
@@ -164,7 +246,7 @@ TEST(BlockingQueueTest, TestGscopedPtrMethods) {
   gscoped_ptr<int> input_int(new int(123));
   ASSERT_EQ(test_queue.Put(&input_int), QUEUE_SUCCESS);
   gscoped_ptr<int> output_int;
-  ASSERT_TRUE(test_queue.BlockingGet(&output_int));
+  ASSERT_OK(test_queue.BlockingGet(&output_int));
   ASSERT_EQ(123, *output_int.get());
   test_queue.Shutdown();
 }
@@ -187,7 +269,7 @@ class MultiThreadTest {
     sync_latch_.CountDown();
     sync_latch_.Wait();
     for (int i = 0; i < blocking_puts_; i++) {
-      ASSERT_TRUE(queue_.BlockingPut(arg));
+      ASSERT_OK(queue_.BlockingPut(arg));
     }
     MutexLock guard(lock_);
     if (--num_inserters_ == 0) {
@@ -198,8 +280,8 @@ class MultiThreadTest {
   void RemoverThread() {
     for (int i = 0; i < puts_ + blocking_puts_; i++) {
       int32_t arg = 0;
-      bool got = queue_.BlockingGet(&arg);
-      if (!got) {
+      Status s = queue_.BlockingGet(&arg);
+      if (!s.ok()) {
         arg = -1;
       }
       MutexLock guard(lock_);
diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h
index 7331c12..a26b3aa 100644
--- a/src/kudu/util/blocking_queue.h
+++ b/src/kudu/util/blocking_queue.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_UTIL_BLOCKING_QUEUE_H
-#define KUDU_UTIL_BLOCKING_QUEUE_H
+#pragma once
 
 #include <list>
 #include <string>
@@ -70,9 +69,17 @@ class BlockingQueue {
         << "BlockingQueue holds bare pointers at destruction time";
   }
 
-  // Get an element from the queue.  Returns false if we were shut down prior to
-  // getting the element.
-  bool BlockingGet(T *out) {
+  // Gets an element from the queue; if the queue is empty, blocks until the
+  // queue becomes non-empty, or until the deadline passes.
+  //
+  // If the queue has been shut down but there are still elements in the queue,
+  // it returns those elements as if the queue were not yet shut down.
+  //
+  // Returns:
+  // - OK if successful
+  // - TimedOut if the deadline passed
+  // - Aborted if the queue shut down
+  Status BlockingGet(T *out, MonoTime deadline = MonoTime()) {
     MutexLock l(lock_);
     while (true) {
       if (!list_.empty()) {
@@ -80,25 +87,26 @@ class BlockingQueue {
         list_.pop_front();
         decrement_size_unlocked(*out);
         not_full_.Signal();
-        return true;
+        return Status::OK();
       }
       if (shutdown_) {
-        return false;
+        return Status::Aborted("");
+      }
+      if (!deadline.Initialized()) {
+        not_empty_.Wait();
+      } else if (PREDICT_FALSE(!not_empty_.WaitUntil(deadline))) {
+        return Status::TimedOut("");
       }
-      not_empty_.Wait();
     }
   }
 
   // Get an element from the queue.  Returns false if the queue is empty and
   // we were shut down prior to getting the element.
-  bool BlockingGet(gscoped_ptr<T_VAL> *out) {
-    T t = NULL;
-    bool got_element = BlockingGet(&t);
-    if (!got_element) {
-      return false;
-    }
+  Status BlockingGet(gscoped_ptr<T_VAL> *out, MonoTime deadline = MonoTime()) {
+    T t = nullptr;
+    RETURN_NOT_OK(BlockingGet(&t, deadline));
     out->reset(t);
-    return true;
+    return Status::OK();
   }
 
   // Get all elements from the queue and append them to a vector.
@@ -168,34 +176,47 @@ class BlockingQueue {
     return s;
   }
 
-  // Gets an element for the queue; if the queue is full, blocks until
-  // space becomes available. Returns false if we were shutdown prior
-  // to enqueueing the element.
-  bool BlockingPut(const T& val) {
+  // Puts an element onto the queue; if the queue is full, blocks until space
+  // becomes available, or until the deadline passes.
+  //
+  // NOTE: unlike BlockingGet() and BlockingDrainTo(), which succeed as long as
+  // there are elements in the queue (regardless of deadline), if the deadline
+  // has passed, an error will be returned even if there is space in the queue.
+  //
+  // Returns:
+  // - OK if successful
+  // - TimedOut if the deadline passed
+  // - Aborted if the queue shut down
+  Status BlockingPut(const T& val, MonoTime deadline = MonoTime()) {
+    if (deadline.Initialized() && MonoTime::Now() > deadline) {
+      return Status::TimedOut("");
+    }
     MutexLock l(lock_);
     while (true) {
       if (shutdown_) {
-        return false;
+        return Status::Aborted("");
       }
       if (size_ < max_size_) {
         list_.push_back(val);
         increment_size_unlocked(val);
         l.Unlock();
         not_empty_.Signal();
-        return true;
+        return Status::OK();
+      }
+      if (!deadline.Initialized()) {
+        not_full_.Wait();
+      } else if (PREDICT_FALSE(!not_full_.WaitUntil(deadline))) {
+        return Status::TimedOut("");
       }
-      not_full_.Wait();
     }
   }
 
   // Same as other BlockingPut() overload above. If the element was
   // enqueued, gscoped_ptr releases its contents.
-  bool BlockingPut(gscoped_ptr<T_VAL>* val) {
-    bool ret = Put(val->get());
-    if (ret) {
-      ignore_result(val->release());
-    }
-    return ret;
+  Status BlockingPut(gscoped_ptr<T_VAL>* val, MonoTime deadline = MonoTime()) {
+    RETURN_NOT_OK(BlockingPut(val->get(), deadline));
+    ignore_result(val->release());
+    return Status::OK();
   }
 
   // Shut down the queue.
@@ -253,4 +274,3 @@ class BlockingQueue {
 
 } // namespace kudu
 
-#endif


[kudu] 01/02: [java] subprocess: add a metrics message

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0b6889d109208493a006381555cb7a1cc2576e62
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Thu Feb 27 18:59:56 2020 -0800

    [java] subprocess: add a metrics message
    
    This patch adds a metrics message to each SubprocessResponsePB. These
    metrics are general metrics that most Subprocess implementations will
    likely find useful. Namely, inbound queue time, outbound queue time,
    inbound queue length, outbound queue length, and execution time.
    
    To get these metrics, I've passed the new SubprocessMetrics class around
    alongside the request/response as it journeys through the
    SubprocessExecutor, timing the different stages as it goes. Right before
    sending the final response over the pipe, the queue lengths are
    determined and the final response is built.
    
    A couple of tests are added to demonstrate how these metrics may be
    useful. Along the way, I refactored the EchoSubprocess tests to be a bit
    more ergonomic w.r.t creating, sending, and receiving messages.
    
    Along the way, I renamed some of the subprocess methods for clarity:
    - ProtocolHandler.handleRequest() -> unpackAndExecuteRequest()
    - ProtocolHandler.createResponse() -> executeRequest()
    - MessageParser.getResponse() -> parseAndExecuteRequest()
    
    These metrics are currently unused -- a later patch will plumb them into
    the C++ metrics.
    
    Change-Id: I11a89fff8df23c5057c577f2aebfd40922d01e3c
    Reviewed-on: http://gerrit.cloudera.org:8080/15315
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 .../kudu/subprocess/echo/EchoProtocolHandler.java  |   2 +-
 .../kudu/subprocess/echo/TestEchoSubprocess.java   | 349 +++++++++++++++------
 .../org/apache/kudu/subprocess/InboundRequest.java |  49 +++
 .../org/apache/kudu/subprocess/MessageParser.java  |  50 +--
 .../org/apache/kudu/subprocess/MessageReader.java  |  11 +-
 .../org/apache/kudu/subprocess/MessageWriter.java  |   9 +-
 .../apache/kudu/subprocess/OutboundResponse.java   |  56 ++++
 .../apache/kudu/subprocess/ProtocolHandler.java    |  18 +-
 .../apache/kudu/subprocess/SubprocessExecutor.java |  30 +-
 .../apache/kudu/subprocess/SubprocessMetrics.java  | 108 +++++++
 .../apache/kudu/subprocess/MessageTestUtil.java    |  38 ++-
 src/kudu/subprocess/subprocess.proto               |  29 +-
 12 files changed, 590 insertions(+), 159 deletions(-)

diff --git a/java/kudu-subprocess-echo/src/main/java/org/apache/kudu/subprocess/echo/EchoProtocolHandler.java b/java/kudu-subprocess-echo/src/main/java/org/apache/kudu/subprocess/echo/EchoProtocolHandler.java
index f6293b5..53b3c43 100644
--- a/java/kudu-subprocess-echo/src/main/java/org/apache/kudu/subprocess/echo/EchoProtocolHandler.java
+++ b/java/kudu-subprocess-echo/src/main/java/org/apache/kudu/subprocess/echo/EchoProtocolHandler.java
@@ -31,7 +31,7 @@ import org.apache.kudu.subprocess.Subprocess.EchoResponsePB;
 class EchoProtocolHandler extends ProtocolHandler<EchoRequestPB, EchoResponsePB> {
 
   @Override
-  protected EchoResponsePB createResponse(EchoRequestPB request) {
+  protected EchoResponsePB executeRequest(EchoRequestPB request) {
     if (request.hasSleepMs()) {
       try {
         Thread.sleep(request.getSleepMs());
diff --git a/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java b/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
index 0e9b5ef..c0e06c4 100644
--- a/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
+++ b/java/kudu-subprocess-echo/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
@@ -17,10 +17,11 @@
 
 package org.apache.kudu.subprocess.echo;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
+import java.io.BufferedInputStream;
+import java.io.IOException;
 import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
 import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
@@ -29,7 +30,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 
-import com.google.common.primitives.Bytes;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -40,6 +40,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.kudu.subprocess.KuduSubprocessException;
 import org.apache.kudu.subprocess.MessageIO;
 import org.apache.kudu.subprocess.MessageTestUtil;
+import org.apache.kudu.subprocess.OutboundResponse;
+import org.apache.kudu.subprocess.Subprocess.EchoResponsePB;
+import org.apache.kudu.subprocess.Subprocess.SubprocessMetricsPB;
+import org.apache.kudu.subprocess.Subprocess.SubprocessRequestPB;
 import org.apache.kudu.subprocess.Subprocess.SubprocessResponsePB;
 import org.apache.kudu.subprocess.SubprocessExecutor;
 import org.apache.kudu.test.junit.RetryRule;
@@ -49,114 +53,272 @@ import org.apache.kudu.test.junit.RetryRule;
  */
 public class TestEchoSubprocess {
   private static final Logger LOG = LoggerFactory.getLogger(TestEchoSubprocess.class);
+  private static final String[] NO_ARGS = {""};
+  private static final int TIMEOUT_MS = 1000;
+  private static final String MESSAGE = "We are one. We are many.";
+
+  // Helper functors that can be passed around to ensure we either see an error
+  // or not.
   private static final Function<Throwable, Void> NO_ERR = e -> {
     LOG.error(String.format("Unexpected error: %s", e.getMessage()));
     Assert.fail();
     return null;
   };
-  private static final Function<Throwable, Void> HAS_ERR = e -> {
+  private final Function<Throwable, Void> HAS_ERR = e -> {
     Assert.assertTrue(e instanceof KuduSubprocessException);
     return null;
   };
 
+  // Pipe that we can write to that will feed requests to the subprocess's
+  // input pipe.
+  private PipedOutputStream requestSenderPipe;
+
+  // Pipe that we can read from that will receive responses from the
+  // subprocess's output pipe.
+  private final PipedInputStream responseReceiverPipe = new PipedInputStream();
+
   @Rule
   public RetryRule retryRule = new RetryRule();
 
   public static class PrintStreamWithIOException extends PrintStream {
     public PrintStreamWithIOException(OutputStream out, boolean autoFlush, String encoding)
-            throws UnsupportedEncodingException {
+        throws UnsupportedEncodingException {
       super(out, autoFlush, encoding);
     }
 
     @Override
     public boolean checkError() {
+      // Always say that we've got an error.
       return true;
     }
   }
 
-  void runEchoSubprocess(InputStream in,
-                         PrintStream out,
-                         String[] args,
-                         Function<Throwable, Void> errorHandler,
-                         boolean injectInterrupt)
-      throws InterruptedException, ExecutionException, TimeoutException {
-    System.setIn(in);
-    System.setOut(out);
-    SubprocessExecutor subprocessExecutor = new SubprocessExecutor(errorHandler);
-    EchoProtocolHandler protocolProcessor = new EchoProtocolHandler();
-    if (injectInterrupt) {
-      subprocessExecutor.interrupt();
+  // Given that executors run multiple threads, the exceptions that we expect
+  // may not necessarily be the first thrown. This checks for the expected
+  // error on all thrown exceptions, including suppressed ones.
+  <T extends Throwable> void assertIncludingSuppressedThrows(Class<T> expectedThrowable,
+                                                             String errorMessage,
+                                                             ThrowingRunnable runnable) {
+    try {
+      runnable.run();
+    } catch (Throwable actualThrown) {
+      if (expectedThrowable.isInstance(actualThrown) &&
+          actualThrown.toString().contains(errorMessage)) {
+        return;
+      }
+      LOG.info(actualThrown.toString());
+      for (Throwable s : actualThrown.getSuppressed()) {
+        if (s.getClass() == expectedThrowable && s.toString().contains(errorMessage)) {
+          return;
+        }
+        LOG.info(s.toString());
+      }
+      throw new AssertionError(String.format("No errors that match %s with message: %s",
+                                             expectedThrowable.toString(), errorMessage));
     }
-    subprocessExecutor.run(args, protocolProcessor, /* timeoutMs= */1000);
+    throw new AssertionError("Didn't throw an exception");
+  }
+
+  // Sends a SubprocessRequestPB to the sender pipe, serializing it as
+  // appropriate.
+  void sendRequestToPipe(SubprocessRequestPB req) throws IOException {
+    requestSenderPipe.write(MessageTestUtil.serializeMessage(req));
+  }
+
+  // Receives a response from the receiver pipe and deserializes it into a
+  // SubprocessResponsePB.
+  SubprocessResponsePB receiveResponse() throws IOException {
+    BufferedInputStream bufferedInput = new BufferedInputStream(responseReceiverPipe);
+    return MessageTestUtil.deserializeMessage(bufferedInput, SubprocessResponsePB.parser());
+  }
+
+  // Sets up and returns a SubprocessExecutor with the given error handler and
+  // IO error injection behavior. The SubprocessExecutor will do IO to and from
+  // 'requestSenderPipe' and 'responseReceiverPipe'.
+  SubprocessExecutor setUpExecutorIO(Function<Throwable, Void> errorHandler,
+                                     boolean injectIOError) throws IOException {
+    // Initialize the pipe that we'll push requests to; feed it into the
+    // executor's input pipe.
+    PipedInputStream inputPipe = new PipedInputStream();
+    requestSenderPipe = new PipedOutputStream(inputPipe);
+    System.setIn(inputPipe);
+
+    // Initialize the pipe that the executor will write to; feed it into the
+    // response pipe that we can read from.
+    PipedOutputStream outputPipe = new PipedOutputStream(responseReceiverPipe);
+    if (injectIOError) {
+      System.setOut(new PrintStreamWithIOException(outputPipe, /*autoFlush*/false, "UTF-8"));
+    } else {
+      System.setOut(new PrintStream(outputPipe));
+    }
+    SubprocessExecutor subprocessExecutor = new SubprocessExecutor(errorHandler);
+    return subprocessExecutor;
   }
 
   /**
-   * Parses non-malformed message should exit normally without any exceptions.
+   * Test a regular old message. There should be no exceptions of any kind.
+   * We should also see some metrics that make sense.
    */
   @Test(expected = TimeoutException.class)
   public void testBasicMsg() throws Exception {
-    final String message = "data";
-    final byte[] messageBytes = MessageTestUtil.serializeMessage(
-        MessageTestUtil.createEchoSubprocessRequest(message));
-    final InputStream in = new ByteArrayInputStream(messageBytes);
-    final PrintStream out =
-            new PrintStream(new ByteArrayOutputStream(), false, "UTF-8");
-    final String[] args = {""};
-    runEchoSubprocess(in, out, args, NO_ERR, /* injectInterrupt= */false);
+    SubprocessExecutor executor =
+        setUpExecutorIO(NO_ERR, /*injectIOError*/false);
+    sendRequestToPipe(MessageTestUtil.createEchoSubprocessRequest(MESSAGE));
+
+    executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
+    SubprocessResponsePB spResp = receiveResponse();
+    EchoResponsePB echoResp = spResp.getResponse().unpack(EchoResponsePB.class);
+    Assert.assertEquals(MESSAGE, echoResp.getData());
+
+    SubprocessMetricsPB spMetrics = spResp.getMetrics();
+    // We only sent one request, so by the time the executor sent the message,
+    // the queues should have been empty.
+    Assert.assertTrue(spMetrics.hasInboundQueueLength());
+    Assert.assertTrue(spMetrics.hasOutboundQueueLength());
+    Assert.assertEquals(0, spMetrics.getInboundQueueLength());
+    Assert.assertEquals(0, spMetrics.getOutboundQueueLength());
+
+    // The recorded times should be non-zero.
+    Assert.assertTrue(spMetrics.hasInboundQueueTimeMs());
+    Assert.assertTrue(spMetrics.hasOutboundQueueTimeMs());
+    Assert.assertTrue(spMetrics.hasExecutionTimeMs());
+    Assert.assertTrue(spMetrics.getInboundQueueTimeMs() >= 0);
+    Assert.assertTrue(spMetrics.getOutboundQueueTimeMs() >= 0);
+    Assert.assertTrue(spMetrics.getExecutionTimeMs() >= 0);
+  }
+
+  /**
+   * Test to see what happens when the execution is the bottleneck. We should
+   * see it in the execution time and inbound queue time and length metrics.
+   */
+  @Test(expected = TimeoutException.class)
+  public void testSlowExecutionMetrics() throws Exception {
+    SubprocessExecutor executor =
+      setUpExecutorIO(NO_ERR, /*injectIOError*/false);
+    final int SLEEP_MS = 200;
+    sendRequestToPipe(MessageTestUtil.createEchoSubprocessRequest(MESSAGE, SLEEP_MS));
+    sendRequestToPipe(MessageTestUtil.createEchoSubprocessRequest(MESSAGE, SLEEP_MS));
+    sendRequestToPipe(MessageTestUtil.createEchoSubprocessRequest(MESSAGE, SLEEP_MS));
+
+    // Run the executor with a single parser thread so we can make stronger
+    // assumptions about timing.
+    executor.run(new String[]{"-p", "1"}, new EchoProtocolHandler(), TIMEOUT_MS);
+
+    SubprocessMetricsPB m = receiveResponse().getMetrics();
+    long inboundQueueLength = m.getInboundQueueLength();
+    long inboundQueueTimeMs = m.getInboundQueueTimeMs();
+    long executionTimeMs = m.getExecutionTimeMs();
+    // By the time the first request is written, the second should be sleeping,
+    // and the third should be waiting in the inbound queue. That said, the
+    // second could also be in the queue if the parser thread is slow to pick
+    // up the second request.
+    Assert.assertTrue(
+        String.format("Got an unexpected inbound queue length: %s", inboundQueueLength),
+        inboundQueueLength == 1 || inboundQueueLength == 2);
+    Assert.assertEquals(0, m.getOutboundQueueLength());
+
+    // We can't make many guarantees about how long the first request was
+    // waiting in the queues.
+    Assert.assertTrue(
+        String.format("Expected a positive inbound queue time: %s", inboundQueueTimeMs),
+        inboundQueueTimeMs >= 0);
+
+    // It should've taken longer than our sleep to execute.
+    Assert.assertTrue(
+        String.format("Expected a longer execution time than %s ms: %s ms",
+                      SLEEP_MS, executionTimeMs),
+        executionTimeMs >= SLEEP_MS);
+
+    // The second request should've spent the duration of the first sleep waiting
+    // in the inbound queue.
+    m = receiveResponse().getMetrics();
+    Assert.assertTrue(
+        String.format("Expected a higher inbound queue time: %s ms", m.getInboundQueueTimeMs()),
+        m.getInboundQueueTimeMs() >= SLEEP_MS);
+
+    // The last should've spent the duration of the first two sleeps waiting.
+    m = receiveResponse().getMetrics();
+    Assert.assertTrue(
+        String.format("Expected a higher inbound queue time: %s", m.getInboundQueueTimeMs()),
+        m.getInboundQueueTimeMs() >= 2 * SLEEP_MS);
   }
 
   /**
-   * Parses message with empty payload should exit normally without any exceptions.
+   * Test to see what happens when writing is the bottleneck. We should see it
+   * in the outbound queue metrics.
    */
   @Test(expected = TimeoutException.class)
-  public void testMsgWithEmptyPayload() throws Exception {
-    final byte[] emptyPayload = MessageIO.intToBytes(0);
-    final InputStream in = new ByteArrayInputStream(emptyPayload);
-    final PrintStream out =
-            new PrintStream(new ByteArrayOutputStream(), false, "UTF-8");
-    final String[] args = {""};
-    runEchoSubprocess(in, out, args, NO_ERR, /* injectInterrupt= */false);
+  public void testSlowWriterMetrics() throws Exception {
+    SubprocessExecutor executor =
+      setUpExecutorIO(NO_ERR, /*injectIOError*/false);
+    final int BLOCK_MS = 200;
+    executor.blockWriteMs(BLOCK_MS);
+    sendRequestToPipe(MessageTestUtil.createEchoSubprocessRequest(MESSAGE));
+    sendRequestToPipe(MessageTestUtil.createEchoSubprocessRequest(MESSAGE));
+    sendRequestToPipe(MessageTestUtil.createEchoSubprocessRequest(MESSAGE));
+    executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
+
+    // In writing the first request, the other two requests should've been
+    // close behind, likely both in the outbound queue.
+    SubprocessMetricsPB m = receiveResponse().getMetrics();
+    Assert.assertEquals(2, m.getOutboundQueueLength());
+
+    m = receiveResponse().getMetrics();
+    Assert.assertEquals(1, m.getOutboundQueueLength());
+    Assert.assertTrue(
+      String.format("Expected a higher outbound queue time: %s ms", m.getOutboundQueueTimeMs()),
+      m.getOutboundQueueTimeMs() >= BLOCK_MS);
+
+    m = receiveResponse().getMetrics();
+    Assert.assertEquals(0, m.getOutboundQueueLength());
+    Assert.assertTrue(
+      String.format("Expected a higher outbound queue time: %s ms", m.getOutboundQueueTimeMs()),
+      m.getOutboundQueueTimeMs() >= 2 * BLOCK_MS);
   }
 
   /**
-   * Parses malformed message should cause <code>IOException</code>.
+   * Test what happens when we send a message that is completely empty (i.e.
+   * not an empty SubprocessRequestPB message -- no message at all).
+   */
+  @Test(expected = TimeoutException.class)
+  public void testMsgWithEmptyMessage() throws Exception {
+    SubprocessExecutor executor = setUpExecutorIO(NO_ERR,
+                                                  /*injectIOError*/false);
+    requestSenderPipe.write(MessageIO.intToBytes(0));
+    executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
+
+    // We should see no bytes land in the receiver pipe.
+    Assert.assertEquals(-1, responseReceiverPipe.read());
+  }
+
+  /**
+   * Test what happens when we send a message that isn't protobuf.
    */
   @Test
-  public void testMalformedMsg() throws Exception {
-    final byte[] messageBytes = "malformed".getBytes(StandardCharsets.UTF_8);
-    final InputStream in = new ByteArrayInputStream(messageBytes);
-    final PrintStream out =
-            new PrintStream(new ByteArrayOutputStream(), false, "UTF-8");
-    Throwable thrown = Assert.assertThrows(ExecutionException.class, new ThrowingRunnable() {
-      @Override
-      public void run() throws Exception {
-        final String[] args = {""};
-        runEchoSubprocess(in, out, args, HAS_ERR, /* injectInterrupt= */false);
-      }
-    });
+  public void testMalformedPB() throws Exception {
+    SubprocessExecutor executor = setUpExecutorIO(NO_ERR, /*injectIOError*/false);
+    requestSenderPipe.write("malformed".getBytes(StandardCharsets.UTF_8));
+    Throwable thrown = Assert.assertThrows(ExecutionException.class,
+        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
     Assert.assertTrue(thrown.getMessage().contains("Unable to read the protobuf message"));
   }
 
   /**
-   * Parses message with <code>IOException</code> injected should exit with
-   * <code>KuduSubprocessException</code>.
+   * Try injecting an <code>IOException</code> to the pipe that gets written to
+   * by the SubprocessExecutor. We should exit with a
+   * <code>KuduSubprocessException</code>
    */
   @Test
   public void testInjectIOException() throws Exception {
-    final String message = "data";
-    final byte[] messageBytes = MessageTestUtil.serializeMessage(
-        MessageTestUtil.createEchoSubprocessRequest(message));
-    final InputStream in = new ByteArrayInputStream(messageBytes);
-    final PrintStream out =
-            new PrintStreamWithIOException(new ByteArrayOutputStream(), false, "UTF-8");
-    Throwable thrown = Assert.assertThrows(ExecutionException.class, new ThrowingRunnable() {
-      @Override
-      public void run() throws Exception {
-        final String[] args = {""};
-        runEchoSubprocess(in, out, args, HAS_ERR, /* injectInterrupt= */false);
-      }
-    });
-    Assert.assertTrue(thrown.getMessage().contains("Unable to write the protobuf messag"));
+    SubprocessExecutor executor =
+        setUpExecutorIO(HAS_ERR, /*injectIOError*/true);
+    sendRequestToPipe(MessageTestUtil.createEchoSubprocessRequest(MESSAGE));
+    // NOTE: we don't expect the ExecutionException from the MessageWriter's
+    // CompletableFuture because, in waiting for completion, the MessageReader
+    // times out before CompletableFuture.get() is called on the writer.
+    assertIncludingSuppressedThrows(IOException.class,
+      "Unable to write to print stream",
+      () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
   }
 
   /**
@@ -165,51 +327,32 @@ public class TestEchoSubprocess {
    */
   @Test
   public void testInjectInterruptedException() throws Exception {
-    final String message = "data";
-    final byte[] messageBytes = MessageTestUtil.serializeMessage(
-        MessageTestUtil.createEchoSubprocessRequest(message));
-    final InputStream in = new ByteArrayInputStream(messageBytes);
-    final PrintStream out =
-        new PrintStream(new ByteArrayOutputStream(), false, "UTF-8");
-    Throwable thrown = Assert.assertThrows(ExecutionException.class, new ThrowingRunnable() {
-      @Override
-      public void run() throws Exception {
-        final String[] args = {""};
-        runEchoSubprocess(in, out, args, HAS_ERR, /* injectInterrupt= */true);
-      }
-    });
-    Assert.assertTrue(thrown.getMessage().contains("Unable to put the message to the queue"));
+    SubprocessExecutor executor =
+        setUpExecutorIO(HAS_ERR, /*injectIOError*/false);
+    executor.interrupt();
+    sendRequestToPipe(MessageTestUtil.createEchoSubprocessRequest(MESSAGE));
+    assertIncludingSuppressedThrows(ExecutionException.class,
+        "Unable to put the message to the queue",
+        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
   }
 
   /**
-   * Verifies when <code>MessageWriter</code> task is blocking on writing the message,
-   * <code>MessageParser</code> task can continue processing the requests without
-   * blocking.
+   * Check that even if the writer is blocked writing, the
+   * <code>MessageParser</code> tasks can continue making progress.
    */
   @Test
   public void testMessageParser() throws Exception  {
-    final byte[] messageBytes = Bytes.concat(
-        MessageTestUtil.serializeMessage(MessageTestUtil.createEchoSubprocessRequest("a")),
-        MessageTestUtil.serializeMessage(MessageTestUtil.createEchoSubprocessRequest("b")));
-    final InputStream in = new ByteArrayInputStream(messageBytes);
-    final PrintStream out =
-        new PrintStream(new ByteArrayOutputStream(), false, "UTF-8");
-    final SubprocessExecutor[] executors = new SubprocessExecutor[1];
-    System.setIn(in);
-    System.setOut(out);
-    Assert.assertThrows(TimeoutException.class, new ThrowingRunnable() {
-      @Override
-      public void run() throws Exception {
-        final String[] args = {""};
-        executors[0] = new SubprocessExecutor(NO_ERR);
-        // Block the message write for 1000 Ms.
-        executors[0].blockWriteMs(1000);
-        executors[0].run(args, new EchoProtocolHandler(), /* timeoutMs= */500);
-      }
-    });
+    SubprocessExecutor executor =
+        setUpExecutorIO(NO_ERR, /*injectIOError*/false);
+    sendRequestToPipe(MessageTestUtil.createEchoSubprocessRequest("a"));
+    sendRequestToPipe(MessageTestUtil.createEchoSubprocessRequest("b"));
+    executor.blockWriteMs(1000);
+    Assert.assertThrows(TimeoutException.class,
+        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), /*timeoutMs*/500));
 
-    // Verify that the message have been processed and placed to outbound queue.
-    BlockingQueue<SubprocessResponsePB> outboundQueue = executors[0].getOutboundQueue();
+    // We should see a single message in the outbound queue. The other one is
+    // blocked writing.
+    BlockingQueue<OutboundResponse> outboundQueue = executor.getOutboundQueue();
     Assert.assertEquals(1, outboundQueue.size());
   }
 }
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/InboundRequest.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/InboundRequest.java
new file mode 100644
index 0000000..23a9e0b
--- /dev/null
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/InboundRequest.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.subprocess;
+
+/**
+ * Encapsulates a request on the inbound queue. It is expected that the
+ * <code>SubprocessMetrics</code> have begun timing the time it takes to make
+ * it through the queue before creating this and putting it onto the queue.
+ */
+public class InboundRequest {
+  private final byte[] bytes;
+  private SubprocessMetrics metrics;
+
+  // TODO(awong): it might be nice if both the request and response spoke the
+  //  same language (e.g. both byte arrays or both protobuf messages).
+  public InboundRequest(byte[] bytes, SubprocessMetrics metrics) {
+    this.bytes = bytes;
+    this.metrics = metrics;
+  }
+
+  /**
+   * @return the bytes associated with this request
+   */
+  public byte[] bytes() {
+    return bytes;
+  }
+
+  /**
+   * @return the metrics associated with this request
+   */
+  public SubprocessMetrics metrics() {
+    return metrics;
+  }
+}
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageParser.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageParser.java
index 6dc5f00..23f25ea 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageParser.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageParser.java
@@ -19,8 +19,10 @@ package org.apache.kudu.subprocess;
 
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -39,12 +41,12 @@ import org.apache.kudu.subprocess.Subprocess.SubprocessResponsePB;
 @InterfaceAudience.Private
 class MessageParser implements Runnable {
   private static final Logger LOG = LoggerFactory.getLogger(MessageParser.class);
-  private final BlockingQueue<byte[]> inboundQueue;
-  private final BlockingQueue<SubprocessResponsePB> outboundQueue;
+  private final BlockingQueue<InboundRequest> inboundQueue;
+  private final BlockingQueue<OutboundResponse> outboundQueue;
   private final ProtocolHandler protocolHandler;
 
-  MessageParser(BlockingQueue<byte[]> inboundQueue,
-                BlockingQueue<SubprocessResponsePB> outboundQueue,
+  MessageParser(BlockingQueue<InboundRequest> inboundQueue,
+                BlockingQueue<OutboundResponse> outboundQueue,
                 ProtocolHandler protocolHandler) {
     Preconditions.checkNotNull(inboundQueue);
     Preconditions.checkNotNull(outboundQueue);
@@ -56,47 +58,59 @@ class MessageParser implements Runnable {
   @Override
   public void run() {
     while (true) {
-      byte[] data = QueueUtil.take(inboundQueue);
-      SubprocessResponsePB response = getResponse(data);
-      QueueUtil.put(outboundQueue, response);
+      InboundRequest req = QueueUtil.take(inboundQueue);
+      SubprocessMetrics metrics = req.metrics();
+      metrics.recordInboundQueueTimeMs();
+
+      // Record the execution time.
+      metrics.startTimer();
+      SubprocessResponsePB.Builder responseBuilder = parseAndExecuteRequest(req.bytes());
+      metrics.recordExecutionTimeMs();
+
+      // Begin recording the time it takes to make it through the outbound
+      // queue. The writer thread will record the elapsed time right before
+      // writing the response to the pipe.
+      metrics.startTimer();
+      QueueUtil.put(outboundQueue, new OutboundResponse(responseBuilder, metrics));
     }
   }
 
   /**
-   * Constructs a message with the given error status.
+   * Returns a response builder with the given error status.
    *
    * @param errorCode the given error status
    * @param resp the message builder
    * @return a message with the given error status
    */
-  static SubprocessResponsePB responseWithError(AppStatusPB.ErrorCode errorCode,
-                                                SubprocessResponsePB.Builder resp) {
+  static SubprocessResponsePB.Builder builderWithError(AppStatusPB.ErrorCode errorCode,
+                                                       SubprocessResponsePB.Builder resp) {
     Preconditions.checkNotNull(resp);
     AppStatusPB.Builder errorBuilder = AppStatusPB.newBuilder();
     errorBuilder.setCode(errorCode);
     resp.setError(errorBuilder);
-    return resp.build();
+    return resp;
   }
 
   /**
-   * Parses the given protobuf message. If encountered InvalidProtocolBufferException,
-   * which indicates the message is invalid, respond with an error message.
+   * Parses the given protobuf request and executes it, returning a builder for
+   * the response. If a InvalidProtocolBufferException is thrown, which
+   * indicates the message is invalid, the builder will contain an error
+   * message.
    *
    * @param data the protobuf message
    * @return a SubprocessResponsePB
    */
-  private SubprocessResponsePB getResponse(byte[] data) {
-    SubprocessResponsePB response;
+  private SubprocessResponsePB.Builder parseAndExecuteRequest(byte[] data) {
     SubprocessResponsePB.Builder responseBuilder = SubprocessResponsePB.newBuilder();
     try {
       // Parses the data as a message of SubprocessRequestPB type.
       SubprocessRequestPB request = SubprocessRequestPB.parser().parseFrom(data);
-      response = protocolHandler.handleRequest(request);
+      responseBuilder = protocolHandler.unpackAndExecuteRequest(request);
     } catch (InvalidProtocolBufferException e) {
       LOG.warn(String.format("%s: %s", "Unable to parse the protobuf message",
                              new String(data, StandardCharsets.UTF_8)), e);
-      response = responseWithError(AppStatusPB.ErrorCode.ILLEGAL_STATE, responseBuilder);
+      responseBuilder = builderWithError(AppStatusPB.ErrorCode.ILLEGAL_STATE, responseBuilder);
     }
-    return response;
+    return responseBuilder;
   }
 }
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
index 979cc18..06ad4b8 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
@@ -38,11 +38,11 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Private
 class MessageReader implements Runnable {
   private static final Logger LOG = LoggerFactory.getLogger(MessageReader.class);
-  private final BlockingQueue<byte[]> inboundQueue;
+  private final BlockingQueue<InboundRequest> inboundQueue;
   private final MessageIO messageIO;
   private final boolean injectInterrupt;
 
-  MessageReader(BlockingQueue<byte[]> inboundQueue,
+  MessageReader(BlockingQueue<InboundRequest> inboundQueue,
                 MessageIO messageIO,
                 boolean injectInterrupt) {
     Preconditions.checkNotNull(inboundQueue);
@@ -79,7 +79,12 @@ class MessageReader implements Runnable {
         LOG.warn("Empty message received.");
         continue;
       }
-      QueueUtil.put(inboundQueue, data);
+      SubprocessMetrics metrics = new SubprocessMetrics(inboundQueue);
+      // Begin recording the time it takes to make it through the inbound
+      // queue. A parser thread will record the elapsed time right after
+      // it pulls the request from the queue.
+      metrics.startTimer();
+      QueueUtil.put(inboundQueue, new InboundRequest(data, metrics));
     }
   }
 }
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java
index 227fba4..81d7a97 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageWriter.java
@@ -32,11 +32,11 @@ import org.apache.kudu.subprocess.Subprocess.SubprocessResponsePB;
  */
 @InterfaceAudience.Private
 class MessageWriter implements Runnable {
-  private final BlockingQueue<SubprocessResponsePB> outboundQueue;
+  private final BlockingQueue<OutboundResponse> outboundQueue;
   private final MessageIO messageIO;
   private final long blockWriteMs;
 
-  MessageWriter(BlockingQueue<SubprocessResponsePB> outboundQueue,
+  MessageWriter(BlockingQueue<OutboundResponse> outboundQueue,
                 MessageIO messageIO,
                 long blockWriteMs) {
     Preconditions.checkNotNull(outboundQueue);
@@ -49,17 +49,18 @@ class MessageWriter implements Runnable {
   @Override
   public void run() {
     while (true) {
-      SubprocessResponsePB response = QueueUtil.take(outboundQueue);
+      OutboundResponse resp = QueueUtil.take(outboundQueue);
+      resp.metrics().recordOutboundQueueTimeMs();
 
       // Write the response to the underlying output stream. IOException is fatal,
       // and should be propagated up the call stack.
       try {
-        messageIO.writeMessage(response);
         // Block the write for the given milliseconds if needed (for tests only).
         // -1 means the write will not be blocked.
         if (blockWriteMs != -1) {
           Thread.sleep(blockWriteMs);
         }
+        messageIO.writeMessage(resp.buildRespPB(outboundQueue));
       } catch (IOException | InterruptedException e) {
         throw new KuduSubprocessException("Unable to write the protobuf message", e);
       }
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/OutboundResponse.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/OutboundResponse.java
new file mode 100644
index 0000000..b171a1c
--- /dev/null
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/OutboundResponse.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.subprocess;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Encapsulates a response on the outbound queue. It is expected that the
+ * <code>SubprocessMetrics</code> have begun timing the time it takes to
+ * make it through the queue.
+ */
+public class OutboundResponse {
+  private final Subprocess.SubprocessResponsePB.Builder respBuilder;
+  private final SubprocessMetrics metrics;
+
+  public OutboundResponse(Subprocess.SubprocessResponsePB.Builder respBuilder,
+                          SubprocessMetrics metrics) {
+    this.respBuilder = respBuilder;
+    this.metrics = metrics;
+  }
+
+  /**
+   * Builds the final <code>SubprocessResponsePB</code> to send over the pipe.
+   * This constructs the <code>SubprocessMetricsPB</code> as well, and expects
+   * that all queue timings have already been recorded.
+   * @param outboundQueue
+   * @return the response
+   */
+  public Subprocess.SubprocessResponsePB buildRespPB(
+      BlockingQueue<OutboundResponse> outboundQueue) {
+    respBuilder.setMetrics(metrics.buildMetricsPB(outboundQueue));
+    return respBuilder.build();
+  }
+
+  /**
+   * @return the metrics associated with this response
+   */
+  public SubprocessMetrics metrics() {
+    return metrics;
+  }
+}
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolHandler.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolHandler.java
index e29a9c0..ac9d44b 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolHandler.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/ProtocolHandler.java
@@ -37,31 +37,31 @@ public abstract class ProtocolHandler<RequestT extends Message,
                                       ResponseT extends Message> {
 
   /**
-   * Processes the given SubprocessRequestPB message according to the
-   * request type and returns a SubprocessResponsePB message.
+   * Unpacks the SubprocessRequestPB message according to the expected request
+   * type and returns a SubprocessResponsePB builder with the results.
    *
-   * @param request a SubprocessRequestPB protobuf message
-   * @return a SubprocessResponsePB message
+   * @param request a SubprocessRequestPB message
+   * @return a SubprocessResponsePB.Builder
    * @throws InvalidProtocolBufferException if the protocol message being parsed is invalid
    */
-  SubprocessResponsePB handleRequest(SubprocessRequestPB request)
+  SubprocessResponsePB.Builder unpackAndExecuteRequest(SubprocessRequestPB request)
       throws InvalidProtocolBufferException {
     Preconditions.checkNotNull(request);
     SubprocessResponsePB.Builder builder = SubprocessResponsePB.newBuilder();
     builder.setId(request.getId());
     Class<RequestT> requestType = getRequestClass();
-    ResponseT resp = createResponse(request.getRequest().unpack(requestType));
+    ResponseT resp = executeRequest(request.getRequest().unpack(requestType));
     builder.setResponse(Any.pack(resp));
-    return builder.build();
+    return builder;
   }
 
   /**
-   * Creates a protobuf message that responds to a request message.
+   * Executes the request and creates a response.
    *
    * @param request the request message
    * @return a response
    */
-  protected abstract ResponseT createResponse(RequestT request);
+  protected abstract ResponseT executeRequest(RequestT request);
 
   /**
    * Gets the class instance of request message.
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
index d329d21..bbecfec 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
@@ -37,19 +37,17 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.kudu.subprocess.Subprocess.SubprocessResponsePB;
-
 /**
  * The {@link SubprocessExecutor} class,
  *    1. parses the command line to get the configuration,
  *    2. has a single reader thread that continuously reads protobuf-based
- *       messages from standard input and puts the messages to a FIFO inbound
- *       blocking queue,
+ *       messages from stdin and puts the message onto the inbound request
+ *       queue,
  *    3. has multiple parser threads that continuously retrieve the messages
- *       from the inbound queue, process them and put the responses to a FIFO
- *       outbound blocking queue,
+ *       from the inbound queue, process them, and put the responses onto the
+ *       outbound response queue,
  *    4. has a single writer thread that continuously retrieves the responses
- *       from the outbound queue, and writes the responses to standard output.
+ *       from the outbound queue, and writes the responses to stdout.
  */
 @InterfaceAudience.Private
 public class SubprocessExecutor {
@@ -57,12 +55,13 @@ public class SubprocessExecutor {
   private final Function<Throwable, Void> errorHandler;
   private boolean injectInterrupt = false;
   private long blockWriteMs = -1;
-  private BlockingQueue<SubprocessResponsePB> outboundQueue;
+  private BlockingQueue<OutboundResponse> outboundQueue;
+  private BlockingQueue<InboundRequest> inboundQueue;
 
   public SubprocessExecutor() {
     errorHandler = (t) -> {
-      // If unexpected exception(s) are thrown by the reader or writer tasks,
-      // this error handler wraps the throwable in a runtime exception and rethrows,
+      // If unexpected exception(s) are thrown by any of the tasks, this error
+      // handler wraps the throwable in a runtime exception and rethrows,
       // causing the program to exit with a nonzero status code.
       throw new RuntimeException(t);
     };
@@ -92,7 +91,7 @@ public class SubprocessExecutor {
     int queueSize = conf.getQueueSize();
     int maxMessageBytes = conf.getMaxMessageBytes();
 
-    BlockingQueue<byte[]> inboundQueue = new ArrayBlockingQueue<>(queueSize, /* fair= */true);
+    inboundQueue = new ArrayBlockingQueue<>(queueSize, /* fair= */true);
     outboundQueue = new ArrayBlockingQueue<>(queueSize, /* fair= */true);
     ExecutorService readerService = Executors.newSingleThreadExecutor();
     ExecutorService parserService = Executors.newFixedThreadPool(maxMsgParserThread);
@@ -127,9 +126,10 @@ public class SubprocessExecutor {
       CompletableFuture<Void> writerFuture = CompletableFuture.runAsync(writer, writerService);
       writerFuture.exceptionally(errorHandler);
 
-      // Wait until the tasks finish execution. -1 means the reader (parser, or writer)
-      // tasks continue the execution until finish. In cases where we don't want the
-      // tasks to run forever, e.g. in tests, wait for the specified timeout.
+      // Wait until the tasks finish execution. A timeout of -1 means the reader, parser,
+      // and writer tasks should continue until finished. In cases where we don't want
+      // the tasks to run forever, e.g. in tests, wait for the specified
+      // timeout.
       if (timeoutMs == -1) {
         readerFuture.join();
         writerFuture.join();
@@ -149,7 +149,7 @@ public class SubprocessExecutor {
    * Returns the outbound message queue.
    */
   @VisibleForTesting
-  public BlockingQueue<SubprocessResponsePB> getOutboundQueue() {
+  public BlockingQueue<OutboundResponse> getOutboundQueue() {
     return outboundQueue;
   }
 
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMetrics.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMetrics.java
new file mode 100644
index 0000000..94e82c6
--- /dev/null
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessMetrics.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.subprocess;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Encapsulates the metrics associated with the subprocess. It is expected that
+ * this is passed around alongside each request/response as it makes its way
+ * through the different stages of the SubprocessExecutor, and for callers to
+ * call startTimer() and the various record methods as appropriate.
+ */
+public class SubprocessMetrics {
+  private final Subprocess.SubprocessMetricsPB.Builder builder;
+  private final Stopwatch stopwatch;
+  private final BlockingQueue<InboundRequest> inboundQueue;
+
+  /**
+   * Construct a SubprocessMetrics object.
+   *
+   * @param inboundQueue used to determine the length of the inbound queue
+   */
+  public SubprocessMetrics(BlockingQueue<InboundRequest> inboundQueue) {
+    this.inboundQueue = inboundQueue;
+    builder = Subprocess.SubprocessMetricsPB.newBuilder();
+    stopwatch = Stopwatch.createUnstarted();
+  }
+
+  public void startTimer() {
+    stopwatch.start();
+  }
+
+  /**
+   * Stops the stopwatch and records the amount of time elapsed into the
+   * metrics builder, with the assumption that it was started upon placing an
+   * element on the inbound queue.
+   */
+  public void recordInboundQueueTimeMs() {
+    Preconditions.checkArgument(!builder.hasInboundQueueTimeMs());
+    Preconditions.checkArgument(stopwatch.isRunning());
+    builder.setInboundQueueTimeMs(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+    // We'll continue to use the timer as it makes its way through the
+    // execution lifecycle, so reset it here.
+    stopwatch.reset();
+  }
+
+  /**
+   * Stops the stopwatch and records the amount of time elapsed into the
+   * metrics builder, with the assumption that it was started upon beginning
+   * to execute.
+   */
+  public void recordExecutionTimeMs() {
+    Preconditions.checkArgument(!builder.hasExecutionTimeMs());
+    Preconditions.checkArgument(stopwatch.isRunning());
+    builder.setExecutionTimeMs(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+    // We'll continue to use the timer as it makes its way through the
+    // execution lifecycle, so reset it here.
+    stopwatch.reset();
+  }
+
+  /**
+   * Stops the stopwatch and records the amount of time elapsed into the
+   * metrics builder, with the assumption that it was started upon placing an
+   * element on the outbound queue.
+   */
+  public void recordOutboundQueueTimeMs() {
+    Preconditions.checkArgument(!builder.hasOutboundQueueTimeMs());
+    Preconditions.checkArgument(stopwatch.isRunning());
+    builder.setOutboundQueueTimeMs(stopwatch.elapsed(TimeUnit.MILLISECONDS));
+    stopwatch.stop();
+  }
+
+  /**
+   * Builds the metrics protobuf message with the recorded timings and the
+   * current lengths of the message queues.
+   *
+   * @param outboundQueue used to determine the length of the outbound queue
+   * @return the constructed SubprocessMetricsPB
+   */
+  public Subprocess.SubprocessMetricsPB buildMetricsPB(
+      BlockingQueue<OutboundResponse> outboundQueue) {
+    Preconditions.checkArgument(builder.hasInboundQueueTimeMs());
+    Preconditions.checkArgument(builder.hasExecutionTimeMs());
+    Preconditions.checkArgument(builder.hasOutboundQueueTimeMs());
+    builder.setInboundQueueLength(inboundQueue.size());
+    builder.setOutboundQueueLength(outboundQueue.size());
+    return builder.build();
+  }
+}
diff --git a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTestUtil.java b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTestUtil.java
index 932743a..bca288f 100644
--- a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTestUtil.java
+++ b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/MessageTestUtil.java
@@ -38,20 +38,36 @@ public class MessageTestUtil {
 
   /**
    * Constructs a SubprocessRequestPB message of echo request with the
-   * given payload.
+   * given payload and sleep.
    *
    * @param payload the message payload
+   * @param sleepMs the amount of time to sleep
    * @return a SubprocessRequestPB message
    */
-  public static SubprocessRequestPB createEchoSubprocessRequest(String payload) {
+  public static SubprocessRequestPB createEchoSubprocessRequest(String payload,
+                                                                int sleepMs) {
     SubprocessRequestPB.Builder builder = SubprocessRequestPB.newBuilder();
     EchoRequestPB.Builder echoBuilder = EchoRequestPB.newBuilder();
     echoBuilder.setData(payload);
+    if (sleepMs > 0) {
+      echoBuilder.setSleepMs(sleepMs);
+    }
     builder.setRequest(Any.pack(echoBuilder.build()));
     return builder.build();
   }
 
   /**
+   * Constructs a SubprocessRequestPB message of echo request with the
+   * given payload.
+   *
+   * @param payload the message payload
+   * @return a SubprocessRequestPB message
+   */
+  public static SubprocessRequestPB createEchoSubprocessRequest(String payload) {
+    return createEchoSubprocessRequest(payload, 0);
+  }
+
+  /**
    * Serializes the given message to a byte array.
    *
    * @param message the message
@@ -68,7 +84,7 @@ public class MessageTestUtil {
   }
 
   /**
-   * De-serializes the given message in byte array.
+   * Deserializes a message from the byte array.
    *
    * @param bytes the serialized message in byte array
    * @param parser the parser for the message
@@ -78,9 +94,21 @@ public class MessageTestUtil {
   static <T extends Message> T deserializeMessage(byte[] bytes, Parser<T> parser)
       throws IOException {
     ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
+    return deserializeMessage(new BufferedInputStream(inputStream), parser);
+  }
+
+  /**
+   * Deserializes a message from the input stream.
+   *
+   * @param inputStream the input stream from which to deserialize the message
+   * @param parser the parser for the message
+   * @return a message
+   * @throws IOException if an I/O error occurs
+   */
+  public static <T extends Message> T deserializeMessage(BufferedInputStream inputStream,
+                                                         Parser<T> parser) throws IOException {
     MessageIO messageIO = new MessageIO(
-        SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT,
-        new BufferedInputStream(inputStream), /* out= */null);
+        SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT, inputStream, /*out*/null);
     byte[] data = messageIO.readBytes();
     return parser.parseFrom(data);
   }
diff --git a/src/kudu/subprocess/subprocess.proto b/src/kudu/subprocess/subprocess.proto
index 487b43f..fa9a190 100644
--- a/src/kudu/subprocess/subprocess.proto
+++ b/src/kudu/subprocess/subprocess.proto
@@ -36,6 +36,29 @@ message EchoResponsePB {
   required string data = 1;
 }
 
+// Metrics for the subprocess subsystem.
+message SubprocessMetricsPB {
+  // Amount of time the request spent in the subprocess's inbound queue before
+  // being serviced.
+  optional int64 inbound_queue_time_ms = 1;
+
+  // Amount of time the response spent in the subprocess's outbound queue
+  // before being sent over the pipe.
+  optional int64 outbound_queue_time_ms = 2;
+
+  // Number of requests in the subprocess's inbound queue at the time the
+  // response is being returned from the subprocess.
+  optional int64 inbound_queue_length = 3;
+
+  // Number of responses in the subprocess's outbound queue at the time the
+  // response is being returned from the subprocess.
+  optional int64 outbound_queue_length = 4;
+
+  // The amount of time it took to execute the request (i.e. the time in
+  // between waiting in the inbound and outbound queues).
+  optional int64 execution_time_ms = 5;
+}
+
 // Request sent to the subprocess.
 message SubprocessRequestPB {
   // A sequence number that uniquely identifies a call to the subprocess. This
@@ -54,6 +77,10 @@ message SubprocessResponsePB {
   // Only set if there was some kind of subprocess-side error.
   optional AppStatusPB error = 2;
 
-  // The subprocess response. Only set for requests that actually expect a response.
+  // The subprocess response. Only set for requests that actually expect a
+  // response.
   optional google.protobuf.Any response = 3;
+
+  // Metrics for the generic subprocess subsystem.
+  optional SubprocessMetricsPB metrics = 4;
 }