You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/07/13 04:12:47 UTC

[1/2] incubator-kudu git commit: Add a ResultTracker class that will track server side results

Repository: incubator-kudu
Updated Branches:
  refs/heads/master 074f849c8 -> 3c01d4c05


Add a ResultTracker class that will track server side results

This adds the initial version of the ResultTracker class that will be
responsible for tracking responses when we want exactly once call semantics.

While this is minimally working, i.e it's able to track responses and
can be used for exactly once semantics, it's not complete. Future patches
will address the missing functionality.
Still missing are:
- Time based client state cleaning.
- Watermark based per client state cleaning.

I initially had a unit test for this class, but it relied on templating
out the RpcContext (since it's not easy to build one for a unit test
and it's not clear what it would do) which became problematic as the
class grew. So I decided to add a new rpc-stress-tess that will test
this class within the rpc framework. I feel it's easier to do it that
way and that, since the client is not being used, the test can still
be very straight forward. This test is in a followup patch as it can
only run after the integration with the rest of the rpc subsystem.

Change-Id: I6718951a9998a6c9b0db35e8f09ff8304591e8b1
Reviewed-on: http://gerrit.cloudera.org:8080/3190
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/90d610fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/90d610fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/90d610fe

Branch: refs/heads/master
Commit: 90d610fe75d198dd9c35d2855023726a41b670ac
Parents: 074f849
Author: David Alves <da...@cloudera.com>
Authored: Tue May 24 08:43:30 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Jul 13 03:52:04 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/CMakeLists.txt    |   1 +
 src/kudu/rpc/result_tracker.cc | 304 ++++++++++++++++++++++++++++++++++++
 src/kudu/rpc/result_tracker.h  | 293 ++++++++++++++++++++++++++++++++++
 src/kudu/rpc/rpc_context.h     |   1 +
 4 files changed, 599 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/90d610fe/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 8d2037e..3e9a1cf 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -54,6 +54,7 @@ set(KRPC_SRCS
     reactor.cc
     remote_method.cc
     request_tracker.cc
+    result_tracker.cc
     rpc.cc
     rpc_context.cc
     rpc_controller.cc

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/90d610fe/src/kudu/rpc/result_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.cc b/src/kudu/rpc/result_tracker.cc
new file mode 100644
index 0000000..900bd5d
--- /dev/null
+++ b/src/kudu/rpc/result_tracker.cc
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/result_tracker.h"
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/trace.h"
+#include "kudu/util/pb_util.h"
+
+namespace kudu {
+namespace rpc {
+
+using google::protobuf::Message;
+using rpc::InboundCall;
+using std::move;
+using std::lock_guard;
+using std::string;
+using std::unique_ptr;
+
+ResultTracker::RpcState ResultTracker::TrackRpc(const RequestIdPB& request_id,
+                                                Message* response,
+                                                RpcContext* context) {
+  lock_guard<simple_spinlock> l(lock_);
+  return TrackRpcUnlocked(request_id, response, context);
+}
+
+ResultTracker::RpcState ResultTracker::TrackRpcUnlocked(const RequestIdPB& request_id,
+                                                        Message* response,
+                                                        RpcContext* context) {
+
+  ClientState* client_state = ComputeIfAbsent(
+      &clients_,
+      request_id.client_id(),
+      []{ return unique_ptr<ClientState>(new ClientState()); })->get();
+
+  client_state->last_heard_from = MonoTime::Now(MonoTime::FINE);
+
+  auto result = ComputeIfAbsentReturnAbsense(
+      &client_state->completion_records,
+      request_id.seq_no(),
+      []{ return unique_ptr<CompletionRecord>(new CompletionRecord()); });
+
+  CompletionRecord* completion_record = result.first->get();
+
+  if (PREDICT_TRUE(result.second)) {
+    completion_record->state = RpcState::IN_PROGRESS;
+    completion_record->driver_attempt_no = request_id.attempt_no();
+    // When a follower is applying an operation it doesn't have a response yet, and it won't
+    // have a context, so only set them if they exist.
+    if (response != nullptr) {
+      completion_record->ongoing_rpcs.push_back({response,
+                                                 DCHECK_NOTNULL(context),
+                                                 request_id.attempt_no()});
+    }
+    return RpcState::NEW;
+  }
+
+  switch (completion_record->state) {
+    case RpcState::COMPLETED: {
+      // If the RPC is COMPLETED and the request originates from a client (context, response are
+      // non-null) copy the response and reply immediately. If there is no context/response
+      // do nothing.
+      if (context != nullptr) {
+        DCHECK_NOTNULL(response)->CopyFrom(*completion_record->response);
+        context->call_->RespondSuccess(*response);
+        delete context;
+      }
+      return RpcState::COMPLETED;
+    }
+    case RpcState::IN_PROGRESS: {
+      // If the RPC is IN_PROGRESS check if there is a context and, if so, attach it
+      // so that the rpc gets the same response when the original one completes.
+      if (context != nullptr) {
+        completion_record->ongoing_rpcs.push_back({DCHECK_NOTNULL(response),
+                                                   context,
+                                                   NO_HANDLER});
+      }
+      return RpcState::IN_PROGRESS;
+    }
+    default:
+      LOG(FATAL) << "Wrong state: " << completion_record->state;
+      // dummy return to avoid warnings
+      return RpcState::STALE;
+  }
+}
+
+ResultTracker::RpcState ResultTracker::TrackRpcOrChangeDriver(const RequestIdPB& request_id) {
+  lock_guard<simple_spinlock> l(lock_);
+  RpcState state = TrackRpcUnlocked(request_id, nullptr, nullptr);
+
+  if (state != RpcState::IN_PROGRESS) return state;
+
+  CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
+
+  // ... if we did find a CompletionRecord change the driver and return true.
+  completion_record->driver_attempt_no = request_id.attempt_no();
+  completion_record->ongoing_rpcs.push_back({nullptr,
+                                             nullptr,
+                                             request_id.attempt_no()});
+  // Since we changed the driver of the RPC, return NEW, so that the caller knows
+  // to store the result.
+  return RpcState::NEW;
+}
+
+bool ResultTracker::IsCurrentDriver(const RequestIdPB& request_id) {
+  lock_guard<simple_spinlock> l(lock_);
+  CompletionRecord* completion_record = FindCompletionRecordOrNullUnlocked(request_id);
+
+  // If we couldn't find the CompletionRecord, someone might have called FailAndRespond() so
+  // just return false.
+  if (completion_record == nullptr) return false;
+
+  // ... if we did find a CompletionRecord return true if we're the driver or false
+  // otherwise.
+  return completion_record->driver_attempt_no == request_id.attempt_no();
+}
+
+void ResultTracker::LogAndTraceAndRespondSuccess(RpcContext* context,
+                                                 const Message& msg) {
+  InboundCall* call = context->call_;
+  VLOG(1) << this << " " << call->remote_method().service_name() << ": Sending RPC success "
+      "response for " << call->ToString() << ":" << std::endl << msg.DebugString();
+  TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                         "response", pb_util::PbTracer::TracePb(msg),
+                         "trace", context->trace()->DumpToString());
+  call->RespondSuccess(msg);
+  delete context;
+}
+
+void ResultTracker::LogAndTraceFailure(RpcContext* context,
+                                       const Message& msg) {
+  InboundCall* call = context->call_;
+  VLOG(1) << this << " " << call->remote_method().service_name() << ": Sending RPC failure "
+      "response for " << call->ToString() << ": " << msg.DebugString();
+  TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                         "response", pb_util::PbTracer::TracePb(msg),
+                         "trace", context->trace()->DumpToString());
+}
+
+void ResultTracker::LogAndTraceFailure(RpcContext* context,
+                                       ErrorStatusPB_RpcErrorCodePB err,
+                                       const Status& status) {
+  InboundCall* call = context->call_;
+  VLOG(1) << this << " " << call->remote_method().service_name() << ": Sending RPC failure "
+      "response for " << call->ToString() << ": " << status.ToString();
+  TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
+                         "status", status.ToString(),
+                         "trace", context->trace()->DumpToString());
+}
+
+ResultTracker::CompletionRecord* ResultTracker::FindCompletionRecordOrDieUnlocked(
+    const RequestIdPB& request_id) {
+  ClientState* client_state = DCHECK_NOTNULL(FindPointeeOrNull(clients_, request_id.client_id()));
+  return DCHECK_NOTNULL(FindPointeeOrNull(client_state->completion_records, request_id.seq_no()));
+}
+
+pair<ResultTracker::ClientState*, ResultTracker::CompletionRecord*>
+ResultTracker::FindClientStateAndCompletionRecordOrNullUnlocked(const RequestIdPB& request_id) {
+  ClientState* client_state = FindPointeeOrNull(clients_, request_id.client_id());
+  CompletionRecord* completion_record = nullptr;
+  if (client_state != nullptr) {
+    completion_record = FindPointeeOrNull(client_state->completion_records, request_id.seq_no());
+  }
+  return make_pair(client_state, completion_record);
+}
+
+ResultTracker::CompletionRecord*
+ResultTracker::FindCompletionRecordOrNullUnlocked(const RequestIdPB& request_id) {
+  return FindClientStateAndCompletionRecordOrNullUnlocked(request_id).second;
+}
+
+void ResultTracker::RecordCompletionAndRespond(
+    const RequestIdPB& request_id,
+    const Message* response) {
+  lock_guard<simple_spinlock> l(lock_);
+
+  CompletionRecord* completion_record = FindCompletionRecordOrDieUnlocked(request_id);
+
+  CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no())
+    << "Called RecordCompletionAndRespond() from an executor identified with an attempt number that"
+    << " was not marked as the driver for the RPC.";
+  DCHECK_EQ(completion_record->state, RpcState::IN_PROGRESS);
+  completion_record->response.reset(DCHECK_NOTNULL(response)->New());
+  completion_record->response->CopyFrom(*response);
+  completion_record->state = RpcState::COMPLETED;
+
+  CHECK_EQ(completion_record->driver_attempt_no, request_id.attempt_no());
+
+  int64_t handler_attempt_no = request_id.attempt_no();
+
+  // Go through the ongoing RPCs and reply to each one.
+  for (auto orpc_iter = completion_record->ongoing_rpcs.rbegin();
+       orpc_iter != completion_record->ongoing_rpcs.rend();) {
+
+    const OnGoingRpcInfo& ongoing_rpc = *orpc_iter;
+    if (MustHandleRpc(handler_attempt_no, completion_record, ongoing_rpc)) {
+      if (ongoing_rpc.context != nullptr) {
+        if (PREDICT_FALSE(ongoing_rpc.response != response)) {
+          ongoing_rpc.response->CopyFrom(*completion_record->response);
+        }
+        LogAndTraceAndRespondSuccess(ongoing_rpc.context, *ongoing_rpc.response);
+      }
+      ++orpc_iter;
+      orpc_iter = std::vector<OnGoingRpcInfo>::reverse_iterator(
+          completion_record->ongoing_rpcs.erase(orpc_iter.base()));
+    } else {
+      ++orpc_iter;
+    }
+  }
+}
+
+void ResultTracker::FailAndRespondInternal(const RequestIdPB& request_id,
+                                           HandleOngoingRpcFunc func) {
+  lock_guard<simple_spinlock> l(lock_);
+  auto state_and_record = FindClientStateAndCompletionRecordOrNullUnlocked(request_id);
+  CompletionRecord* completion_record = state_and_record.second;
+
+  if (completion_record == nullptr) {
+    return;
+  }
+
+  int64_t seq_no = request_id.seq_no();
+  int64_t handler_attempt_no = request_id.attempt_no();
+
+  // If we're copying from a client originated response we need to take care to reply
+  // to that call last, otherwise we'll lose 'response', before we go through all the
+  // CompletionRecords.
+  for (auto orpc_iter = completion_record->ongoing_rpcs.rbegin();
+       orpc_iter != completion_record->ongoing_rpcs.rend();) {
+
+    const OnGoingRpcInfo& ongoing_rpc = *orpc_iter;
+    if (MustHandleRpc(handler_attempt_no, completion_record, ongoing_rpc)) {
+      if (ongoing_rpc.context != nullptr) {
+        func(ongoing_rpc);
+        delete ongoing_rpc.context;
+      }
+      ++orpc_iter;
+      orpc_iter = std::vector<OnGoingRpcInfo>::reverse_iterator(
+          completion_record->ongoing_rpcs.erase(orpc_iter.base()));
+    } else {
+      ++orpc_iter;
+    }
+  }
+
+  // If we're the last ones trying this and the state is not completed,
+  // delete the completion record.
+  if (completion_record->ongoing_rpcs.size() == 0
+      && completion_record->state != RpcState::COMPLETED) {
+    unique_ptr<CompletionRecord> completion_record =
+        EraseKeyReturnValuePtr(&state_and_record.first->completion_records, seq_no);
+  }
+}
+
+void ResultTracker::FailAndRespond(const RequestIdPB& request_id, Message* response) {
+  auto func = [&](const OnGoingRpcInfo& ongoing_rpc) {
+    // In the common case RPCs are just executed once so, in that case, avoid an extra
+    // copy of the response.
+    if (PREDICT_FALSE(ongoing_rpc.response != response)) {
+      ongoing_rpc.response->CopyFrom(*response);
+    }
+    LogAndTraceFailure(ongoing_rpc.context, *response);
+    ongoing_rpc.context->call_->RespondSuccess(*response);
+  };
+  FailAndRespondInternal(request_id, func);
+}
+
+void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
+                                   ErrorStatusPB_RpcErrorCodePB err, const Status& status) {
+  auto func = [&](const OnGoingRpcInfo& ongoing_rpc) {
+    LogAndTraceFailure(ongoing_rpc.context, err, status);
+    ongoing_rpc.context->call_->RespondFailure(err, status);
+  };
+  FailAndRespondInternal(request_id, func);
+}
+
+void ResultTracker::FailAndRespond(const RequestIdPB& request_id,
+                                   int error_ext_id, const string& message,
+                                   const Message& app_error_pb) {
+  auto func = [&](const OnGoingRpcInfo& ongoing_rpc) {
+    LogAndTraceFailure(ongoing_rpc.context, app_error_pb);
+    ongoing_rpc.context->call_->RespondApplicationError(error_ext_id, message, app_error_pb);
+  };
+  FailAndRespondInternal(request_id, func);
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/90d610fe/src/kudu/rpc/result_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/result_tracker.h b/src/kudu/rpc/result_tracker.h
new file mode 100644
index 0000000..176ff9d
--- /dev/null
+++ b/src/kudu/rpc/result_tracker.h
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <map>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // protobuf
+} // google
+
+namespace kudu {
+namespace rpc {
+class RpcContext;
+
+// A ResultTracker for RPC results.
+//
+// The ResultTracker is responsible for tracking the results of RPCs and making sure that
+// client calls with the same client ID and sequence number (first attempt and subsequent retries)
+// are executed exactly once.
+//
+// In most cases, the use of ResultTracker is internal to the RPC system: RPCs are tracked when
+// they first arrive, before service methods are called, and calls to ResultTracker to store
+// responses are performed internally by RpcContext. The exception is when an RPC is replicated
+// across multiple servers, such as with writes, in which case direct interaction with the result
+// tracker is required so as to cache responses on replicas which did not receive the RPC directly
+// from the client.
+//
+// Throughout this header and elsewhere we use the following terms:
+//
+// RPC - The operation that a client or another server wants to execute on this server. The client
+//       might attempt one RPC many times, for instance if failures or timeouts happen.
+// Attempt - Each individual attempt of an RPC on the server.
+// Handler - A thread executing an attempt. Usually there is only one handler that executes the
+//           first attempt of an RPC and, when it completes, replies to its own attempt and to all
+//           other attempts that might have arrived after it started.
+// Driver - Only important in cases where there might be multiple handlers (e.g. in replicated
+//          RPCs). In these cases there might be two handlers executing the same RPC, corresponding
+//          to different attempts. Since the RPC must be executed exactly once, only one of the
+//          handlers must be selected as the "driver" and actually perform the operation.
+//
+// If a client wishes to track the result of a given RPC it must send on the RPC header
+// a RequestId with the following information:
+//
+//       Client ID - Uniquely identifies a single client. All the RPCs originating from the same
+//                   client must have the same ID.
+// Sequence number - Uniquely identifies a single RPC, even across retries to multiple servers, for
+//                   replicated RPCs. All retries of the same RPC must have the same sequence
+//                   number.
+//  Attempt number - Uniquely identifies each retry of the same RPC. All retries of the same RPC
+//                   must have different attempt numbers.
+//
+// When a call first arrives from the client the RPC subsystem will call TrackRpc() which
+// will return the state of the RPC in the form of an RpcState enum.
+//
+// If the ResultTracker returns NEW, this signals that it's the first time the server has heard
+// of the RPC and that the corresponding server function should be executed.
+//
+// If anything other than NEW is returned it means that the call has either previously completed or
+// is in the process of being executed. In this case the caller should _not_ execute the function
+// corresponding to the RPC. The ResultTracker itself will take care of responding to the client
+// appropriately. If the RPC was already completed, the ResultTracker replies to the client
+// immediately. If the RPC is still ongoing, the attempt gets "attached" to the ongoing one and will
+// receive the same response when its handler finishes.
+//
+// If handling of the RPC is successful, RecordCompletionAndRespond() must be called
+// to register successful completion, in which case all pending or future RPCs with the same
+// sequence number, from the same client, will receive the same response.
+//
+// On the other hand, if execution of the server function is not successful then one of
+// the FailAndRespond() methods should be called, causing all _pending_ attempts to receive the same
+// error. However this error is not stored, any future attempt with the same sequence number and
+// same client ID will be given a new chance to execute, as if it it had never been tried before.
+// This gives the client a chance to either retry (if the failure reason is transient) or give up.
+//
+// ============================================================================
+// RPCs with multiple handlers
+// ============================================================================
+//
+// Some RPCs results are tracked by single server, i.e. they correspond to the modification of an
+// unreplicated resource and are unpersisted. For those no additional care needs to be taken, the
+// first attempt will be the only handler, and subsequent attempts will receive the response when
+// that first attempt is done.
+// However some RPCs are replicated across servers, using consensus, and thus can have multiple
+// handlers executing different attempts at the same time, e.g. one handler from a client
+// originating retry, and one from a previous leader originating update.
+//
+// In this case we need to make sure that the following invariants are enforced:
+// - Only one handler can actually record a response, the "driver" handler.
+// - Only one handler must respond to "attached" attempts.
+// - Each handler replies to their own RPCs, to avoid races. That is, a live handler should
+//   not mutate another live handler's response/context.
+//
+// This is achieved by naming one handler the "driver" of the RPC and making sure that only
+// the driver can successfully complete it, i.e. call RecordCompletionAndRespond().
+//
+// In order to make sure there is only one driver, there must be an _external_ serialization
+// point, before the final response is produced, after which only one of the handlers will
+// be marked as the driver. For instance, for writes, this serialization point is in
+// TransactionDriver, in a synchronized block where a logic such as this one happens (here
+// in pseudo-ish code):
+//
+// {
+//   lock_guard<simple_spinlock> l(lock_);
+//   if (follower_transaction) {
+//     result_tracker_->TrackRpcOrChangeDriver(request_id);
+//     continue_with_transaction();
+//   } else if (client_transaction) {
+//     bool is_still_driver = result_tracker_->IsCurrentDriver(request_id);
+//     if (is_still_driver) continue_with_transaction();
+//     else abort_transaction();
+//   }
+// }
+//
+// This class is thread safe.
+//
+// TODO Memory bookkeeping.
+// TODO Garbage collection.
+class ResultTracker : public RefCountedThreadSafe<ResultTracker> {
+ public:
+  typedef rpc::RequestTracker::SequenceNumber SequenceNumber;
+  static const int NO_HANDLER = -1;
+  // Enum returned by TrackRpc that reflects the state of the RPC.
+  enum RpcState {
+    // The RPC is new.
+    NEW,
+    // The RPC has previously completed and the same response has been sent
+    // to the client.
+    COMPLETED,
+    // The RPC is currently in-progress and, when it completes, the same response
+    // will be sent to the client.
+    IN_PROGRESS,
+    // The RPC's state is stale, meaning it's older than our per-client garbage
+    // collection watermark and we do not recall the original response.
+    STALE
+  };
+
+  ResultTracker() {}
+  ~ResultTracker() {}
+
+  // Tracks the RPC and returns its current state.
+  //
+  // If the RpcState == NEW the caller is supposed to actually start executing the RPC.
+  // The caller still owns the passed 'response' and 'context'.
+  //
+  // If the RpcState is anything else all remaining actions will be taken care of internally,
+  // i.e. the caller no longer needs to execute the RPC and this takes ownership of the passed
+  // 'response' and 'context'.
+  RpcState TrackRpc(const RequestIdPB& request_id,
+                    google::protobuf::Message* response,
+                    RpcContext* context);
+
+  // Used to track RPC attempts which originate from other replicas, and which may race with
+  // client originated ones.
+  // Tracks the RPC if it is untracked or changes the current driver of this RPC, i.e. sets the
+  // attempt number in 'request_id' as the driver of the RPC, if it is tracked and IN_PROGRESS.
+  RpcState TrackRpcOrChangeDriver(const RequestIdPB& request_id);
+
+  // Checks if the attempt at an RPC identified by 'request_id' is the current driver of the
+  // RPC. That is, if the attempt number in 'request_id' corresponds to the attempt marked
+  // as the driver of this RPC, either by initially getting NEW from TrackRpc() or by
+  // explicit driver change with ChangeDriver().
+  bool IsCurrentDriver(const RequestIdPB& request_id);
+
+  // Records the completion of sucessful operation.
+  // This will respond to all RPCs from the same client with the same sequence_number.
+  // The response will be stored so that any future retries of this RPC get the same response.
+  //
+  // Requires that TrackRpc() was called before with the same 'client_id' and
+  // 'sequence_number'.
+  // Requires that the attempt indentified by 'request_id' is the current driver
+  // of the RPC.
+  void RecordCompletionAndRespond(const RequestIdPB& request_id,
+                                  const google::protobuf::Message* response);
+
+  // Responds to all RPCs identified by 'client_id' and 'sequence_number' with the same response,
+  // but doesn't actually store the response.
+  // This should be called when the RPC failed validation or if some transient error occurred.
+  // Based on the response the client can then decide whether to retry the RPC (which will
+  // be treated as a new one) or to give up.
+  //
+  // Requires that TrackRpc() was called before with the same 'client_id' and
+  // 'sequence_number'.
+  // Requires that the attempt indentified by 'request_id' is the current driver
+  // of the RPC.
+  void FailAndRespond(const RequestIdPB& request_id,
+                      google::protobuf::Message* response);
+
+  // Overload to match other types of RpcContext::Respond*Failure()
+  void FailAndRespond(const RequestIdPB& request_id,
+                      ErrorStatusPB_RpcErrorCodePB err, const Status& status);
+
+  // Overload to match other types of RpcContext::Respond*Failure()
+  void FailAndRespond(const RequestIdPB& request_id,
+                      int error_ext_id, const std::string& message,
+                      const google::protobuf::Message& app_error_pb);
+
+ private:
+  // Information about client originated ongoing RPCs.
+  // The lifecycle of 'response' and 'context' is managed by the RPC layer.
+  struct OnGoingRpcInfo {
+    google::protobuf::Message* response;
+    RpcContext* context;
+    int64_t handler_attempt_no;
+  };
+  // A completion record for an IN_PROGRESS or COMPLETED RPC.
+  struct CompletionRecord {
+    // The current state of the RPC.
+    RpcState state;
+    // The attempt number that is/was "driving" this RPC.
+    int64_t driver_attempt_no;
+    // The cached response, if this RPC is in COMPLETED state.
+    std::unique_ptr<google::protobuf::Message> response;
+    // The set of ongoing RPCs that correspond to this record.
+    std::vector<OnGoingRpcInfo> ongoing_rpcs;
+  };
+  // The state corresponding to a single client.
+  struct ClientState {
+    MonoTime last_heard_from;
+    std::map<SequenceNumber, std::unique_ptr<CompletionRecord>> completion_records;
+  };
+
+  RpcState TrackRpcUnlocked(const RequestIdPB& request_id,
+                            google::protobuf::Message* response,
+                            RpcContext* context);
+
+  typedef std::function<void (const OnGoingRpcInfo&)> HandleOngoingRpcFunc;
+
+  // Helper method to handle the multiple overloads of FailAndRespond. Takes a lambda
+  // that knows what to do with OnGoingRpcInfo in each individual case.
+  void FailAndRespondInternal(const rpc::RequestIdPB& request_id,
+                              HandleOngoingRpcFunc func);
+
+  CompletionRecord* FindCompletionRecordOrNullUnlocked(const RequestIdPB& request_id);
+  CompletionRecord* FindCompletionRecordOrDieUnlocked(const RequestIdPB& request_id);
+  std::pair<ClientState*, CompletionRecord*> FindClientStateAndCompletionRecordOrNullUnlocked(
+      const RequestIdPB& request_id);
+
+  // A handler must handle an RPC attempt if:
+  // 1 - It's its own attempt. I.e. it has the same attempt number of the handler.
+  // 2 - It's the driver of the RPC and the attempt has no handler (was attached).
+  bool MustHandleRpc(int64_t handler_attempt_no,
+                     CompletionRecord* completion_record,
+                     const OnGoingRpcInfo& ongoing_rpc) {
+    if (PREDICT_TRUE(ongoing_rpc.handler_attempt_no == handler_attempt_no)) {
+      return true;
+    }
+    if (completion_record->driver_attempt_no == handler_attempt_no) {
+      return ongoing_rpc.handler_attempt_no == NO_HANDLER;
+    }
+    return false;
+  }
+
+  void LogAndTraceAndRespondSuccess(RpcContext* context, const google::protobuf::Message& msg);
+  void LogAndTraceFailure(RpcContext* context, const google::protobuf::Message& msg);
+  void LogAndTraceFailure(RpcContext* context, ErrorStatusPB_RpcErrorCodePB err,
+                          const Status& status);
+
+  // Lock that protects access to 'clients_' and to the state contained in each ClientState.
+  // TODO consider a per-ClientState lock if we find this too coarse grained.
+  simple_spinlock lock_;
+  std::map<std::string, std::unique_ptr<ClientState>> clients_;
+
+  DISALLOW_COPY_AND_ASSIGN(ResultTracker);
+};
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/90d610fe/src/kudu/rpc/rpc_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h
index 593a3b6..aae84a2 100644
--- a/src/kudu/rpc/rpc_context.h
+++ b/src/kudu/rpc/rpc_context.h
@@ -175,6 +175,7 @@ class RpcContext {
     __attribute__((noreturn));
 
  private:
+  friend class ResultTracker;
   InboundCall* const call_;
   const gscoped_ptr<const google::protobuf::Message> request_pb_;
   const gscoped_ptr<google::protobuf::Message> response_pb_;


[2/2] incubator-kudu git commit: Add a ToString() method to Proxy

Posted by jd...@apache.org.
Add a ToString() method to Proxy

ReplicatedRpc takes the server proxy type as a template argument and
uses its ToString() method to print out details in case of error. Usually
this is RemoteTablet, which has a ToString() method, but that might
not always be the case.

In fact, in a test in a follow up patch ReplicatedRpc takes Proxy as the
server proxy type and compilation would fail due to a missing ToString().
We could make ReplicatedRpc not use the ToString() method, but it seems
very helpful to have it so this patch adds it to Proxy instead.

Change-Id: Ia1e158db09e6e3c188b2725424681187a4b8c72e
Reviewed-on: http://gerrit.cloudera.org:8080/3502
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/3c01d4c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/3c01d4c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/3c01d4c0

Branch: refs/heads/master
Commit: 3c01d4c058ab07f2346d83a8750bcb0839a4bd97
Parents: 90d610f
Author: David Alves <da...@cloudera.com>
Authored: Sun Jun 26 18:06:03 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Jul 13 03:54:57 2016 +0000

----------------------------------------------------------------------
 src/kudu/rpc/proxy.cc    | 6 ++++++
 src/kudu/rpc/proxy.h     | 2 ++
 src/kudu/rpc/rpc-test.cc | 4 ++++
 3 files changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3c01d4c0/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 8337e31..c8e5017 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -27,6 +27,8 @@
 #include <sstream>
 #include <vector>
 
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/remote_method.h"
@@ -105,5 +107,9 @@ void Proxy::set_user_credentials(const UserCredentials& user_credentials) {
   conn_id_.set_user_credentials(user_credentials);
 }
 
+std::string Proxy::ToString() const {
+  return strings::Substitute("$0@$1", service_name_, conn_id_.ToString());
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3c01d4c0/src/kudu/rpc/proxy.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/proxy.h b/src/kudu/rpc/proxy.h
index 7e27d55..b1bc350 100644
--- a/src/kudu/rpc/proxy.h
+++ b/src/kudu/rpc/proxy.h
@@ -104,6 +104,8 @@ class Proxy {
   // Get the user credentials which should be used to log in.
   const UserCredentials& user_credentials() const { return conn_id_.user_credentials(); }
 
+  std::string ToString() const;
+
  private:
   const std::string service_name_;
   std::shared_ptr<Messenger> messenger_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/3c01d4c0/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index e4f7969..dd8c265 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -27,6 +27,7 @@
 
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/serialization.h"
 #include "kudu/util/countdown_latch.h"
@@ -108,6 +109,9 @@ TEST_F(TestRpc, TestCall) {
   LOG(INFO) << "Connecting to " << server_addr.ToString();
   shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
   Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+  ASSERT_STR_CONTAINS(p.ToString(), strings::Substitute("kudu.rpc.GenericCalculatorService@"
+                                                            "{remote=$0, user_credentials=",
+                                                        server_addr.ToString()));
 
   for (int i = 0; i < 10; i++) {
     ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));