You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2017/07/27 04:04:17 UTC

[1/2] kudu git commit: KUDU-1865: Avoid heap allocation for payload slices

Repository: kudu
Updated Branches:
  refs/heads/master 9285f2b44 -> 0ec793e32


KUDU-1865: Avoid heap allocation for payload slices

As shown in KUDU-1865, the heap allocation for the temporary
vector for the slices for holding the serialized payload is
introducing measurable overhead under heavy load. This change
replaces the heap allocation with a stack allocation of an
array of size TransferLimits::kMaxPayloadSlices. With this
change, we saw 10%~15% improvement under heavy workload.

Change-Id: I4470d34ba48db5edaeb66d9e739e0c8942004d86
Reviewed-on: http://gerrit.cloudera.org:8080/7471
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0c6aa525
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0c6aa525
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0c6aa525

Branch: refs/heads/master
Commit: 0c6aa525680d4f927c11b7dc85f9d79abf036b87
Parents: 9285f2b
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Jul 19 20:15:03 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Jul 27 04:03:12 2017 +0000

----------------------------------------------------------------------
 src/kudu/rpc/connection.cc    | 18 +++++++-----------
 src/kudu/rpc/connection.h     |  4 ----
 src/kudu/rpc/inbound_call.cc  | 20 ++++++++++++--------
 src/kudu/rpc/inbound_call.h   |  5 +++--
 src/kudu/rpc/outbound_call.cc | 17 ++++++++++++-----
 src/kudu/rpc/outbound_call.h  |  3 ++-
 src/kudu/rpc/transfer.cc      | 26 +++++++++++++-------------
 src/kudu/rpc/transfer.h       | 14 ++++++++++----
 8 files changed, 59 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index fc46d67..8dfdbdf 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -310,13 +310,8 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
   call->set_call_id(call_id);
 
   // Serialize the actual bytes to be put on the wire.
-  slices_tmp_.clear();
-  Status s = call->SerializeTo(&slices_tmp_);
-  if (PREDICT_FALSE(!s.ok())) {
-    call->SetFailed(s, negotiation_complete_ ? Phase::REMOTE_CALL
-                                             : Phase::CONNECTION_NEGOTIATION);
-    return;
-  }
+  TransferPayload tmp_slices;
+  size_t n_slices = call->SerializeTo(&tmp_slices);
 
   call->SetQueued();
 
@@ -371,7 +366,7 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
   TransferCallbacks *cb = new CallTransferCallbacks(call);
   awaiting_response_[call_id] = car.release();
   QueueOutbound(gscoped_ptr<OutboundTransfer>(
-      OutboundTransfer::CreateForCallRequest(call_id, slices_tmp_, cb)));
+      OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, n_slices, cb)));
 }
 
 // Callbacks for sending an RPC call response from the server.
@@ -442,14 +437,15 @@ void Connection::QueueResponseForCall(gscoped_ptr<InboundCall> call) {
   // eventually runs in the reactor thread will take care of calling
   // ResponseTransferCallbacks::NotifyTransferAborted.
 
-  std::vector<Slice> slices;
-  call->SerializeResponseTo(&slices);
+  TransferPayload tmp_slices;
+  size_t n_slices = call->SerializeResponseTo(&tmp_slices);
 
   TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this);
   // After the response is sent, can delete the InboundCall object.
   // We set a dummy call ID and required feature set, since these are not needed
   // when sending responses.
-  gscoped_ptr<OutboundTransfer> t(OutboundTransfer::CreateForCallResponse(slices, cb));
+  gscoped_ptr<OutboundTransfer> t(
+      OutboundTransfer::CreateForCallResponse(tmp_slices, n_slices, cb));
 
   QueueTransferTask *task = new QueueTransferTask(std::move(t), this);
   reactor_thread_->reactor()->ScheduleReactorTask(task);

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index 816c43c..c6cae38 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -320,10 +320,6 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // Starts as Status::OK, gets set to a shutdown status upon Shutdown().
   Status shutdown_status_;
 
-  // Temporary vector used when serializing - avoids an allocation
-  // when serializing calls.
-  std::vector<Slice> slices_tmp_;
-
   // RPC features supported by the remote end of the connection.
   std::set<RpcFeatureFlag> remote_features_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/inbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index aba9977..d1c27b7 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -175,16 +175,20 @@ void InboundCall::SerializeResponseBuffer(const MessageLite& response,
                                  &response_hdr_buf_);
 }
 
-void InboundCall::SerializeResponseTo(vector<Slice>* slices) const {
+size_t InboundCall::SerializeResponseTo(TransferPayload* slices) const {
   TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo");
-  CHECK_GT(response_hdr_buf_.size(), 0);
-  CHECK_GT(response_msg_buf_.size(), 0);
-  slices->reserve(slices->size() + 2 + outbound_sidecars_.size());
-  slices->push_back(Slice(response_hdr_buf_));
-  slices->push_back(Slice(response_msg_buf_));
-  for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
-    slices->push_back(car->AsSlice());
+  DCHECK_GT(response_hdr_buf_.size(), 0);
+  DCHECK_GT(response_msg_buf_.size(), 0);
+  size_t n_slices = 2 + outbound_sidecars_.size();
+  DCHECK_LE(n_slices, slices->size());
+  auto slice_iter = slices->begin();
+  *slice_iter++ = Slice(response_hdr_buf_);
+  *slice_iter++ = Slice(response_msg_buf_);
+  for (auto& sidecar : outbound_sidecars_) {
+    *slice_iter++ = sidecar->AsSlice();
   }
+  DCHECK_EQ(slice_iter - slices->begin(), n_slices);
+  return n_slices;
 }
 
 Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 6bed18f..84e6745 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -119,9 +119,10 @@ class InboundCall {
                                    const google::protobuf::MessageLite& app_error_pb,
                                    ErrorStatusPB* err);
 
-  // Serialize the response packet for the finished call.
+  // Serialize the response packet for the finished call into 'slices'.
   // The resulting slices refer to memory in this object.
-  void SerializeResponseTo(std::vector<Slice>* slices) const;
+  // Returns the number of slices in the serialized response.
+  size_t SerializeResponseTo(TransferPayload* slices) const;
 
   // See RpcContext::AddRpcSidecar()
   Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index af03f1c..a238568 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -92,7 +92,7 @@ OutboundCall::~OutboundCall() {
   DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " << StateName(state_);
 }
 
-Status OutboundCall::SerializeTo(vector<Slice>* slices) {
+size_t OutboundCall::SerializeTo(TransferPayload* slices) {
   DCHECK_LT(0, request_buf_.size())
       << "Must call SetRequestPayload() before SerializeTo()";
 
@@ -109,10 +109,16 @@ Status OutboundCall::SerializeTo(vector<Slice>* slices) {
   serialization::SerializeHeader(
       header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_);
 
-  slices->push_back(Slice(header_buf_));
-  slices->push_back(Slice(request_buf_));
-  for (const unique_ptr<RpcSidecar>& car : sidecars_) slices->push_back(car->AsSlice());
-  return Status::OK();
+  size_t n_slices = 2 + sidecars_.size();
+  DCHECK_LE(n_slices, slices->size());
+  auto slice_iter = slices->begin();
+  *slice_iter++ = Slice(header_buf_);
+  *slice_iter++ = Slice(request_buf_);
+  for (auto& sidecar : sidecars_) {
+    *slice_iter++ = sidecar->AsSlice();
+  }
+  DCHECK_EQ(slice_iter - slices->begin(), n_slices);
+  return n_slices;
 }
 
 void OutboundCall::SetRequestPayload(const Message& req,
@@ -120,6 +126,7 @@ void OutboundCall::SetRequestPayload(const Message& req,
   DCHECK_EQ(-1, sidecar_byte_size_);
 
   sidecars_ = move(sidecars);
+  DCHECK_LE(sidecars_.size(), TransferLimits::kMaxSidecars);
 
   // Compute total size of sidecar payload so that extra space can be reserved as part of
   // the request body.

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index ebed9b5..16ebc8a 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -154,7 +154,8 @@ class OutboundCall {
 
   // Serialize the call for the wire. Requires that SetRequestPayload()
   // is called first. This is called from the Reactor thread.
-  Status SerializeTo(std::vector<Slice>* slices);
+  // Returns the number of slices in the serialized call.
+  size_t SerializeTo(TransferPayload* slices);
 
   // Callback after the call has been put on the outbound connection queue.
   void SetQueued();

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/transfer.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index d24e94d..d660869 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -135,32 +135,32 @@ string InboundTransfer::StatusAsString() const {
   return Substitute("$0/$1 bytes received", cur_offset_, total_length_);
 }
 
-OutboundTransfer* OutboundTransfer::CreateForCallRequest(
-    int32_t call_id,
-    const std::vector<Slice> &payload,
-    TransferCallbacks *callbacks) {
-  return new OutboundTransfer(call_id, payload, callbacks);
+OutboundTransfer* OutboundTransfer::CreateForCallRequest(int32_t call_id,
+                                                         const TransferPayload &payload,
+                                                         size_t n_payload_slices,
+                                                         TransferCallbacks *callbacks) {
+  return new OutboundTransfer(call_id, payload, n_payload_slices, callbacks);
 }
 
-OutboundTransfer* OutboundTransfer::CreateForCallResponse(const std::vector<Slice> &payload,
+OutboundTransfer* OutboundTransfer::CreateForCallResponse(const TransferPayload &payload,
+                                                          size_t n_payload_slices,
                                                           TransferCallbacks *callbacks) {
-  return new OutboundTransfer(kInvalidCallId, payload, callbacks);
+  return new OutboundTransfer(kInvalidCallId, payload, n_payload_slices, callbacks);
 }
 
-
 OutboundTransfer::OutboundTransfer(int32_t call_id,
-                                   const std::vector<Slice> &payload,
+                                   const TransferPayload &payload,
+                                   size_t n_payload_slices,
                                    TransferCallbacks *callbacks)
   : cur_slice_idx_(0),
     cur_offset_in_slice_(0),
     callbacks_(callbacks),
     call_id_(call_id),
     aborted_(false) {
-  CHECK(!payload.empty());
 
-  n_payload_slices_ = payload.size();
-  CHECK_LE(n_payload_slices_, arraysize(payload_slices_));
-  for (int i = 0; i < payload.size(); i++) {
+  n_payload_slices_ = n_payload_slices;
+  CHECK_LE(n_payload_slices_, payload_slices_.size());
+  for (int i = 0; i < n_payload_slices; i++) {
     payload_slices_[i] = payload[i];
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/0c6aa525/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 671347a..2a2b726 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -18,6 +18,7 @@
 #ifndef KUDU_RPC_TRANSFER_H
 #define KUDU_RPC_TRANSFER_H
 
+#include <array>
 #include <boost/intrusive/list.hpp>
 #include <gflags/gflags.h>
 #include <set>
@@ -56,6 +57,8 @@ class TransferLimits {
   DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
 };
 
+typedef std::array<Slice, TransferLimits::kMaxPayloadSlices> TransferPayload;
+
 // This class is used internally by the RPC layer to represent an inbound
 // transfer in progress.
 //
@@ -119,12 +122,14 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> {
 
   // Create an outbound transfer for a call request.
   static OutboundTransfer* CreateForCallRequest(int32_t call_id,
-                                                const std::vector<Slice> &payload,
+                                                const TransferPayload &payload,
+                                                size_t n_payload_slices,
                                                 TransferCallbacks *callbacks);
 
   // Create an outbound transfer for a call response.
   // See above for details.
-  static OutboundTransfer* CreateForCallResponse(const std::vector<Slice> &payload,
+  static OutboundTransfer* CreateForCallResponse(const TransferPayload &payload,
+                                                 size_t n_payload_slices,
                                                  TransferCallbacks *callbacks);
 
   // Destruct the transfer. A transfer object should never be deallocated
@@ -162,12 +167,13 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> {
 
  private:
   OutboundTransfer(int32_t call_id,
-                   const std::vector<Slice> &payload,
+                   const TransferPayload& payload,
+                   size_t n_payload_slices,
                    TransferCallbacks *callbacks);
 
   // Slices to send. Uses an array here instead of a vector to avoid an expensive
   // vector construction (improved performance a couple percent).
-  Slice payload_slices_[TransferLimits::kMaxPayloadSlices];
+  TransferPayload payload_slices_;
   size_t n_payload_slices_;
 
   // The current slice that is being sent.


[2/2] kudu git commit: KUDU-2065: Support cancellation for outbound RPC call

Posted by to...@apache.org.
KUDU-2065: Support cancellation for outbound RPC call

This change implements a new interface RpcController::Cancel()
which takes a RpcController as argument and cancels any
pending OutboundCall associated with it.

RpcController::Cancel() queues a cancellation task scheduled
on the reactor thread for that outbound call. Once the task
is run, it will cancel the outbound call right away if
the RPC hasn't started sending yet or if it has already
sent the request and waiting for a response. If cancellation
happens when the RPC request is being sent, the RPC will
be cancelled only after the RPC has finished sending the
request. If the RPC is finished, the cancellation will
be a no-op.

Change-Id: Iaf53c5b113de10d573bd32fb9b2293572e806fbf
Reviewed-on: http://gerrit.cloudera.org:8080/7455
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0ec793e3
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0ec793e3
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0ec793e3

Branch: refs/heads/master
Commit: 0ec793e32dd70da501ff6e967ad775ca3244eeae
Parents: 0c6aa52
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Jul 13 11:49:02 2017 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Jul 27 04:03:55 2017 +0000

----------------------------------------------------------------------
 src/kudu/common/wire_protocol.proto  |   1 +
 src/kudu/rpc/connection.cc           |  56 +++++++++++---
 src/kudu/rpc/connection.h            |  12 ++-
 src/kudu/rpc/messenger.cc            |   5 ++
 src/kudu/rpc/messenger.h             |   3 +
 src/kudu/rpc/outbound_call.cc        |  66 ++++++++++++++++-
 src/kudu/rpc/outbound_call.h         |  36 ++++++++-
 src/kudu/rpc/proxy.cc                |   1 +
 src/kudu/rpc/reactor.cc              |  69 +++++++++++++++--
 src/kudu/rpc/reactor.h               |  17 +++++
 src/kudu/rpc/rpc-test-base.h         |  42 ++++++++++-
 src/kudu/rpc/rpc-test.cc             | 118 +++++++++++++++++++++++++++++-
 src/kudu/rpc/rpc_controller.cc       |  10 ++-
 src/kudu/rpc/rpc_controller.h        |  21 ++++++
 src/kudu/rpc/rpc_introspection.proto |   1 +
 src/kudu/rpc/rtest.proto             |  10 +++
 16 files changed, 444 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/common/wire_protocol.proto
----------------------------------------------------------------------
diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto
index 33645f0..f592734 100644
--- a/src/kudu/common/wire_protocol.proto
+++ b/src/kudu/common/wire_protocol.proto
@@ -57,6 +57,7 @@ message AppStatusPB {
     CONFIGURATION_ERROR = 16;
     INCOMPLETE = 17;
     END_OF_FILE = 18;
+    CANCELLED = 19;
   }
 
   required ErrorCode code = 1;

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index 8dfdbdf..bc7446e 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -246,6 +246,9 @@ void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
   car->call->SetTimedOut(negotiation_complete_ ? Phase::REMOTE_CALL
                                                : Phase::CONNECTION_NEGOTIATION);
 
+  // Test cancellation when 'car->call' is in 'TIMED_OUT' state
+  MaybeInjectCancellation(car->call);
+
   // Drop the reference to the call. If the original caller has moved on after
   // seeing the timeout, we no longer need to hold onto the allocated memory
   // from the request.
@@ -258,22 +261,41 @@ void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
   // already timed out.
 }
 
+void Connection::CancelOutboundCall(const shared_ptr<OutboundCall> &call) {
+  CallAwaitingResponse* car = FindPtrOrNull(awaiting_response_, call->call_id());
+  if (car != nullptr) {
+    // car->call may be NULL if the call has timed out already.
+    DCHECK(!car->call || car->call.get() == call.get());
+    car->call.reset();
+  }
+}
+
+// Inject a cancellation when 'call' is in state 'FLAGS_rpc_inject_cancellation_state'.
+void inline Connection::MaybeInjectCancellation(const shared_ptr<OutboundCall> &call) {
+  if (PREDICT_FALSE(call->ShouldInjectCancellation())) {
+    reactor_thread_->reactor()->messenger()->QueueCancellation(call);
+  }
+}
+
 // Callbacks after sending a call on the wire.
 // This notifies the OutboundCall object to change its state to SENT once it
 // has been fully transmitted.
 struct CallTransferCallbacks : public TransferCallbacks {
  public:
-  explicit CallTransferCallbacks(shared_ptr<OutboundCall> call)
-      : call_(std::move(call)) {}
+  explicit CallTransferCallbacks(shared_ptr<OutboundCall> call,
+                                 Connection *conn)
+      : call_(std::move(call)), conn_(conn) {}
 
   virtual void NotifyTransferFinished() OVERRIDE {
     // TODO: would be better to cancel the transfer while it is still on the queue if we
     // timed out before the transfer started, but there is still a race in the case of
     // a partial send that we have to handle here
     if (call_->IsFinished()) {
-      DCHECK(call_->IsTimedOut());
+      DCHECK(call_->IsTimedOut() || call_->IsCancelled());
     } else {
       call_->SetSent();
+      // Test cancellation when 'call_' is in 'SENT' state.
+      conn_->MaybeInjectCancellation(call_);
     }
     delete this;
   }
@@ -286,6 +308,7 @@ struct CallTransferCallbacks : public TransferCallbacks {
 
  private:
   shared_ptr<OutboundCall> call_;
+  Connection* conn_;
 };
 
 void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
@@ -305,6 +328,9 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
   // yet assigned a call ID.
   DCHECK(!call->call_id_assigned());
 
+  // We shouldn't reach this point if 'call' was requested to be cancelled.
+  DCHECK(!call->cancellation_requested());
+
   // Assign the call ID.
   int32_t call_id = GetNextCallId();
   call->set_call_id(call_id);
@@ -315,6 +341,9 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
 
   call->SetQueued();
 
+  // Test cancellation when 'call_' is in 'ON_OUTBOUND_QUEUE' state.
+  MaybeInjectCancellation(call);
+
   scoped_car car(car_pool_.make_scoped_ptr(car_pool_.Construct()));
   car->conn = this;
   car->call = call;
@@ -363,7 +392,7 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
     car->timeout_timer.start();
   }
 
-  TransferCallbacks *cb = new CallTransferCallbacks(call);
+  TransferCallbacks *cb = new CallTransferCallbacks(call, this);
   awaiting_response_[call_id] = car.release();
   QueueOutbound(gscoped_ptr<OutboundTransfer>(
       OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, n_slices, cb)));
@@ -551,13 +580,17 @@ void Connection::HandleCallResponse(gscoped_ptr<InboundTransfer> transfer) {
   // The car->timeout_timer ev::timer will be stopped automatically by its destructor.
   scoped_car car(car_pool_.make_scoped_ptr(car_ptr));
 
-  if (PREDICT_FALSE(car->call.get() == nullptr)) {
+  if (PREDICT_FALSE(!car->call)) {
     // The call already failed due to a timeout.
-    VLOG(1) << "Got response to call id " << resp->call_id() << " after client already timed out";
+    VLOG(1) << "Got response to call id " << resp->call_id() << " after client "
+            << "already timed out or cancelled";
     return;
   }
 
   car->call->SetResponse(std::move(resp));
+
+  // Test cancellation when 'car->call' is in 'FINISHED_SUCCESS' or 'FINISHED_ERROR' state.
+  MaybeInjectCancellation(car->call);
 }
 
 void Connection::WriteHandler(ev::io &watcher, int revents) {
@@ -586,10 +619,10 @@ void Connection::WriteHandler(ev::io &watcher, int revents) {
       if (transfer->is_for_outbound_call()) {
         CallAwaitingResponse* car = FindOrDie(awaiting_response_, transfer->call_id());
         if (!car->call) {
-          // If the call has already timed out, then the 'call' field will have been nulled.
-          // In that case, we don't need to bother sending it.
+          // If the call has already timed out or has already been cancelled, the 'call'
+          // field would be set to NULL. In that case, don't bother sending it.
           outbound_transfers_.pop_front();
-          transfer->Abort(Status::Aborted("already timed out"));
+          transfer->Abort(Status::Aborted("already timed out or cancelled"));
           delete transfer;
           continue;
         }
@@ -606,12 +639,17 @@ void Connection::WriteHandler(ev::io &watcher, int revents) {
           transfer->Abort(s);
           car->call->SetFailed(s, negotiation_complete_ ? Phase::REMOTE_CALL
                                                         : Phase::CONNECTION_NEGOTIATION);
+          // Test cancellation when 'call_' is in 'FINISHED_ERROR' state.
+          MaybeInjectCancellation(car->call);
           car->call.reset();
           delete transfer;
           continue;
         }
 
         car->call->SetSending();
+
+        // Test cancellation when 'call_' is in 'SENDING' state.
+        MaybeInjectCancellation(car->call);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index c6cae38..7f16b7b 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -113,7 +113,8 @@ class Connection : public RefCountedThreadSafe<Connection> {
                 std::unique_ptr<ErrorStatusPB> rpc_error = {});
 
   // Queue a new call to be made. If the queueing fails, the call will be
-  // marked failed.
+  // marked failed. The caller is expected to check if 'call' has been cancelled
+  // before making the call.
   // Takes ownership of the 'call' object regardless of whether it succeeds or fails.
   void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
 
@@ -122,6 +123,10 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // This may be called from a non-reactor thread.
   void QueueResponseForCall(gscoped_ptr<InboundCall> call);
 
+  // Cancel an outbound call by removing any reference to it by CallAwaitingResponse
+  // in 'awaiting_responses_'.
+  void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
   // The address of the remote end of the connection.
   const Sockaddr &remote() const { return remote_; }
 
@@ -216,6 +221,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
  private:
   friend struct CallAwaitingResponse;
   friend class QueueTransferTask;
+  friend struct CallTransferCallbacks;
   friend struct ResponseTransferCallbacks;
 
   // A call which has been fully sent to the server, which we're waiting for
@@ -269,6 +275,10 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // This must be called from the reactor thread.
   void QueueOutbound(gscoped_ptr<OutboundTransfer> transfer);
 
+  // Internal test function for injecting cancellation request when 'call'
+  // reaches state specified in 'FLAGS_rpc_inject_cancellation_state'.
+  void MaybeInjectCancellation(const std::shared_ptr<OutboundCall> &call);
+
   // The reactor thread that created this connection.
   ReactorThread* const reactor_thread_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index fcf46a9..76b04fe 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -467,6 +467,11 @@ void Messenger::QueueInboundCall(gscoped_ptr<InboundCall> call) {
   WARN_NOT_OK((*service)->QueueInboundCall(std::move(call)), "Unable to handle RPC call");
 }
 
+void Messenger::QueueCancellation(const shared_ptr<OutboundCall> &call) {
+  Reactor *reactor = RemoteToReactor(call->conn_id().remote());
+  reactor->QueueCancellation(call);
+}
+
 void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote) {
   Reactor *reactor = RemoteToReactor(remote);
   reactor->RegisterInboundSocket(new_socket, remote);

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index 73e02fa..8a02104 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -202,6 +202,9 @@ class Messenger {
   // Enqueue a call for processing on the server.
   void QueueInboundCall(gscoped_ptr<InboundCall> call);
 
+  // Queue a cancellation for the given outbound call.
+  void QueueCancellation(const std::shared_ptr<OutboundCall> &call);
+
   // Take ownership of the socket via Socket::Release
   void RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote);
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index a238568..bcc39c3 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -46,6 +46,12 @@ DEFINE_int64(rpc_callback_max_cycles, 100 * 1000 * 1000,
 TAG_FLAG(rpc_callback_max_cycles, advanced);
 TAG_FLAG(rpc_callback_max_cycles, runtime);
 
+// Flag used in debug build for injecting cancellation at different code paths.
+DEFINE_int32(rpc_inject_cancellation_state, -1,
+             "If this flag is not -1, it is the state in which a cancellation request "
+             "will be injected. Should use values in OutboundCall::State only");
+TAG_FLAG(rpc_inject_cancellation_state, unsafe);
+
 using std::unique_ptr;
 
 namespace kudu {
@@ -70,7 +76,8 @@ OutboundCall::OutboundCall(const ConnectionId& conn_id,
       conn_id_(conn_id),
       callback_(std::move(callback)),
       controller_(DCHECK_NOTNULL(controller)),
-      response_(DCHECK_NOTNULL(response_storage)) {
+      response_(DCHECK_NOTNULL(response_storage)),
+      cancellation_requested_(false) {
   DVLOG(4) << "OutboundCall " << this << " constructed with state_: " << StateName(state_)
            << " and RPC timeout: "
            << (controller->timeout().Initialized() ? controller->timeout().ToString() : "none");
@@ -164,6 +171,8 @@ string OutboundCall::StateName(State state) {
       return "NEGOTIATION_TIMED_OUT";
     case TIMED_OUT:
       return "TIMED_OUT";
+    case CANCELLED:
+      return "CANCELLED";
     case FINISHED_NEGOTIATION_ERROR:
       return "FINISHED_NEGOTIATION_ERROR";
     case FINISHED_ERROR:
@@ -208,6 +217,9 @@ void OutboundCall::set_state_unlocked(State new_state) {
     case TIMED_OUT:
       DCHECK(state_ == SENT || state_ == ON_OUTBOUND_QUEUE || state_ == SENDING);
       break;
+    case CANCELLED:
+      DCHECK(state_ == READY || state_ == ON_OUTBOUND_QUEUE || state_ == SENT);
+      break;
     case FINISHED_SUCCESS:
       DCHECK_EQ(state_, SENT);
       break;
@@ -219,7 +231,31 @@ void OutboundCall::set_state_unlocked(State new_state) {
   state_ = new_state;
 }
 
+void OutboundCall::Cancel() {
+  cancellation_requested_ = true;
+  // No lock needed as it's called from reactor thread
+  switch (state_) {
+    case READY:
+    case ON_OUTBOUND_QUEUE:
+    case SENT: {
+      SetCancelled();
+      break;
+    }
+    case SENDING:
+    case NEGOTIATION_TIMED_OUT:
+    case TIMED_OUT:
+    case CANCELLED:
+    case FINISHED_NEGOTIATION_ERROR:
+    case FINISHED_ERROR:
+    case FINISHED_SUCCESS:
+      break;
+  }
+}
+
 void OutboundCall::CallCallback() {
+  // Clear references to outbound sidecars before invoking callback.
+  sidecars_.clear();
+
   int64_t start_cycles = CycleClock::Now();
   {
     SCOPED_WATCH_STACK(100);
@@ -290,6 +326,11 @@ void OutboundCall::SetSent() {
   // request_buf_ is also done being used here, but since it was allocated by
   // the caller thread, we would rather let that thread free it whenever it
   // deletes the RpcController.
+
+  // If cancellation was requested, it's now a good time to do the actual cancellation.
+  if (cancellation_requested()) {
+    SetCancelled();
+  }
 }
 
 void OutboundCall::SetFailed(const Status &status,
@@ -332,6 +373,20 @@ void OutboundCall::SetTimedOut(Phase phase) {
   CallCallback();
 }
 
+void OutboundCall::SetCancelled() {
+  DCHECK(!IsFinished());
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    status_ = Status::Aborted(
+        Substitute("$0 RPC to $1 is cancelled in state $2",
+                   remote_method_.method_name(),
+                   conn_id_.remote().ToString(),
+                   StateName(state_)));
+    set_state_unlocked(CANCELLED);
+  }
+  CallCallback();
+}
+
 bool OutboundCall::IsTimedOut() const {
   std::lock_guard<simple_spinlock> l(lock_);
   switch (state_) {
@@ -343,6 +398,11 @@ bool OutboundCall::IsTimedOut() const {
   }
 }
 
+bool OutboundCall::IsCancelled() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return state_ == CANCELLED;
+}
+
 bool OutboundCall::IsNegotiationError() const {
   std::lock_guard<simple_spinlock> l(lock_);
   switch (state_) {
@@ -364,6 +424,7 @@ bool OutboundCall::IsFinished() const {
       return false;
     case NEGOTIATION_TIMED_OUT:
     case TIMED_OUT:
+    case CANCELLED:
     case FINISHED_NEGOTIATION_ERROR:
     case FINISHED_ERROR:
     case FINISHED_SUCCESS:
@@ -404,6 +465,9 @@ void OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
     case TIMED_OUT:
       resp->set_state(RpcCallInProgressPB::TIMED_OUT);
       break;
+    case CANCELLED:
+      resp->set_state(RpcCallInProgressPB::CANCELLED);
+      break;
     case FINISHED_NEGOTIATION_ERROR:
       resp->set_state(RpcCallInProgressPB::FINISHED_NEGOTIATION_ERROR);
       break;

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index 16ebc8a..221c368 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -38,6 +38,8 @@
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
+DECLARE_int32(rpc_inject_cancellation_state);
+
 namespace google {
 namespace protobuf {
 class Message;
@@ -157,6 +159,13 @@ class OutboundCall {
   // Returns the number of slices in the serialized call.
   size_t SerializeTo(TransferPayload* slices);
 
+  // Mark in the call that cancellation has been requested. If the call hasn't yet
+  // started sending or has finished sending the RPC request but is waiting for a
+  // response, cancel the RPC right away. Otherwise, wait until the RPC has finished
+  // sending before cancelling it. If the call is finished, it's a no-op.
+  // REQUIRES: must be called from the reactor thread.
+  void Cancel();
+
   // Callback after the call has been put on the outbound connection queue.
   void SetQueued();
 
@@ -182,6 +191,8 @@ class OutboundCall {
 
   bool IsNegotiationError() const;
 
+  bool IsCancelled() const;
+
   // Is the call finished?
   bool IsFinished() const;
 
@@ -216,8 +227,22 @@ class OutboundCall {
     return header_.call_id();
   }
 
+  // Returns true if cancellation has been requested. Must be called from
+  // reactor thread.
+  bool cancellation_requested() const {
+    return cancellation_requested_;
+  }
+
+  // Test function which returns true if a cancellation request should be injected
+  // at the current state.
+  bool ShouldInjectCancellation() const {
+    return FLAGS_rpc_inject_cancellation_state != -1 &&
+        FLAGS_rpc_inject_cancellation_state == state();
+  }
+
  private:
   friend class RpcController;
+  FRIEND_TEST(TestRpc, TestCancellation);
 
   // Various states the call propagates through.
   // NB: if adding another state, be sure to update OutboundCall::IsFinished()
@@ -229,6 +254,7 @@ class OutboundCall {
     SENT,
     NEGOTIATION_TIMED_OUT,
     TIMED_OUT,
+    CANCELLED,
     FINISHED_NEGOTIATION_ERROR,
     FINISHED_ERROR,
     FINISHED_SUCCESS
@@ -236,6 +262,9 @@ class OutboundCall {
 
   static std::string StateName(State state);
 
+  // Mark the call as cancelled. This also invokes the callback to notify the caller.
+  void SetCancelled();
+
   void set_state(State new_state);
   State state() const;
 
@@ -261,7 +290,9 @@ class OutboundCall {
   Status status_;
   gscoped_ptr<ErrorStatusPB> error_pb_;
 
-  // Call the user-provided callback.
+  // Call the user-provided callback. Note that entries in 'sidecars_' are cleared
+  // prior to invoking the callback so the client can assume that the call doesn't
+  // hold references to outbound sidecars.
   void CallCallback();
 
   // The RPC header.
@@ -296,6 +327,9 @@ class OutboundCall {
   // Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload().
   int64_t sidecar_byte_size_ = -1;
 
+  // True if cancellation was requested on this call.
+  bool cancellation_requested_;
+
   DISALLOW_COPY_AND_ASSIGN(OutboundCall);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 45ad5dd..3ec907d 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -82,6 +82,7 @@ void Proxy::AsyncRequest(const string& method,
   controller->call_.reset(
       new OutboundCall(conn_id_, remote_method, response, controller, callback));
   controller->SetRequestParam(req);
+  controller->SetMessenger(messenger_.get());
 
   // If this fails to queue, the callback will get called immediately
   // and the controller will be in an ERROR state.

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 33746c5..cf63672 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -250,8 +250,13 @@ void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) {
 
 void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall>& call) {
   DCHECK(IsCurrentThread());
-  scoped_refptr<Connection> conn;
 
+  // Skip if the outbound has been cancelled already.
+  if (PREDICT_FALSE(call->IsCancelled())) {
+    return;
+  }
+
+  scoped_refptr<Connection> conn;
   Status s = FindOrStartConnection(call->conn_id(),
                                    call->controller()->credentials_policy(),
                                    &conn);
@@ -263,6 +268,24 @@ void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall>& call) {
   conn->QueueOutboundCall(call);
 }
 
+void ReactorThread::CancelOutboundCall(const shared_ptr<OutboundCall>& call) {
+  DCHECK(IsCurrentThread());
+
+  // If the callback has been invoked already, the cancellation is a no-op.
+  // The controller may be gone already if the callback has been invoked.
+  if (call->IsFinished()) {
+    return;
+  }
+
+  scoped_refptr<Connection> conn;
+  if (FindConnection(call->conn_id(),
+                     call->controller()->credentials_policy(),
+                     &conn)) {
+    conn->CancelOutboundCall(call);
+  }
+  call->Cancel();
+}
+
 //
 // Handles timer events.  The periodic timer:
 //
@@ -361,9 +384,9 @@ void ReactorThread::RunThread() {
   reactor_->messenger_.reset();
 }
 
-Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
-                                            CredentialsPolicy cred_policy,
-                                            scoped_refptr<Connection>* conn) {
+bool ReactorThread::FindConnection(const ConnectionId& conn_id,
+                                   CredentialsPolicy cred_policy,
+                                   scoped_refptr<Connection>* conn) {
   DCHECK(IsCurrentThread());
   const auto range = client_conns_.equal_range(conn_id);
   scoped_refptr<Connection> found_conn;
@@ -404,6 +427,16 @@ Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
   if (found_conn) {
     // Found matching not-to-be-shutdown connection: return it as the result.
     conn->swap(found_conn);
+    return true;
+  }
+  return false;
+}
+
+Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
+                                            CredentialsPolicy cred_policy,
+                                            scoped_refptr<Connection>* conn) {
+  DCHECK(IsCurrentThread());
+  if (FindConnection(conn_id, cred_policy, conn)) {
     return Status::OK();
   }
 
@@ -716,7 +749,7 @@ class AssignOutboundCallTask : public ReactorTask {
 
   void Abort(const Status& status) override {
     // It doesn't matter what is the actual phase of the OutboundCall: just set
-    // it to Phase::REMOTE_CALL to finilize the state of the call.
+    // it to Phase::REMOTE_CALL to finalize the state of the call.
     call_->SetFailed(status, OutboundCall::Phase::REMOTE_CALL);
     delete this;
   }
@@ -728,9 +761,35 @@ class AssignOutboundCallTask : public ReactorTask {
 void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall>& call) {
   DVLOG(3) << name_ << ": queueing outbound call "
            << call->ToString() << " to remote " << call->conn_id().remote().ToString();
+  // Test cancellation when 'call_' is in 'READY' state.
+  if (PREDICT_FALSE(call->ShouldInjectCancellation())) {
+    QueueCancellation(call);
+  }
   ScheduleReactorTask(new AssignOutboundCallTask(call));
 }
 
+class CancellationTask : public ReactorTask {
+ public:
+  explicit CancellationTask(shared_ptr<OutboundCall> call)
+      : call_(std::move(call)) {}
+
+  void Run(ReactorThread* reactor) override {
+    reactor->CancelOutboundCall(call_);
+    delete this;
+  }
+
+  void Abort(const Status& /*status*/) override {
+    delete this;
+  }
+
+ private:
+  shared_ptr<OutboundCall> call_;
+};
+
+void Reactor::QueueCancellation(const shared_ptr<OutboundCall>& call) {
+  ScheduleReactorTask(new CancellationTask(call));
+}
+
 void Reactor::ScheduleReactorTask(ReactorTask *task) {
   {
     std::unique_lock<LockType> l(lock_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.h b/src/kudu/rpc/reactor.h
index e9d131b..9ee0aa8 100644
--- a/src/kudu/rpc/reactor.h
+++ b/src/kudu/rpc/reactor.h
@@ -193,12 +193,19 @@ class ReactorThread {
 
  private:
   friend class AssignOutboundCallTask;
+  friend class CancellationTask;
   friend class RegisterConnectionTask;
   friend class DelayedTask;
 
   // Run the main event loop of the reactor.
   void RunThread();
 
+  // Find a connection to the given remote and returns it in 'conn'.
+  // Returns true if a connection is found. Returns false otherwise.
+  bool FindConnection(const ConnectionId& conn_id,
+                      CredentialsPolicy cred_policy,
+                      scoped_refptr<Connection>* conn);
+
   // Find or create a new connection to the given remote.
   // If such a connection already exists, returns that, otherwise creates a new one.
   // May return a bad Status if the connect() call fails.
@@ -230,6 +237,13 @@ class ReactorThread {
   // If this fails, the call is marked failed and completed.
   void AssignOutboundCall(const std::shared_ptr<OutboundCall> &call);
 
+  // Cancel the outbound call. May update corresponding connection
+  // object to remove call from the CallAwaitingResponse object.
+  // Also mark the call as slated for cancellation so the callback
+  // may be invoked early if the RPC hasn't yet been sent or if it's
+  // waiting for a response from the remote.
+  void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
   // Register a new connection.
   void RegisterConnection(scoped_refptr<Connection> conn);
 
@@ -313,6 +327,9 @@ class Reactor {
   // the call as failed.
   void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
 
+  // Queue a new reactor task to cancel an outbound call.
+  void QueueCancellation(const std::shared_ptr<OutboundCall> &call);
+
   // Schedule the given task's Run() method to be called on the
   // reactor thread.
   // If the reactor shuts down before it is run, the Abort method will be

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index c28218b..c8c84ca 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -71,6 +71,8 @@ using kudu::rpc_test::SendTwoStringsRequestPB;
 using kudu::rpc_test::SendTwoStringsResponsePB;
 using kudu::rpc_test::SleepRequestPB;
 using kudu::rpc_test::SleepResponsePB;
+using kudu::rpc_test::SleepWithSidecarRequestPB;
+using kudu::rpc_test::SleepWithSidecarResponsePB;
 using kudu::rpc_test::TestInvalidResponseRequestPB;
 using kudu::rpc_test::TestInvalidResponseResponsePB;
 using kudu::rpc_test::WhoAmIRequestPB;
@@ -85,6 +87,7 @@ class GenericCalculatorService : public ServiceIf {
   static const char *kFullServiceName;
   static const char *kAddMethodName;
   static const char *kSleepMethodName;
+  static const char *kSleepWithSidecarMethodName;
   static const char *kPushTwoStringsMethodName;
   static const char *kSendTwoStringsMethodName;
   static const char *kAddExactlyOnce;
@@ -106,6 +109,8 @@ class GenericCalculatorService : public ServiceIf {
       DoAdd(incoming);
     } else if (incoming->remote_method().method_name() == kSleepMethodName) {
       DoSleep(incoming);
+    } else if (incoming->remote_method().method_name() == kSleepWithSidecarMethodName) {
+      DoSleepWithSidecar(incoming);
     } else if (incoming->remote_method().method_name() == kSendTwoStringsMethodName) {
       DoSendTwoStrings(incoming);
     } else if (incoming->remote_method().method_name() == kPushTwoStringsMethodName) {
@@ -206,6 +211,31 @@ class GenericCalculatorService : public ServiceIf {
     SleepResponsePB resp;
     incoming->RespondSuccess(resp);
   }
+
+  void DoSleepWithSidecar(InboundCall *incoming) {
+    Slice param(incoming->serialized_request());
+    SleepWithSidecarRequestPB req;
+    if (!req.ParseFromArray(param.data(), param.size())) {
+      incoming->RespondFailure(ErrorStatusPB::ERROR_INVALID_REQUEST,
+        Status::InvalidArgument("Couldn't parse pb",
+                                req.InitializationErrorString()));
+      return;
+    }
+
+    LOG(INFO) << "got call: " << SecureShortDebugString(req);
+    SleepFor(MonoDelta::FromMicroseconds(req.sleep_micros()));
+
+    uint32 pattern = req.pattern();
+    uint32 num_repetitions = req.num_repetitions();
+    Slice sidecar;
+    CHECK_OK(incoming->GetInboundSidecar(req.sidecar_idx(), &sidecar));
+    CHECK_EQ(sidecar.size(), sizeof(uint32) * num_repetitions);
+    const uint32_t *data = reinterpret_cast<const uint32_t*>(sidecar.data());
+    for (int i = 0; i < num_repetitions; ++i) CHECK_EQ(data[i], pattern);
+
+    SleepResponsePB resp;
+    incoming->RespondSuccess(resp);
+  }
 };
 
 class CalculatorService : public CalculatorServiceIf {
@@ -361,6 +391,7 @@ class CalculatorService : public CalculatorServiceIf {
 const char *GenericCalculatorService::kFullServiceName = "kudu.rpc.GenericCalculatorService";
 const char *GenericCalculatorService::kAddMethodName = "Add";
 const char *GenericCalculatorService::kSleepMethodName = "Sleep";
+const char *GenericCalculatorService::kSleepWithSidecarMethodName = "SleepWithSidecar";
 const char *GenericCalculatorService::kPushTwoStringsMethodName = "PushTwoStrings";
 const char *GenericCalculatorService::kSendTwoStringsMethodName = "SendTwoStrings";
 const char *GenericCalculatorService::kAddExactlyOnce = "AddExactlyOnce";
@@ -462,7 +493,7 @@ class RpcTestBase : public KuduTest {
     CHECK_EQ(0, second.compare(Slice(expected)));
   }
 
-  void DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) {
+  Status DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) {
     PushTwoStringsRequestPB request;
     RpcController controller;
 
@@ -478,12 +509,17 @@ class RpcTestBase : public KuduTest {
     request.set_sidecar2_idx(idx2);
 
     PushTwoStringsResponsePB resp;
-    CHECK_OK(p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
-            request, &resp, &controller));
+    KUDU_RETURN_NOT_OK(p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+                                     request, &resp, &controller));
     CHECK_EQ(size1, resp.size1());
     CHECK_EQ(resp.data1(), s1);
     CHECK_EQ(size2, resp.size2());
     CHECK_EQ(resp.data2(), s2);
+    return Status::OK();
+  }
+
+  void DoTestOutgoingSidecarExpectOK(const Proxy &p, int size1, int size2) {
+    CHECK_OK(DoTestOutgoingSidecar(p, size1, size2));
   }
 
   void DoTestExpectTimeout(const Proxy& p,

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 2378892..38ad357 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -29,10 +29,12 @@
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/constants.h"
+#include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/serialization.h"
 #include "kudu/security/test/test_certs.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/env.h"
+#include "kudu/util/random.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/test_util.h"
 
@@ -492,9 +494,9 @@ TEST_P(TestRpc, TestRpcSidecar) {
   // we can't write the whole response to the socket in a single call.
   DoTestSidecar(p, 3000 * 1024, 2000 * 1024);
 
-  DoTestOutgoingSidecar(p, 0, 0);
-  DoTestOutgoingSidecar(p, 123, 456);
-  DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024);
+  DoTestOutgoingSidecarExpectOK(p, 0, 0);
+  DoTestOutgoingSidecarExpectOK(p, 123, 456);
+  DoTestOutgoingSidecarExpectOK(p, 3000 * 1024, 2000 * 1024);
 }
 
 TEST_P(TestRpc, TestRpcSidecarLimits) {
@@ -886,5 +888,115 @@ TEST_P(TestRpc, TestApplicationFeatureFlagUnsupportedServer) {
   }
 }
 
+TEST_P(TestRpc, TestCancellation) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+
+  for (int i = OutboundCall::READY; i <= OutboundCall::FINISHED_SUCCESS; ++i) {
+    FLAGS_rpc_inject_cancellation_state = i;
+    switch (i) {
+      case OutboundCall::READY:
+      case OutboundCall::ON_OUTBOUND_QUEUE:
+      case OutboundCall::SENDING:
+      case OutboundCall::SENT:
+        ASSERT_TRUE(DoTestOutgoingSidecar(p, 0, 0).IsAborted());
+        ASSERT_TRUE(DoTestOutgoingSidecar(p, 123, 456).IsAborted());
+        ASSERT_TRUE(DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024).IsAborted());
+        break;
+      case OutboundCall::NEGOTIATION_TIMED_OUT:
+      case OutboundCall::TIMED_OUT:
+        DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(1000));
+        break;
+      case OutboundCall::CANCELLED:
+        break;
+      case OutboundCall::FINISHED_NEGOTIATION_ERROR:
+      case OutboundCall::FINISHED_ERROR: {
+        AddRequestPB req;
+        req.set_x(1);
+        req.set_y(2);
+        AddResponsePB resp;
+        RpcController controller;
+        controller.RequireServerFeature(FeatureFlags::FOO);
+        controller.RequireServerFeature(99);
+        Status s = p.SyncRequest("Add", req, &resp, &controller);
+        ASSERT_TRUE(s.IsRemoteError());
+        break;
+      }
+      case OutboundCall::FINISHED_SUCCESS:
+        DoTestOutgoingSidecarExpectOK(p, 0, 0);
+        DoTestOutgoingSidecarExpectOK(p, 123, 456);
+        DoTestOutgoingSidecarExpectOK(p, 3000 * 1024, 2000 * 1024);
+        break;
+    }
+  }
+  client_messenger->Shutdown();
+}
+
+#define TEST_PAYLOAD_SIZE  (1 << 23)
+#define TEST_SLEEP_TIME_MS (500)
+
+static void SleepCallback(uint8_t* payload, CountDownLatch* latch) {
+  // Overwrites the payload which the sidecar is pointing to. The server
+  // checks if the payload matches the expected pattern to detect cases
+  // in which the payload is overwritten while it's being sent.
+  memset(payload, 0, TEST_PAYLOAD_SIZE);
+  latch->CountDown();
+}
+
+TEST_P(TestRpc, TestCancellationAsync) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+
+  RpcController controller;
+
+  // The payload to be used during the RPC.
+  gscoped_array<uint8_t> payload(new uint8_t[TEST_PAYLOAD_SIZE]);
+
+  // Used to generate sleep time between invoking RPC and requesting cancellation.
+  Random rand(SeedRandom());
+
+  for (int i = 0; i < 10; ++i) {
+    SleepWithSidecarRequestPB req;
+    SleepWithSidecarResponsePB resp;
+
+    // Initialize the payload with non-zero pattern.
+    memset(payload.get(), 0xff, TEST_PAYLOAD_SIZE);
+    req.set_sleep_micros(TEST_SLEEP_TIME_MS);
+    req.set_pattern(0xffffffff);
+    req.set_num_repetitions(TEST_PAYLOAD_SIZE / sizeof(uint32_t));
+
+    int idx;
+    Slice s(payload.get(), TEST_PAYLOAD_SIZE);
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(s), &idx));
+    req.set_sidecar_idx(idx);
+
+    CountDownLatch latch(1);
+    p.AsyncRequest(GenericCalculatorService::kSleepWithSidecarMethodName,
+                   req, &resp, &controller,
+                   boost::bind(SleepCallback, payload.get(), &latch));
+    // Sleep for a while before cancelling the RPC.
+    if (i > 0) SleepFor(MonoDelta::FromMicroseconds(rand.Uniform64(i * 30)));
+    controller.Cancel();
+    latch.Wait();
+    ASSERT_TRUE(controller.status().IsAborted() || controller.status().ok());
+    controller.Reset();
+  }
+  client_messenger->Shutdown();
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index 505db22..9139cd4 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -23,6 +23,7 @@
 
 #include <glog/logging.h>
 
+#include "kudu/rpc/messenger.h"
 #include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/rpc_header.pb.h"
 
@@ -32,7 +33,7 @@ namespace kudu {
 namespace rpc {
 
 RpcController::RpcController()
-    : credentials_policy_(CredentialsPolicy::ANY_CREDENTIALS) {
+    : credentials_policy_(CredentialsPolicy::ANY_CREDENTIALS), messenger_(nullptr) {
   DVLOG(4) << "RpcController " << this << " constructed";
 }
 
@@ -63,6 +64,7 @@ void RpcController::Reset() {
   call_.reset();
   required_server_features_.clear();
   credentials_policy_ = CredentialsPolicy::ANY_CREDENTIALS;
+  messenger_ = nullptr;
 }
 
 bool RpcController::finished() const {
@@ -145,5 +147,11 @@ void RpcController::SetRequestParam(const google::protobuf::Message& req) {
   call_->SetRequestPayload(req, std::move(outbound_sidecars_));
 }
 
+void RpcController::Cancel() {
+  DCHECK(call_);
+  DCHECK(messenger_);
+  messenger_->QueueCancellation(call_);
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.h b/src/kudu/rpc/rpc_controller.h
index ab611a8..bd3fbe3 100644
--- a/src/kudu/rpc/rpc_controller.h
+++ b/src/kudu/rpc/rpc_controller.h
@@ -40,6 +40,7 @@ namespace kudu {
 namespace rpc {
 
 class ErrorStatusPB;
+class Messenger;
 class OutboundCall;
 class RequestIdPB;
 class RpcSidecar;
@@ -223,6 +224,19 @@ class RpcController {
   // to this request.
   Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
 
+  // Cancel the call associated with the RpcController. This function should only be
+  // called when there is an outstanding outbound call. It's always safe to call
+  // Cancel() after you've sent a call, so long as you haven't called Reset() yet.
+  // Caller is not responsible for synchronization between cancellation and the
+  // callback. (i.e. the callback may or may not be invoked yet when Cancel()
+  // is called).
+  //
+  // Cancellation is "best effort" - i.e. it's still possible the callback passed
+  // to the call will be fired with a success status. If cancellation succeeds,
+  // the callback will be invoked with a Aborted status. Cancellation is asynchronous
+  // so the callback will still be invoked from the reactor thread.
+  void Cancel();
+
  private:
   friend class OutboundCall;
   friend class Proxy;
@@ -231,6 +245,9 @@ class RpcController {
   // outbound_sidecars_ to call_ in preparation for serialization.
   void SetRequestParam(const google::protobuf::Message& req);
 
+  // Set the messenger which contains the reactor thread handling the outbound call.
+  void SetMessenger(Messenger* messenger) { messenger_ = messenger; }
+
   MonoDelta timeout_;
   std::unordered_set<uint32_t> required_server_features_;
 
@@ -243,6 +260,10 @@ class RpcController {
   // Ownership is transferred to OutboundCall once the call is sent.
   std::unique_ptr<RequestIdPB> request_id_;
 
+  // The messenger which contains the reactor thread for 'call_'.
+  // Set only when 'call_' is set.
+  Messenger* messenger_;
+
   // Once the call is sent, it is tracked here.
   std::shared_ptr<OutboundCall> call_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/rpc_introspection.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_introspection.proto b/src/kudu/rpc/rpc_introspection.proto
index 9d2f9b5..5c4f4f1 100644
--- a/src/kudu/rpc/rpc_introspection.proto
+++ b/src/kudu/rpc/rpc_introspection.proto
@@ -42,6 +42,7 @@ message RpcCallInProgressPB {
     FINISHED_SUCCESS = 6;
     NEGOTIATION_TIMED_OUT = 7;
     FINISHED_NEGOTIATION_ERROR = 8;
+    CANCELLED = 9;
 
     // TODO(todd): add states for InboundCall
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ec793e3/src/kudu/rpc/rtest.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rtest.proto b/src/kudu/rpc/rtest.proto
index 1ef5ca6..d212cef 100644
--- a/src/kudu/rpc/rtest.proto
+++ b/src/kudu/rpc/rtest.proto
@@ -55,6 +55,16 @@ message SleepRequestPB {
 message SleepResponsePB {
 }
 
+message SleepWithSidecarRequestPB {
+  required uint32 sleep_micros = 1;
+  required uint32 pattern = 2;
+  required uint32 num_repetitions = 3;
+  required uint32 sidecar_idx = 4;
+}
+
+message SleepWithSidecarResponsePB {
+}
+
 message SendTwoStringsRequestPB {
   required uint32 random_seed = 1;
   required uint64 size1 = 2;