You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/04/20 08:13:45 UTC

[2/2] incubator-kudu git commit: KUDU-1410 (part 4): rpc: refactor call trace logging to new RpczStore class

KUDU-1410 (part 4): rpc: refactor call trace logging to new RpczStore class

This doesn't have any functional change, but moves the actual
logging of RPC traces into a new class.

The new class will soon be responsible for keeping sampled
traces for each method.

Change-Id: Icaf607815177c71608e09b47386ef5fbd46d7673
Reviewed-on: http://gerrit.cloudera.org:8080/2797
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/05f816e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/05f816e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/05f816e9

Branch: refs/heads/master
Commit: 05f816e9b54a14b57617e6b5cb857e0736bb8483
Parents: 38e16b7
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Apr 13 14:16:04 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Apr 20 06:04:52 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/CMakeLists.txt  |  1 +
 src/kudu/rpc/connection.cc   |  4 +++
 src/kudu/rpc/connection.h    |  3 ++
 src/kudu/rpc/inbound_call.cc | 37 ++-----------------
 src/kudu/rpc/inbound_call.h  |  8 ++---
 src/kudu/rpc/messenger.cc    |  2 ++
 src/kudu/rpc/messenger.h     |  5 +++
 src/kudu/rpc/rpcz_store.cc   | 75 +++++++++++++++++++++++++++++++++++++++
 src/kudu/rpc/rpcz_store.h    | 54 ++++++++++++++++++++++++++++
 9 files changed, 148 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 0533b3e..15571ea 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -56,6 +56,7 @@ set(KRPC_SRCS
     rpc.cc
     rpc_context.cc
     rpc_controller.cc
+    rpcz_store.cc
     sasl_common.cc
     sasl_client.cc
     sasl_helper.cc

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index b811911..46a92d9 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -382,6 +382,10 @@ void Connection::set_user_credentials(const UserCredentials &user_credentials) {
   user_credentials_.CopyFrom(user_credentials);
 }
 
+RpczStore* Connection::rpcz_store() {
+  return reactor_thread_->reactor()->messenger()->rpcz_store();
+}
+
 void Connection::ReadHandler(ev::io &watcher, int revents) {
   DCHECK(reactor_thread_->IsCurrentThread());
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index 9ad7e42..3f7e234 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -47,6 +47,7 @@ namespace rpc {
 class DumpRunningRpcsRequestPB;
 class RpcConnectionPB;
 class ReactorThread;
+class RpczStore;
 
 //
 // A connection between an endpoint and us.
@@ -127,6 +128,8 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // Get the user credentials which will be used to log in.
   const UserCredentials &user_credentials() const { return user_credentials_; }
 
+  RpczStore* rpcz_store();
+
   // libev callback when data is available to read.
   void ReadHandler(ev::io &watcher, int revents);
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/inbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 582decc..644116a 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -24,10 +24,10 @@
 #include "kudu/rpc/connection.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
 #include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/rpcz_store.h"
 #include "kudu/rpc/serialization.h"
 #include "kudu/rpc/service_if.h"
 #include "kudu/util/debug/trace_event.h"
-#include "kudu/util/flag_tags.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/trace.h"
 
@@ -39,11 +39,6 @@ using std::shared_ptr;
 using std::vector;
 using strings::Substitute;
 
-DEFINE_bool(rpc_dump_all_traces, false,
-            "If true, dump all RPC traces at INFO level");
-TAG_FLAG(rpc_dump_all_traces, advanced);
-TAG_FLAG(rpc_dump_all_traces, runtime);
-
 namespace kudu {
 namespace rpc {
 
@@ -138,7 +133,7 @@ void InboundCall::Respond(const MessageLite& response,
                          "method", remote_method_.method_name());
   TRACE_TO(trace_, "Queueing $0 response", is_success ? "success" : "failure");
   RecordHandlingCompleted();
-  LogTrace();
+  conn_->rpcz_store()->AddCall(this);
   conn_->QueueResponseForCall(gscoped_ptr<InboundCall>(this));
 }
 
@@ -206,34 +201,6 @@ void InboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
                            .ToMicroseconds());
 }
 
-void InboundCall::LogTrace() const {
-  MonoTime now = MonoTime::Now(MonoTime::FINE);
-  int total_time = now.GetDeltaSince(timing_.time_received).ToMilliseconds();
-
-  if (header_.has_timeout_millis() && header_.timeout_millis() > 0) {
-    double log_threshold = header_.timeout_millis() * 0.75f;
-    if (total_time > log_threshold) {
-      // TODO: consider pushing this onto another thread since it may be slow.
-      // The traces may also be too large to fit in a log message.
-      LOG(WARNING) << ToString() << " took " << total_time << "ms (client timeout "
-                   << header_.timeout_millis() << ").";
-      std::string s = trace_->DumpToString();
-      if (!s.empty()) {
-        LOG(WARNING) << "Trace:\n" << s;
-      }
-      return;
-    }
-  }
-
-  if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
-    LOG(INFO) << ToString() << " took " << total_time << "ms. Trace:";
-    trace_->Dump(&LOG(INFO), true);
-  } else if (total_time > 1000) {
-    LOG(INFO) << ToString() << " took " << total_time << "ms. "
-              << "Request Metrics: " << trace_->MetricsAsJSON();
-  }
-}
-
 const UserCredentials& InboundCall::user_credentials() const {
   return conn_->user_credentials();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index e84195b..30b38b3 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -175,6 +175,8 @@ class InboundCall {
   std::vector<uint32_t> GetRequiredFeatures() const;
 
  private:
+  friend class RpczStore;
+
   // Serialize and queue the response.
   void Respond(const google::protobuf::MessageLite& response,
                bool is_success);
@@ -185,12 +187,6 @@ class InboundCall {
   Status SerializeResponseBuffer(const google::protobuf::MessageLite& response,
                                  bool is_success);
 
-  // Log a WARNING message if the RPC response was slow enough that the
-  // client likely timed out. This is based on the client-provided timeout
-  // value.
-  // Also can be configured to log _all_ RPC traces for help debugging.
-  void LogTrace() const;
-
   // When RPC call Handle() completed execution on the server side.
   // Updates the Histogram with time elapsed since the call was started,
   // and should only be called once on a given instance.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 0f4ff5b..2261217 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -38,6 +38,7 @@
 #include "kudu/rpc/reactor.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/rpcz_store.h"
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/util/errno.h"
@@ -232,6 +233,7 @@ void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote
 Messenger::Messenger(const MessengerBuilder &bld)
   : name_(bld.name_),
     closing_(false),
+    rpcz_store_(new RpczStore()),
     metric_entity_(bld.metric_entity_),
     retain_self_(this) {
   for (int i = 0; i < bld.num_reactors_; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 5286780..e43278c 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -52,6 +52,7 @@ class OutboundCall;
 class Reactor;
 class ReactorThread;
 class RpcService;
+class RpczStore;
 
 struct AcceptorPoolInfo {
  public:
@@ -174,6 +175,8 @@ class Messenger {
 
   ThreadPool* negotiation_pool() const { return negotiation_pool_.get(); }
 
+  RpczStore* rpcz_store() { return rpcz_store_.get(); }
+
   std::string name() const {
     return name_;
   }
@@ -221,6 +224,8 @@ class Messenger {
 
   gscoped_ptr<ThreadPool> negotiation_pool_;
 
+  std::unique_ptr<RpczStore> rpcz_store_;
+
   scoped_refptr<MetricEntity> metric_entity_;
 
   // The ownership of the Messenger object is somewhat subtle. The pointer graph

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/rpcz_store.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpcz_store.cc b/src/kudu/rpc/rpcz_store.cc
new file mode 100644
index 0000000..1e7796d
--- /dev/null
+++ b/src/kudu/rpc/rpcz_store.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpcz_store.h"
+
+#include <glog/stl_logging.h>
+#include <string>
+
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/trace.h"
+
+
+DEFINE_bool(rpc_dump_all_traces, false,
+            "If true, dump all RPC traces at INFO level");
+TAG_FLAG(rpc_dump_all_traces, advanced);
+TAG_FLAG(rpc_dump_all_traces, runtime);
+
+namespace kudu {
+namespace rpc {
+
+
+RpczStore::RpczStore() {}
+RpczStore::~RpczStore() {}
+
+void RpczStore::AddCall(InboundCall* call) {
+  LogTrace(call);
+}
+
+void RpczStore::LogTrace(InboundCall* call) {
+  MonoTime now = MonoTime::Now(MonoTime::FINE);
+  int total_time = now.GetDeltaSince(call->timing_.time_received).ToMilliseconds();
+
+  if (call->header_.has_timeout_millis() && call->header_.timeout_millis() > 0) {
+    double log_threshold = call->header_.timeout_millis() * 0.75f;
+    if (total_time > log_threshold) {
+      // TODO: consider pushing this onto another thread since it may be slow.
+      // The traces may also be too large to fit in a log message.
+      LOG(WARNING) << call->ToString() << " took " << total_time << "ms (client timeout "
+                   << call->header_.timeout_millis() << ").";
+      std::string s = call->trace()->DumpToString();
+      if (!s.empty()) {
+        LOG(WARNING) << "Trace:\n" << s;
+      }
+      return;
+    }
+  }
+
+  if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
+    LOG(INFO) << call->ToString() << " took " << total_time << "ms. Trace:";
+    call->trace()->Dump(&LOG(INFO), true);
+  } else if (total_time > 1000) {
+    LOG(INFO) << call->ToString() << " took " << total_time << "ms. "
+              << "Request Metrics: " << call->trace()->MetricsAsJSON();
+  }
+}
+
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/rpcz_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpcz_store.h b/src/kudu/rpc/rpcz_store.h
new file mode 100644
index 0000000..92409b2
--- /dev/null
+++ b/src/kudu/rpc/rpcz_store.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/macros.h"
+
+namespace kudu {
+namespace rpc {
+
+class InboundCall;
+
+// Responsible for storing sampled traces associated with completed calls.
+// Before each call is responded to, it is added to this store.
+//
+// The current implementation just logs traces for calls which are slow.
+class RpczStore {
+ public:
+  RpczStore();
+  ~RpczStore();
+
+  // Process a single call, potentially sampling it for later analysis.
+  //
+  // If the call is sampled, it might be mutated. For example, the request
+  // and response might be taken from the call and stored as part of the
+  // sample. This should be called just before a call response is sent
+  // to the client.
+  void AddCall(InboundCall* c);
+
+ private:
+  // Log a WARNING message if the RPC response was slow enough that the
+  // client likely timed out. This is based on the client-provided timeout
+  // value.
+  // Also can be configured to log _all_ RPC traces for help debugging.
+  void LogTrace(InboundCall* call);
+
+  DISALLOW_COPY_AND_ASSIGN(RpczStore);
+};
+
+} // namespace rpc
+} // namespace kudu