You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/06 05:12:38 UTC

[kudu] 02/03: subprocess: add server metric for queue size

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 771d36f84ef86c80334b9c322bf0748397cd8dd9
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Thu Mar 5 01:10:37 2020 -0800

    subprocess: add server metric for queue size
    
    Unlike the Java in/outbound queues, the C++ queues are measured in
    bytes. This exposes histogram metrics for the sizes of the inbound and
    outbound queue at Put-time.
    
    This also adds a method to get the logical size of a blocking queue.
    
    Change-Id: I62d2d5727dca3f54b59ff1044431326cbdde855d
    Reviewed-on: http://gerrit.cloudera.org:8080/15375
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/ranger/ranger_client.cc             | 46 +++++++++++++++--------
 src/kudu/subprocess/echo_subprocess.cc       | 56 ++++++++++++++++++----------
 src/kudu/subprocess/server.cc                |  4 ++
 src/kudu/subprocess/server.h                 |  6 ++-
 src/kudu/subprocess/subprocess_proxy-test.cc | 30 ++++++++++++---
 src/kudu/util/blocking_queue-test.cc         |  3 ++
 src/kudu/util/blocking_queue.h               |  7 +++-
 7 files changed, 107 insertions(+), 45 deletions(-)

diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index 8ca9812..c8ca24f 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -49,49 +49,63 @@ DEFINE_string(ranger_jar_path, "",
               "Path to the JAR file containing the Ranger subprocess.");
 TAG_FLAG(ranger_jar_path, experimental);
 
+METRIC_DEFINE_histogram(server, ranger_subprocess_execution_time_ms,
+    "Ranger subprocess execution time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent executing the Ranger subprocess request, excluding "
+    "time spent spent in the subprocess queues",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
 METRIC_DEFINE_histogram(server, ranger_subprocess_inbound_queue_length,
     "Ranger subprocess inbound queue length",
     kudu::MetricUnit::kMessages,
     "Number of request messages in the Ranger subprocess' inbound request queue",
     kudu::MetricLevel::kInfo,
     1000, 1);
-METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_length,
-    "Ranger subprocess outbound queue length",
-    kudu::MetricUnit::kMessages,
-    "Number of request messages in the Ranger subprocess' outbound response queue",
-    kudu::MetricLevel::kInfo,
-    1000, 1);
 METRIC_DEFINE_histogram(server, ranger_subprocess_inbound_queue_time_ms,
     "Ranger subprocess inbound queue time (ms)",
     kudu::MetricUnit::kMilliseconds,
     "Duration of time in ms spent in the Ranger subprocess' inbound request queue",
     kudu::MetricLevel::kInfo,
     60000LU, 1);
+METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_length,
+    "Ranger subprocess outbound queue length",
+    kudu::MetricUnit::kMessages,
+    "Number of request messages in the Ranger subprocess' outbound response queue",
+    kudu::MetricLevel::kInfo,
+    1000, 1);
 METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_time_ms,
     "Ranger subprocess outbound queue time (ms)",
     kudu::MetricUnit::kMilliseconds,
     "Duration of time in ms spent in the Ranger subprocess' outbound response queue",
     kudu::MetricLevel::kInfo,
     60000LU, 1);
-METRIC_DEFINE_histogram(server, ranger_subprocess_execution_time_ms,
-    "Ranger subprocess execution time (ms)",
+METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_size_bytes,
+    "Ranger server inbound queue size (bytes)",
+    kudu::MetricUnit::kBytes,
+    "Number of bytes in the inbound response queue of the Ranger server, recorded "
+    "at the time a new response is read from the pipe and added to the inbound queue",
+    kudu::MetricLevel::kInfo,
+    4 * 1024 * 1024, 1);
+METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_time_ms,
+    "Ranger server inbound queue time (ms)",
     kudu::MetricUnit::kMilliseconds,
-    "Duration of time in ms spent executing the Ranger subprocess request, excluding "
-    "time spent spent in the subprocess queues",
+    "Duration of time in ms spent in the Ranger server's inbound response queue",
     kudu::MetricLevel::kInfo,
     60000LU, 1);
+METRIC_DEFINE_histogram(server, ranger_server_outbound_queue_size_bytes,
+    "Ranger server outbound queue size (bytes)",
+    kudu::MetricUnit::kBytes,
+    "Number of bytes in the outbound request queue of the Ranger server, recorded "
+    "at the time a new request is added to the outbound request queue",
+    kudu::MetricLevel::kInfo,
+    4 * 1024 * 1024, 1);
 METRIC_DEFINE_histogram(server, ranger_server_outbound_queue_time_ms,
     "Ranger server outbound queue time (ms)",
     kudu::MetricUnit::kMilliseconds,
     "Duration of time in ms spent in the Ranger server's outbound request queue",
     kudu::MetricLevel::kInfo,
     60000LU, 1);
-METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_time_ms,
-    "Ranger server inbound queue time (ms)",
-    kudu::MetricUnit::kMilliseconds,
-    "Duration of time in ms spent in the Ranger server's inbound response queue",
-    kudu::MetricLevel::kInfo,
-    60000LU, 1);
 
 namespace kudu {
 namespace ranger {
diff --git a/src/kudu/subprocess/echo_subprocess.cc b/src/kudu/subprocess/echo_subprocess.cc
index 5e39114..df5634d 100644
--- a/src/kudu/subprocess/echo_subprocess.cc
+++ b/src/kudu/subprocess/echo_subprocess.cc
@@ -19,62 +19,78 @@
 
 #include "kudu/util/metrics.h"
 
+METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms,
+    "Echo subprocess execution time (ms)",
+    kudu::MetricUnit::kMilliseconds,
+    "Duration of time in ms spent executing the Echo subprocess request, excluding "
+    "time spent spent in the subprocess queues",
+    kudu::MetricLevel::kInfo,
+    60000LU, 1);
 METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_length,
     "Echo subprocess inbound queue length",
     kudu::MetricUnit::kMessages,
     "Number of request messages in the Echo subprocess' inbound request queue",
     kudu::MetricLevel::kInfo,
     1000, 1);
-METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_length,
-    "Echo subprocess outbound queue length",
-    kudu::MetricUnit::kMessages,
-    "Number of request messages in the Echo subprocess' outbound response queue",
-    kudu::MetricLevel::kInfo,
-    1000, 1);
 METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_time_ms,
     "Echo subprocess inbound queue time (ms)",
     kudu::MetricUnit::kMilliseconds,
     "Duration of time in ms spent in the Echo subprocess' inbound request queue",
     kudu::MetricLevel::kInfo,
     60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_length,
+    "Echo subprocess outbound queue length",
+    kudu::MetricUnit::kMessages,
+    "Number of request messages in the Echo subprocess' outbound response queue",
+    kudu::MetricLevel::kInfo,
+    1000, 1);
 METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_time_ms,
     "Echo subprocess outbound queue time (ms)",
     kudu::MetricUnit::kMilliseconds,
     "Duration of time in ms spent in the Echo subprocess' outbound response queue",
     kudu::MetricLevel::kInfo,
     60000LU, 1);
-METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms,
-    "Echo subprocess execution time (ms)",
+METRIC_DEFINE_histogram(server, echo_server_inbound_queue_size_bytes,
+    "Echo server inbound queue size (bytes)",
+    kudu::MetricUnit::kBytes,
+    "Number of bytes in the inbound response queue of the Echo server, recorded "
+    "at the time a new response is read from the pipe and added to the inbound queue",
+    kudu::MetricLevel::kInfo,
+    4 * 1024 * 1024, 1);
+METRIC_DEFINE_histogram(server, echo_server_inbound_queue_time_ms,
+    "Echo server inbound queue time (ms)",
     kudu::MetricUnit::kMilliseconds,
-    "Duration of time in ms spent executing the Echo subprocess request, excluding "
-    "time spent spent in the subprocess queues",
+    "Duration of time in ms spent in the Echo server's inbound response queue",
     kudu::MetricLevel::kInfo,
     60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_server_outbound_queue_size_bytes,
+    "Echo server outbound queue size (bytes)",
+    kudu::MetricUnit::kBytes,
+    "Number of bytes in the outbound request queue of the Echo server, recorded "
+    "at the time a new request is added to the outbound request queue",
+    kudu::MetricLevel::kInfo,
+    4 * 1024 * 1024, 1);
 METRIC_DEFINE_histogram(server, echo_server_outbound_queue_time_ms,
     "Echo server outbound queue time (ms)",
     kudu::MetricUnit::kMilliseconds,
     "Duration of time in ms spent in the Echo server's outbound request queue",
     kudu::MetricLevel::kInfo,
     60000LU, 1);
-METRIC_DEFINE_histogram(server, echo_server_inbound_queue_time_ms,
-    "Echo server inbound queue time (ms)",
-    kudu::MetricUnit::kMilliseconds,
-    "Duration of time in ms spent in the Echo server's inbound response queue",
-    kudu::MetricLevel::kInfo,
-    60000LU, 1);
 
 namespace kudu {
 namespace subprocess {
 
 #define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
 EchoSubprocessMetrics::EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
+  HISTINIT(server_inbound_queue_size_bytes, echo_server_inbound_queue_size_bytes);
+  HISTINIT(server_inbound_queue_time_ms, echo_server_inbound_queue_time_ms);
+  HISTINIT(server_outbound_queue_size_bytes, echo_server_outbound_queue_size_bytes);
+  HISTINIT(server_outbound_queue_time_ms, echo_server_outbound_queue_time_ms);
+  HISTINIT(sp_execution_time_ms, echo_subprocess_execution_time_ms);
   HISTINIT(sp_inbound_queue_length, echo_subprocess_inbound_queue_length);
-  HISTINIT(sp_outbound_queue_length, echo_subprocess_outbound_queue_length);
   HISTINIT(sp_inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms);
+  HISTINIT(sp_outbound_queue_length, echo_subprocess_outbound_queue_length);
   HISTINIT(sp_outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms);
-  HISTINIT(sp_execution_time_ms, echo_subprocess_execution_time_ms);
-  HISTINIT(server_outbound_queue_time_ms, echo_server_outbound_queue_time_ms);
-  HISTINIT(server_inbound_queue_time_ms, echo_server_inbound_queue_time_ms);
 }
 #undef HISTINIT
 
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 744d0dc..3c7607b 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -131,6 +131,8 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req,
   req->set_id(next_id_++);
   Synchronizer sync;
   auto cb = sync.AsStdStatusCallback();
+  // Before adding to the queue, record the size of the call queue.
+  metrics_.server_outbound_queue_size_bytes->Increment(outbound_call_queue_.size());
   CallAndTimer call_and_timer = {
       make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_), {} };
   RETURN_NOT_OK_PREPEND(
@@ -203,6 +205,8 @@ void SubprocessServer::ReceiveMessagesThread() {
     // subprocess.
     DCHECK(s.ok());
     WARN_NOT_OK(s, "failed to receive response from the subprocess");
+    // Before adding to the queue, record the size of the response queue.
+    metrics_.server_inbound_queue_size_bytes->Increment(inbound_response_queue_.size());
     ResponsePBAndTimer resp_and_timer = { std::move(response), {} };
     if (s.ok() && !inbound_response_queue_.BlockingPut(resp_and_timer).ok()) {
       // The queue has been shut down and we should shut down too.
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index ed03f27..512966b 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -66,14 +66,16 @@ struct SimpleTimer {
 
 struct SubprocessMetrics {
   // Metrics returned from the subprocess.
+  scoped_refptr<Histogram> sp_execution_time_ms;
   scoped_refptr<Histogram> sp_inbound_queue_length;
-  scoped_refptr<Histogram> sp_outbound_queue_length;
   scoped_refptr<Histogram> sp_inbound_queue_time_ms;
+  scoped_refptr<Histogram> sp_outbound_queue_length;
   scoped_refptr<Histogram> sp_outbound_queue_time_ms;
-  scoped_refptr<Histogram> sp_execution_time_ms;
 
   // Metrics recorded by the SubprocessServer.
+  scoped_refptr<Histogram> server_inbound_queue_size_bytes;
   scoped_refptr<Histogram> server_inbound_queue_time_ms;
+  scoped_refptr<Histogram> server_outbound_queue_size_bytes;
   scoped_refptr<Histogram> server_outbound_queue_time_ms;
 };
 
diff --git a/src/kudu/subprocess/subprocess_proxy-test.cc b/src/kudu/subprocess/subprocess_proxy-test.cc
index e83490c..ff612e4 100644
--- a/src/kudu/subprocess/subprocess_proxy-test.cc
+++ b/src/kudu/subprocess/subprocess_proxy-test.cc
@@ -45,6 +45,8 @@ METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_length);
 METRIC_DECLARE_histogram(echo_subprocess_inbound_queue_time_ms);
 METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_time_ms);
 METRIC_DECLARE_histogram(echo_subprocess_execution_time_ms);
+METRIC_DECLARE_histogram(echo_server_outbound_queue_size_bytes);
+METRIC_DECLARE_histogram(echo_server_inbound_queue_size_bytes);
 METRIC_DECLARE_histogram(echo_server_outbound_queue_time_ms);
 METRIC_DECLARE_histogram(echo_server_inbound_queue_time_ms);
 
@@ -119,6 +121,16 @@ TEST_F(EchoSubprocessTest, TestBasicSubprocessMetrics) {
   ASSERT_EQ(1, in_hist->TotalCount());
   ASSERT_LE(0, in_hist->MaxValueForTests());
 
+  // There shouldn't have anything bytes the server queues when we enqueue.
+  Histogram* server_in_size_hist =
+      GET_HIST(metric_entity_, echo_server_inbound_queue_size_bytes);
+  ASSERT_EQ(1, server_in_size_hist->TotalCount());
+  ASSERT_EQ(0, server_in_size_hist->MaxValueForTests());
+  Histogram* server_out_size_hist =
+      GET_HIST(metric_entity_, echo_server_outbound_queue_size_bytes);
+  ASSERT_EQ(1, server_out_size_hist->TotalCount());
+  ASSERT_EQ(0, server_out_size_hist->MaxValueForTests());
+
   // We should have some non-negative queue times on the server side too.
   Histogram* server_out_hist =
     GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms);
@@ -156,8 +168,10 @@ TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) {
   Histogram* in_len_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_length);
   Histogram* sp_out_hist = GET_HIST(metric_entity_, echo_subprocess_outbound_queue_time_ms);
   Histogram* sp_in_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_time_ms);
-  Histogram* server_out_hist = GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms);
-  Histogram* server_in_hist = GET_HIST(metric_entity_, echo_server_inbound_queue_time_ms);
+  Histogram* server_out_time_hist = GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms);
+  Histogram* server_out_size_hist = GET_HIST(metric_entity_, echo_server_outbound_queue_size_bytes);
+  Histogram* server_in_time_hist = GET_HIST(metric_entity_, echo_server_inbound_queue_time_ms);
+  Histogram* server_in_size_hist = GET_HIST(metric_entity_, echo_server_inbound_queue_size_bytes);
   ASSERT_EQ(0, exec_hist->TotalCount());
   ASSERT_EQ(0, out_len_hist->TotalCount());
   ASSERT_EQ(0, in_len_hist->TotalCount());
@@ -166,8 +180,10 @@ TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) {
 
   // We'll have sent the request from the server and not received the response.
   // Our metrics should reflect that.
-  ASSERT_EQ(1, server_out_hist->TotalCount());
-  ASSERT_EQ(0, server_in_hist->TotalCount());
+  ASSERT_EQ(1, server_out_time_hist->TotalCount());
+  ASSERT_EQ(1, server_out_size_hist->TotalCount());
+  ASSERT_EQ(0, server_in_time_hist->TotalCount());
+  ASSERT_EQ(0, server_in_size_hist->TotalCount());
 
   // Eventually the subprocess will return our call, and we'll see some
   // metrics.
@@ -179,8 +195,10 @@ TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) {
     ASSERT_EQ(1, in_len_hist->TotalCount());
     ASSERT_EQ(1, sp_out_hist->TotalCount());
     ASSERT_EQ(1, sp_in_hist->TotalCount());
-    ASSERT_EQ(1, server_out_hist->TotalCount());
-    ASSERT_EQ(1, server_in_hist->TotalCount());
+    ASSERT_EQ(1, server_out_time_hist->TotalCount());
+    ASSERT_EQ(1, server_in_time_hist->TotalCount());
+    ASSERT_EQ(1, server_out_size_hist->TotalCount());
+    ASSERT_EQ(1, server_in_size_hist->TotalCount());
   });
 }
 
diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc
index a0f1297..38d38dd 100644
--- a/src/kudu/util/blocking_queue-test.cc
+++ b/src/kudu/util/blocking_queue-test.cc
@@ -207,8 +207,11 @@ struct LengthLogicalSize {
 TEST(BlockingQueueTest, TestLogicalSize) {
   BlockingQueue<string, LengthLogicalSize> test_queue(4);
   ASSERT_EQ(test_queue.Put("a"), QUEUE_SUCCESS);
+  ASSERT_EQ(1, test_queue.size());
   ASSERT_EQ(test_queue.Put("bcd"), QUEUE_SUCCESS);
+  ASSERT_EQ(4, test_queue.size());
   ASSERT_EQ(test_queue.Put("e"), QUEUE_FULL);
+  ASSERT_EQ(4, test_queue.size());
 }
 
 TEST(BlockingQueueTest, TestNonPointerParamsMayBeNonEmptyOnDestruct) {
diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h
index 40734b4..c9d59a2 100644
--- a/src/kudu/util/blocking_queue.h
+++ b/src/kudu/util/blocking_queue.h
@@ -244,6 +244,11 @@ class BlockingQueue {
     return max_size_;
   }
 
+  size_t size() const {
+    MutexLock l(lock_);
+    return size_;
+  }
+
   std::string ToString() const {
     std::string ret;
 
@@ -269,7 +274,7 @@ class BlockingQueue {
 
   bool shutdown_;
   size_t size_;
-  size_t max_size_;
+  const size_t max_size_;
   mutable Mutex lock_;
   ConditionVariable not_empty_;
   ConditionVariable not_full_;