You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/04/20 20:29:16 UTC

incubator-kudu git commit: KUDU-1410 (part 5): rpc: sample each RPC method in several latency buckets, expose on /rpcz

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 05f816e9b -> 3cb45e969


KUDU-1410 (part 5): rpc: sample each RPC method in several latency buckets, expose on /rpcz

This changes RpczStore to keep several sample traces for each RPC, categorized
into several buckets (fast, slow, slower). Currently the thresholds for those
buckets are hard-coded, but follow-up work could make them adaptive based on
previous RPC percentiles, or user-configurable. Having tested this with a
YCSB workload, I found that they are useful even in this simple implementation.

Change-Id: I0eadb3f7035b5b156cb624ce50876876a5698b17
Reviewed-on: http://gerrit.cloudera.org:8080/2798
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/3cb45e96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/3cb45e96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/3cb45e96

Branch: refs/heads/master
Commit: 3cb45e9696f87775ea1dcd6ee27137326d88e451
Parents: 05f816e
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Apr 13 14:44:44 2016 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Wed Apr 20 18:28:58 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/inbound_call.h          |  12 +++
 src/kudu/rpc/rpc-test-base.h         |   1 +
 src/kudu/rpc/rpc_introspection.proto |  25 +++++
 src/kudu/rpc/rpc_stub-test.cc        |  29 ++++++
 src/kudu/rpc/rpcz_store.cc           | 161 ++++++++++++++++++++++++++++--
 src/kudu/rpc/rpcz_store.h            |  24 ++++-
 src/kudu/server/rpcz-path-handler.cc |  28 ++++--
 7 files changed, 265 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 30b38b3..cc35383 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -57,6 +57,10 @@ struct InboundCallTiming {
   MonoTime time_received;   // Time the call was first accepted.
   MonoTime time_handled;    // Time the call handler was kicked off.
   MonoTime time_completed;  // Time the call handler completed.
+
+  MonoDelta TotalDuration() const {
+    return time_completed.GetDeltaSince(time_received);
+  }
 };
 
 // Inbound call on server
@@ -133,6 +137,14 @@ class InboundCall {
 
   Trace* trace();
 
+  const InboundCallTiming& timing() const {
+    return timing_;
+  }
+
+  const RequestHeader& header() const {
+    return header_;
+  }
+
   // Associate this call with a particular method that will be invoked
   // by the service.
   void set_method_info(RpcMethodInfo* info) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index fa3857d..9be3bcb 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -236,6 +236,7 @@ class CalculatorService : public CalculatorServiceIf {
  private:
   void DoSleep(const SleepRequestPB *req,
                RpcContext *context) {
+    TRACE_COUNTER_INCREMENT("test_sleep_us", req->sleep_micros());
     SleepFor(MonoDelta::FromMicroseconds(req->sleep_micros()));
     context->RespondSuccess();
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/rpc/rpc_introspection.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_introspection.proto b/src/kudu/rpc/rpc_introspection.proto
index be72450..c4b6faa 100644
--- a/src/kudu/rpc/rpc_introspection.proto
+++ b/src/kudu/rpc/rpc_introspection.proto
@@ -52,3 +52,28 @@ message DumpRunningRpcsResponsePB {
   repeated RpcConnectionPB inbound_connections = 1;
   repeated RpcConnectionPB outbound_connections = 2;
 }
+
+//------------------------------------------------------------
+
+// A single sampled RPC call.
+message RpczSamplePB {
+  // The original request header.
+  optional RequestHeader header = 1;
+  // The stringified request trace.
+  optional string trace = 2;
+  // The number of millis that this call took to complete.
+  optional int32 duration_ms = 3;
+}
+
+// A set of samples for a particular RPC method.
+message RpczMethodPB {
+  required string method_name = 1;
+  repeated RpczSamplePB samples = 2;
+}
+
+// Request and response for dumping previously sampled RPC calls.
+message DumpRpczStoreRequestPB {
+}
+message DumpRpczStoreResponsePB {
+  repeated RpczMethodPB methods = 1;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/rpc/rpc_stub-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_stub-test.cc b/src/kudu/rpc/rpc_stub-test.cc
index 8ac3dba..193da0f 100644
--- a/src/kudu/rpc/rpc_stub-test.cc
+++ b/src/kudu/rpc/rpc_stub-test.cc
@@ -27,6 +27,7 @@
 
 #include "kudu/gutil/stl_util.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpcz_store.h"
 #include "kudu/rpc/rtest.proxy.h"
 #include "kudu/rpc/rtest.service.h"
 #include "kudu/rpc/rpc-test-base.h"
@@ -487,6 +488,34 @@ TEST_F(RpcStubTest, TestDumpCallsInFlight) {
   sleep.latch.Wait();
 }
 
+TEST_F(RpcStubTest, TestDumpSampledCalls) {
+  CalculatorServiceProxy p(client_messenger_, server_addr_);
+
+  // Issue two calls that fall into different latency buckets.
+  AsyncSleep sleeps[2];
+  sleeps[0].req.set_sleep_micros(150 * 1000); // 150ms
+  sleeps[1].req.set_sleep_micros(1500 * 1000); // 1500ms
+
+  for (auto& sleep : sleeps) {
+    p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc,
+                 boost::bind(&CountDownLatch::CountDown, &sleep.latch));
+  }
+  for (auto& sleep : sleeps) {
+    sleep.latch.Wait();
+  }
+
+  // Dump the sampled RPCs and expect to see the calls
+  // above.
+
+  DumpRpczStoreResponsePB sampled_rpcs;
+  server_messenger_->rpcz_store()->DumpPB(DumpRpczStoreRequestPB(), &sampled_rpcs);
+  EXPECT_EQ(sampled_rpcs.methods_size(), 1);
+  ASSERT_STR_CONTAINS(sampled_rpcs.DebugString(), "{\\\"test_sleep_us\\\":150000");
+  ASSERT_STR_CONTAINS(sampled_rpcs.DebugString(), "{\\\"test_sleep_us\\\":1500000}");
+  ASSERT_STR_CONTAINS(sampled_rpcs.DebugString(), "SleepRequestPB");
+  ASSERT_STR_CONTAINS(sampled_rpcs.DebugString(), "duration_ms");
+}
+
 namespace {
 struct RefCountedTest : public RefCountedThreadSafe<RefCountedTest> {
 };

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/rpc/rpcz_store.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpcz_store.cc b/src/kudu/rpc/rpcz_store.cc
index 1e7796d..33997fe 100644
--- a/src/kudu/rpc/rpcz_store.cc
+++ b/src/kudu/rpc/rpcz_store.cc
@@ -17,10 +17,18 @@
 
 #include "kudu/rpc/rpcz_store.h"
 
+#include <algorithm>
 #include <glog/stl_logging.h>
+#include <mutex> // for unique_lock
 #include <string>
+#include <utility>
+#include <vector>
 
+#include "kudu/gutil/walltime.h"
 #include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/util/atomic.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/trace.h"
@@ -31,27 +39,166 @@ DEFINE_bool(rpc_dump_all_traces, false,
 TAG_FLAG(rpc_dump_all_traces, advanced);
 TAG_FLAG(rpc_dump_all_traces, runtime);
 
+using std::pair;
+using std::vector;
+using std::unique_ptr;
+
 namespace kudu {
 namespace rpc {
 
+// Sample an RPC call once every N milliseconds within each
+// bucket. If the current sample in a latency bucket is older
+// than this threshold, a new sample will be taken.
+static const int kSampleIntervalMs = 1000;
+
+static const int kBucketThresholdsMs[] = {10, 100, 1000};
+static constexpr int kNumBuckets = arraysize(kBucketThresholdsMs) + 1;
+
+// An instance of this class is created For each RPC method implemented
+// on the server. It keeps several recent samples for each RPC, currently
+// based on fixed time buckets.
+class MethodSampler {
+ public:
+  MethodSampler() {}
+  ~MethodSampler() {}
+
+  // Potentially sample a single call.
+  void SampleCall(InboundCall* call);
+
+  // Dump the current samples.
+  void GetSamplePBs(RpczMethodPB* pb);
+
+ private:
+  // An individual recorded sample.
+  struct Sample {
+    RequestHeader header;
+    scoped_refptr<Trace> trace;
+    int duration_ms;
+  };
+
+  // A sample, including the particular time at which it was
+  // sampled, and a lock protecting it.
+  struct SampleBucket {
+    SampleBucket() : last_sample_time(0) {}
+
+    AtomicInt<int64_t> last_sample_time;
+    simple_spinlock sample_lock;
+    Sample sample;
+  };
+  std::array<SampleBucket, kNumBuckets> buckets_;
+
+  DISALLOW_COPY_AND_ASSIGN(MethodSampler);
+};
+
+MethodSampler* RpczStore::SamplerForCall(InboundCall* call) {
+  if (PREDICT_FALSE(!call->method_info())) {
+    return nullptr;
+  }
+
+  // Most likely, we already have a sampler created for the call.
+  {
+    shared_lock<rw_spinlock> lock(&samplers_lock_.get_lock());
+    auto it = method_samplers_.find(call->method_info());
+    if (PREDICT_TRUE(it != method_samplers_.end())) {
+      return it->second.get();
+    }
+  }
+
+  // If missing, create a new sampler for this method and try to insert it.
+  unique_ptr<MethodSampler> ms(new MethodSampler());
+  lock_guard<percpu_rwlock> lock(&samplers_lock_);
+  auto it = method_samplers_.find(call->method_info());
+  if (it != method_samplers_.end()) {
+    return it->second.get();
+  }
+  auto* ret = ms.get();
+  method_samplers_[call->method_info()] = std::move(ms);
+  return ret;
+}
+
+void MethodSampler::SampleCall(InboundCall* call) {
+  // First determine which sample bucket to put this in.
+  int duration_ms = call->timing().TotalDuration().ToMilliseconds();
+
+  SampleBucket* bucket = &buckets_[kNumBuckets - 1];
+  for (int i = 0 ; i < kNumBuckets - 1; i++) {
+    if (duration_ms < kBucketThresholdsMs[i]) {
+      bucket = &buckets_[i];
+      break;
+    }
+  }
+
+  MicrosecondsInt64 now = GetMonoTimeMicros();
+  int64_t us_since_trace = now - bucket->last_sample_time.Load();
+  if (us_since_trace > kSampleIntervalMs * 1000) {
+    Sample new_sample = {call->header(), call->trace(), duration_ms};
+    {
+      std::unique_lock<simple_spinlock> lock(bucket->sample_lock, std::try_to_lock);
+      // If another thread is already taking a sample, it's not worth waiting.
+      if (!lock.owns_lock()) {
+        return;
+      }
+      std::swap(bucket->sample, new_sample);
+      bucket->last_sample_time.Store(now);
+    }
+    LOG(INFO) << "Sampled call " << call->ToString();
+  }
+}
+
+void MethodSampler::GetSamplePBs(RpczMethodPB* method_pb) {
+  for (auto& bucket : buckets_) {
+    if (bucket.last_sample_time.Load() == 0) continue;
+
+    std::unique_lock<simple_spinlock> lock(bucket.sample_lock);
+    auto* sample_pb = method_pb->add_samples();
+    sample_pb->mutable_header()->CopyFrom(bucket.sample.header);
+    sample_pb->set_trace(bucket.sample.trace->DumpToString(Trace::INCLUDE_ALL));
+    sample_pb->set_duration_ms(bucket.sample.duration_ms);
+  }
+}
 
 RpczStore::RpczStore() {}
 RpczStore::~RpczStore() {}
 
 void RpczStore::AddCall(InboundCall* call) {
   LogTrace(call);
+  auto* sampler = SamplerForCall(call);
+  if (PREDICT_FALSE(!sampler)) return;
+
+  sampler->SampleCall(call);
+}
+
+void RpczStore::DumpPB(const DumpRpczStoreRequestPB& req,
+                       DumpRpczStoreResponsePB* resp) {
+  vector<pair<RpcMethodInfo*, MethodSampler*>> samplers;
+  {
+    shared_lock<rw_spinlock> lock(&samplers_lock_.get_lock());
+    for (const auto& p : method_samplers_) {
+      samplers.emplace_back(p.first, p.second.get());
+    }
+  }
+
+  for (const auto& p : samplers) {
+    auto* sampler = p.second;
+
+    RpczMethodPB* method_pb = resp->add_methods();
+    // TODO: use the actual RPC name instead of the request type name.
+    // Currently this isn't conveniently plumbed here, but the type name
+    // is close enough.
+    method_pb->set_method_name(p.first->req_prototype->GetTypeName());
+    sampler->GetSamplePBs(method_pb);
+  }
 }
 
 void RpczStore::LogTrace(InboundCall* call) {
-  MonoTime now = MonoTime::Now(MonoTime::FINE);
-  int total_time = now.GetDeltaSince(call->timing_.time_received).ToMilliseconds();
+  int duration_ms = call->timing().TotalDuration().ToMilliseconds();
 
   if (call->header_.has_timeout_millis() && call->header_.timeout_millis() > 0) {
     double log_threshold = call->header_.timeout_millis() * 0.75f;
-    if (total_time > log_threshold) {
+    if (duration_ms > log_threshold) {
       // TODO: consider pushing this onto another thread since it may be slow.
       // The traces may also be too large to fit in a log message.
-      LOG(WARNING) << call->ToString() << " took " << total_time << "ms (client timeout "
+      LOG(WARNING) << call->ToString() << " took " << duration_ms << "ms (client timeout "
                    << call->header_.timeout_millis() << ").";
       std::string s = call->trace()->DumpToString();
       if (!s.empty()) {
@@ -62,10 +209,10 @@ void RpczStore::LogTrace(InboundCall* call) {
   }
 
   if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
-    LOG(INFO) << call->ToString() << " took " << total_time << "ms. Trace:";
+    LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. Trace:";
     call->trace()->Dump(&LOG(INFO), true);
-  } else if (total_time > 1000) {
-    LOG(INFO) << call->ToString() << " took " << total_time << "ms. "
+  } else if (duration_ms > 1000) {
+    LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. "
               << "Request Metrics: " << call->trace()->MetricsAsJSON();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/rpc/rpcz_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpcz_store.h b/src/kudu/rpc/rpcz_store.h
index 92409b2..48e4474 100644
--- a/src/kudu/rpc/rpcz_store.h
+++ b/src/kudu/rpc/rpcz_store.h
@@ -18,15 +18,22 @@
 
 #include "kudu/gutil/macros.h"
 
+#include <memory>
+#include <unordered_map>
+
+#include "kudu/util/locks.h"
+
 namespace kudu {
 namespace rpc {
 
+class DumpRpczStoreRequestPB;
+class DumpRpczStoreResponsePB;
 class InboundCall;
+class MethodSampler;
+struct RpcMethodInfo;
 
 // Responsible for storing sampled traces associated with completed calls.
 // Before each call is responded to, it is added to this store.
-//
-// The current implementation just logs traces for calls which are slow.
 class RpczStore {
  public:
   RpczStore();
@@ -40,13 +47,26 @@ class RpczStore {
   // to the client.
   void AddCall(InboundCall* c);
 
+  // Dump all of the collected RPC samples in response to a user query.
+  void DumpPB(const DumpRpczStoreRequestPB& req,
+              DumpRpczStoreResponsePB* resp);
+
  private:
+  // Look up or create the particular MethodSampler instance which should
+  // store samples for this call.
+  MethodSampler* SamplerForCall(InboundCall* call);
+
   // Log a WARNING message if the RPC response was slow enough that the
   // client likely timed out. This is based on the client-provided timeout
   // value.
   // Also can be configured to log _all_ RPC traces for help debugging.
   void LogTrace(InboundCall* call);
 
+  percpu_rwlock samplers_lock_;
+
+  // Protected by samplers_lock_.
+  std::unordered_map<RpcMethodInfo*, std::unique_ptr<MethodSampler>> method_samplers_;
+
   DISALLOW_COPY_AND_ASSIGN(RpczStore);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3cb45e96/src/kudu/server/rpcz-path-handler.cc
----------------------------------------------------------------------
diff --git a/src/kudu/server/rpcz-path-handler.cc b/src/kudu/server/rpcz-path-handler.cc
index 9df2132..57c599c 100644
--- a/src/kudu/server/rpcz-path-handler.cc
+++ b/src/kudu/server/rpcz-path-handler.cc
@@ -26,10 +26,13 @@
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpcz_store.h"
 #include "kudu/server/webserver.h"
 
 using kudu::rpc::DumpRunningRpcsRequestPB;
 using kudu::rpc::DumpRunningRpcsResponsePB;
+using kudu::rpc::DumpRpczStoreRequestPB;
+using kudu::rpc::DumpRpczStoreResponsePB;
 using kudu::rpc::Messenger;
 using std::shared_ptr;
 using std::stringstream;
@@ -40,16 +43,29 @@ namespace {
 
 void RpczPathHandler(const shared_ptr<Messenger>& messenger,
                      const Webserver::WebRequest& req, stringstream* output) {
-  DumpRunningRpcsRequestPB dump_req;
-  DumpRunningRpcsResponsePB dump_resp;
+  DumpRunningRpcsResponsePB running_rpcs;
+  {
+    DumpRunningRpcsRequestPB dump_req;
 
-  string arg = FindWithDefault(req.parsed_args, "include_traces", "false");
-  dump_req.set_include_traces(ParseLeadingBoolValue(arg.c_str(), false));
+    string arg = FindWithDefault(req.parsed_args, "include_traces", "false");
+    dump_req.set_include_traces(ParseLeadingBoolValue(arg.c_str(), false));
 
-  messenger->DumpRunningRpcs(dump_req, &dump_resp);
+    messenger->DumpRunningRpcs(dump_req, &running_rpcs);
+  }
+  DumpRpczStoreResponsePB sampled_rpcs;
+  {
+    DumpRpczStoreRequestPB dump_req;
+    messenger->rpcz_store()->DumpPB(dump_req, &sampled_rpcs);
+  }
 
   JsonWriter writer(output, JsonWriter::PRETTY);
-  writer.Protobuf(dump_resp);
+  writer.StartObject();
+  writer.String("running");
+  writer.Protobuf(running_rpcs);
+  writer.String("sampled");
+  writer.Protobuf(sampled_rpcs);
+  writer.EndObject();
+
 }
 
 } // anonymous namespace