You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/05 22:14:03 UTC
[kudu] branch master updated: subprocess: add server-side metrics
This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 6f01667 subprocess: add server-side metrics
6f01667 is described below
commit 6f016678cbf8fa32714685cd5166bced87eb28e9
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Mar 3 20:22:11 2020 -0800
subprocess: add server-side metrics
This introduces server-side timing information to the list of
subprocess-related metrics.
Change-Id: I51294741ff82bd47e64ceaba18a6d04ae0144179
Reviewed-on: http://gerrit.cloudera.org:8080/15367
Reviewed-by: Hao Hao <ha...@cloudera.com>
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/ranger/ranger_client.cc | 24 +++++++++---
src/kudu/subprocess/echo_subprocess.cc | 24 +++++++++---
src/kudu/subprocess/server.cc | 44 ++++++++++++++-------
src/kudu/subprocess/server.h | 39 +++++++++++++------
src/kudu/subprocess/subprocess_proxy-test.cc | 58 ++++++++++++++++++++++------
5 files changed, 141 insertions(+), 48 deletions(-)
diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index 36141d9..8ca9812 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -80,6 +80,18 @@ METRIC_DEFINE_histogram(server, ranger_subprocess_execution_time_ms,
"time spent spent in the subprocess queues",
kudu::MetricLevel::kInfo,
60000LU, 1);
+METRIC_DEFINE_histogram(server, ranger_server_outbound_queue_time_ms,
+ "Ranger server outbound queue time (ms)",
+ kudu::MetricUnit::kMilliseconds,
+ "Duration of time in ms spent in the Ranger server's outbound request queue",
+ kudu::MetricLevel::kInfo,
+ 60000LU, 1);
+METRIC_DEFINE_histogram(server, ranger_server_inbound_queue_time_ms,
+ "Ranger server inbound queue time (ms)",
+ kudu::MetricUnit::kMilliseconds,
+ "Duration of time in ms spent in the Ranger server's inbound response queue",
+ kudu::MetricLevel::kInfo,
+ 60000LU, 1);
namespace kudu {
namespace ranger {
@@ -99,11 +111,13 @@ const char* kMainClass = "org.apache.kudu.ranger.RangerSubprocessMain";
#define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
RangerSubprocessMetrics::RangerSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
- HISTINIT(inbound_queue_length, ranger_subprocess_inbound_queue_length);
- HISTINIT(outbound_queue_length, ranger_subprocess_outbound_queue_length);
- HISTINIT(inbound_queue_time_ms, ranger_subprocess_inbound_queue_time_ms);
- HISTINIT(outbound_queue_time_ms, ranger_subprocess_outbound_queue_time_ms);
- HISTINIT(execution_time_ms, ranger_subprocess_execution_time_ms);
+ HISTINIT(sp_inbound_queue_length, ranger_subprocess_inbound_queue_length);
+ HISTINIT(sp_outbound_queue_length, ranger_subprocess_outbound_queue_length);
+ HISTINIT(sp_inbound_queue_time_ms, ranger_subprocess_inbound_queue_time_ms);
+ HISTINIT(sp_outbound_queue_time_ms, ranger_subprocess_outbound_queue_time_ms);
+ HISTINIT(sp_execution_time_ms, ranger_subprocess_execution_time_ms);
+ HISTINIT(server_outbound_queue_time_ms, ranger_server_outbound_queue_time_ms);
+ HISTINIT(server_inbound_queue_time_ms, ranger_server_inbound_queue_time_ms);
}
#undef HISTINIT
diff --git a/src/kudu/subprocess/echo_subprocess.cc b/src/kudu/subprocess/echo_subprocess.cc
index 9faac0e..5e39114 100644
--- a/src/kudu/subprocess/echo_subprocess.cc
+++ b/src/kudu/subprocess/echo_subprocess.cc
@@ -50,17 +50,31 @@ METRIC_DEFINE_histogram(server, echo_subprocess_execution_time_ms,
"time spent spent in the subprocess queues",
kudu::MetricLevel::kInfo,
60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_server_outbound_queue_time_ms,
+ "Echo server outbound queue time (ms)",
+ kudu::MetricUnit::kMilliseconds,
+ "Duration of time in ms spent in the Echo server's outbound request queue",
+ kudu::MetricLevel::kInfo,
+ 60000LU, 1);
+METRIC_DEFINE_histogram(server, echo_server_inbound_queue_time_ms,
+ "Echo server inbound queue time (ms)",
+ kudu::MetricUnit::kMilliseconds,
+ "Duration of time in ms spent in the Echo server's inbound response queue",
+ kudu::MetricLevel::kInfo,
+ 60000LU, 1);
namespace kudu {
namespace subprocess {
#define HISTINIT(member, x) member = METRIC_##x.Instantiate(entity)
EchoSubprocessMetrics::EchoSubprocessMetrics(const scoped_refptr<MetricEntity>& entity) {
- HISTINIT(inbound_queue_length, echo_subprocess_inbound_queue_length);
- HISTINIT(outbound_queue_length, echo_subprocess_outbound_queue_length);
- HISTINIT(inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms);
- HISTINIT(outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms);
- HISTINIT(execution_time_ms, echo_subprocess_execution_time_ms);
+ HISTINIT(sp_inbound_queue_length, echo_subprocess_inbound_queue_length);
+ HISTINIT(sp_outbound_queue_length, echo_subprocess_outbound_queue_length);
+ HISTINIT(sp_inbound_queue_time_ms, echo_subprocess_inbound_queue_time_ms);
+ HISTINIT(sp_outbound_queue_time_ms, echo_subprocess_outbound_queue_time_ms);
+ HISTINIT(sp_execution_time_ms, echo_subprocess_execution_time_ms);
+ HISTINIT(server_outbound_queue_time_ms, echo_server_outbound_queue_time_ms);
+ HISTINIT(server_inbound_queue_time_ms, echo_server_inbound_queue_time_ms);
}
#undef HISTINIT
diff --git a/src/kudu/subprocess/server.cc b/src/kudu/subprocess/server.cc
index f23c1af..744d0dc 100644
--- a/src/kudu/subprocess/server.cc
+++ b/src/kudu/subprocess/server.cc
@@ -21,6 +21,7 @@
#include <memory>
#include <ostream>
#include <string>
+#include <type_traits>
#include <utility>
#include <vector>
@@ -130,9 +131,11 @@ Status SubprocessServer::Execute(SubprocessRequestPB* req,
req->set_id(next_id_++);
Synchronizer sync;
auto cb = sync.AsStdStatusCallback();
- auto call = make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_);
- RETURN_NOT_OK_PREPEND(outbound_call_queue_.BlockingPut(call, call->deadline()),
- "couldn't enqueue call");
+ CallAndTimer call_and_timer = {
+ make_shared<SubprocessCall>(req, resp, &cb, MonoTime::Now() + call_timeout_), {} };
+ RETURN_NOT_OK_PREPEND(
+ outbound_call_queue_.BlockingPut(call_and_timer, call_and_timer.first->deadline()),
+ "couldn't enqueue call");
return sync.Wait();
}
@@ -200,7 +203,8 @@ void SubprocessServer::ReceiveMessagesThread() {
// subprocess.
DCHECK(s.ok());
WARN_NOT_OK(s, "failed to receive response from the subprocess");
- if (s.ok() && !inbound_response_queue_.BlockingPut(response).ok()) {
+ ResponsePBAndTimer resp_and_timer = { std::move(response), {} };
+ if (s.ok() && !inbound_response_queue_.BlockingPut(resp_and_timer).ok()) {
// The queue has been shut down and we should shut down too.
DCHECK_EQ(0, closing_.count());
LOG(INFO) << "failed to put response onto inbound queue";
@@ -212,12 +216,16 @@ void SubprocessServer::ReceiveMessagesThread() {
void SubprocessServer::ResponderThread() {
Status s;
do {
- vector<SubprocessResponsePB> resps;
+ vector<ResponsePBAndTimer> resps;
// NOTE: since we don't supply a deadline, this will only fail if the queue
// is shutting down. Also note that even if this fails because we're
// shutting down, we still populate 'resps' and must run their callbacks.
s = inbound_response_queue_.BlockingDrainTo(&resps);
- for (const auto& resp : resps) {
+ for (auto& resp_and_timer : resps) {
+ metrics_.server_inbound_queue_time_ms->Increment(
+ resp_and_timer.second.elapsed().ToMilliseconds());
+ const auto& resp = resp_and_timer.first;
+
if (!resp.has_id()) {
LOG(FATAL) << Substitute("Received invalid response: $0",
pb_util::SecureDebugString(resp));
@@ -226,18 +234,19 @@ void SubprocessServer::ResponderThread() {
// metrics.
if (PREDICT_TRUE(resp.has_metrics())) {
const auto& pb = resp.metrics();
- metrics_.inbound_queue_length->Increment(pb.inbound_queue_length());
- metrics_.outbound_queue_length->Increment(pb.outbound_queue_length());
- metrics_.inbound_queue_time_ms->Increment(pb.inbound_queue_time_ms());
- metrics_.outbound_queue_time_ms->Increment(pb.outbound_queue_time_ms());
- metrics_.execution_time_ms->Increment(pb.execution_time_ms());
+ metrics_.sp_inbound_queue_length->Increment(pb.inbound_queue_length());
+ metrics_.sp_outbound_queue_length->Increment(pb.outbound_queue_length());
+ metrics_.sp_inbound_queue_time_ms->Increment(pb.inbound_queue_time_ms());
+ metrics_.sp_outbound_queue_time_ms->Increment(pb.outbound_queue_time_ms());
+ metrics_.sp_execution_time_ms->Increment(pb.execution_time_ms());
}
}
vector<pair<shared_ptr<SubprocessCall>, SubprocessResponsePB>> calls_and_resps;
calls_and_resps.reserve(resps.size());
{
std::lock_guard<simple_spinlock> l(in_flight_lock_);
- for (auto& resp : resps) {
+ for (auto& resp_and_timer : resps) {
+ auto& resp = resp_and_timer.first;
auto id = resp.id();
auto call = EraseKeyReturnValuePtr(&call_by_id_, id);
if (call) {
@@ -290,7 +299,7 @@ void SubprocessServer::SendMessagesThread() {
DCHECK(message_protocol_) << "message protocol is not initialized";
Status s;
do {
- vector<shared_ptr<SubprocessCall>> calls;
+ vector<CallAndTimer> calls;
// NOTE: since we don't supply a deadline, this will only fail if the queue
// is shutting down. Also note that even if this fails because we're
// shutting down, we still populate 'calls' and should add them to the
@@ -298,14 +307,19 @@ void SubprocessServer::SendMessagesThread() {
s = outbound_call_queue_.BlockingDrainTo(&calls);
{
std::lock_guard<simple_spinlock> l(in_flight_lock_);
- for (const auto& call : calls) {
+ for (const auto& call_and_timer : calls) {
+ const auto& call = call_and_timer.first;
EmplaceOrDie(&call_by_id_, call->id(), call);
}
}
// NOTE: it's possible that before sending the request, the call already
// timed out and the deadline checker already called its callback. If so,
// the following call will no-op.
- for (const auto& call : calls) {
+ for (const auto& call_and_timer : calls) {
+ const auto& call = call_and_timer.first;
+ metrics_.server_outbound_queue_time_ms->Increment(
+ call_and_timer.second.elapsed().ToMilliseconds());
+
call->SendRequest(message_protocol_.get());
}
} while (s.ok());
diff --git a/src/kudu/subprocess/server.h b/src/kudu/subprocess/server.h
index d4e0ebe..ed03f27 100644
--- a/src/kudu/subprocess/server.h
+++ b/src/kudu/subprocess/server.h
@@ -54,12 +54,27 @@ namespace subprocess {
typedef int64_t CallId;
+struct SimpleTimer {
+ SimpleTimer() {
+ start_time = MonoTime::Now();
+ }
+ MonoTime start_time;
+ MonoDelta elapsed() const {
+ return MonoTime::Now() - start_time;
+ }
+};
+
struct SubprocessMetrics {
- scoped_refptr<Histogram> inbound_queue_length;
- scoped_refptr<Histogram> outbound_queue_length;
- scoped_refptr<Histogram> inbound_queue_time_ms;
- scoped_refptr<Histogram> outbound_queue_time_ms;
- scoped_refptr<Histogram> execution_time_ms;
+ // Metrics returned from the subprocess.
+ scoped_refptr<Histogram> sp_inbound_queue_length;
+ scoped_refptr<Histogram> sp_outbound_queue_length;
+ scoped_refptr<Histogram> sp_inbound_queue_time_ms;
+ scoped_refptr<Histogram> sp_outbound_queue_time_ms;
+ scoped_refptr<Histogram> sp_execution_time_ms;
+
+ // Metrics recorded by the SubprocessServer.
+ scoped_refptr<Histogram> server_inbound_queue_time_ms;
+ scoped_refptr<Histogram> server_outbound_queue_time_ms;
};
// Encapsulates the pending state of a request that is in the process of being
@@ -155,19 +170,21 @@ class SubprocessCall {
};
// Used by BlockingQueue to determine the size of messages.
+typedef std::pair<std::shared_ptr<SubprocessCall>, SimpleTimer> CallAndTimer;
struct RequestLogicalSize {
- static size_t logical_size(const std::shared_ptr<SubprocessCall>& call) {
- return call->req_->ByteSizeLong();
+ static size_t logical_size(const CallAndTimer& call_and_timer) {
+ return call_and_timer.first->req_->ByteSizeLong();
}
};
+typedef std::pair<SubprocessResponsePB, SimpleTimer> ResponsePBAndTimer;
struct ResponseLogicalSize {
- static size_t logical_size(const SubprocessResponsePB& response) {
- return response.ByteSizeLong();
+ static size_t logical_size(const ResponsePBAndTimer& resp_and_timer) {
+ return resp_and_timer.first.ByteSizeLong();
}
};
-typedef BlockingQueue<std::shared_ptr<SubprocessCall>, RequestLogicalSize> SubprocessCallQueue;
-typedef BlockingQueue<SubprocessResponsePB, ResponseLogicalSize> ResponseQueue;
+typedef BlockingQueue<CallAndTimer, RequestLogicalSize> SubprocessCallQueue;
+typedef BlockingQueue<ResponsePBAndTimer, ResponseLogicalSize> ResponseQueue;
// Wrapper for a subprocess that communicates via protobuf. A server is
// comprised of a few things to facilitate concurrent communication with an
diff --git a/src/kudu/subprocess/subprocess_proxy-test.cc b/src/kudu/subprocess/subprocess_proxy-test.cc
index 897dd18..e83490c 100644
--- a/src/kudu/subprocess/subprocess_proxy-test.cc
+++ b/src/kudu/subprocess/subprocess_proxy-test.cc
@@ -45,6 +45,8 @@ METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_length);
METRIC_DECLARE_histogram(echo_subprocess_inbound_queue_time_ms);
METRIC_DECLARE_histogram(echo_subprocess_outbound_queue_time_ms);
METRIC_DECLARE_histogram(echo_subprocess_execution_time_ms);
+METRIC_DECLARE_histogram(echo_server_outbound_queue_time_ms);
+METRIC_DECLARE_histogram(echo_server_inbound_queue_time_ms);
using std::unique_ptr;
using std::string;
@@ -87,6 +89,9 @@ class EchoSubprocessTest : public KuduTest {
unique_ptr<EchoSubprocess> echo_subprocess_;
};
+#define GET_HIST(metric_entity, metric_name) \
+ down_cast<Histogram*>((metric_entity)->FindOrNull(METRIC_##metric_name).get());
+
TEST_F(EchoSubprocessTest, TestBasicSubprocessMetrics) {
const string kMessage = "don't catch you slippin' now";
const int64_t kSleepMs = 1000;
@@ -97,29 +102,35 @@ TEST_F(EchoSubprocessTest, TestBasicSubprocessMetrics) {
ASSERT_OK(echo_subprocess_->Execute(req, &resp));
ASSERT_EQ(kMessage, resp.data());
+
// There shouldn't have anything in the subprocess queues.
- Histogram* in_len_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
- METRIC_echo_subprocess_inbound_queue_length).get());
+ Histogram* in_len_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_length);
ASSERT_EQ(1, in_len_hist->TotalCount());
ASSERT_EQ(0, in_len_hist->MaxValueForTests());
- Histogram* out_len_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
- METRIC_echo_subprocess_outbound_queue_length).get());
+ Histogram* out_len_hist = GET_HIST(metric_entity_, echo_subprocess_outbound_queue_length);
ASSERT_EQ(1, out_len_hist->TotalCount());
ASSERT_EQ(0, out_len_hist->MaxValueForTests());
// We should have some non-negative queue times.
- Histogram* out_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
- METRIC_echo_subprocess_outbound_queue_time_ms).get());
+ Histogram* out_hist = GET_HIST(metric_entity_, echo_subprocess_outbound_queue_time_ms);
ASSERT_EQ(1, out_hist->TotalCount());
ASSERT_LE(0, out_hist->MaxValueForTests());
- Histogram* in_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
- METRIC_echo_subprocess_inbound_queue_time_ms).get());
+ Histogram* in_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_time_ms);
ASSERT_EQ(1, in_hist->TotalCount());
ASSERT_LE(0, in_hist->MaxValueForTests());
+ // We should have some non-negative queue times on the server side too.
+ Histogram* server_out_hist =
+ GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms);
+ ASSERT_EQ(1, server_out_hist->TotalCount());
+ ASSERT_LE(0, server_out_hist->MaxValueForTests());
+ Histogram* server_in_hist =
+ GET_HIST(metric_entity_, echo_server_inbound_queue_time_ms);
+ ASSERT_EQ(1, server_in_hist->TotalCount());
+ ASSERT_LE(0, server_in_hist->MaxValueForTests());
+
// The execution should've taken at least our sleep time.
- Histogram* exec_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
- METRIC_echo_subprocess_execution_time_ms).get());
+ Histogram* exec_hist = GET_HIST(metric_entity_, echo_subprocess_execution_time_ms);
ASSERT_EQ(1, exec_hist->TotalCount());
ASSERT_LT(kSleepMs, exec_hist->MaxValueForTests());
}
@@ -140,17 +151,40 @@ TEST_F(EchoSubprocessTest, TestSubprocessMetricsOnError) {
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
// Immediately following our call, we won't have any metrics from the subprocess.
- Histogram* exec_hist = down_cast<Histogram*>(metric_entity_->FindOrNull(
- METRIC_echo_subprocess_execution_time_ms).get());
+ Histogram* exec_hist = GET_HIST(metric_entity_, echo_subprocess_execution_time_ms);
+ Histogram* out_len_hist = GET_HIST(metric_entity_, echo_subprocess_outbound_queue_length);
+ Histogram* in_len_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_length);
+ Histogram* sp_out_hist = GET_HIST(metric_entity_, echo_subprocess_outbound_queue_time_ms);
+ Histogram* sp_in_hist = GET_HIST(metric_entity_, echo_subprocess_inbound_queue_time_ms);
+ Histogram* server_out_hist = GET_HIST(metric_entity_, echo_server_outbound_queue_time_ms);
+ Histogram* server_in_hist = GET_HIST(metric_entity_, echo_server_inbound_queue_time_ms);
ASSERT_EQ(0, exec_hist->TotalCount());
+ ASSERT_EQ(0, out_len_hist->TotalCount());
+ ASSERT_EQ(0, in_len_hist->TotalCount());
+ ASSERT_EQ(0, sp_out_hist->TotalCount());
+ ASSERT_EQ(0, sp_in_hist->TotalCount());
+
+ // We'll have sent the request from the server and not received the response.
+ // Our metrics should reflect that.
+ ASSERT_EQ(1, server_out_hist->TotalCount());
+ ASSERT_EQ(0, server_in_hist->TotalCount());
// Eventually the subprocess will return our call, and we'll see some
// metrics.
ASSERT_EVENTUALLY([&] {
ASSERT_EQ(1, exec_hist->TotalCount());
ASSERT_LT(kSleepMs, exec_hist->MaxValueForTests());
+
+ ASSERT_EQ(1, out_len_hist->TotalCount());
+ ASSERT_EQ(1, in_len_hist->TotalCount());
+ ASSERT_EQ(1, sp_out_hist->TotalCount());
+ ASSERT_EQ(1, sp_in_hist->TotalCount());
+ ASSERT_EQ(1, server_out_hist->TotalCount());
+ ASSERT_EQ(1, server_in_hist->TotalCount());
});
}
+#undef GET_HIST
+
} // namespace subprocess
} // namespace kudu