You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/03/06 05:12:38 UTC
[kudu] 02/03: subprocess: add server metric for queue size
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 771d36f84ef86c80334b9c322bf0748397cd8dd9
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Thu Mar 5 01:10:37 2020 -0800
subprocess: add server metric for queue size
Unlike the Java in/outbound queues, the C++ queues are measured in
bytes. This exposes histogram metrics for the sizes of the inbound and
outbound queue at Put-time.
This also adds a method to get the logical size of a blocking queue.
Change-Id: I62d2d5727dca3f54b59ff1044431326cbdde855d
Reviewed-on: http://gerrit.cloudera.org:8080/15375
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/ranger/ranger_client.cc | 46 +++++++++++++++--------
src/kudu/subprocess/echo_subprocess.cc | 56 ++++++++++++++++++----------
src/kudu/subprocess/server.cc | 4 ++
src/kudu/subprocess/server.h | 6 ++-
src/kudu/subprocess/subprocess_proxy-test.cc | 30 ++++++++++++---
src/kudu/util/blocking_queue-test.cc | 3 ++
src/kudu/util/blocking_queue.h | 7 +++-
7 files changed, 107 insertions(+), 45 deletions(-)
diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index 8ca9812..c8ca24f 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -49,49 +49,63 @@ DEFINE_string(ranger_jar_path, "",
"Path to the JAR file containing the Ranger subprocess.");
TAG_FLAG(ranger_jar_path, experimental);
+METRIC_DEFINE_histogram(server, ranger_subprocess_execution_time_ms,
+ "Ranger subprocess execution time (ms)",
+ kudu::MetricUnit::kMilliseconds,
+ "Duration of time in ms spent executing the Ranger subprocess request, excluding "
+ "time spent spent in the subprocess queues",
+ kudu::MetricLevel::kInfo,
+ 60000LU, 1);
METRIC_DEFINE_histogram(server, ranger_subprocess_inbound_queue_length,
"Ranger subprocess inbound queue length",
kudu::MetricUnit::kMessages,
"Number of request messages in the Ranger subprocess' inbound request queue",
kudu::MetricLevel::kInfo,
1000, 1);
-METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_length,
- "Ranger subprocess outbound queue length",
- kudu::MetricUnit::kMessages,
- "Number of request messages in the Ranger subprocess' outbound response queue",
- kudu::MetricLevel::kInfo,
- 1000, 1);
METRIC_DEFINE_histogram(server, ranger_subprocess_inbound_queue_time_ms,
"Ranger subprocess inbound queue time (ms)",
kudu::MetricUnit::kMilliseconds,
"Duration of time in ms spent in the Ranger subprocess' inbound request queue",
kudu::MetricLevel::kInfo,
60000LU, 1);
+METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_length,
+ "Ranger subprocess outbound queue length",
+ kudu::MetricUnit::kMessages,
+ "Number of request messages in the Ranger subprocess' outbound response queue",
+ kudu::MetricLevel::kInfo,
+ 1000, 1);
METRIC_DEFINE_histogram(server, ranger_subprocess_outbound_queue_time_ms,
"Ranger subprocess outbound queue time (ms)",
kudu::MetricUnit::kMilliseconds,
"Duration of time in ms spent in the Ranger subprocess' outbound response queue",
kudu::MetricLevel::kInfo,
60000LU, 1);
-METRIC_DEFINE_histogram(server, ranger_subprocess_execution_time_ms,
- "Ranger subprocess execution time (ms)",
+METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_size_bytes,
+ "Ranger server inbound queue size (bytes)",
+ kudu::MetricUnit::kBytes,
+ "Number of bytes in the inbound response queue of the Ranger server, recorded "
+ "at the time a new response is read from the pipe and added to the inbound queue",
+ kudu::MetricLevel::kInfo,
+ 4 * 1024 * 1024, 1);
+METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_time_ms,
+ "Ranger server inbound queue time (ms)",
kudu::MetricUnit::kMilliseconds,
- "Duration of time in ms spent executing the Ranger subprocess request, excluding "
- "time spent spent in the subprocess queues",
+ "Duration of time in ms spent in the Ranger server's inbound response queue",
kudu::MetricLevel::kInfo,
60000LU, 1);
+METRIC_DEFINE_histogram(server, ranger_server_outbound_queue_size_bytes,
+ "Ranger server outbound queue size (bytes)",
+ kudu::MetricUnit::kBytes,
+ "Number of bytes in the outbound request queue of the Ranger server, recorded "
+ "at the time a new request is added to the outbound request queue",
+ kudu::MetricLevel::kInfo,
+ 4 * 1024 * 1024, 1);
METRIC_DEFINE_histogram(server, ranger_server_outbound_queue_time_ms,
"Ranger server outbound queue time (ms)",
kudu::MetricUnit::kMilliseconds,
"Duration of time in ms spent in the Ranger server's outbound request queue",
kudu::MetricLevel::kInfo,
60000LU, 1);
-METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_time_ms,
- "Ranger server inbound queue time (ms)",
- kudu::MetricUnit::kMilliseconds,
- "Duration of time in ms spent in the Ranger server's inbound response queue",
- kudu::MetricLevel::kInfo,
- 60000LU, 1);
namespace kudu {
namespace ranger {
diff --git a/src/kudu/subprocess/echo_subprocess.cc b/src/kudu/subprocess/echo_subprocess.cc
index 5e39114..df5634d 100644
--- a/src/kudu/subprocess/echo_subprocess.cc
+++ b/src/kudu/subprocess/echo_subprocess.cc
@@ -19,62 +19,78 @@
#include "kudu/util/metrics.h"
+METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms,
+ "Echo subprocess execution time (ms)",
+ kudu::MetricUnit::kMilliseconds,
+ "Duration of time in ms spent executing the Echo subprocess request, excluding "
+ "time spent spent in the subprocess queues",
+ kudu::MetricLevel::kInfo,
+ 60000LU, 1);
METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_length,
"Echo subprocess inbound queue length",
kudu::MetricUnit::kMessages,
"Number of request messages in the Echo subprocess' inbound request queue",
kudu::MetricLevel::kInfo,
1000, 1);
-METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_length,
- "Echo subprocess outbound queue length",
- kudu::MetricUnit::kMessages,
- "Number of request messages in the Echo subprocess' outbound response queue",
- kudu::MetricLevel::kInfo,
- 1000, 1);
METRIC_DEFINE_histogram(server, echo_subprocess_inbound_queue_time_ms,
"Echo subprocess inbound queue time (ms)",
kudu::MetricUnit::kMilliseconds,
"Duration of time in ms spent in the Echo subprocess' inbound request queue",
kudu::MetricLevel::kInfo,
60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_length,
+ "Echo subprocess outbound queue length",
+ kudu::MetricUnit::kMessages,
+ "Number of request messages in the Echo subprocess' outbound response queue",
+ kudu::MetricLevel::kInfo,
+ 1000, 1);
METRIC_DEFINE_histogram(server, echo_subprocess_outbound_queue_time_ms,
"Echo subprocess outbound queue time (ms)",
kudu::MetricUnit::kMilliseconds,
"Duration of time in ms spent in the Echo subprocess' outbound response queue",
kudu::MetricLevel::kInfo,
60000LU, 1);
-METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms,
- "Echo subprocess execution time (ms)",
+METRIC_DEFINE_histogram(server, echo_server_inbound_queue_size_bytes,
+ "Echo server inbound queue size (bytes)",
+ kudu::MetricUnit::kBytes,
+ "Number of bytes in the inbound response queue of the Echo server, recorded "
+ "at the time a new response is read from the pipe and added to the inbound queue",
+ kudu::MetricLevel::kInfo,
+ 4 * 1024 * 1024, 1);
+METRIC_DEFINE_histogram(server, echo_server_inbound_queue_time_ms,
+ "Echo server inbound queue time (ms)",
kudu::MetricUnit::kMilliseconds,
- "Duration of time in ms spent executing the Echo subprocess request, excluding "
- "time spent spent in the subprocess queues",
+ "Duration of time in ms spent in the Echo server's inbound response queue",
kudu::MetricLevel::kInfo,
60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_server_outbound_queue_size_bytes,
+ "Echo server outbound queue size (bytes)",
+ kudu::MetricUnit::kBytes,
+ "Number of bytes in the outbound request queue of the Echo server, recorded "
+ "at the time a new request is added to the outbound request queue",
+ kudu::MetricLevel::kInfo,
+ 4 * 1024 * 1024, 1);
METRIC_DEFINE_histogram(server, echo_server_outbound_queue_time_ms,
"Echo server outbound queue time (ms)",
kudu::MetricUnit::kMilliseconds,
"Duration of time in ms spent in the Echo server's outbound request queue",
kudu::MetricLevel::kInfo,
60000LU, 1);
-METRIC_DEFINE_histogram(server, echo_server_inbound_queue_time_ms,
- "Echo server inbound queue time (ms)",
- kudu::MetricUnit::kMilliseconds,
- "Duration of time in ms spent in the Echo server's inbound response queue",
- kudu::MetricLevel::kInfo,
- 60000LU, 1);
namespace kudu {
namespace subprocess {
#define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
EchoSubprocessMetrics::EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
+ HISTINIT(server_inbound_queue_size_bytes, echo_server_inbound_queue_size_bytes);
+ HISTINIT(server_inbound_queue_time_ms, echo_server_inbound_queue_time_ms);
+ HISTINIT(server_outbound_queue_size_bytes, echo_server_outbound_queue_size_bytes);
+ HISTINIT(server_outbound_queue_time_ms, echo_server_outbound_queue_time_ms);
+ HISTINIT(sp_execution_time_ms, echo_subprocess_execution_time_ms);
HISTINIT(sp_inbound_queue_length, echo_subprocess_inbound_queue_length);
- HISTINIT(sp_outbound_queue_length, echo_subprocess_outbound_queue_length);
HISTINIT(sp_inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms);
+ HISTINIT(sp_outbound_queue_length, echo_subprocess_outbound_queue_length);
HISTINIT(sp_outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms);
- HISTINIT(sp_execution_time_ms, echo_subprocess_execution_time_ms);
- HISTINIT(server_outbound_queue_time_ms, echo_server_outbound_queue_time_ms);
- HISTINIT(server_inbound_queue_time_ms, echo_server_inbound_queue_time_ms);
}
#undef HISTINIT
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index 744d0dc..3c7607b 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -131,6 +131,8 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req,
req->set_id(next_id_++);
Synchronizer sync;
auto cb = sync.AsStdStatusCallback();
+ // Before adding to the queue, record the size of the call queue.
+ metrics_.server_outbound_queue_size_bytes->Increment(outbound_call_queue_.size());
CallAndTimer call_and_timer = {
make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_), {} };
RETURN_NOT_OK_PREPEND(
@@ -203,6 +205,8 @@ void SubprocessServer::ReceiveMessagesThread() {
// subprocess.
DCHECK(s.ok());
WARN_NOT_OK(s, "failed to receive response from the subprocess");
+ // Before adding to the queue, record the size of the response queue.
+ metrics_.server_inbound_queue_size_bytes->Increment(inbound_response_queue_.size());
ResponsePBAndTimer resp_and_timer = { std::move(response), {} };
if (s.ok() && !inbound_response_queue_.BlockingPut(resp_and_timer).ok()) {
// The queue has been shut down and we should shut down too.
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index ed03f27..512966b 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -66,14 +66,16 @@ struct SimpleTimer {
struct SubprocessMetrics {
// Metrics returned from the subprocess.
+ scoped_refptr<Histogram> sp_execution_time_ms;
scoped_refptr<Histogram> sp_inbound_queue_length;
- scoped_refptr<Histogram> sp_outbound_queue_length;
scoped_refptr<Histogram> sp_inbound_queue_time_ms;
+ scoped_refptr<Histogram> sp_outbound_queue_length;
scoped_refptr<Histogram> sp_outbound_queue_time_ms;
- scoped_refptr<Histogram> sp_execution_time_ms;
// Metrics recorded by the SubprocessServer.
+ scoped_refptr<Histogram> server_inbound_queue_size_bytes;
scoped_refptr<Histogram> server_inbound_queue_time_ms;
+ scoped_refptr<Histogram> server_outbound_queue_size_bytes;
scoped_refptr<Histogram> server_outbound_queue_time_ms;
};
diff --git a/src/kudu/subprocess/subprocess_proxy-test.cc b/src/kudu/subprocess/subprocess_proxy-test.cc
index e83490c..ff612e4 100644
--- a/src/kudu/subprocess/subprocess_proxy-test.cc
+++ b/src/kudu/subprocess/subprocess_proxy-test.cc
@@ -45,6 +45,8 @@ METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_length);
METRIC_DECLARE_histogram(echo_subprocess_inbound_queue_time_ms);
METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_time_ms);
METRIC_DECLARE_histogram(echo_subprocess_execution_time_ms);
+METRIC_DECLARE_histogram(echo_server_outbound_queue_size_bytes);
+METRIC_DECLARE_histogram(echo_server_inbound_queue_size_bytes);
METRIC_DECLARE_histogram(echo_server_outbound_queue_time_ms);
METRIC_DECLARE_histogram(echo_server_inbound_queue_time_ms);
@@ -119,6 +121,16 @@ TEST_F(EchoSubprocessTest, TestBasicSubprocessMetrics) {
ASSERT_EQ(1, in_hist->TotalCount());
ASSERT_LE(0, in_hist->MaxValueForTests());
+ // There shouldn't have anything bytes the server queues when we enqueue.
+ Histogram* server_in_size_hist =
+ GET_HIST(metric_entity_, echo_server_inbound_queue_size_bytes);
+ ASSERT_EQ(1, server_in_size_hist->TotalCount());
+ ASSERT_EQ(0, server_in_size_hist->MaxValueForTests());
+ Histogram* server_out_size_hist =
+ GET_HIST(metric_entity_, echo_server_outbound_queue_size_bytes);
+ ASSERT_EQ(1, server_out_size_hist->TotalCount());
+ ASSERT_EQ(0, server_out_size_hist->MaxValueForTests());
+
// We should have some non-negative queue times on the server side too.
Histogram* server_out_hist =
GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms);
@@ -156,8 +168,10 @@ TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) {
Histogram* in_len_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_length);
Histogram* sp_out_hist = GET_HIST(metric_entity_, echo_subprocess_outbound_queue_time_ms);
Histogram* sp_in_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_time_ms);
- Histogram* server_out_hist = GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms);
- Histogram* server_in_hist = GET_HIST(metric_entity_, echo_server_inbound_queue_time_ms);
+ Histogram* server_out_time_hist = GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms);
+ Histogram* server_out_size_hist = GET_HIST(metric_entity_, echo_server_outbound_queue_size_bytes);
+ Histogram* server_in_time_hist = GET_HIST(metric_entity_, echo_server_inbound_queue_time_ms);
+ Histogram* server_in_size_hist = GET_HIST(metric_entity_, echo_server_inbound_queue_size_bytes);
ASSERT_EQ(0, exec_hist->TotalCount());
ASSERT_EQ(0, out_len_hist->TotalCount());
ASSERT_EQ(0, in_len_hist->TotalCount());
@@ -166,8 +180,10 @@ TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) {
// We'll have sent the request from the server and not received the response.
// Our metrics should reflect that.
- ASSERT_EQ(1, server_out_hist->TotalCount());
- ASSERT_EQ(0, server_in_hist->TotalCount());
+ ASSERT_EQ(1, server_out_time_hist->TotalCount());
+ ASSERT_EQ(1, server_out_size_hist->TotalCount());
+ ASSERT_EQ(0, server_in_time_hist->TotalCount());
+ ASSERT_EQ(0, server_in_size_hist->TotalCount());
// Eventually the subprocess will return our call, and we'll see some
// metrics.
@@ -179,8 +195,10 @@ TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) {
ASSERT_EQ(1, in_len_hist->TotalCount());
ASSERT_EQ(1, sp_out_hist->TotalCount());
ASSERT_EQ(1, sp_in_hist->TotalCount());
- ASSERT_EQ(1, server_out_hist->TotalCount());
- ASSERT_EQ(1, server_in_hist->TotalCount());
+ ASSERT_EQ(1, server_out_time_hist->TotalCount());
+ ASSERT_EQ(1, server_in_time_hist->TotalCount());
+ ASSERT_EQ(1, server_out_size_hist->TotalCount());
+ ASSERT_EQ(1, server_in_size_hist->TotalCount());
});
}
diff --git a/src/kudu/util/blocking_queue-test.cc b/src/kudu/util/blocking_queue-test.cc
index a0f1297..38d38dd 100644
--- a/src/kudu/util/blocking_queue-test.cc
+++ b/src/kudu/util/blocking_queue-test.cc
@@ -207,8 +207,11 @@ struct LengthLogicalSize {
TEST(BlockingQueueTest, TestLogicalSize) {
BlockingQueue<string, LengthLogicalSize> test_queue(4);
ASSERT_EQ(test_queue.Put("a"), QUEUE_SUCCESS);
+ ASSERT_EQ(1, test_queue.size());
ASSERT_EQ(test_queue.Put("bcd"), QUEUE_SUCCESS);
+ ASSERT_EQ(4, test_queue.size());
ASSERT_EQ(test_queue.Put("e"), QUEUE_FULL);
+ ASSERT_EQ(4, test_queue.size());
}
TEST(BlockingQueueTest, TestNonPointerParamsMayBeNonEmptyOnDestruct) {
diff --git a/src/kudu/util/blocking_queue.h b/src/kudu/util/blocking_queue.h
index 40734b4..c9d59a2 100644
--- a/src/kudu/util/blocking_queue.h
+++ b/src/kudu/util/blocking_queue.h
@@ -244,6 +244,11 @@ class BlockingQueue {
return max_size_;
}
+ size_t size() const {
+ MutexLock l(lock_);
+ return size_;
+ }
+
std::string ToString() const {
std::string ret;
@@ -269,7 +274,7 @@ class BlockingQueue {
bool shutdown_;
size_t size_;
- size_t max_size_;
+ const size_t max_size_;
mutable Mutex lock_;
ConditionVariable not_empty_;
ConditionVariable not_full_;