You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/04/20 08:13:45 UTC
[2/2] incubator-kudu git commit: KUDU-1410 (part 4): rpc: refactor
call trace logging to new RpczStore class
KUDU-1410 (part 4): rpc: refactor call trace logging to new RpczStore class
This doesn't have any functional change, but moves the actual
logging of RPC traces into a new class.
The new class will soon be responsible for keeping sampled
traces for each method.
Change-Id: Icaf607815177c71608e09b47386ef5fbd46d7673
Reviewed-on: http://gerrit.cloudera.org:8080/2797
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/05f816e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/05f816e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/05f816e9
Branch: refs/heads/master
Commit: 05f816e9b54a14b57617e6b5cb857e0736bb8483
Parents: 38e16b7
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Apr 13 14:16:04 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Apr 20 06:04:52 2016 +0000
----------------------------------------------------------------------
src/kudu/rpc/CMakeLists.txt | 1 +
src/kudu/rpc/connection.cc | 4 +++
src/kudu/rpc/connection.h | 3 ++
src/kudu/rpc/inbound_call.cc | 37 ++-----------------
src/kudu/rpc/inbound_call.h | 8 ++---
src/kudu/rpc/messenger.cc | 2 ++
src/kudu/rpc/messenger.h | 5 +++
src/kudu/rpc/rpcz_store.cc | 75 +++++++++++++++++++++++++++++++++++++++
src/kudu/rpc/rpcz_store.h | 54 ++++++++++++++++++++++++++++
9 files changed, 148 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 0533b3e..15571ea 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -56,6 +56,7 @@ set(KRPC_SRCS
rpc.cc
rpc_context.cc
rpc_controller.cc
+ rpcz_store.cc
sasl_common.cc
sasl_client.cc
sasl_helper.cc
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index b811911..46a92d9 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -382,6 +382,10 @@ void Connection::set_user_credentials(const UserCredentials &user_credentials) {
user_credentials_.CopyFrom(user_credentials);
}
+RpczStore* Connection::rpcz_store() {
+ return reactor_thread_->reactor()->messenger()->rpcz_store();
+}
+
void Connection::ReadHandler(ev::io &watcher, int revents) {
DCHECK(reactor_thread_->IsCurrentThread());
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index 9ad7e42..3f7e234 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -47,6 +47,7 @@ namespace rpc {
class DumpRunningRpcsRequestPB;
class RpcConnectionPB;
class ReactorThread;
+class RpczStore;
//
// A connection between an endpoint and us.
@@ -127,6 +128,8 @@ class Connection : public RefCountedThreadSafe<Connection> {
// Get the user credentials which will be used to log in.
const UserCredentials &user_credentials() const { return user_credentials_; }
+ RpczStore* rpcz_store();
+
// libev callback when data is available to read.
void ReadHandler(ev::io &watcher, int revents);
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/inbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 582decc..644116a 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -24,10 +24,10 @@
#include "kudu/rpc/connection.h"
#include "kudu/rpc/rpc_introspection.pb.h"
#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/rpcz_store.h"
#include "kudu/rpc/serialization.h"
#include "kudu/rpc/service_if.h"
#include "kudu/util/debug/trace_event.h"
-#include "kudu/util/flag_tags.h"
#include "kudu/util/metrics.h"
#include "kudu/util/trace.h"
@@ -39,11 +39,6 @@ using std::shared_ptr;
using std::vector;
using strings::Substitute;
-DEFINE_bool(rpc_dump_all_traces, false,
- "If true, dump all RPC traces at INFO level");
-TAG_FLAG(rpc_dump_all_traces, advanced);
-TAG_FLAG(rpc_dump_all_traces, runtime);
-
namespace kudu {
namespace rpc {
@@ -138,7 +133,7 @@ void InboundCall::Respond(const MessageLite& response,
"method", remote_method_.method_name());
TRACE_TO(trace_, "Queueing $0 response", is_success ? "success" : "failure");
RecordHandlingCompleted();
- LogTrace();
+ conn_->rpcz_store()->AddCall(this);
conn_->QueueResponseForCall(gscoped_ptr<InboundCall>(this));
}
@@ -206,34 +201,6 @@ void InboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
.ToMicroseconds());
}
-void InboundCall::LogTrace() const {
- MonoTime now = MonoTime::Now(MonoTime::FINE);
- int total_time = now.GetDeltaSince(timing_.time_received).ToMilliseconds();
-
- if (header_.has_timeout_millis() && header_.timeout_millis() > 0) {
- double log_threshold = header_.timeout_millis() * 0.75f;
- if (total_time > log_threshold) {
- // TODO: consider pushing this onto another thread since it may be slow.
- // The traces may also be too large to fit in a log message.
- LOG(WARNING) << ToString() << " took " << total_time << "ms (client timeout "
- << header_.timeout_millis() << ").";
- std::string s = trace_->DumpToString();
- if (!s.empty()) {
- LOG(WARNING) << "Trace:\n" << s;
- }
- return;
- }
- }
-
- if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
- LOG(INFO) << ToString() << " took " << total_time << "ms. Trace:";
- trace_->Dump(&LOG(INFO), true);
- } else if (total_time > 1000) {
- LOG(INFO) << ToString() << " took " << total_time << "ms. "
- << "Request Metrics: " << trace_->MetricsAsJSON();
- }
-}
-
const UserCredentials& InboundCall::user_credentials() const {
return conn_->user_credentials();
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index e84195b..30b38b3 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -175,6 +175,8 @@ class InboundCall {
std::vector<uint32_t> GetRequiredFeatures() const;
private:
+ friend class RpczStore;
+
// Serialize and queue the response.
void Respond(const google::protobuf::MessageLite& response,
bool is_success);
@@ -185,12 +187,6 @@ class InboundCall {
Status SerializeResponseBuffer(const google::protobuf::MessageLite& response,
bool is_success);
- // Log a WARNING message if the RPC response was slow enough that the
- // client likely timed out. This is based on the client-provided timeout
- // value.
- // Also can be configured to log _all_ RPC traces for help debugging.
- void LogTrace() const;
-
// When RPC call Handle() completed execution on the server side.
// Updates the Histogram with time elapsed since the call was started,
// and should only be called once on a given instance.
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 0f4ff5b..2261217 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -38,6 +38,7 @@
#include "kudu/rpc/reactor.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/rpcz_store.h"
#include "kudu/rpc/sasl_common.h"
#include "kudu/rpc/transfer.h"
#include "kudu/util/errno.h"
@@ -232,6 +233,7 @@ void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote
Messenger::Messenger(const MessengerBuilder &bld)
: name_(bld.name_),
closing_(false),
+ rpcz_store_(new RpczStore()),
metric_entity_(bld.metric_entity_),
retain_self_(this) {
for (int i = 0; i < bld.num_reactors_; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 5286780..e43278c 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -52,6 +52,7 @@ class OutboundCall;
class Reactor;
class ReactorThread;
class RpcService;
+class RpczStore;
struct AcceptorPoolInfo {
public:
@@ -174,6 +175,8 @@ class Messenger {
ThreadPool* negotiation_pool() const { return negotiation_pool_.get(); }
+ RpczStore* rpcz_store() { return rpcz_store_.get(); }
+
std::string name() const {
return name_;
}
@@ -221,6 +224,8 @@ class Messenger {
gscoped_ptr<ThreadPool> negotiation_pool_;
+ std::unique_ptr<RpczStore> rpcz_store_;
+
scoped_refptr<MetricEntity> metric_entity_;
// The ownership of the Messenger object is somewhat subtle. The pointer graph
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/rpcz_store.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpcz_store.cc b/src/kudu/rpc/rpcz_store.cc
new file mode 100644
index 0000000..1e7796d
--- /dev/null
+++ b/src/kudu/rpc/rpcz_store.cc
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpcz_store.h"
+
+#include <glog/stl_logging.h>
+#include <string>
+
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/trace.h"
+
+
+DEFINE_bool(rpc_dump_all_traces, false,
+ "If true, dump all RPC traces at INFO level");
+TAG_FLAG(rpc_dump_all_traces, advanced);
+TAG_FLAG(rpc_dump_all_traces, runtime);
+
+namespace kudu {
+namespace rpc {
+
+
+RpczStore::RpczStore() {}
+RpczStore::~RpczStore() {}
+
+void RpczStore::AddCall(InboundCall* call) {
+ LogTrace(call);
+}
+
+void RpczStore::LogTrace(InboundCall* call) {
+ MonoTime now = MonoTime::Now(MonoTime::FINE);
+ int total_time = now.GetDeltaSince(call->timing_.time_received).ToMilliseconds();
+
+ if (call->header_.has_timeout_millis() && call->header_.timeout_millis() > 0) {
+ double log_threshold = call->header_.timeout_millis() * 0.75f;
+ if (total_time > log_threshold) {
+ // TODO: consider pushing this onto another thread since it may be slow.
+ // The traces may also be too large to fit in a log message.
+ LOG(WARNING) << call->ToString() << " took " << total_time << "ms (client timeout "
+ << call->header_.timeout_millis() << ").";
+ std::string s = call->trace()->DumpToString();
+ if (!s.empty()) {
+ LOG(WARNING) << "Trace:\n" << s;
+ }
+ return;
+ }
+ }
+
+ if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
+ LOG(INFO) << call->ToString() << " took " << total_time << "ms. Trace:";
+ call->trace()->Dump(&LOG(INFO), true);
+ } else if (total_time > 1000) {
+ LOG(INFO) << call->ToString() << " took " << total_time << "ms. "
+ << "Request Metrics: " << call->trace()->MetricsAsJSON();
+ }
+}
+
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/05f816e9/src/kudu/rpc/rpcz_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpcz_store.h b/src/kudu/rpc/rpcz_store.h
new file mode 100644
index 0000000..92409b2
--- /dev/null
+++ b/src/kudu/rpc/rpcz_store.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "kudu/gutil/macros.h"
+
+namespace kudu {
+namespace rpc {
+
+class InboundCall;
+
+// Responsible for storing sampled traces associated with completed calls.
+// Before each call is responded to, it is added to this store.
+//
+// The current implementation just logs traces for calls which are slow.
+class RpczStore {
+ public:
+ RpczStore();
+ ~RpczStore();
+
+ // Process a single call, potentially sampling it for later analysis.
+ //
+ // If the call is sampled, it might be mutated. For example, the request
+ // and response might be taken from the call and stored as part of the
+ // sample. This should be called just before a call response is sent
+ // to the client.
+ void AddCall(InboundCall* c);
+
+ private:
+ // Log a WARNING message if the RPC response was slow enough that the
+ // client likely timed out. This is based on the client-provided timeout
+ // value.
+ // Also can be configured to log _all_ RPC traces for help debugging.
+ void LogTrace(InboundCall* call);
+
+ DISALLOW_COPY_AND_ASSIGN(RpczStore);
+};
+
+} // namespace rpc
+} // namespace kudu