You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/14 22:36:51 UTC
[3/4] incubator-kudu git commit: Add a way to dump ResultTracker state
Add a way to dump ResultTracker state
This adds a way to dump result tracker state and makes sure we use it when
FATAling out on CHECK failures.
Change-Id: I418da53b52aba5f8358b08709ffe65ece132aeb1
Reviewed-on: http://gerrit.cloudera.org:8080/3569
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/dfd4d664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/dfd4d664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/dfd4d664
Branch: refs/heads/master
Commit: dfd4d66482123a74a7f6cc76f538606adde67f31
Parents: 16b5bd2
Author: David Alves <da...@cloudera.com>
Authored: Tue Jul 5 03:41:07 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Thu Jul 14 21:36:01 2016 +0000
----------------------------------------------------------------------
src/kudu/rpc/result_tracker.cc | 61 ++++++++++++++++++++++++++++++++++---
src/kudu/rpc/result_tracker.h | 13 +++++++-
2 files changed, 69 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/dfd4d664/src/kudu/rpc/result_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.cc b/src/kudu/rpc/result_tracker.cc
index 900bd5d..0ae64b2 100644
--- a/src/kudu/rpc/result_tracker.cc
+++ b/src/kudu/rpc/result_tracker.cc
@@ -18,6 +18,7 @@
#include "kudu/rpc/result_tracker.h"
#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/inbound_call.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/util/debug/trace_event.h"
@@ -33,6 +34,8 @@ using std::move;
using std::lock_guard;
using std::string;
using std::unique_ptr;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
ResultTracker::RpcState ResultTracker::TrackRpc(const RequestIdPB& request_id,
Message* response,
@@ -186,16 +189,16 @@ ResultTracker::FindCompletionRecordOrNullUnlocked(const RequestIdPB& request_id)
return FindClientStateAndCompletionRecordOrNullUnlocked(request_id).second;
}
-void ResultTracker::RecordCompletionAndRespond(
- const RequestIdPB& request_id,
- const Message* response) {
+void ResultTracker::RecordCompletionAndRespond(const RequestIdPB& request_id,
+ const Message* response) {
lock_guard<simple_spinlock> l(lock_);
CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no())
<< "Called RecordCompletionAndRespond() from an executor identified with an attempt number that"
- << " was not marked as the driver for the RPC.";
+ << " was not marked as the driver for the RPC. RequestId: " << request_id.ShortDebugString()
+ << "\nTracker state:\n " << ToStringUnlocked();
DCHECK_EQ(completion_record->state, RpcState::IN_PROGRESS);
completion_record->response.reset(DCHECK_NOTNULL(response)->New());
completion_record->response->CopyFrom(*response);
@@ -230,6 +233,12 @@ void ResultTracker::FailAndRespondInternal(const RequestIdPB& request_id,
HandleOngoingRpcFunc func) {
lock_guard<simple_spinlock> l(lock_);
auto state_and_record = FindClientStateAndCompletionRecordOrNullUnlocked(request_id);
+
+ if (PREDICT_FALSE(state_and_record.first == nullptr)) {
+ LOG(FATAL) << "Couldn't find ClientState for request: " << request_id.ShortDebugString()
+ << ". \nTracker state:\n" << ToStringUnlocked();
+ }
+
CompletionRecord* completion_record = state_and_record.second;
if (completion_record == nullptr) {
@@ -300,5 +309,49 @@ void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
FailAndRespondInternal(request_id, func);
}
+string ResultTracker::ToString() {
+ lock_guard<simple_spinlock> l(lock_);
+ return ToStringUnlocked();
+}
+
+string ResultTracker::ToStringUnlocked() const {
+ string result = Substitute("ResultTracker[this: $0, Num. Client States: $1, Client States:\n",
+ this, clients_.size());
+ for (auto& cs : clients_) {
+ SubstituteAndAppend(&result, Substitute("\n\tClient: $0, $1", cs.first, cs.second->ToString()));
+ }
+ result.append("]");
+ return result;
+}
+
+string ResultTracker::ClientState::ToString() const {
+ string result = Substitute("Client State[Last heard from: $1, Num. Completion "
+ "Records: $2, CompletionRecords:\n", completion_records.size(),
+ last_heard_from.ToString());
+ for (auto& completion_record : completion_records) {
+ SubstituteAndAppend(&result, Substitute("\n\tCompletion Record: $0, $1",
+ completion_record.first,
+ completion_record.second->ToString()));
+ }
+ result.append("\t]");
+ return result;
+}
+
+string ResultTracker::CompletionRecord::ToString() const {
+ string result = Substitute("Completion Record[State: $0, Driver: $1, Num. Ongoing RPCs: $2, "
+ "Cached response: $3, OngoingRpcs:", state, driver_attempt_no,
+ ongoing_rpcs.size(), response ? response->ShortDebugString() : "None");
+ for (auto& orpc : ongoing_rpcs) {
+ SubstituteAndAppend(&result, Substitute("\n\t$0", orpc.ToString()));
+ }
+ result.append("\t\t]");
+ return result;
+}
+
+string ResultTracker::OnGoingRpcInfo::ToString() const {
+ return Substitute("OngoingRpc[Handler: $0, Context: $1, Response: $2]",
+ handler_attempt_no, context, response ? response->ShortDebugString() : "NULL");
+}
+
} // namespace rpc
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/dfd4d664/src/kudu/rpc/result_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.h b/src/kudu/rpc/result_tracker.h
index 176ff9d..a04cb07 100644
--- a/src/kudu/rpc/result_tracker.h
+++ b/src/kudu/rpc/result_tracker.h
@@ -220,6 +220,8 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
int error_ext_id, const std::string& message,
const google::protobuf::Message& app_error_pb);
+ string ToString();
+
private:
// Information about client originated ongoing RPCs.
// The lifecycle of 'response' and 'context' is managed by the RPC layer.
@@ -227,6 +229,8 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
google::protobuf::Message* response;
RpcContext* context;
int64_t handler_attempt_no;
+
+ std::string ToString() const;
};
// A completion record for an IN_PROGRESS or COMPLETED RPC.
struct CompletionRecord {
@@ -238,11 +242,14 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
std::unique_ptr<google::protobuf::Message> response;
// The set of ongoing RPCs that correspond to this record.
std::vector<OnGoingRpcInfo> ongoing_rpcs;
+
+ std::string ToString() const;
};
// The state corresponding to a single client.
struct ClientState {
MonoTime last_heard_from;
std::map<SequenceNumber, std::unique_ptr<CompletionRecord>> completion_records;
+ std::string ToString() const;
};
RpcState TrackRpcUnlocked(const RequestIdPB& request_id,
@@ -281,7 +288,11 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
void LogAndTraceFailure(RpcContext* context, ErrorStatusPB_RpcErrorCodePB err,
const Status& status);
- // Lock that protects access to 'clients_' and to the state contained in each ClientState.
+ std::string ToStringUnlocked() const;
+
+
+ // Lock that protects access to 'clients_' and to the state contained in each
+ // ClientState.
// TODO consider a per-ClientState lock if we find this too coarse grained.
simple_spinlock lock_;
std::map<std::string, std::unique_ptr<ClientState>> clients_;