You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/04/20 20:29:16 UTC
incubator-kudu git commit: KUDU-1410 (part 5): rpc: sample each RPC
method in several latency buckets, expose on /rpcz
Repository: incubator-kudu
Updated Branches:
refs/heads/master 05f816e9b -> 3cb45e969
KUDU-1410 (part 5): rpc: sample each RPC method in several latency buckets, expose on /rpcz
This changes RpczStore to keep several sample traces for each RPC, categorized
into several buckets (fast, slow, slower). Currently the thresholds for those
buckets are hard-coded, but follow-up work could make them adaptive based on
previous RPC percentiles, or user-configurable. Having tested this with a
YCSB workload, I found that they are useful even in this simple implementation.
Change-Id: I0eadb3f7035b5b156cb624ce50876876a5698b17
Reviewed-on: http://gerrit.cloudera.org:8080/2798
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/3cb45e96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/3cb45e96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/3cb45e96
Branch: refs/heads/master
Commit: 3cb45e9696f87775ea1dcd6ee27137326d88e451
Parents: 05f816e
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Apr 13 14:44:44 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Apr 20 18:28:58 2016 +0000
----------------------------------------------------------------------
src/kudu/rpc/inbound_call.h | 12 +++
src/kudu/rpc/rpc-test-base.h | 1 +
src/kudu/rpc/rpc_introspection.proto | 25 +++++
src/kudu/rpc/rpc_stub-test.cc | 29 ++++++
src/kudu/rpc/rpcz_store.cc | 161 ++++++++++++++++++++++++++++--
src/kudu/rpc/rpcz_store.h | 24 ++++-
src/kudu/server/rpcz-path-handler.cc | 28 ++++--
7 files changed, 265 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 30b38b3..cc35383 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -57,6 +57,10 @@ struct InboundCallTiming {
MonoTime time_received; // Time the call was first accepted.
MonoTime time_handled; // Time the call handler was kicked off.
MonoTime time_completed; // Time the call handler completed.
+
+ MonoDelta TotalDuration() const {
+ return time_completed.GetDeltaSince(time_received);
+ }
};
// Inbound call on server
@@ -133,6 +137,14 @@ class InboundCall {
Trace* trace();
+ const InboundCallTiming& timing() const {
+ return timing_;
+ }
+
+ const RequestHeader& header() const {
+ return header_;
+ }
+
// Associate this call with a particular method that will be invoked
// by the service.
void set_method_info(RpcMethodInfo* info) {
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index fa3857d..9be3bcb 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -236,6 +236,7 @@ class CalculatorService : public CalculatorServiceIf {
private:
void DoSleep(const SleepRequestPB *req,
RpcContext *context) {
+ TRACE_COUNTER_INCREMENT("test_sleep_us", req->sleep_micros());
SleepFor(MonoDelta::FromMicroseconds(req->sleep_micros()));
context->RespondSuccess();
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/rpc/rpc_introspection.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_introspection.proto b/src/kudu/rpc/rpc_introspection.proto
index be72450..c4b6faa 100644
--- a/src/kudu/rpc/rpc_introspection.proto
+++ b/src/kudu/rpc/rpc_introspection.proto
@@ -52,3 +52,28 @@ message DumpRunningRpcsResponsePB {
repeated RpcConnectionPB inbound_connections = 1;
repeated RpcConnectionPB outbound_connections = 2;
}
+
+//------------------------------------------------------------
+
+// A single sampled RPC call.
+message RpczSamplePB {
+ // The original request header.
+ optional RequestHeader header = 1;
+ // The stringified request trace.
+ optional string trace = 2;
+ // The number of millis that this call took to complete.
+ optional int32 duration_ms = 3;
+}
+
+// A set of samples for a particular RPC method.
+message RpczMethodPB {
+ required string method_name = 1;
+ repeated RpczSamplePB samples = 2;
+}
+
+// Request and response for dumping previously sampled RPC calls.
+message DumpRpczStoreRequestPB {
+}
+message DumpRpczStoreResponsePB {
+ repeated RpczMethodPB methods = 1;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/rpc/rpc_stub-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_stub-test.cc b/src/kudu/rpc/rpc_stub-test.cc
index 8ac3dba..193da0f 100644
--- a/src/kudu/rpc/rpc_stub-test.cc
+++ b/src/kudu/rpc/rpc_stub-test.cc
@@ -27,6 +27,7 @@
#include "kudu/gutil/stl_util.h"
#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpcz_store.h"
#include "kudu/rpc/rtest.proxy.h"
#include "kudu/rpc/rtest.service.h"
#include "kudu/rpc/rpc-test-base.h"
@@ -487,6 +488,34 @@ TEST_F(RpcStubTest, TestDumpCallsInFlight) {
sleep.latch.Wait();
}
+TEST_F(RpcStubTest, TestDumpSampledCalls) {
+ CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+ // Issue two calls that fall into different latency buckets.
+ AsyncSleep sleeps[2];
+ sleeps[0].req.set_sleep_micros(150 * 1000); // 150ms
+ sleeps[1].req.set_sleep_micros(1500 * 1000); // 1500ms
+
+ for (auto& sleep : sleeps) {
+ p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc,
+ boost::bind(&CountDownLatch::CountDown, &sleep.latch));
+ }
+ for (auto& sleep : sleeps) {
+ sleep.latch.Wait();
+ }
+
+ // Dump the sampled RPCs and expect to see the calls
+ // above.
+
+ DumpRpczStoreResponsePB sampled_rpcs;
+ server_messenger_->rpcz_store()->DumpPB(DumpRpczStoreRequestPB(), &sampled_rpcs);
+ EXPECT_EQ(sampled_rpcs.methods_size(), 1);
+ ASSERT_STR_CONTAINS(sampled_rpcs.DebugString(), "{\\\"test_sleep_us\\\":150000");
+ ASSERT_STR_CONTAINS(sampled_rpcs.DebugString(), "{\\\"test_sleep_us\\\":1500000}");
+ ASSERT_STR_CONTAINS(sampled_rpcs.DebugString(), "SleepRequestPB");
+ ASSERT_STR_CONTAINS(sampled_rpcs.DebugString(), "duration_ms");
+}
+
namespace {
struct RefCountedTest : public RefCountedThreadSafe<RefCountedTest> {
};
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/rpc/rpcz_store.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpcz_store.cc b/src/kudu/rpc/rpcz_store.cc
index 1e7796d..33997fe 100644
--- a/src/kudu/rpc/rpcz_store.cc
+++ b/src/kudu/rpc/rpcz_store.cc
@@ -17,10 +17,18 @@
#include "kudu/rpc/rpcz_store.h"
+#include <algorithm>
#include <glog/stl_logging.h>
+#include <mutex> // for unique_lock
#include <string>
+#include <utility>
+#include <vector>
+#include "kudu/gutil/walltime.h"
#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/util/atomic.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/monotime.h"
#include "kudu/util/trace.h"
@@ -31,27 +39,166 @@ DEFINE_bool(rpc_dump_all_traces, false,
TAG_FLAG(rpc_dump_all_traces, advanced);
TAG_FLAG(rpc_dump_all_traces, runtime);
+using std::pair;
+using std::vector;
+using std::unique_ptr;
+
namespace kudu {
namespace rpc {
+// Sample an RPC call once every N milliseconds within each
+// bucket. If the current sample in a latency bucket is older
+// than this threshold, a new sample will be taken.
+static const int kSampleIntervalMs = 1000;
+
+static const int kBucketThresholdsMs[] = {10, 100, 1000};
+static constexpr int kNumBuckets = arraysize(kBucketThresholdsMs) + 1;
+
+// An instance of this class is created For each RPC method implemented
+// on the server. It keeps several recent samples for each RPC, currently
+// based on fixed time buckets.
+class MethodSampler {
+ public:
+ MethodSampler() {}
+ ~MethodSampler() {}
+
+ // Potentially sample a single call.
+ void SampleCall(InboundCall* call);
+
+ // Dump the current samples.
+ void GetSamplePBs(RpczMethodPB* pb);
+
+ private:
+ // An individual recorded sample.
+ struct Sample {
+ RequestHeader header;
+ scoped_refptr<Trace> trace;
+ int duration_ms;
+ };
+
+ // A sample, including the particular time at which it was
+ // sampled, and a lock protecting it.
+ struct SampleBucket {
+ SampleBucket() : last_sample_time(0) {}
+
+ AtomicInt<int64_t> last_sample_time;
+ simple_spinlock sample_lock;
+ Sample sample;
+ };
+ std::array<SampleBucket, kNumBuckets> buckets_;
+
+ DISALLOW_COPY_AND_ASSIGN(MethodSampler);
+};
+
+MethodSampler* RpczStore::SamplerForCall(InboundCall* call) {
+ if (PREDICT_FALSE(!call->method_info())) {
+ return nullptr;
+ }
+
+ // Most likely, we already have a sampler created for the call.
+ {
+ shared_lock<rw_spinlock> lock(&samplers_lock_.get_lock());
+ auto it = method_samplers_.find(call->method_info());
+ if (PREDICT_TRUE(it != method_samplers_.end())) {
+ return it->second.get();
+ }
+ }
+
+ // If missing, create a new sampler for this method and try to insert it.
+ unique_ptr<MethodSampler> ms(new MethodSampler());
+ lock_guard<percpu_rwlock> lock(&samplers_lock_);
+ auto it = method_samplers_.find(call->method_info());
+ if (it != method_samplers_.end()) {
+ return it->second.get();
+ }
+ auto* ret = ms.get();
+ method_samplers_[call->method_info()] = std::move(ms);
+ return ret;
+}
+
+void MethodSampler::SampleCall(InboundCall* call) {
+ // First determine which sample bucket to put this in.
+ int duration_ms = call->timing().TotalDuration().ToMilliseconds();
+
+ SampleBucket* bucket = &buckets_[kNumBuckets - 1];
+ for (int i = 0 ; i < kNumBuckets - 1; i++) {
+ if (duration_ms < kBucketThresholdsMs[i]) {
+ bucket = &buckets_[i];
+ break;
+ }
+ }
+
+ MicrosecondsInt64 now = GetMonoTimeMicros();
+ int64_t us_since_trace = now - bucket->last_sample_time.Load();
+ if (us_since_trace > kSampleIntervalMs * 1000) {
+ Sample new_sample = {call->header(), call->trace(), duration_ms};
+ {
+ std::unique_lock<simple_spinlock> lock(bucket->sample_lock, std::try_to_lock);
+ // If another thread is already taking a sample, it's not worth waiting.
+ if (!lock.owns_lock()) {
+ return;
+ }
+ std::swap(bucket->sample, new_sample);
+ bucket->last_sample_time.Store(now);
+ }
+ LOG(INFO) << "Sampled call " << call->ToString();
+ }
+}
+
+void MethodSampler::GetSamplePBs(RpczMethodPB* method_pb) {
+ for (auto& bucket : buckets_) {
+ if (bucket.last_sample_time.Load() == 0) continue;
+
+ std::unique_lock<simple_spinlock> lock(bucket.sample_lock);
+ auto* sample_pb = method_pb->add_samples();
+ sample_pb->mutable_header()->CopyFrom(bucket.sample.header);
+ sample_pb->set_trace(bucket.sample.trace->DumpToString(Trace::INCLUDE_ALL));
+ sample_pb->set_duration_ms(bucket.sample.duration_ms);
+ }
+}
RpczStore::RpczStore() {}
RpczStore::~RpczStore() {}
void RpczStore::AddCall(InboundCall* call) {
LogTrace(call);
+ auto* sampler = SamplerForCall(call);
+ if (PREDICT_FALSE(!sampler)) return;
+
+ sampler->SampleCall(call);
+}
+
+void RpczStore::DumpPB(const DumpRpczStoreRequestPB& req,
+ DumpRpczStoreResponsePB* resp) {
+ vector<pair<RpcMethodInfo*, MethodSampler*>> samplers;
+ {
+ shared_lock<rw_spinlock> lock(&samplers_lock_.get_lock());
+ for (const auto& p : method_samplers_) {
+ samplers.emplace_back(p.first, p.second.get());
+ }
+ }
+
+ for (const auto& p : samplers) {
+ auto* sampler = p.second;
+
+ RpczMethodPB* method_pb = resp->add_methods();
+ // TODO: use the actual RPC name instead of the request type name.
+ // Currently this isn't conveniently plumbed here, but the type name
+ // is close enough.
+ method_pb->set_method_name(p.first->req_prototype->GetTypeName());
+ sampler->GetSamplePBs(method_pb);
+ }
}
void RpczStore::LogTrace(InboundCall* call) {
- MonoTime now = MonoTime::Now(MonoTime::FINE);
- int total_time = now.GetDeltaSince(call->timing_.time_received).ToMilliseconds();
+ int duration_ms = call->timing().TotalDuration().ToMilliseconds();
if (call->header_.has_timeout_millis() && call->header_.timeout_millis() > 0) {
double log_threshold = call->header_.timeout_millis() * 0.75f;
- if (total_time > log_threshold) {
+ if (duration_ms > log_threshold) {
// TODO: consider pushing this onto another thread since it may be slow.
// The traces may also be too large to fit in a log message.
- LOG(WARNING) << call->ToString() << " took " << total_time << "ms (client timeout "
+ LOG(WARNING) << call->ToString() << " took " << duration_ms << "ms (client timeout "
<< call->header_.timeout_millis() << ").";
std::string s = call->trace()->DumpToString();
if (!s.empty()) {
@@ -62,10 +209,10 @@ void RpczStore::LogTrace(InboundCall* call) {
}
if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
- LOG(INFO) << call->ToString() << " took " << total_time << "ms. Trace:";
+ LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. Trace:";
call->trace()->Dump(&LOG(INFO), true);
- } else if (total_time > 1000) {
- LOG(INFO) << call->ToString() << " took " << total_time << "ms. "
+ } else if (duration_ms > 1000) {
+ LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. "
<< "Request Metrics: " << call->trace()->MetricsAsJSON();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/rpc/rpcz_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpcz_store.h b/src/kudu/rpc/rpcz_store.h
index 92409b2..48e4474 100644
--- a/src/kudu/rpc/rpcz_store.h
+++ b/src/kudu/rpc/rpcz_store.h
@@ -18,15 +18,22 @@
#include "kudu/gutil/macros.h"
+#include <memory>
+#include <unordered_map>
+
+#include "kudu/util/locks.h"
+
namespace kudu {
namespace rpc {
+class DumpRpczStoreRequestPB;
+class DumpRpczStoreResponsePB;
class InboundCall;
+class MethodSampler;
+struct RpcMethodInfo;
// Responsible for storing sampled traces associated with completed calls.
// Before each call is responded to, it is added to this store.
-//
-// The current implementation just logs traces for calls which are slow.
class RpczStore {
public:
RpczStore();
@@ -40,13 +47,26 @@ class RpczStore {
// to the client.
void AddCall(InboundCall* c);
+ // Dump all of the collected RPC samples in response to a user query.
+ void DumpPB(const DumpRpczStoreRequestPB& req,
+ DumpRpczStoreResponsePB* resp);
+
private:
+ // Look up or create the particular MethodSampler instance which should
+ // store samples for this call.
+ MethodSampler* SamplerForCall(InboundCall* call);
+
// Log a WARNING message if the RPC response was slow enough that the
// client likely timed out. This is based on the client-provided timeout
// value.
// Also can be configured to log _all_ RPC traces for help debugging.
void LogTrace(InboundCall* call);
+ percpu_rwlock samplers_lock_;
+
+ // Protected by samplers_lock_.
+ std::unordered_map<RpcMethodInfo*, std::unique_ptr<MethodSampler>> method_samplers_;
+
DISALLOW_COPY_AND_ASSIGN(RpczStore);
};
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/server/rpcz-path-handler.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/rpcz-path-handler.cc b/src/kudu/server/rpcz-path-handler.cc
index 9df2132..57c599c 100644
--- a/src/kudu/server/rpcz-path-handler.cc
+++ b/src/kudu/server/rpcz-path-handler.cc
@@ -26,10 +26,13 @@
#include "kudu/gutil/strings/numbers.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpcz_store.h"
#include "kudu/server/webserver.h"
using kudu::rpc::DumpRunningRpcsRequestPB;
using kudu::rpc::DumpRunningRpcsResponsePB;
+using kudu::rpc::DumpRpczStoreRequestPB;
+using kudu::rpc::DumpRpczStoreResponsePB;
using kudu::rpc::Messenger;
using std::shared_ptr;
using std::stringstream;
@@ -40,16 +43,29 @@ namespace {
void RpczPathHandler(const shared_ptr<Messenger>& messenger,
const Webserver::WebRequest& req, stringstream* output) {
- DumpRunningRpcsRequestPB dump_req;
- DumpRunningRpcsResponsePB dump_resp;
+ DumpRunningRpcsResponsePB running_rpcs;
+ {
+ DumpRunningRpcsRequestPB dump_req;
- string arg = FindWithDefault(req.parsed_args, "include_traces", "false");
- dump_req.set_include_traces(ParseLeadingBoolValue(arg.c_str(), false));
+ string arg = FindWithDefault(req.parsed_args, "include_traces", "false");
+ dump_req.set_include_traces(ParseLeadingBoolValue(arg.c_str(), false));
- messenger->DumpRunningRpcs(dump_req, &dump_resp);
+ messenger->DumpRunningRpcs(dump_req, &running_rpcs);
+ }
+ DumpRpczStoreResponsePB sampled_rpcs;
+ {
+ DumpRpczStoreRequestPB dump_req;
+ messenger->rpcz_store()->DumpPB(dump_req, &sampled_rpcs);
+ }
JsonWriter writer(output, JsonWriter::PRETTY);
- writer.Protobuf(dump_resp);
+ writer.StartObject();
+ writer.String("running");
+ writer.Protobuf(running_rpcs);
+ writer.String("sampled");
+ writer.Protobuf(sampled_rpcs);
+ writer.EndObject();
+
}
} // anonymous namespace