You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/14 22:36:51 UTC

[3/4] incubator-kudu git commit: Add a way to dump ResultTracker state

Add a way to dump ResultTracker state

This adds a way to dump result tracker state and makes sure we use it when
FATAling out on CHECK failures.

Change-Id: I418da53b52aba5f8358b08709ffe65ece132aeb1
Reviewed-on: http://gerrit.cloudera.org:8080/3569
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/dfd4d664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/dfd4d664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/dfd4d664

Branch: refs/heads/master
Commit: dfd4d66482123a74a7f6cc76f538606adde67f31
Parents: 16b5bd2
Author: David Alves <da...@cloudera.com>
Authored: Tue Jul 5 03:41:07 2016 -0700
Committer: David Ribeiro Alves <dr...@apache.org>
Committed: Thu Jul 14 21:36:01 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/result_tracker.cc | 61 ++++++++++++++++++++++++++++++++++---
 src/kudu/rpc/result_tracker.h  | 13 +++++++-
 2 files changed, 69 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/dfd4d664/src/kudu/rpc/result_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.cc b/src/kudu/rpc/result_tracker.cc
index 900bd5d..0ae64b2 100644
--- a/src/kudu/rpc/result_tracker.cc
+++ b/src/kudu/rpc/result_tracker.cc
@@ -18,6 +18,7 @@
 #include "kudu/rpc/result_tracker.h"
 
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/inbound_call.h"
 #include "kudu/rpc/rpc_context.h"
 #include "kudu/util/debug/trace_event.h"
@@ -33,6 +34,8 @@ using std::move;
 using std::lock_guard;
 using std::string;
 using std::unique_ptr;
+using strings::Substitute;
+using strings::SubstituteAndAppend;
 
 ResultTracker::RpcState ResultTracker::TrackRpc(const RequestIdPB& request_id,
                                                 Message* response,
@@ -186,16 +189,16 @@ ResultTracker::FindCompletionRecordOrNullUnlocked(const RequestIdPB& request_id)
   return FindClientStateAndCompletionRecordOrNullUnlocked(request_id).second;
 }
 
-void ResultTracker::RecordCompletionAndRespond(
-    const RequestIdPB& request_id,
-    const Message* response) {
+void ResultTracker::RecordCompletionAndRespond(const RequestIdPB& request_id,
+                                               const Message* response) {
   lock_guard<simple_spinlock> l(lock_);
 
   CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
 
   CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no())
     << "Called RecordCompletionAndRespond() from an executor identified with an attempt number that"
-    << " was not marked as the driver for the RPC.";
+    << " was not marked as the driver for the RPC. RequestId: " << request_id.ShortDebugString()
+    << "\nTracker state:\n " << ToStringUnlocked();
   DCHECK_EQ(completion_record->state, RpcState::IN_PROGRESS);
   completion_record->response.reset(DCHECK_NOTNULL(response)->New());
   completion_record->response->CopyFrom(*response);
@@ -230,6 +233,12 @@ void ResultTracker::FailAndRespondInternal(const RequestIdPB& request_id,
                                            HandleOngoingRpcFunc func) {
   lock_guard<simple_spinlock> l(lock_);
   auto state_and_record = FindClientStateAndCompletionRecordOrNullUnlocked(request_id);
+
+  if (PREDICT_FALSE(state_and_record.first == nullptr)) {
+    LOG(FATAL) << "Couldn't find ClientState for request: " << request_id.ShortDebugString()
+        << ". \nTracker state:\n" << ToStringUnlocked();
+  }
+
   CompletionRecord* completion_record = state_and_record.second;
 
   if (completion_record == nullptr) {
@@ -300,5 +309,49 @@ void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
   FailAndRespondInternal(request_id, func);
 }
 
+string ResultTracker::ToString() {
+  lock_guard<simple_spinlock> l(lock_);
+  return ToStringUnlocked();
+}
+
+string ResultTracker::ToStringUnlocked() const {
+  string result = Substitute("ResultTracker[this: $0, Num. Client States: $1, Client States:\n",
+                             this, clients_.size());
+  for (auto& cs : clients_) {
+    SubstituteAndAppend(&result, Substitute("\n\tClient: $0, $1", cs.first, cs.second->ToString()));
+  }
+  result.append("]");
+  return result;
+}
+
+string ResultTracker::ClientState::ToString() const {
+  string result = Substitute("Client State[Last heard from: $1, Num. Completion "
+                                 "Records: $2, CompletionRecords:\n", completion_records.size(),
+                             last_heard_from.ToString());
+  for (auto& completion_record : completion_records) {
+    SubstituteAndAppend(&result, Substitute("\n\tCompletion Record: $0, $1",
+                                            completion_record.first,
+                                            completion_record.second->ToString()));
+  }
+  result.append("\t]");
+  return result;
+}
+
+string ResultTracker::CompletionRecord::ToString() const {
+  string result = Substitute("Completion Record[State: $0, Driver: $1, Num. Ongoing RPCs: $2, "
+                                 "Cached response: $3, OngoingRpcs:", state, driver_attempt_no,
+                             ongoing_rpcs.size(), response ? response->ShortDebugString() : "None");
+  for (auto& orpc : ongoing_rpcs) {
+    SubstituteAndAppend(&result, Substitute("\n\t$0", orpc.ToString()));
+  }
+  result.append("\t\t]");
+  return result;
+}
+
+string ResultTracker::OnGoingRpcInfo::ToString() const {
+  return Substitute("OngoingRpc[Handler: $0, Context: $1, Response: $2]",
+                    handler_attempt_no, context, response ? response->ShortDebugString() : "NULL");
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/dfd4d664/src/kudu/rpc/result_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.h b/src/kudu/rpc/result_tracker.h
index 176ff9d..a04cb07 100644
--- a/src/kudu/rpc/result_tracker.h
+++ b/src/kudu/rpc/result_tracker.h
@@ -220,6 +220,8 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
                       int error_ext_id, const std::string& message,
                       const google::protobuf::Message& app_error_pb);
 
+  string ToString();
+
  private:
   // Information about client originated ongoing RPCs.
   // The lifecycle of 'response' and 'context' is managed by the RPC layer.
@@ -227,6 +229,8 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
     google::protobuf::Message* response;
     RpcContext* context;
     int64_t handler_attempt_no;
+
+    std::string ToString() const;
   };
   // A completion record for an IN_PROGRESS or COMPLETED RPC.
   struct CompletionRecord {
@@ -238,11 +242,14 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
     std::unique_ptr<google::protobuf::Message> response;
     // The set of ongoing RPCs that correspond to this record.
     std::vector<OnGoingRpcInfo> ongoing_rpcs;
+
+    std::string ToString() const;
   };
   // The state corresponding to a single client.
   struct ClientState {
     MonoTime last_heard_from;
     std::map<SequenceNumber, std::unique_ptr<CompletionRecord>> completion_records;
+    std::string ToString() const;
   };
 
   RpcState TrackRpcUnlocked(const RequestIdPB& request_id,
@@ -281,7 +288,11 @@ class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
   void LogAndTraceFailure(RpcContext* context, ErrorStatusPB_RpcErrorCodePB err,
                           const Status& status);
 
-  // Lock that protects access to 'clients_' and to the state contained in each ClientState.
+  std::string ToStringUnlocked() const;
+
+
+  // Lock that protects access to 'clients_' and to the state contained in each
+  // ClientState.
   // TODO consider a per-ClientState lock if we find this too coarse grained.
   simple_spinlock lock_;
   std::map<std::string, std::unique_ptr<ClientState>> clients_;