You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/05 22:14:03 UTC

[kudu] branch master updated: subprocess: add server-side metrics

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f01667  subprocess: add server-side metrics
6f01667 is described below

commit 6f016678cbf8fa32714685cd5166bced87eb28e9
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Mar 3 20:22:11 2020 -0800

    subprocess: add server-side metrics
    
    This introduces server-side timing information to the list of
    subprocess-related metrics.
    
    Change-Id: I51294741ff82bd47e64ceaba18a6d04ae0144179
    Reviewed-on: http://gerrit.cloudera.org:8080/15367
    Reviewed-by: Hao Hao <ha...@cloudera.com>
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/ranger/ranger_client.cc             | 24 +++++++++---
 src/kudu/subprocess/echo_subprocess.cc       | 24 +++++++++---
 src/kudu/subprocess/server.cc                | 44 ++++++++++++++-------
 src/kudu/subprocess/server.h                 | 39 +++++++++++++------
 src/kudu/subprocess/subprocess_proxy-test.cc | 58 ++++++++++++++++++++++------
 5 files changed, 141 insertions(+), 48 deletions(-)

diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index 36141d9..8ca9812 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -80,6 +80,18 @@ METRIC_DEFINE_histogram(server, ranger_subprocess_execution_time_ms,
     "time spent spent in the subprocess queues",
     kudu::MetricLevel::kInfo,
     60000LU, 1);
+METRIC_DEFINE_histogram(server, ranger_server_outbound_queue_time_ms,
+    "Ranger server outbound queue time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent in the Ranger server's outbound request queue",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
+METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_time_ms,
+    "Ranger server inbound queue time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent in the Ranger server's inbound response queue",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
 
 namespace kudu {
 namespace ranger {
@@ -99,11 +111,13 @@ const char* kMainClass = "org.apache.kudu.ranger.RangerSubprocessMain";
 
 #define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
 RangerSubprocessMetrics::RangerSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
-  HISTINIT(inbound_queue_length, ranger_subprocess_inbound_queue_length);
-  HISTINIT(outbound_queue_length, ranger_subprocess_outbound_queue_length);
-  HISTINIT(inbound_queue_time_ms, ranger_subprocess_inbound_queue_time_ms);
-  HISTINIT(outbound_queue_time_ms, ranger_subprocess_outbound_queue_time_ms);
-  HISTINIT(execution_time_ms, ranger_subprocess_execution_time_ms);
+  HISTINIT(sp_inbound_queue_length, ranger_subprocess_inbound_queue_length);
+  HISTINIT(sp_outbound_queue_length, ranger_subprocess_outbound_queue_length);
+  HISTINIT(sp_inbound_queue_time_ms, ranger_subprocess_inbound_queue_time_ms);
+  HISTINIT(sp_outbound_queue_time_ms, ranger_subprocess_outbound_queue_time_ms);
+  HISTINIT(sp_execution_time_ms, ranger_subprocess_execution_time_ms);
+  HISTINIT(server_outbound_queue_time_ms, ranger_server_outbound_queue_time_ms);
+  HISTINIT(server_inbound_queue_time_ms, ranger_server_inbound_queue_time_ms);
 }
 #undef HISTINIT
 
diff --git a/src/kudu/subprocess/echo_subprocess.cc b/src/kudu/subprocess/echo_subprocess.cc
index 9faac0e..5e39114 100644
--- a/src/kudu/subprocess/echo_subprocess.cc
+++ b/src/kudu/subprocess/echo_subprocess.cc
@@ -50,17 +50,31 @@ METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms,
     "time spent spent in the subprocess queues",
     kudu::MetricLevel::kInfo,
     60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_server_outbound_queue_time_ms,
+    "Echo server outbound queue time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent in the Echo server's outbound request queue",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_server_inbound_queue_time_ms,
+    "Echo server inbound queue time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent in the Echo server's inbound response queue",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
 
 namespace kudu {
 namespace subprocess {
 
 #define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
 EchoSubprocessMetrics::EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
-  HISTINIT(inbound_queue_length, echo_subprocess_inbound_queue_length);
-  HISTINIT(outbound_queue_length, echo_subprocess_outbound_queue_length);
-  HISTINIT(inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms);
-  HISTINIT(outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms);
-  HISTINIT(execution_time_ms, echo_subprocess_execution_time_ms);
+  HISTINIT(sp_inbound_queue_length, echo_subprocess_inbound_queue_length);
+  HISTINIT(sp_outbound_queue_length, echo_subprocess_outbound_queue_length);
+  HISTINIT(sp_inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms);
+  HISTINIT(sp_outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms);
+  HISTINIT(sp_execution_time_ms, echo_subprocess_execution_time_ms);
+  HISTINIT(server_outbound_queue_time_ms, echo_server_outbound_queue_time_ms);
+  HISTINIT(server_inbound_queue_time_ms, echo_server_inbound_queue_time_ms);
 }
 #undef HISTINIT
 
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index f23c1af..744d0dc 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -21,6 +21,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -130,9 +131,11 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req,
   req->set_id(next_id_++);
   Synchronizer sync;
   auto cb = sync.AsStdStatusCallback();
-  auto call = make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_);
-  RETURN_NOT_OK_PREPEND(outbound_call_queue_.BlockingPut(call, call->deadline()),
-                        "couldn't enqueue call");
+  CallAndTimer call_and_timer = {
+      make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_), {} };
+  RETURN_NOT_OK_PREPEND(
+      outbound_call_queue_.BlockingPut(call_and_timer, call_and_timer.first->deadline()),
+      "couldn't enqueue call");
   return sync.Wait();
 }
 
@@ -200,7 +203,8 @@ void SubprocessServer::ReceiveMessagesThread() {
     // subprocess.
     DCHECK(s.ok());
     WARN_NOT_OK(s, "failed to receive response from the subprocess");
-    if (s.ok() && !inbound_response_queue_.BlockingPut(response).ok()) {
+    ResponsePBAndTimer resp_and_timer = { std::move(response), {} };
+    if (s.ok() && !inbound_response_queue_.BlockingPut(resp_and_timer).ok()) {
       // The queue has been shut down and we should shut down too.
       DCHECK_EQ(0, closing_.count());
       LOG(INFO) << "failed to put response onto inbound queue";
@@ -212,12 +216,16 @@ void SubprocessServer::ReceiveMessagesThread() {
 void SubprocessServer::ResponderThread() {
   Status s;
   do {
-    vector<SubprocessResponsePB> resps;
+    vector<ResponsePBAndTimer> resps;
     // NOTE: since we don't supply a deadline, this will only fail if the queue
     // is shutting down. Also note that even if this fails because we're
     // shutting down, we still populate 'resps' and must run their callbacks.
     s = inbound_response_queue_.BlockingDrainTo(&resps);
-    for (const auto& resp : resps) {
+    for (auto& resp_and_timer : resps) {
+      metrics_.server_inbound_queue_time_ms->Increment(
+          resp_and_timer.second.elapsed().ToMilliseconds());
+      const auto& resp = resp_and_timer.first;
+
       if (!resp.has_id()) {
         LOG(FATAL) << Substitute("Received invalid response: $0",
                                  pb_util::SecureDebugString(resp));
@@ -226,18 +234,19 @@ void SubprocessServer::ResponderThread() {
       // metrics.
       if (PREDICT_TRUE(resp.has_metrics())) {
         const auto& pb = resp.metrics();
-        metrics_.inbound_queue_length->Increment(pb.inbound_queue_length());
-        metrics_.outbound_queue_length->Increment(pb.outbound_queue_length());
-        metrics_.inbound_queue_time_ms->Increment(pb.inbound_queue_time_ms());
-        metrics_.outbound_queue_time_ms->Increment(pb.outbound_queue_time_ms());
-        metrics_.execution_time_ms->Increment(pb.execution_time_ms());
+        metrics_.sp_inbound_queue_length->Increment(pb.inbound_queue_length());
+        metrics_.sp_outbound_queue_length->Increment(pb.outbound_queue_length());
+        metrics_.sp_inbound_queue_time_ms->Increment(pb.inbound_queue_time_ms());
+        metrics_.sp_outbound_queue_time_ms->Increment(pb.outbound_queue_time_ms());
+        metrics_.sp_execution_time_ms->Increment(pb.execution_time_ms());
       }
     }
     vector<pair<shared_ptr<SubprocessCall>, SubprocessResponsePB>> calls_and_resps;
     calls_and_resps.reserve(resps.size());
     {
       std::lock_guard<simple_spinlock> l(in_flight_lock_);
-      for (auto& resp : resps) {
+      for (auto& resp_and_timer : resps) {
+        auto& resp = resp_and_timer.first;
         auto id = resp.id();
         auto call = EraseKeyReturnValuePtr(&call_by_id_, id);
         if (call) {
@@ -290,7 +299,7 @@ void SubprocessServer::SendMessagesThread() {
   DCHECK(message_protocol_) << "message protocol is not initialized";
   Status s;
   do {
-    vector<shared_ptr<SubprocessCall>> calls;
+    vector<CallAndTimer> calls;
     // NOTE: since we don't supply a deadline, this will only fail if the queue
     // is shutting down. Also note that even if this fails because we're
     // shutting down, we still populate 'calls' and should add them to the
@@ -298,14 +307,19 @@ void SubprocessServer::SendMessagesThread() {
     s = outbound_call_queue_.BlockingDrainTo(&calls);
     {
       std::lock_guard<simple_spinlock> l(in_flight_lock_);
-      for (const auto& call : calls) {
+      for (const auto& call_and_timer : calls) {
+        const auto& call = call_and_timer.first;
         EmplaceOrDie(&call_by_id_, call->id(), call);
       }
     }
     // NOTE: it's possible that before sending the request, the call already
     // timed out and the deadline checker already called its callback. If so,
     // the following call will no-op.
-    for (const auto& call : calls) {
+    for (const auto& call_and_timer : calls) {
+      const auto& call = call_and_timer.first;
+      metrics_.server_outbound_queue_time_ms->Increment(
+          call_and_timer.second.elapsed().ToMilliseconds());
+
       call->SendRequest(message_protocol_.get());
     }
   } while (s.ok());
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index d4e0ebe..ed03f27 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -54,12 +54,27 @@ namespace subprocess {
 
 typedef int64_t CallId;
 
+struct SimpleTimer {
+  SimpleTimer() {
+    start_time = MonoTime::Now();
+  }
+  MonoTime start_time;
+  MonoDelta elapsed() const {
+    return MonoTime::Now() - start_time;
+  }
+};
+
 struct SubprocessMetrics {
-  scoped_refptr<Histogram> inbound_queue_length;
-  scoped_refptr<Histogram> outbound_queue_length;
-  scoped_refptr<Histogram> inbound_queue_time_ms;
-  scoped_refptr<Histogram> outbound_queue_time_ms;
-  scoped_refptr<Histogram> execution_time_ms;
+  // Metrics returned from the subprocess.
+  scoped_refptr<Histogram> sp_inbound_queue_length;
+  scoped_refptr<Histogram> sp_outbound_queue_length;
+  scoped_refptr<Histogram> sp_inbound_queue_time_ms;
+  scoped_refptr<Histogram> sp_outbound_queue_time_ms;
+  scoped_refptr<Histogram> sp_execution_time_ms;
+
+  // Metrics recorded by the SubprocessServer.
+  scoped_refptr<Histogram> server_inbound_queue_time_ms;
+  scoped_refptr<Histogram> server_outbound_queue_time_ms;
 };
 
 // Encapsulates the pending state of a request that is in the process of being
@@ -155,19 +170,21 @@ class SubprocessCall {
 };
 
 // Used by BlockingQueue to determine the size of messages.
+typedef std::pair<std::shared_ptr<SubprocessCall>, SimpleTimer> CallAndTimer;
 struct RequestLogicalSize {
-  static size_t logical_size(const std::shared_ptr<SubprocessCall>& call) {
-    return call->req_->ByteSizeLong();
+  static size_t logical_size(const CallAndTimer& call_and_timer) {
+    return call_and_timer.first->req_->ByteSizeLong();
   }
 };
+typedef std::pair<SubprocessResponsePB, SimpleTimer> ResponsePBAndTimer;
 struct ResponseLogicalSize {
-  static size_t logical_size(const SubprocessResponsePB& response) {
-    return response.ByteSizeLong();
+  static size_t logical_size(const ResponsePBAndTimer& resp_and_timer) {
+    return resp_and_timer.first.ByteSizeLong();
   }
 };
 
-typedef BlockingQueue<std::shared_ptr<SubprocessCall>, RequestLogicalSize> SubprocessCallQueue;
-typedef BlockingQueue<SubprocessResponsePB, ResponseLogicalSize> ResponseQueue;
+typedef BlockingQueue<CallAndTimer, RequestLogicalSize> SubprocessCallQueue;
+typedef BlockingQueue<ResponsePBAndTimer, ResponseLogicalSize> ResponseQueue;
 
 // Wrapper for a subprocess that communicates via protobuf. A server is
 // comprised of a few things to facilitate concurrent communication with an
diff --git a/src/kudu/subprocess/subprocess_proxy-test.cc b/src/kudu/subprocess/subprocess_proxy-test.cc
index 897dd18..e83490c 100644
--- a/src/kudu/subprocess/subprocess_proxy-test.cc
+++ b/src/kudu/subprocess/subprocess_proxy-test.cc
@@ -45,6 +45,8 @@ METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_length);
 METRIC_DECLARE_histogram(echo_subprocess_inbound_queue_time_ms);
 METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_time_ms);
 METRIC_DECLARE_histogram(echo_subprocess_execution_time_ms);
+METRIC_DECLARE_histogram(echo_server_outbound_queue_time_ms);
+METRIC_DECLARE_histogram(echo_server_inbound_queue_time_ms);
 
 using std::unique_ptr;
 using std::string;
@@ -87,6 +89,9 @@ class EchoSubprocessTest : public KuduTest {
   unique_ptr<EchoSubprocess> echo_subprocess_;
 };
 
+#define GET_HIST(metric_entity, metric_name) \
+  down_cast<Histogram*>((metric_entity)->FindOrNull(METRIC_##metric_name).get());
+
 TEST_F(EchoSubprocessTest, TestBasicSubprocessMetrics) {
   const string kMessage = "don't catch you slippin' now";
   const int64_t kSleepMs = 1000;
@@ -97,29 +102,35 @@ TEST_F(EchoSubprocessTest, TestBasicSubprocessMetrics) {
   ASSERT_OK(echo_subprocess_->Execute(req, &resp));
   ASSERT_EQ(kMessage, resp.data());
 
+
   // There shouldn't have anything in the subprocess queues.
-  Histogram* in_len_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
-      METRIC_echo_subprocess_inbound_queue_length).get());
+  Histogram* in_len_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_length);
   ASSERT_EQ(1, in_len_hist->TotalCount());
   ASSERT_EQ(0, in_len_hist->MaxValueForTests());
-  Histogram* out_len_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
-      METRIC_echo_subprocess_outbound_queue_length).get());
+  Histogram* out_len_hist = GET_HIST(metric_entity_, echo_subprocess_outbound_queue_length);
   ASSERT_EQ(1, out_len_hist->TotalCount());
   ASSERT_EQ(0, out_len_hist->MaxValueForTests());
 
   // We should have some non-negative queue times.
-  Histogram* out_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
-      METRIC_echo_subprocess_outbound_queue_time_ms).get());
+  Histogram* out_hist = GET_HIST(metric_entity_, echo_subprocess_outbound_queue_time_ms);
   ASSERT_EQ(1, out_hist->TotalCount());
   ASSERT_LE(0, out_hist->MaxValueForTests());
-  Histogram* in_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
-      METRIC_echo_subprocess_inbound_queue_time_ms).get());
+  Histogram* in_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_time_ms);
   ASSERT_EQ(1, in_hist->TotalCount());
   ASSERT_LE(0, in_hist->MaxValueForTests());
 
+  // We should have some non-negative queue times on the server side too.
+  Histogram* server_out_hist =
+    GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms);
+  ASSERT_EQ(1, server_out_hist->TotalCount());
+  ASSERT_LE(0, server_out_hist->MaxValueForTests());
+  Histogram* server_in_hist =
+    GET_HIST(metric_entity_, echo_server_inbound_queue_time_ms);
+  ASSERT_EQ(1, server_in_hist->TotalCount());
+  ASSERT_LE(0, server_in_hist->MaxValueForTests());
+
   // The execution should've taken at least our sleep time.
-  Histogram* exec_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
-      METRIC_echo_subprocess_execution_time_ms).get());
+  Histogram* exec_hist = GET_HIST(metric_entity_, echo_subprocess_execution_time_ms);
   ASSERT_EQ(1, exec_hist->TotalCount());
   ASSERT_LT(kSleepMs, exec_hist->MaxValueForTests());
 }
@@ -140,17 +151,40 @@ TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) {
   ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
 
   // Immediately following our call, we won't have any metrics from the subprocess.
-  Histogram* exec_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
-      METRIC_echo_subprocess_execution_time_ms).get());
+  Histogram* exec_hist = GET_HIST(metric_entity_, echo_subprocess_execution_time_ms);
+  Histogram* out_len_hist = GET_HIST(metric_entity_, echo_subprocess_outbound_queue_length);
+  Histogram* in_len_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_length);
+  Histogram* sp_out_hist = GET_HIST(metric_entity_, echo_subprocess_outbound_queue_time_ms);
+  Histogram* sp_in_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_time_ms);
+  Histogram* server_out_hist = GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms);
+  Histogram* server_in_hist = GET_HIST(metric_entity_, echo_server_inbound_queue_time_ms);
   ASSERT_EQ(0, exec_hist->TotalCount());
+  ASSERT_EQ(0, out_len_hist->TotalCount());
+  ASSERT_EQ(0, in_len_hist->TotalCount());
+  ASSERT_EQ(0, sp_out_hist->TotalCount());
+  ASSERT_EQ(0, sp_in_hist->TotalCount());
+
+  // We'll have sent the request from the server and not received the response.
+  // Our metrics should reflect that.
+  ASSERT_EQ(1, server_out_hist->TotalCount());
+  ASSERT_EQ(0, server_in_hist->TotalCount());
 
   // Eventually the subprocess will return our call, and we'll see some
   // metrics.
   ASSERT_EVENTUALLY([&] {
     ASSERT_EQ(1, exec_hist->TotalCount());
     ASSERT_LT(kSleepMs, exec_hist->MaxValueForTests());
+
+    ASSERT_EQ(1, out_len_hist->TotalCount());
+    ASSERT_EQ(1, in_len_hist->TotalCount());
+    ASSERT_EQ(1, sp_out_hist->TotalCount());
+    ASSERT_EQ(1, sp_in_hist->TotalCount());
+    ASSERT_EQ(1, server_out_hist->TotalCount());
+    ASSERT_EQ(1, server_in_hist->TotalCount());
   });
 }
 
+#undef GET_HIST
+
 } // namespace subprocess
 } // namespace kudu