You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/05 08:07:16 UTC

[kudu] branch master updated (e13fd4a -> 0702fc5)

This is an automated email from the ASF dual-hosted git repository.

awong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from e13fd4a  KUDU-2972 Add Ranger client
     new e37c9f8  subprocess: fix TestEchoSubprocess
     new 0702fc5  subprocess: report metrics even when calls fail

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/kudu/subprocess/SubprocessExecutor.java | 18 ++++
 .../apache/kudu/subprocess/SubprocessTestUtil.java |  9 +-
 .../kudu/subprocess/echo/TestEchoSubprocess.java   | 43 +++++-----
 src/kudu/ranger/ranger_client-test.cc              |  5 +-
 src/kudu/subprocess/CMakeLists.txt                 |  8 ++
 src/kudu/subprocess/echo_subprocess.cc             | 68 +++++++++++++++
 .../echo_subprocess.h}                             | 18 ++--
 src/kudu/subprocess/server.cc                      | 15 +++-
 src/kudu/subprocess/server.h                       | 14 +++-
 src/kudu/subprocess/subprocess_proxy-test.cc       | 96 ++++++++++------------
 src/kudu/subprocess/subprocess_proxy.h             | 33 +-------
 src/kudu/subprocess/subprocess_server-test.cc      | 11 ++-
 12 files changed, 218 insertions(+), 120 deletions(-)
 create mode 100644 src/kudu/subprocess/echo_subprocess.cc
 copy src/kudu/{master/sentry_privileges_cache_metrics.h => subprocess/echo_subprocess.h} (69%)


[kudu] 01/02: subprocess: fix TestEchoSubprocess

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit e37c9f83b0144971b8d081ac1b3ba8e53abff859
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Wed Mar 4 00:25:12 2020 -0800

    subprocess: fix TestEchoSubprocess
    
    There were a number of issues with TestEchoSubprocess:
    - Many tests coarsely expected timeouts, leading to the non-execution of
      many parts of the test. Instead, I wrapped executor.run() with a
      executor.runUntilTimeout() that expects the timeout, rather than
      relying on test-level expectation of timeouts.
    - Initializing BufferedInputStream with the same pipe multiple times led
      to us to miss out on some bytes in the pipe. This initializes the
      BufferedInputStream once as a member.
    - testMsgWithEmptyMessage() wouldn't time out because its readers were
      stuck parsing an empty pipe. This is resolved by adding more requests
      to the pipe.
    
    Change-Id: I351ae84285fa5eb9db5dcc374dd404e475a9ddb4
    Reviewed-on: http://gerrit.cloudera.org:8080/15362
    Tested-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Hao Hao <ha...@cloudera.com>
---
 .../apache/kudu/subprocess/SubprocessExecutor.java | 18 +++++++++
 .../apache/kudu/subprocess/SubprocessTestUtil.java |  9 +++--
 .../kudu/subprocess/echo/TestEchoSubprocess.java   | 43 ++++++++++++----------
 3 files changed, 48 insertions(+), 22 deletions(-)

diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
index bbecfec..b56d301 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessExecutor.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -168,4 +169,21 @@ public class SubprocessExecutor {
   public void blockWriteMs(long blockWriteMs) {
     this.blockWriteMs = blockWriteMs;
   }
+
+  /**
+   * Wrapper around <code>run()</code> that runs until 'timeoutMs' elapses,
+   * catches any timeout exceptions, and returns.
+   *
+   * Used in tests.
+   * TODO(awong): it'd be nice if we had a nicer way to shut down the executor.
+   */
+  public void runUntilTimeout(String[] args, ProtocolHandler handler, long timeoutMs)
+      throws ExecutionException, InterruptedException {
+    Preconditions.checkArgument(timeoutMs != -1);
+    try {
+     run(args, handler, timeoutMs);
+    } catch (TimeoutException e) {
+      // no-op
+    }
+  }
 }
diff --git a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/SubprocessTestUtil.java b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/SubprocessTestUtil.java
index b7d44c9..1abb019 100644
--- a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/SubprocessTestUtil.java
+++ b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/SubprocessTestUtil.java
@@ -66,8 +66,11 @@ public class SubprocessTestUtil {
   protected PipedOutputStream requestSenderPipe;
 
   // Pipe that we can read from that will receive responses from the
-  // subprocess's output pipe.
+  // subprocess's output pipe. We'll read from it via BufferedInputStream,
+  // so wrap the pipe here.
   protected final PipedInputStream responseReceiverPipe = new PipedInputStream();
+  private final BufferedInputStream bufferedInputStream =
+      new BufferedInputStream(responseReceiverPipe);
 
   public static class PrintStreamWithIOException extends PrintStream {
     public PrintStreamWithIOException(OutputStream out, boolean autoFlush, String encoding)
@@ -91,8 +94,8 @@ public class SubprocessTestUtil {
   // Receives a response from the receiver pipe and deserializes it into a
   // SubprocessResponsePB.
   public Subprocess.SubprocessResponsePB receiveResponse() throws IOException {
-    BufferedInputStream bufferedInput = new BufferedInputStream(responseReceiverPipe);
-    return SubprocessTestUtil.deserializeMessage(bufferedInput, Subprocess.SubprocessResponsePB.parser());
+    return SubprocessTestUtil.deserializeMessage(bufferedInputStream,
+                                                 Subprocess.SubprocessResponsePB.parser());
   }
 
   // Sets up and returns a SubprocessExecutor with the given error handler and
diff --git a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
index 66f7ddc..5f0e3c1 100644
--- a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
+++ b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
@@ -79,13 +79,13 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
    * Test a regular old message. There should be no exceptions of any kind.
    * We should also see some metrics that make sense.
    */
-  @Test(expected = TimeoutException.class)
+  @Test
   public void testBasicMsg() throws Exception {
     SubprocessExecutor executor =
         setUpExecutorIO(NO_ERR, /*injectIOError*/false);
     sendRequestToPipe(createEchoSubprocessRequest(MESSAGE));
 
-    executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
+    executor.runUntilTimeout(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
     SubprocessResponsePB spResp = receiveResponse();
     EchoResponsePB echoResp = spResp.getResponse().unpack(EchoResponsePB.class);
     Assert.assertEquals(MESSAGE, echoResp.getData());
@@ -111,7 +111,7 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
    * Test to see what happens when the execution is the bottleneck. We should
    * see it in the execution time and inbound queue time and length metrics.
    */
-  @Test(expected = TimeoutException.class)
+  @Test
   public void testSlowExecutionMetrics() throws Exception {
     SubprocessExecutor executor =
       setUpExecutorIO(NO_ERR, /*injectIOError*/false);
@@ -122,7 +122,7 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
 
     // Run the executor with a single parser thread so we can make stronger
     // assumptions about timing.
-    executor.run(new String[]{"-p", "1"}, new EchoProtocolHandler(), TIMEOUT_MS);
+    executor.runUntilTimeout(new String[]{"-p", "1"}, new EchoProtocolHandler(), TIMEOUT_MS);
 
     SubprocessMetricsPB m = receiveResponse().getMetrics();
     long inboundQueueLength = m.getInboundQueueLength();
@@ -167,16 +167,16 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
    * Test to see what happens when writing is the bottleneck. We should see it
    * in the outbound queue metrics.
    */
-  @Test(expected = TimeoutException.class)
+  @Test
   public void testSlowWriterMetrics() throws Exception {
     SubprocessExecutor executor =
-      setUpExecutorIO(NO_ERR, /*injectIOError*/false);
+        setUpExecutorIO(NO_ERR, /*injectIOError*/false);
     final int BLOCK_MS = 200;
     executor.blockWriteMs(BLOCK_MS);
     sendRequestToPipe(createEchoSubprocessRequest(MESSAGE));
     sendRequestToPipe(createEchoSubprocessRequest(MESSAGE));
     sendRequestToPipe(createEchoSubprocessRequest(MESSAGE));
-    executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
+    executor.runUntilTimeout(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
 
     // In writing the first request, the other two requests should've been
     // close behind, likely both in the outbound queue.
@@ -186,29 +186,34 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
     m = receiveResponse().getMetrics();
     Assert.assertEquals(1, m.getOutboundQueueLength());
     Assert.assertTrue(
-      String.format("Expected a higher outbound queue time: %s ms", m.getOutboundQueueTimeMs()),
-      m.getOutboundQueueTimeMs() >= BLOCK_MS);
+        String.format("Expected a higher outbound queue time: %s ms", m.getOutboundQueueTimeMs()),
+        m.getOutboundQueueTimeMs() >= BLOCK_MS);
 
     m = receiveResponse().getMetrics();
     Assert.assertEquals(0, m.getOutboundQueueLength());
     Assert.assertTrue(
-      String.format("Expected a higher outbound queue time: %s ms", m.getOutboundQueueTimeMs()),
-      m.getOutboundQueueTimeMs() >= 2 * BLOCK_MS);
+        String.format("Expected a higher outbound queue time: %s ms", m.getOutboundQueueTimeMs()),
+        m.getOutboundQueueTimeMs() >= 2 * BLOCK_MS);
   }
 
   /**
    * Test what happens when we send a message that is completely empty (i.e.
    * not an empty SubprocessRequestPB message -- no message at all).
    */
-  @Test(expected = TimeoutException.class)
+  @Test
   public void testMsgWithEmptyMessage() throws Exception {
     SubprocessExecutor executor = setUpExecutorIO(NO_ERR,
                                                   /*injectIOError*/false);
     requestSenderPipe.write(MessageIO.intToBytes(0));
-    executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
+    // NOTE: reading IO when the pipe is virtually empty leads us to hang. So
+    // let's put something else onto the pipe and just ensure that our empty
+    // message was a no-op.
+    sendRequestToPipe(createEchoSubprocessRequest(MESSAGE));
+    executor.runUntilTimeout(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS);
 
-    // We should see no bytes land in the receiver pipe.
-    Assert.assertEquals(-1, responseReceiverPipe.read());
+    SubprocessResponsePB spResp = receiveResponse();
+    EchoResponsePB echoResp = spResp.getResponse().unpack(EchoResponsePB.class);
+    Assert.assertEquals(MESSAGE, echoResp.getData());
   }
 
   /**
@@ -237,8 +242,8 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
     // CompletableFuture because, in waiting for completion, the MessageReader
     // times out before CompletableFuture.get() is called on the writer.
     assertIncludingSuppressedThrows(IOException.class,
-      "Unable to write to print stream",
-      () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
+        "Unable to write to print stream",
+        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
   }
 
   /**
@@ -261,14 +266,14 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
    * <code>MessageParser</code> tasks can continue making progress.
    */
   @Test
-  public void testMessageParser() throws Exception  {
+  public void testSlowWriterDoesntBlockQueues() throws Exception  {
     SubprocessExecutor executor =
         setUpExecutorIO(NO_ERR, /*injectIOError*/false);
     sendRequestToPipe(createEchoSubprocessRequest("a"));
     sendRequestToPipe(createEchoSubprocessRequest("b"));
     executor.blockWriteMs(1000);
     Assert.assertThrows(TimeoutException.class,
-        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), /*timeoutMs*/500));
+        () -> executor.run(NO_ARGS, new EchoProtocolHandler(), TIMEOUT_MS));
 
     // We should see a single message in the outbound queue. The other one is
     // blocked writing.


[kudu] 02/02: subprocess: report metrics even when calls fail

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 0702fc5e099478ceba9e3f6f6b25f2ffae2afc82
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Wed Mar 4 16:50:55 2020 -0800

    subprocess: report metrics even when calls fail
    
    This moves metric reporting for metrics returned by the subprocess into
    the SubprocessServer so they are still reported in the event that the
    call fails.
    
    To do this, I moved the SubprocessMetrics into the SubprocessServer,
    which means that these metrics are now required by the server. As such,
    I moved the declarations of Echo metrics out of subprocess_proxy-test so
    they could be reused by other tests.
    
    This should prove more useful, particularly given the fact that failed
    calls warrant a deeper look at these metrics.
    
    Change-Id: I87cc82f82a0a767383284799ec6159d680526ea9
    Reviewed-on: http://gerrit.cloudera.org:8080/15366
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/ranger/ranger_client-test.cc         |  5 +-
 src/kudu/subprocess/CMakeLists.txt            |  8 +++
 src/kudu/subprocess/echo_subprocess.cc        | 68 +++++++++++++++++++
 src/kudu/subprocess/echo_subprocess.h         | 39 +++++++++++
 src/kudu/subprocess/server.cc                 | 15 ++++-
 src/kudu/subprocess/server.h                  | 14 +++-
 src/kudu/subprocess/subprocess_proxy-test.cc  | 96 ++++++++++++---------------
 src/kudu/subprocess/subprocess_proxy.h        | 33 +--------
 src/kudu/subprocess/subprocess_server-test.cc | 11 ++-
 9 files changed, 198 insertions(+), 91 deletions(-)

diff --git a/src/kudu/ranger/ranger_client-test.cc b/src/kudu/ranger/ranger_client-test.cc
index 4213593..b2cae2c 100644
--- a/src/kudu/ranger/ranger_client-test.cc
+++ b/src/kudu/ranger/ranger_client-test.cc
@@ -42,6 +42,7 @@ namespace kudu {
 namespace ranger {
 
 using boost::hash_combine;
+using kudu::subprocess::SubprocessMetrics;
 using kudu::subprocess::SubprocessRequestPB;
 using kudu::subprocess::SubprocessResponsePB;
 using kudu::subprocess::SubprocessServer;
@@ -91,8 +92,8 @@ class MockSubprocessServer : public SubprocessServer {
 
   ~MockSubprocessServer() override {}
 
-  MockSubprocessServer() :
-    SubprocessServer({"mock"}) {}
+  MockSubprocessServer()
+      : SubprocessServer({"mock"}, SubprocessMetrics()) {}
 
   Status Execute(SubprocessRequestPB* req,
                  SubprocessResponsePB* resp) override {
diff --git a/src/kudu/subprocess/CMakeLists.txt b/src/kudu/subprocess/CMakeLists.txt
index a7514a4..ceab0ca 100644
--- a/src/kudu/subprocess/CMakeLists.txt
+++ b/src/kudu/subprocess/CMakeLists.txt
@@ -59,6 +59,13 @@ target_link_libraries(kudu_subprocess
 )
 add_dependencies(kudu_subprocess subprocess_jar)
 
+add_library(echo_subprocess
+  echo_subprocess.cc
+)
+target_link_libraries(echo_subprocess
+  kudu_subprocess
+)
+
 #######################################
 # Unit tests
 #######################################
@@ -66,6 +73,7 @@ add_dependencies(kudu_subprocess subprocess_jar)
 if (NOT NO_TESTS)
   SET_KUDU_TEST_LINK_LIBS(
     kudu_subprocess
+    echo_subprocess
   )
 endif()
 
diff --git a/src/kudu/subprocess/echo_subprocess.cc b/src/kudu/subprocess/echo_subprocess.cc
new file mode 100644
index 0000000..9faac0e
--- /dev/null
+++ b/src/kudu/subprocess/echo_subprocess.cc
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/subprocess/echo_subprocess.h"
+
+#include "kudu/util/metrics.h"
+
+METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_length,
+    "Echo subprocess inbound queue length",
+    kudu::MetricUnit::kMessages,
+    "Number of request messages in the Echo subprocess' inbound request queue",
+    kudu::MetricLevel::kInfo,
+    1000, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_length,
+    "Echo subprocess outbound queue length",
+    kudu::MetricUnit::kMessages,
+    "Number of request messages in the Echo subprocess' outbound response queue",
+    kudu::MetricLevel::kInfo,
+    1000, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_time_ms,
+    "Echo subprocess inbound queue time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent in the Echo subprocess' inbound request queue",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_time_ms,
+    "Echo subprocess outbound queue time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent in the Echo subprocess' outbound response queue",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms,
+    "Echo subprocess execution time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent executing the Echo subprocess request, excluding "
+    "time spent spent in the subprocess queues",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
+
+namespace kudu {
+namespace subprocess {
+
+#define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
+EchoSubprocessMetrics::EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
+  HISTINIT(inbound_queue_length, echo_subprocess_inbound_queue_length);
+  HISTINIT(outbound_queue_length, echo_subprocess_outbound_queue_length);
+  HISTINIT(inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms);
+  HISTINIT(outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms);
+  HISTINIT(execution_time_ms, echo_subprocess_execution_time_ms);
+}
+#undef HISTINIT
+
+} // namespace subprocess
+} // namespace kudu
diff --git a/src/kudu/subprocess/echo_subprocess.h b/src/kudu/subprocess/echo_subprocess.h
new file mode 100644
index 0000000..c78170b
--- /dev/null
+++ b/src/kudu/subprocess/echo_subprocess.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/subprocess/server.h"
+#include "kudu/subprocess/subprocess_proxy.h"
+
+namespace kudu {
+
+class MetricEntity;
+
+namespace subprocess {
+
+class EchoRequestPB;
+class EchoResponsePB;
+
+struct EchoSubprocessMetrics : public SubprocessMetrics {
+  explicit EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity);
+};
+
+typedef SubprocessProxy<EchoRequestPB, EchoResponsePB, EchoSubprocessMetrics> EchoSubprocess;
+
+} // namespace subprocess
+} // namespace kudu
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 6d81d27..f23c1af 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -83,13 +83,14 @@ using strings::Substitute;
 namespace kudu {
 namespace subprocess {
 
-SubprocessServer::SubprocessServer(vector<string> subprocess_argv)
+SubprocessServer::SubprocessServer(vector<string> subprocess_argv, SubprocessMetrics metrics)
     : call_timeout_(MonoDelta::FromSeconds(FLAGS_subprocess_timeout_secs)),
       next_id_(1),
       closing_(1),
       process_(make_shared<Subprocess>(std::move(subprocess_argv))),
       outbound_call_queue_(FLAGS_subprocess_request_queue_size_bytes),
-      inbound_response_queue_(FLAGS_subprocess_response_queue_size_bytes) {
+      inbound_response_queue_(FLAGS_subprocess_response_queue_size_bytes),
+      metrics_(std::move(metrics)) {
   process_->ShareParentStdin(false);
   process_->ShareParentStdout(false);
 }
@@ -221,6 +222,16 @@ void SubprocessServer::ResponderThread() {
         LOG(FATAL) << Substitute("Received invalid response: $0",
                                  pb_util::SecureDebugString(resp));
       }
+      // Regardless of whether this call succeeded or not, parse the returned
+      // metrics.
+      if (PREDICT_TRUE(resp.has_metrics())) {
+        const auto& pb = resp.metrics();
+        metrics_.inbound_queue_length->Increment(pb.inbound_queue_length());
+        metrics_.outbound_queue_length->Increment(pb.outbound_queue_length());
+        metrics_.inbound_queue_time_ms->Increment(pb.inbound_queue_time_ms());
+        metrics_.outbound_queue_time_ms->Increment(pb.outbound_queue_time_ms());
+        metrics_.execution_time_ms->Increment(pb.execution_time_ms());
+      }
     }
     vector<pair<shared_ptr<SubprocessCall>, SubprocessResponsePB>> calls_and_resps;
     calls_and_resps.reserve(resps.size());
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index c175e22..d4e0ebe 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -39,6 +39,7 @@
 #include "kudu/util/blocking_queue.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/mutex.h"
 #include "kudu/util/status.h"
@@ -53,6 +54,14 @@ namespace subprocess {
 
 typedef int64_t CallId;
 
+struct SubprocessMetrics {
+  scoped_refptr<Histogram> inbound_queue_length;
+  scoped_refptr<Histogram> outbound_queue_length;
+  scoped_refptr<Histogram> inbound_queue_time_ms;
+  scoped_refptr<Histogram> outbound_queue_time_ms;
+  scoped_refptr<Histogram> execution_time_ms;
+};
+
 // Encapsulates the pending state of a request that is in the process of being
 // sent to a subprocess. These calls are added to an in-flight map before
 // calling SendRequest(). See the method comments for some discussion about
@@ -188,7 +197,7 @@ typedef BlockingQueue<SubprocessResponsePB, ResponseLogicalSize> ResponseQueue;
 // Public methods are virtual so a mock server can be used in tests.
 class SubprocessServer {
  public:
-  explicit SubprocessServer(std::vector<std::string> subprocess_argv);
+  SubprocessServer(std::vector<std::string> subprocess_argv, SubprocessMetrics metrics);
   virtual ~SubprocessServer();
 
   // Initialize the server, starting the subprocess and worker threads.
@@ -268,6 +277,9 @@ class SubprocessServer {
   // Inbound queue of responses sent by the subprocess.
   ResponseQueue inbound_response_queue_;
 
+  // Metrics for this subprocess.
+  SubprocessMetrics metrics_;
+
   // Calls that are currently in-flight (the requests are being sent over the
   // pipe or waiting for a response), ordered by ID. This ordering allows for
   // lookup by ID, and gives us a rough way to get the calls with earliest
diff --git a/src/kudu/subprocess/subprocess_proxy-test.cc b/src/kudu/subprocess/subprocess_proxy-test.cc
index 958807e..897dd18 100644
--- a/src/kudu/subprocess/subprocess_proxy-test.cc
+++ b/src/kudu/subprocess/subprocess_proxy-test.cc
@@ -23,11 +23,13 @@
 #include <utility>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <gtest/gtest.h>
 
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/subprocess/echo_subprocess.h"
 #include "kudu/subprocess/subprocess.pb.h"
 #include "kudu/util/env.h"
 #include "kudu/util/metrics.h"
@@ -36,65 +38,23 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
-using std::make_shared;
-using std::shared_ptr;
+DECLARE_int32(subprocess_timeout_secs);
+
+METRIC_DECLARE_histogram(echo_subprocess_inbound_queue_length);
+METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_length);
+METRIC_DECLARE_histogram(echo_subprocess_inbound_queue_time_ms);
+METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_time_ms);
+METRIC_DECLARE_histogram(echo_subprocess_execution_time_ms);
+
+using std::unique_ptr;
 using std::string;
 using std::vector;
 using strings::Substitute;
 
-METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_length,
-    "Echo subprocess inbound queue length",
-    kudu::MetricUnit::kMessages,
-    "Number of request messages in the Echo subprocess' inbound request queue",
-    kudu::MetricLevel::kInfo,
-    1000, 1);
-METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_length,
-    "Echo subprocess outbound queue length",
-    kudu::MetricUnit::kMessages,
-    "Number of request messages in the Echo subprocess' outbound response queue",
-    kudu::MetricLevel::kInfo,
-    1000, 1);
-METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_time_ms,
-    "Echo subprocess inbound queue time (ms)",
-    kudu::MetricUnit::kMilliseconds,
-    "Duration of time in ms spent in the Echo subprocess' inbound request queue",
-    kudu::MetricLevel::kInfo,
-    60000LU, 1);
-METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_time_ms,
-    "Echo subprocess outbound queue time (ms)",
-    kudu::MetricUnit::kMilliseconds,
-    "Duration of time in ms spent in the Echo subprocess' outbound response queue",
-    kudu::MetricLevel::kInfo,
-    60000LU, 1);
-METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms,
-    "Echo subprocess execution time (ms)",
-    kudu::MetricUnit::kMilliseconds,
-    "Duration of time in ms spent executing the Echo subprocess request, excluding "
-    "time spent spent in the subprocess queues",
-    kudu::MetricLevel::kInfo,
-    60000LU, 1);
-
 
 namespace kudu {
 namespace subprocess {
 
-
-#define GINIT(member, x) member = METRIC_##x.Instantiate(entity, 0)
-#define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
-struct EchoSubprocessMetrics : public SubprocessMetrics {
-  explicit EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
-    HISTINIT(inbound_queue_length, echo_subprocess_inbound_queue_length);
-    HISTINIT(outbound_queue_length, echo_subprocess_outbound_queue_length);
-    HISTINIT(inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms);
-    HISTINIT(outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms);
-    HISTINIT(execution_time_ms, echo_subprocess_execution_time_ms);
-  }
-};
-#undef HISTINIT
-#undef MINIT
-
-typedef SubprocessProxy<EchoRequestPB, EchoResponsePB, EchoSubprocessMetrics> EchoSubprocess;
-
 class EchoSubprocessTest : public KuduTest {
  public:
   EchoSubprocessTest()
@@ -117,17 +77,17 @@ class EchoSubprocessTest : public KuduTest {
       "-cp", Substitute("$0/kudu-subprocess.jar", bin_dir),
       "org.apache.kudu.subprocess.echo.EchoSubprocessMain"
     };
-    echo_subprocess_ = make_shared<EchoSubprocess>(std::move(argv), metric_entity_);
+    echo_subprocess_.reset(new EchoSubprocess(std::move(argv), metric_entity_));
     return echo_subprocess_->Start();
   }
 
  protected:
   MetricRegistry metric_registry_;
   scoped_refptr<MetricEntity> metric_entity_;
-  shared_ptr<EchoSubprocess> echo_subprocess_;
+  unique_ptr<EchoSubprocess> echo_subprocess_;
 };
 
-TEST_F(EchoSubprocessTest, TestBasicMetrics) {
+TEST_F(EchoSubprocessTest, TestBasicSubprocessMetrics) {
   const string kMessage = "don't catch you slippin' now";
   const int64_t kSleepMs = 1000;
   EchoRequestPB req;
@@ -164,5 +124,33 @@ TEST_F(EchoSubprocessTest, TestBasicMetrics) {
   ASSERT_LT(kSleepMs, exec_hist->MaxValueForTests());
 }
 
+// Test that we'll still report metrics when we recieve them from the
+// subprocess, even if the call itself failed.
+TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) {
+  // Set things up so we'll time out.
+  FLAGS_subprocess_timeout_secs = 1;
+  const int64_t kSleepMs = 2000;
+  ASSERT_OK(ResetEchoSubprocess());
+
+  EchoRequestPB req;
+  req.set_data("garbage!");
+  req.set_sleep_ms(kSleepMs);
+  EchoResponsePB resp;
+  Status s = echo_subprocess_->Execute(req, &resp);
+  ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+
+  // Immediately following our call, we won't have any metrics from the subprocess.
+  Histogram* exec_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
+      METRIC_echo_subprocess_execution_time_ms).get());
+  ASSERT_EQ(0, exec_hist->TotalCount());
+
+  // Eventually the subprocess will return our call, and we'll see some
+  // metrics.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(1, exec_hist->TotalCount());
+    ASSERT_LT(kSleepMs, exec_hist->MaxValueForTests());
+  });
+}
+
 } // namespace subprocess
 } // namespace kudu
diff --git a/src/kudu/subprocess/subprocess_proxy.h b/src/kudu/subprocess/subprocess_proxy.h
index 33e0f9a..d8cf6c7 100644
--- a/src/kudu/subprocess/subprocess_proxy.h
+++ b/src/kudu/subprocess/subprocess_proxy.h
@@ -14,6 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+#pragma once
 
 #include <vector>
 #include <string>
@@ -32,15 +33,6 @@
 namespace kudu {
 namespace subprocess {
 
-// TODO(awong): add server metrics.
-struct SubprocessMetrics {
-  scoped_refptr<Histogram> inbound_queue_length;
-  scoped_refptr<Histogram> outbound_queue_length;
-  scoped_refptr<Histogram> inbound_queue_time_ms;
-  scoped_refptr<Histogram> outbound_queue_time_ms;
-  scoped_refptr<Histogram> execution_time_ms;
-};
-
 // Template that wraps a SubprocessServer, exposing only the underlying ReqPB
 // and RespPB as an interface. The given MetricsPB will be initialized,
 // allowing for metrics specific to each specialized SubprocessServer.
@@ -48,7 +40,7 @@ template<class ReqPB, class RespPB, class MetricsPB>
 class SubprocessProxy {
  public:
   SubprocessProxy(std::vector<std::string> argv, const scoped_refptr<MetricEntity>& entity)
-      : server_(new SubprocessServer(std::move(argv))), metrics_(entity) {}
+      : server_(new SubprocessServer(std::move(argv), MetricsPB(entity))) {}
 
   // Starts the underlying subprocess.
   Status Start() {
@@ -68,11 +60,6 @@ class SubprocessProxy {
                                         pb_util::SecureDebugString(sresp));
       return Status::Corruption("unable to unpack response");
     }
-    // The subprocess metrics should still be valid regardless of whether there
-    // was an error, so parse them first.
-    if (sresp.has_metrics()) {
-      ParseMetricsPB(sresp.metrics());
-    }
     if (sresp.has_error()) {
       return StatusFromPB(sresp.error());
     }
@@ -84,23 +71,7 @@ class SubprocessProxy {
     server_ = std::move(server);
   }
  private:
-  // Parses the given metrics protobuf and updates 'metrics_' based on its
-  // contents.
-  void ParseMetricsPB(const SubprocessMetricsPB& pb) {
-    DCHECK(pb.has_inbound_queue_length());
-    DCHECK(pb.has_outbound_queue_length());
-    DCHECK(pb.has_inbound_queue_time_ms());
-    DCHECK(pb.has_outbound_queue_time_ms());
-    DCHECK(pb.has_execution_time_ms());
-    metrics_.inbound_queue_length->Increment(pb.inbound_queue_length());
-    metrics_.outbound_queue_length->Increment(pb.outbound_queue_length());
-    metrics_.inbound_queue_time_ms->Increment(pb.inbound_queue_time_ms());
-    metrics_.outbound_queue_time_ms->Increment(pb.outbound_queue_time_ms());
-    metrics_.execution_time_ms->Increment(pb.execution_time_ms());
-  }
-
   std::unique_ptr<SubprocessServer> server_;
-  MetricsPB metrics_;
 };
 
 } // namespace subprocess
diff --git a/src/kudu/subprocess/subprocess_server-test.cc b/src/kudu/subprocess/subprocess_server-test.cc
index 2de5ec7..d14293f 100644
--- a/src/kudu/subprocess/subprocess_server-test.cc
+++ b/src/kudu/subprocess/subprocess_server-test.cc
@@ -26,10 +26,13 @@
 #include <google/protobuf/any.pb.h>
 #include <gtest/gtest.h>
 
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/subprocess/echo_subprocess.h"
 #include "kudu/subprocess/server.h"
 #include "kudu/subprocess/subprocess.pb.h"
 #include "kudu/util/env.h"
+#include "kudu/util/metrics.h"
 #include "kudu/util/path_util.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/status.h"
@@ -75,6 +78,9 @@ const char* kHello = "hello world";
 
 class SubprocessServerTest : public KuduTest {
  public:
+  SubprocessServerTest()
+      : metric_entity_(METRIC_ENTITY_server.Instantiate(&metric_registry_,
+                                                        "subprocess_server-test")) {}
   void SetUp() override {
     KuduTest::SetUp();
     ASSERT_OK(ResetSubprocessServer());
@@ -103,11 +109,14 @@ class SubprocessServerTest : public KuduTest {
       argv.emplace_back("p");
       argv.emplace_back(std::to_string(java_parser_threads));
     }
-    server_ = make_shared<SubprocessServer>(std::move(argv));
+    server_ = make_shared<SubprocessServer>(std::move(argv),
+                                            EchoSubprocessMetrics(metric_entity_));
     return server_->Init();
   }
 
  protected:
+  MetricRegistry metric_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
   shared_ptr<SubprocessServer> server_;
 };