You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/30 16:58:01 UTC

incubator-impala git commit: KUDU-2065: Support cancellation for outbound RPC call

Repository: incubator-impala
Updated Branches:
  refs/heads/master cc4816b3d -> c1c481504


KUDU-2065: Support cancellation for outbound RPC call

This change implements a new interface RpcController::Cancel()
which takes a RpcController as argument and cancels any
pending OutboundCall associated with it.

RpcController::Cancel() queues a cancellation task scheduled
on the reactor thread for that outbound call. Once the task
is run, it will cancel the outbound call right away if
the RPC hasn't started sending yet or if it has already
sent the request and waiting for a response. If cancellation
happens when the RPC request is being sent, the RPC will
be cancelled only after the RPC has finished sending the
request. If the RPC is finished, the cancellation will
be a no-op.

Change-Id: Iaf53c5b113de10d573bd32fb9b2293572e806fbf
Reviewed-on: http://gerrit.cloudera.org:8080/7455
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/7743
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c1c48150
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c1c48150
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c1c48150

Branch: refs/heads/master
Commit: c1c4815049ca3deab7465070b1207446cb3a1645
Parents: cc4816b
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Jul 13 11:49:02 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 30 07:33:58 2017 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/connection.cc           |  56 +++++++++++--
 be/src/kudu/rpc/connection.h            |  12 ++-
 be/src/kudu/rpc/messenger.cc            |   5 ++
 be/src/kudu/rpc/messenger.h             |   3 +
 be/src/kudu/rpc/outbound_call.cc        |  66 ++++++++++++++-
 be/src/kudu/rpc/outbound_call.h         |  36 +++++++-
 be/src/kudu/rpc/proxy.cc                |   1 +
 be/src/kudu/rpc/reactor.cc              |  69 ++++++++++++++--
 be/src/kudu/rpc/reactor.h               |  17 ++++
 be/src/kudu/rpc/rpc-test-base.h         |  42 +++++++++-
 be/src/kudu/rpc/rpc-test.cc             | 118 ++++++++++++++++++++++++++-
 be/src/kudu/rpc/rpc_controller.cc       |  10 ++-
 be/src/kudu/rpc/rpc_controller.h        |  21 +++++
 be/src/kudu/rpc/rpc_introspection.proto |   1 +
 be/src/kudu/rpc/rtest.proto             |  10 +++
 15 files changed, 443 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection.cc b/be/src/kudu/rpc/connection.cc
index fc46d67..7a519e3 100644
--- a/be/src/kudu/rpc/connection.cc
+++ b/be/src/kudu/rpc/connection.cc
@@ -246,6 +246,9 @@ void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
   car->call->SetTimedOut(negotiation_complete_ ? Phase::REMOTE_CALL
                                                : Phase::CONNECTION_NEGOTIATION);
 
+  // Test cancellation when 'car->call' is in 'TIMED_OUT' state
+  MaybeInjectCancellation(car->call);
+
   // Drop the reference to the call. If the original caller has moved on after
   // seeing the timeout, we no longer need to hold onto the allocated memory
   // from the request.
@@ -258,22 +261,41 @@ void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
   // already timed out.
 }
 
+void Connection::CancelOutboundCall(const shared_ptr<OutboundCall> &call) {
+  CallAwaitingResponse* car = FindPtrOrNull(awaiting_response_, call->call_id());
+  if (car != nullptr) {
+    // car->call may be NULL if the call has timed out already.
+    DCHECK(!car->call || car->call.get() == call.get());
+    car->call.reset();
+  }
+}
+
+// Inject a cancellation when 'call' is in state 'FLAGS_rpc_inject_cancellation_state'.
+void inline Connection::MaybeInjectCancellation(const shared_ptr<OutboundCall> &call) {
+  if (PREDICT_FALSE(call->ShouldInjectCancellation())) {
+    reactor_thread_->reactor()->messenger()->QueueCancellation(call);
+  }
+}
+
 // Callbacks after sending a call on the wire.
 // This notifies the OutboundCall object to change its state to SENT once it
 // has been fully transmitted.
 struct CallTransferCallbacks : public TransferCallbacks {
  public:
-  explicit CallTransferCallbacks(shared_ptr<OutboundCall> call)
-      : call_(std::move(call)) {}
+  explicit CallTransferCallbacks(shared_ptr<OutboundCall> call,
+                                 Connection *conn)
+      : call_(std::move(call)), conn_(conn) {}
 
   virtual void NotifyTransferFinished() OVERRIDE {
     // TODO: would be better to cancel the transfer while it is still on the queue if we
     // timed out before the transfer started, but there is still a race in the case of
     // a partial send that we have to handle here
     if (call_->IsFinished()) {
-      DCHECK(call_->IsTimedOut());
+      DCHECK(call_->IsTimedOut() || call_->IsCancelled());
     } else {
       call_->SetSent();
+      // Test cancellation when 'call_' is in 'SENT' state.
+      conn_->MaybeInjectCancellation(call_);
     }
     delete this;
   }
@@ -286,6 +308,7 @@ struct CallTransferCallbacks : public TransferCallbacks {
 
  private:
   shared_ptr<OutboundCall> call_;
+  Connection* conn_;
 };
 
 void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
@@ -305,6 +328,9 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
   // yet assigned a call ID.
   DCHECK(!call->call_id_assigned());
 
+  // We shouldn't reach this point if 'call' was requested to be cancelled.
+  DCHECK(!call->cancellation_requested());
+
   // Assign the call ID.
   int32_t call_id = GetNextCallId();
   call->set_call_id(call_id);
@@ -320,6 +346,9 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
 
   call->SetQueued();
 
+  // Test cancellation when 'call_' is in 'ON_OUTBOUND_QUEUE' state.
+  MaybeInjectCancellation(call);
+
   scoped_car car(car_pool_.make_scoped_ptr(car_pool_.Construct()));
   car->conn = this;
   car->call = call;
@@ -368,7 +397,7 @@ void Connection::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
     car->timeout_timer.start();
   }
 
-  TransferCallbacks *cb = new CallTransferCallbacks(call);
+  TransferCallbacks *cb = new CallTransferCallbacks(call, this);
   awaiting_response_[call_id] = car.release();
   QueueOutbound(gscoped_ptr<OutboundTransfer>(
       OutboundTransfer::CreateForCallRequest(call_id, slices_tmp_, cb)));
@@ -555,13 +584,17 @@ void Connection::HandleCallResponse(gscoped_ptr<InboundTransfer> transfer) {
   // The car->timeout_timer ev::timer will be stopped automatically by its destructor.
   scoped_car car(car_pool_.make_scoped_ptr(car_ptr));
 
-  if (PREDICT_FALSE(car->call.get() == nullptr)) {
+  if (PREDICT_FALSE(!car->call)) {
     // The call already failed due to a timeout.
-    VLOG(1) << "Got response to call id " << resp->call_id() << " after client already timed out";
+    VLOG(1) << "Got response to call id " << resp->call_id() << " after client "
+            << "already timed out or cancelled";
     return;
   }
 
   car->call->SetResponse(std::move(resp));
+
+  // Test cancellation when 'car->call' is in 'FINISHED_SUCCESS' or 'FINISHED_ERROR' state.
+  MaybeInjectCancellation(car->call);
 }
 
 void Connection::WriteHandler(ev::io &watcher, int revents) {
@@ -590,10 +623,10 @@ void Connection::WriteHandler(ev::io &watcher, int revents) {
       if (transfer->is_for_outbound_call()) {
         CallAwaitingResponse* car = FindOrDie(awaiting_response_, transfer->call_id());
         if (!car->call) {
-          // If the call has already timed out, then the 'call' field will have been nulled.
-          // In that case, we don't need to bother sending it.
+          // If the call has already timed out or has already been cancelled, the 'call'
+          // field would be set to NULL. In that case, don't bother sending it.
           outbound_transfers_.pop_front();
-          transfer->Abort(Status::Aborted("already timed out"));
+          transfer->Abort(Status::Aborted("already timed out or cancelled"));
           delete transfer;
           continue;
         }
@@ -610,12 +643,17 @@ void Connection::WriteHandler(ev::io &watcher, int revents) {
           transfer->Abort(s);
           car->call->SetFailed(s, negotiation_complete_ ? Phase::REMOTE_CALL
                                                         : Phase::CONNECTION_NEGOTIATION);
+          // Test cancellation when 'call_' is in 'FINISHED_ERROR' state.
+          MaybeInjectCancellation(car->call);
           car->call.reset();
           delete transfer;
           continue;
         }
 
         car->call->SetSending();
+
+        // Test cancellation when 'call_' is in 'SENDING' state.
+        MaybeInjectCancellation(car->call);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection.h b/be/src/kudu/rpc/connection.h
index 816c43c..3d23826 100644
--- a/be/src/kudu/rpc/connection.h
+++ b/be/src/kudu/rpc/connection.h
@@ -113,7 +113,8 @@ class Connection : public RefCountedThreadSafe<Connection> {
                 std::unique_ptr<ErrorStatusPB> rpc_error = {});
 
   // Queue a new call to be made. If the queueing fails, the call will be
-  // marked failed.
+  // marked failed. The caller is expected to check if 'call' has been cancelled
+  // before making the call.
   // Takes ownership of the 'call' object regardless of whether it succeeds or fails.
   void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
 
@@ -122,6 +123,10 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // This may be called from a non-reactor thread.
   void QueueResponseForCall(gscoped_ptr<InboundCall> call);
 
+  // Cancel an outbound call by removing any reference to it by CallAwaitingResponse
+  // in 'awaiting_responses_'.
+  void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
   // The address of the remote end of the connection.
   const Sockaddr &remote() const { return remote_; }
 
@@ -216,6 +221,7 @@ class Connection : public RefCountedThreadSafe<Connection> {
  private:
   friend struct CallAwaitingResponse;
   friend class QueueTransferTask;
+  friend struct CallTransferCallbacks;
   friend struct ResponseTransferCallbacks;
 
   // A call which has been fully sent to the server, which we're waiting for
@@ -269,6 +275,10 @@ class Connection : public RefCountedThreadSafe<Connection> {
   // This must be called from the reactor thread.
   void QueueOutbound(gscoped_ptr<OutboundTransfer> transfer);
 
+  // Internal test function for injecting cancellation request when 'call'
+  // reaches state specified in 'FLAGS_rpc_inject_cancellation_state'.
+  void MaybeInjectCancellation(const std::shared_ptr<OutboundCall> &call);
+
   // The reactor thread that created this connection.
   ReactorThread* const reactor_thread_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.cc b/be/src/kudu/rpc/messenger.cc
index 2884fba..74fb0f4 100644
--- a/be/src/kudu/rpc/messenger.cc
+++ b/be/src/kudu/rpc/messenger.cc
@@ -418,6 +418,11 @@ void Messenger::QueueInboundCall(gscoped_ptr<InboundCall> call) {
   WARN_NOT_OK((*service)->QueueInboundCall(std::move(call)), "Unable to handle RPC call");
 }
 
+void Messenger::QueueCancellation(const shared_ptr<OutboundCall> &call) {
+  Reactor *reactor = RemoteToReactor(call->conn_id().remote());
+  reactor->QueueCancellation(call);
+}
+
 void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote) {
   Reactor *reactor = RemoteToReactor(remote);
   reactor->RegisterInboundSocket(new_socket, remote);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h
index 9a8ebab..d860f90 100644
--- a/be/src/kudu/rpc/messenger.h
+++ b/be/src/kudu/rpc/messenger.h
@@ -193,6 +193,9 @@ class Messenger {
   // Enqueue a call for processing on the server.
   void QueueInboundCall(gscoped_ptr<InboundCall> call);
 
+  // Queue a cancellation for the given outbound call.
+  void QueueCancellation(const std::shared_ptr<OutboundCall> &call);
+
   // Take ownership of the socket via Socket::Release
   void RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/outbound_call.cc b/be/src/kudu/rpc/outbound_call.cc
index af03f1c..aab29d3 100644
--- a/be/src/kudu/rpc/outbound_call.cc
+++ b/be/src/kudu/rpc/outbound_call.cc
@@ -46,6 +46,12 @@ DEFINE_int64(rpc_callback_max_cycles, 100 * 1000 * 1000,
 TAG_FLAG(rpc_callback_max_cycles, advanced);
 TAG_FLAG(rpc_callback_max_cycles, runtime);
 
+// Flag used in debug build for injecting cancellation at different code paths.
+DEFINE_int32(rpc_inject_cancellation_state, -1,
+             "If this flag is not -1, it is the state in which a cancellation request "
+             "will be injected. Should use values in OutboundCall::State only");
+TAG_FLAG(rpc_inject_cancellation_state, unsafe);
+
 using std::unique_ptr;
 
 namespace kudu {
@@ -70,7 +76,8 @@ OutboundCall::OutboundCall(const ConnectionId& conn_id,
       conn_id_(conn_id),
       callback_(std::move(callback)),
       controller_(DCHECK_NOTNULL(controller)),
-      response_(DCHECK_NOTNULL(response_storage)) {
+      response_(DCHECK_NOTNULL(response_storage)),
+      cancellation_requested_(false) {
   DVLOG(4) << "OutboundCall " << this << " constructed with state_: " << StateName(state_)
            << " and RPC timeout: "
            << (controller->timeout().Initialized() ? controller->timeout().ToString() : "none");
@@ -157,6 +164,8 @@ string OutboundCall::StateName(State state) {
       return "NEGOTIATION_TIMED_OUT";
     case TIMED_OUT:
       return "TIMED_OUT";
+    case CANCELLED:
+      return "CANCELLED";
     case FINISHED_NEGOTIATION_ERROR:
       return "FINISHED_NEGOTIATION_ERROR";
     case FINISHED_ERROR:
@@ -201,6 +210,9 @@ void OutboundCall::set_state_unlocked(State new_state) {
     case TIMED_OUT:
       DCHECK(state_ == SENT || state_ == ON_OUTBOUND_QUEUE || state_ == SENDING);
       break;
+    case CANCELLED:
+      DCHECK(state_ == READY || state_ == ON_OUTBOUND_QUEUE || state_ == SENT);
+      break;
     case FINISHED_SUCCESS:
       DCHECK_EQ(state_, SENT);
       break;
@@ -212,7 +224,31 @@ void OutboundCall::set_state_unlocked(State new_state) {
   state_ = new_state;
 }
 
+void OutboundCall::Cancel() {
+  cancellation_requested_ = true;
+  // No lock needed as it's called from reactor thread
+  switch (state_) {
+    case READY:
+    case ON_OUTBOUND_QUEUE:
+    case SENT: {
+      SetCancelled();
+      break;
+    }
+    case SENDING:
+    case NEGOTIATION_TIMED_OUT:
+    case TIMED_OUT:
+    case CANCELLED:
+    case FINISHED_NEGOTIATION_ERROR:
+    case FINISHED_ERROR:
+    case FINISHED_SUCCESS:
+      break;
+  }
+}
+
 void OutboundCall::CallCallback() {
+  // Clear references to outbound sidecars before invoking callback.
+  sidecars_.clear();
+
   int64_t start_cycles = CycleClock::Now();
   {
     SCOPED_WATCH_STACK(100);
@@ -283,6 +319,11 @@ void OutboundCall::SetSent() {
   // request_buf_ is also done being used here, but since it was allocated by
   // the caller thread, we would rather let that thread free it whenever it
   // deletes the RpcController.
+
+  // If cancellation was requested, it's now a good time to do the actual cancellation.
+  if (cancellation_requested()) {
+    SetCancelled();
+  }
 }
 
 void OutboundCall::SetFailed(const Status &status,
@@ -325,6 +366,20 @@ void OutboundCall::SetTimedOut(Phase phase) {
   CallCallback();
 }
 
+void OutboundCall::SetCancelled() {
+  DCHECK(!IsFinished());
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    status_ = Status::Aborted(
+        Substitute("$0 RPC to $1 is cancelled in state $2",
+                   remote_method_.method_name(),
+                   conn_id_.remote().ToString(),
+                   StateName(state_)));
+    set_state_unlocked(CANCELLED);
+  }
+  CallCallback();
+}
+
 bool OutboundCall::IsTimedOut() const {
   std::lock_guard<simple_spinlock> l(lock_);
   switch (state_) {
@@ -336,6 +391,11 @@ bool OutboundCall::IsTimedOut() const {
   }
 }
 
+bool OutboundCall::IsCancelled() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return state_ == CANCELLED;
+}
+
 bool OutboundCall::IsNegotiationError() const {
   std::lock_guard<simple_spinlock> l(lock_);
   switch (state_) {
@@ -357,6 +417,7 @@ bool OutboundCall::IsFinished() const {
       return false;
     case NEGOTIATION_TIMED_OUT:
     case TIMED_OUT:
+    case CANCELLED:
     case FINISHED_NEGOTIATION_ERROR:
     case FINISHED_ERROR:
     case FINISHED_SUCCESS:
@@ -397,6 +458,9 @@ void OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
     case TIMED_OUT:
       resp->set_state(RpcCallInProgressPB::TIMED_OUT);
       break;
+    case CANCELLED:
+      resp->set_state(RpcCallInProgressPB::CANCELLED);
+      break;
     case FINISHED_NEGOTIATION_ERROR:
       resp->set_state(RpcCallInProgressPB::FINISHED_NEGOTIATION_ERROR);
       break;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/outbound_call.h b/be/src/kudu/rpc/outbound_call.h
index ebed9b5..ba2df27 100644
--- a/be/src/kudu/rpc/outbound_call.h
+++ b/be/src/kudu/rpc/outbound_call.h
@@ -38,6 +38,8 @@
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
+DECLARE_int32(rpc_inject_cancellation_state);
+
 namespace google {
 namespace protobuf {
 class Message;
@@ -156,6 +158,13 @@ class OutboundCall {
   // is called first. This is called from the Reactor thread.
   Status SerializeTo(std::vector<Slice>* slices);
 
+  // Mark in the call that cancellation has been requested. If the call hasn't yet
+  // started sending or has finished sending the RPC request but is waiting for a
+  // response, cancel the RPC right away. Otherwise, wait until the RPC has finished
+  // sending before cancelling it. If the call is finished, it's a no-op.
+  // REQUIRES: must be called from the reactor thread.
+  void Cancel();
+
   // Callback after the call has been put on the outbound connection queue.
   void SetQueued();
 
@@ -181,6 +190,8 @@ class OutboundCall {
 
   bool IsNegotiationError() const;
 
+  bool IsCancelled() const;
+
   // Is the call finished?
   bool IsFinished() const;
 
@@ -215,8 +226,22 @@ class OutboundCall {
     return header_.call_id();
   }
 
+  // Returns true if cancellation has been requested. Must be called from
+  // reactor thread.
+  bool cancellation_requested() const {
+    return cancellation_requested_;
+  }
+
+  // Test function which returns true if a cancellation request should be injected
+  // at the current state.
+  bool ShouldInjectCancellation() const {
+    return FLAGS_rpc_inject_cancellation_state != -1 &&
+        FLAGS_rpc_inject_cancellation_state == state();
+  }
+
  private:
   friend class RpcController;
+  FRIEND_TEST(TestRpc, TestCancellation);
 
   // Various states the call propagates through.
   // NB: if adding another state, be sure to update OutboundCall::IsFinished()
@@ -228,6 +253,7 @@ class OutboundCall {
     SENT,
     NEGOTIATION_TIMED_OUT,
     TIMED_OUT,
+    CANCELLED,
     FINISHED_NEGOTIATION_ERROR,
     FINISHED_ERROR,
     FINISHED_SUCCESS
@@ -235,6 +261,9 @@ class OutboundCall {
 
   static std::string StateName(State state);
 
+  // Mark the call as cancelled. This also invokes the callback to notify the caller.
+  void SetCancelled();
+
   void set_state(State new_state);
   State state() const;
 
@@ -260,7 +289,9 @@ class OutboundCall {
   Status status_;
   gscoped_ptr<ErrorStatusPB> error_pb_;
 
-  // Call the user-provided callback.
+  // Call the user-provided callback. Note that entries in 'sidecars_' are cleared
+  // prior to invoking the callback so the client can assume that the call doesn't
+  // hold references to outbound sidecars.
   void CallCallback();
 
   // The RPC header.
@@ -295,6 +326,9 @@ class OutboundCall {
   // Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload().
   int64_t sidecar_byte_size_ = -1;
 
+  // True if cancellation was requested on this call.
+  bool cancellation_requested_;
+
   DISALLOW_COPY_AND_ASSIGN(OutboundCall);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/proxy.cc b/be/src/kudu/rpc/proxy.cc
index 45ad5dd..3ec907d 100644
--- a/be/src/kudu/rpc/proxy.cc
+++ b/be/src/kudu/rpc/proxy.cc
@@ -82,6 +82,7 @@ void Proxy::AsyncRequest(const string& method,
   controller->call_.reset(
       new OutboundCall(conn_id_, remote_method, response, controller, callback));
   controller->SetRequestParam(req);
+  controller->SetMessenger(messenger_.get());
 
   // If this fails to queue, the callback will get called immediately
   // and the controller will be in an ERROR state.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
index df4a661..f55cca0 100644
--- a/be/src/kudu/rpc/reactor.cc
+++ b/be/src/kudu/rpc/reactor.cc
@@ -244,8 +244,13 @@ void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) {
 
 void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall>& call) {
   DCHECK(IsCurrentThread());
-  scoped_refptr<Connection> conn;
 
+  // Skip if the outbound has been cancelled already.
+  if (PREDICT_FALSE(call->IsCancelled())) {
+    return;
+  }
+
+  scoped_refptr<Connection> conn;
   Status s = FindOrStartConnection(call->conn_id(),
                                    call->controller()->credentials_policy(),
                                    &conn);
@@ -257,6 +262,24 @@ void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall>& call) {
   conn->QueueOutboundCall(call);
 }
 
+void ReactorThread::CancelOutboundCall(const shared_ptr<OutboundCall>& call) {
+  DCHECK(IsCurrentThread());
+
+  // If the callback has been invoked already, the cancellation is a no-op.
+  // The controller may be gone already if the callback has been invoked.
+  if (call->IsFinished()) {
+    return;
+  }
+
+  scoped_refptr<Connection> conn;
+  if (FindConnection(call->conn_id(),
+                     call->controller()->credentials_policy(),
+                     &conn)) {
+    conn->CancelOutboundCall(call);
+  }
+  call->Cancel();
+}
+
 //
 // Handles timer events.  The periodic timer:
 //
@@ -355,9 +378,9 @@ void ReactorThread::RunThread() {
   reactor_->messenger_.reset();
 }
 
-Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
-                                            CredentialsPolicy cred_policy,
-                                            scoped_refptr<Connection>* conn) {
+bool ReactorThread::FindConnection(const ConnectionId& conn_id,
+                                   CredentialsPolicy cred_policy,
+                                   scoped_refptr<Connection>* conn) {
   DCHECK(IsCurrentThread());
   const auto range = client_conns_.equal_range(conn_id);
   scoped_refptr<Connection> found_conn;
@@ -398,6 +421,16 @@ Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
   if (found_conn) {
     // Found matching not-to-be-shutdown connection: return it as the result.
     conn->swap(found_conn);
+    return true;
+  }
+  return false;
+}
+
+Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
+                                            CredentialsPolicy cred_policy,
+                                            scoped_refptr<Connection>* conn) {
+  DCHECK(IsCurrentThread());
+  if (FindConnection(conn_id, cred_policy, conn)) {
     return Status::OK();
   }
 
@@ -710,7 +743,7 @@ class AssignOutboundCallTask : public ReactorTask {
 
   void Abort(const Status& status) override {
     // It doesn't matter what is the actual phase of the OutboundCall: just set
-    // it to Phase::REMOTE_CALL to finilize the state of the call.
+    // it to Phase::REMOTE_CALL to finalize the state of the call.
     call_->SetFailed(status, OutboundCall::Phase::REMOTE_CALL);
     delete this;
   }
@@ -722,9 +755,35 @@ class AssignOutboundCallTask : public ReactorTask {
 void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall>& call) {
   DVLOG(3) << name_ << ": queueing outbound call "
            << call->ToString() << " to remote " << call->conn_id().remote().ToString();
+  // Test cancellation when 'call_' is in 'READY' state.
+  if (PREDICT_FALSE(call->ShouldInjectCancellation())) {
+    QueueCancellation(call);
+  }
   ScheduleReactorTask(new AssignOutboundCallTask(call));
 }
 
+class CancellationTask : public ReactorTask {
+ public:
+  explicit CancellationTask(shared_ptr<OutboundCall> call)
+      : call_(std::move(call)) {}
+
+  void Run(ReactorThread* reactor) override {
+    reactor->CancelOutboundCall(call_);
+    delete this;
+  }
+
+  void Abort(const Status& /*status*/) override {
+    delete this;
+  }
+
+ private:
+  shared_ptr<OutboundCall> call_;
+};
+
+void Reactor::QueueCancellation(const shared_ptr<OutboundCall>& call) {
+  ScheduleReactorTask(new CancellationTask(call));
+}
+
 void Reactor::ScheduleReactorTask(ReactorTask *task) {
   {
     std::unique_lock<LockType> l(lock_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.h b/be/src/kudu/rpc/reactor.h
index 37eedd4..dcdc8a2 100644
--- a/be/src/kudu/rpc/reactor.h
+++ b/be/src/kudu/rpc/reactor.h
@@ -194,12 +194,19 @@ class ReactorThread {
 
  private:
   friend class AssignOutboundCallTask;
+  friend class CancellationTask;
   friend class RegisterConnectionTask;
   friend class DelayedTask;
 
   // Run the main event loop of the reactor.
   void RunThread();
 
+  // Find a connection to the given remote and returns it in 'conn'.
+  // Returns true if a connection is found. Returns false otherwise.
+  bool FindConnection(const ConnectionId& conn_id,
+                      CredentialsPolicy cred_policy,
+                      scoped_refptr<Connection>* conn);
+
   // Find or create a new connection to the given remote.
   // If such a connection already exists, returns that, otherwise creates a new one.
   // May return a bad Status if the connect() call fails.
@@ -231,6 +238,13 @@ class ReactorThread {
   // If this fails, the call is marked failed and completed.
   void AssignOutboundCall(const std::shared_ptr<OutboundCall> &call);
 
+  // Cancel the outbound call. May update corresponding connection
+  // object to remove call from the CallAwaitingResponse object.
+  // Also mark the call as slated for cancellation so the callback
+  // may be invoked early if the RPC hasn't yet been sent or if it's
+  // waiting for a response from the remote.
+  void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
   // Register a new connection.
   void RegisterConnection(scoped_refptr<Connection> conn);
 
@@ -313,6 +327,9 @@ class Reactor {
   // the call as failed.
   void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
 
+  // Queue a new reactor task to cancel an outbound call.
+  void QueueCancellation(const std::shared_ptr<OutboundCall> &call);
+
   // Schedule the given task's Run() method to be called on the
   // reactor thread.
   // If the reactor shuts down before it is run, the Abort method will be

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test-base.h b/be/src/kudu/rpc/rpc-test-base.h
index c28218b..c8c84ca 100644
--- a/be/src/kudu/rpc/rpc-test-base.h
+++ b/be/src/kudu/rpc/rpc-test-base.h
@@ -71,6 +71,8 @@ using kudu::rpc_test::SendTwoStringsRequestPB;
 using kudu::rpc_test::SendTwoStringsResponsePB;
 using kudu::rpc_test::SleepRequestPB;
 using kudu::rpc_test::SleepResponsePB;
+using kudu::rpc_test::SleepWithSidecarRequestPB;
+using kudu::rpc_test::SleepWithSidecarResponsePB;
 using kudu::rpc_test::TestInvalidResponseRequestPB;
 using kudu::rpc_test::TestInvalidResponseResponsePB;
 using kudu::rpc_test::WhoAmIRequestPB;
@@ -85,6 +87,7 @@ class GenericCalculatorService : public ServiceIf {
   static const char *kFullServiceName;
   static const char *kAddMethodName;
   static const char *kSleepMethodName;
+  static const char *kSleepWithSidecarMethodName;
   static const char *kPushTwoStringsMethodName;
   static const char *kSendTwoStringsMethodName;
   static const char *kAddExactlyOnce;
@@ -106,6 +109,8 @@ class GenericCalculatorService : public ServiceIf {
       DoAdd(incoming);
     } else if (incoming->remote_method().method_name() == kSleepMethodName) {
       DoSleep(incoming);
+    } else if (incoming->remote_method().method_name() == kSleepWithSidecarMethodName) {
+      DoSleepWithSidecar(incoming);
     } else if (incoming->remote_method().method_name() == kSendTwoStringsMethodName) {
       DoSendTwoStrings(incoming);
     } else if (incoming->remote_method().method_name() == kPushTwoStringsMethodName) {
@@ -206,6 +211,31 @@ class GenericCalculatorService : public ServiceIf {
     SleepResponsePB resp;
     incoming->RespondSuccess(resp);
   }
+
+  void DoSleepWithSidecar(InboundCall *incoming) {
+    Slice param(incoming->serialized_request());
+    SleepWithSidecarRequestPB req;
+    if (!req.ParseFromArray(param.data(), param.size())) {
+      incoming->RespondFailure(ErrorStatusPB::ERROR_INVALID_REQUEST,
+        Status::InvalidArgument("Couldn't parse pb",
+                                req.InitializationErrorString()));
+      return;
+    }
+
+    LOG(INFO) << "got call: " << SecureShortDebugString(req);
+    SleepFor(MonoDelta::FromMicroseconds(req.sleep_micros()));
+
+    uint32 pattern = req.pattern();
+    uint32 num_repetitions = req.num_repetitions();
+    Slice sidecar;
+    CHECK_OK(incoming->GetInboundSidecar(req.sidecar_idx(), &sidecar));
+    CHECK_EQ(sidecar.size(), sizeof(uint32) * num_repetitions);
+    const uint32_t *data = reinterpret_cast<const uint32_t*>(sidecar.data());
+    for (int i = 0; i < num_repetitions; ++i) CHECK_EQ(data[i], pattern);
+
+    SleepResponsePB resp;
+    incoming->RespondSuccess(resp);
+  }
 };
 
 class CalculatorService : public CalculatorServiceIf {
@@ -361,6 +391,7 @@ class CalculatorService : public CalculatorServiceIf {
 const char *GenericCalculatorService::kFullServiceName = "kudu.rpc.GenericCalculatorService";
 const char *GenericCalculatorService::kAddMethodName = "Add";
 const char *GenericCalculatorService::kSleepMethodName = "Sleep";
+const char *GenericCalculatorService::kSleepWithSidecarMethodName = "SleepWithSidecar";
 const char *GenericCalculatorService::kPushTwoStringsMethodName = "PushTwoStrings";
 const char *GenericCalculatorService::kSendTwoStringsMethodName = "SendTwoStrings";
 const char *GenericCalculatorService::kAddExactlyOnce = "AddExactlyOnce";
@@ -462,7 +493,7 @@ class RpcTestBase : public KuduTest {
     CHECK_EQ(0, second.compare(Slice(expected)));
   }
 
-  void DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) {
+  Status DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) {
     PushTwoStringsRequestPB request;
     RpcController controller;
 
@@ -478,12 +509,17 @@ class RpcTestBase : public KuduTest {
     request.set_sidecar2_idx(idx2);
 
     PushTwoStringsResponsePB resp;
-    CHECK_OK(p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
-            request, &resp, &controller));
+    KUDU_RETURN_NOT_OK(p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+                                     request, &resp, &controller));
     CHECK_EQ(size1, resp.size1());
     CHECK_EQ(resp.data1(), s1);
     CHECK_EQ(size2, resp.size2());
     CHECK_EQ(resp.data2(), s2);
+    return Status::OK();
+  }
+
+  void DoTestOutgoingSidecarExpectOK(const Proxy &p, int size1, int size2) {
+    CHECK_OK(DoTestOutgoingSidecar(p, size1, size2));
   }
 
   void DoTestExpectTimeout(const Proxy& p,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc-test.cc b/be/src/kudu/rpc/rpc-test.cc
index 2378892..38ad357 100644
--- a/be/src/kudu/rpc/rpc-test.cc
+++ b/be/src/kudu/rpc/rpc-test.cc
@@ -29,10 +29,12 @@
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/constants.h"
+#include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/serialization.h"
 #include "kudu/security/test/test_certs.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/env.h"
+#include "kudu/util/random.h"
 #include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/test_util.h"
 
@@ -492,9 +494,9 @@ TEST_P(TestRpc, TestRpcSidecar) {
   // we can't write the whole response to the socket in a single call.
   DoTestSidecar(p, 3000 * 1024, 2000 * 1024);
 
-  DoTestOutgoingSidecar(p, 0, 0);
-  DoTestOutgoingSidecar(p, 123, 456);
-  DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024);
+  DoTestOutgoingSidecarExpectOK(p, 0, 0);
+  DoTestOutgoingSidecarExpectOK(p, 123, 456);
+  DoTestOutgoingSidecarExpectOK(p, 3000 * 1024, 2000 * 1024);
 }
 
 TEST_P(TestRpc, TestRpcSidecarLimits) {
@@ -886,5 +888,115 @@ TEST_P(TestRpc, TestApplicationFeatureFlagUnsupportedServer) {
   }
 }
 
+TEST_P(TestRpc, TestCancellation) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+
+  for (int i = OutboundCall::READY; i <= OutboundCall::FINISHED_SUCCESS; ++i) {
+    FLAGS_rpc_inject_cancellation_state = i;
+    switch (i) {
+      case OutboundCall::READY:
+      case OutboundCall::ON_OUTBOUND_QUEUE:
+      case OutboundCall::SENDING:
+      case OutboundCall::SENT:
+        ASSERT_TRUE(DoTestOutgoingSidecar(p, 0, 0).IsAborted());
+        ASSERT_TRUE(DoTestOutgoingSidecar(p, 123, 456).IsAborted());
+        ASSERT_TRUE(DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024).IsAborted());
+        break;
+      case OutboundCall::NEGOTIATION_TIMED_OUT:
+      case OutboundCall::TIMED_OUT:
+        DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(1000));
+        break;
+      case OutboundCall::CANCELLED:
+        break;
+      case OutboundCall::FINISHED_NEGOTIATION_ERROR:
+      case OutboundCall::FINISHED_ERROR: {
+        AddRequestPB req;
+        req.set_x(1);
+        req.set_y(2);
+        AddResponsePB resp;
+        RpcController controller;
+        controller.RequireServerFeature(FeatureFlags::FOO);
+        controller.RequireServerFeature(99);
+        Status s = p.SyncRequest("Add", req, &resp, &controller);
+        ASSERT_TRUE(s.IsRemoteError());
+        break;
+      }
+      case OutboundCall::FINISHED_SUCCESS:
+        DoTestOutgoingSidecarExpectOK(p, 0, 0);
+        DoTestOutgoingSidecarExpectOK(p, 123, 456);
+        DoTestOutgoingSidecarExpectOK(p, 3000 * 1024, 2000 * 1024);
+        break;
+    }
+  }
+  client_messenger->Shutdown();
+}
+
+#define TEST_PAYLOAD_SIZE  (1 << 23)
+#define TEST_SLEEP_TIME_MS (500)
+
+static void SleepCallback(uint8_t* payload, CountDownLatch* latch) {
+  // Overwrites the payload which the sidecar is pointing to. The server
+  // checks if the payload matches the expected pattern to detect cases
+  // in which the payload is overwritten while it's being sent.
+  memset(payload, 0, TEST_PAYLOAD_SIZE);
+  latch->CountDown();
+}
+
+TEST_P(TestRpc, TestCancellationAsync) {
+  // Set up server.
+  Sockaddr server_addr;
+  bool enable_ssl = GetParam();
+  StartTestServer(&server_addr, enable_ssl);
+
+  // Set up client.
+  LOG(INFO) << "Connecting to " << server_addr.ToString();
+  shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, enable_ssl));
+  Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+
+  RpcController controller;
+
+  // The payload to be used during the RPC.
+  gscoped_array<uint8_t> payload(new uint8_t[TEST_PAYLOAD_SIZE]);
+
+  // Used to generate sleep time between invoking RPC and requesting cancellation.
+  Random rand(SeedRandom());
+
+  for (int i = 0; i < 10; ++i) {
+    SleepWithSidecarRequestPB req;
+    SleepWithSidecarResponsePB resp;
+
+    // Initialize the payload with non-zero pattern.
+    memset(payload.get(), 0xff, TEST_PAYLOAD_SIZE);
+    req.set_sleep_micros(TEST_SLEEP_TIME_MS);
+    req.set_pattern(0xffffffff);
+    req.set_num_repetitions(TEST_PAYLOAD_SIZE / sizeof(uint32_t));
+
+    int idx;
+    Slice s(payload.get(), TEST_PAYLOAD_SIZE);
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(s), &idx));
+    req.set_sidecar_idx(idx);
+
+    CountDownLatch latch(1);
+    p.AsyncRequest(GenericCalculatorService::kSleepWithSidecarMethodName,
+                   req, &resp, &controller,
+                   boost::bind(SleepCallback, payload.get(), &latch));
+    // Sleep for a while before cancelling the RPC.
+    if (i > 0) SleepFor(MonoDelta::FromMicroseconds(rand.Uniform64(i * 30)));
+    controller.Cancel();
+    latch.Wait();
+    ASSERT_TRUE(controller.status().IsAborted() || controller.status().ok());
+    controller.Reset();
+  }
+  client_messenger->Shutdown();
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_controller.cc b/be/src/kudu/rpc/rpc_controller.cc
index 505db22..9139cd4 100644
--- a/be/src/kudu/rpc/rpc_controller.cc
+++ b/be/src/kudu/rpc/rpc_controller.cc
@@ -23,6 +23,7 @@
 
 #include <glog/logging.h>
 
+#include "kudu/rpc/messenger.h"
 #include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/rpc_header.pb.h"
 
@@ -32,7 +33,7 @@ namespace kudu {
 namespace rpc {
 
 RpcController::RpcController()
-    : credentials_policy_(CredentialsPolicy::ANY_CREDENTIALS) {
+    : credentials_policy_(CredentialsPolicy::ANY_CREDENTIALS), messenger_(nullptr) {
   DVLOG(4) << "RpcController " << this << " constructed";
 }
 
@@ -63,6 +64,7 @@ void RpcController::Reset() {
   call_.reset();
   required_server_features_.clear();
   credentials_policy_ = CredentialsPolicy::ANY_CREDENTIALS;
+  messenger_ = nullptr;
 }
 
 bool RpcController::finished() const {
@@ -145,5 +147,11 @@ void RpcController::SetRequestParam(const google::protobuf::Message& req) {
   call_->SetRequestPayload(req, std::move(outbound_sidecars_));
 }
 
+void RpcController::Cancel() {
+  DCHECK(call_);
+  DCHECK(messenger_);
+  messenger_->QueueCancellation(call_);
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_controller.h b/be/src/kudu/rpc/rpc_controller.h
index ab611a8..bd3fbe3 100644
--- a/be/src/kudu/rpc/rpc_controller.h
+++ b/be/src/kudu/rpc/rpc_controller.h
@@ -40,6 +40,7 @@ namespace kudu {
 namespace rpc {
 
 class ErrorStatusPB;
+class Messenger;
 class OutboundCall;
 class RequestIdPB;
 class RpcSidecar;
@@ -223,6 +224,19 @@ class RpcController {
   // to this request.
   Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
 
+  // Cancel the call associated with the RpcController. This function should only be
+  // called when there is an outstanding outbound call. It's always safe to call
+  // Cancel() after you've sent a call, so long as you haven't called Reset() yet.
+  // Caller is not responsible for synchronization between cancellation and the
+  // callback. (i.e. the callback may or may not be invoked yet when Cancel()
+  // is called).
+  //
+  // Cancellation is "best effort" - i.e. it's still possible the callback passed
+  // to the call will be fired with a success status. If cancellation succeeds,
+  // the callback will be invoked with a Aborted status. Cancellation is asynchronous
+  // so the callback will still be invoked from the reactor thread.
+  void Cancel();
+
  private:
   friend class OutboundCall;
   friend class Proxy;
@@ -231,6 +245,9 @@ class RpcController {
   // outbound_sidecars_ to call_ in preparation for serialization.
   void SetRequestParam(const google::protobuf::Message& req);
 
+  // Set the messenger which contains the reactor thread handling the outbound call.
+  void SetMessenger(Messenger* messenger) { messenger_ = messenger; }
+
   MonoDelta timeout_;
   std::unordered_set<uint32_t> required_server_features_;
 
@@ -243,6 +260,10 @@ class RpcController {
   // Ownership is transferred to OutboundCall once the call is sent.
   std::unique_ptr<RequestIdPB> request_id_;
 
+  // The messenger which contains the reactor thread for 'call_'.
+  // Set only when 'call_' is set.
+  Messenger* messenger_;
+
   // Once the call is sent, it is tracked here.
   std::shared_ptr<OutboundCall> call_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/rpc_introspection.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rpc_introspection.proto b/be/src/kudu/rpc/rpc_introspection.proto
index 9d2f9b5..5c4f4f1 100644
--- a/be/src/kudu/rpc/rpc_introspection.proto
+++ b/be/src/kudu/rpc/rpc_introspection.proto
@@ -42,6 +42,7 @@ message RpcCallInProgressPB {
     FINISHED_SUCCESS = 6;
     NEGOTIATION_TIMED_OUT = 7;
     FINISHED_NEGOTIATION_ERROR = 8;
+    CANCELLED = 9;
 
     // TODO(todd): add states for InboundCall
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c1c48150/be/src/kudu/rpc/rtest.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/rtest.proto b/be/src/kudu/rpc/rtest.proto
index 1ef5ca6..d212cef 100644
--- a/be/src/kudu/rpc/rtest.proto
+++ b/be/src/kudu/rpc/rtest.proto
@@ -55,6 +55,16 @@ message SleepRequestPB {
 message SleepResponsePB {
 }
 
+message SleepWithSidecarRequestPB {
+  required uint32 sleep_micros = 1;
+  required uint32 pattern = 2;
+  required uint32 num_repetitions = 3;
+  required uint32 sidecar_idx = 4;
+}
+
+message SleepWithSidecarResponsePB {
+}
+
 message SendTwoStringsRequestPB {
   required uint32 random_seed = 1;
   required uint64 size1 = 2;