You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2017/03/08 18:27:10 UTC

[2/2] kudu git commit: KUDU-1866: Add request-side sidecars

KUDU-1866: Add request-side sidecars

This patch adds sidecars to client requests. Using the same mechanism as
on the response-side, clients may attach slices to outbound requests
which do not pass through a serialization or copy before being pushed to
the network socket. On the server side, these sidecars may be read
directly from the underlying byte stream with the interposition of a
Protobuf wrapper.

The sidecars may be added to a request via RpcController and retrieved
via RpcContext (i.e. the reverse of the existing response-side
interface).

This patch adds a few tests to rpc-test, and all rpc-test tests pass.

Change-Id: I3d709edb2a22dc983f51b69d7660a39e8d8d6a09
Reviewed-on: http://gerrit.cloudera.org:8080/5908
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/72895966
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/72895966
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/72895966

Branch: refs/heads/master
Commit: 72895966c6458d6f33e68d53450d9bd43a2c57b1
Parents: 5566bc9
Author: Henry Robinson <he...@cloudera.com>
Authored: Thu Feb 2 20:31:33 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Mar 8 00:49:34 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/scanner-internal.cc        |  14 ++--
 src/kudu/rpc/CMakeLists.txt                |   2 +-
 src/kudu/rpc/inbound_call.cc               |  40 +++++++---
 src/kudu/rpc/inbound_call.h                |  14 +++-
 src/kudu/rpc/outbound_call.cc              |  74 ++++++++---------
 src/kudu/rpc/outbound_call.h               |  22 +++--
 src/kudu/rpc/proxy.cc                      |   2 +-
 src/kudu/rpc/rpc-test-base.h               |  70 ++++++++++++++--
 src/kudu/rpc/rpc-test.cc                   |  45 +++++++++++
 src/kudu/rpc/rpc_context.cc                |  10 ++-
 src/kudu/rpc/rpc_context.h                 |   6 +-
 src/kudu/rpc/rpc_controller.cc             |  20 ++++-
 src/kudu/rpc/rpc_controller.h              |  23 +++++-
 src/kudu/rpc/rpc_header.proto              |   5 ++
 src/kudu/rpc/rpc_sidecar.cc                | 102 ++++++++++++++++++++++++
 src/kudu/rpc/rpc_sidecar.h                 |  60 +++++++-------
 src/kudu/rpc/rtest.proto                   |  13 +++
 src/kudu/rpc/transfer.h                    |  14 +++-
 src/kudu/tserver/tablet_server-test-base.h |   6 +-
 src/kudu/tserver/tablet_service.cc         |  13 +--
 20 files changed, 429 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 07f93a7..c804046 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -505,16 +505,16 @@ Status KuduScanBatch::Data::Reset(RpcController* controller,
   // First, rewrite the relative addresses into absolute ones.
   if (PREDICT_FALSE(!resp_data_.has_rows_sidecar())) {
     return Status::Corruption("Server sent invalid response: no row data");
-  } else {
-    Status s = controller_.GetSidecar(resp_data_.rows_sidecar(), &direct_data_);
-    if (!s.ok()) {
-      return Status::Corruption("Server sent invalid response: row data "
-                                "sidecar index corrupt", s.ToString());
-    }
+  }
+
+  Status s = controller_.GetInboundSidecar(resp_data_.rows_sidecar(), &direct_data_);
+  if (!s.ok()) {
+    return Status::Corruption("Server sent invalid response: row data "
+        "sidecar index corrupt", s.ToString());
   }
 
   if (resp_data_.has_indirect_data_sidecar()) {
-    Status s = controller_.GetSidecar(resp_data_.indirect_data_sidecar(),
+    Status s = controller_.GetInboundSidecar(resp_data_.indirect_data_sidecar(),
                                       &indirect_data_);
     if (!s.ok()) {
       return Status::Corruption("Server sent invalid response: indirect data "

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 19a7610..0cfe6e9 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -59,6 +59,7 @@ set(KRPC_SRCS
     rpc.cc
     rpc_context.cc
     rpc_controller.cc
+    rpc_sidecar.cc
     rpcz_store.cc
     sasl_common.cc
     sasl_helper.cc
@@ -125,4 +126,3 @@ ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
 ADD_KUDU_TEST(rpc-test)
 ADD_KUDU_TEST(rpc_stub-test)
 ADD_KUDU_TEST(service_queue-test)
-

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/inbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 448fd70..03e7da4 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -33,9 +33,8 @@
 
 using google::protobuf::FieldDescriptor;
 using google::protobuf::io::CodedOutputStream;
-using google::protobuf::Message;
 using google::protobuf::MessageLite;
-using std::shared_ptr;
+using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
@@ -44,7 +43,6 @@ namespace rpc {
 
 InboundCall::InboundCall(Connection* conn)
   : conn_(conn),
-    sidecars_deleter_(&sidecars_),
     trace_(new Trace),
     method_info_(nullptr) {
   RecordCallReceived();
@@ -67,6 +65,19 @@ Status InboundCall::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
   }
   remote_method_.FromPB(header_.remote_method());
 
+  if (header_.sidecar_offsets_size() > TransferLimits::kMaxSidecars) {
+    return Status::Corruption(strings::Substitute(
+            "Received $0 additional payload slices, expected at most %d",
+            header_.sidecar_offsets_size(), TransferLimits::kMaxSidecars));
+  }
+
+  RETURN_NOT_OK(RpcSidecar::ParseSidecars(
+          header_.sidecar_offsets(), serialized_request_, inbound_sidecar_slices_));
+  if (header_.sidecar_offsets_size() > 0) {
+    // Trim the request to just the message
+    serialized_request_ = Slice(serialized_request_.data(), header_.sidecar_offsets(0));
+  }
+
   // Retain the buffer that we have a view into.
   transfer_.swap(transfer);
   return Status::OK();
@@ -151,7 +162,7 @@ void InboundCall::SerializeResponseBuffer(const MessageLite& response,
   resp_hdr.set_call_id(header_.call_id());
   resp_hdr.set_is_error(!is_success);
   uint32_t absolute_sidecar_offset = protobuf_msg_size;
-  for (RpcSidecar* car : sidecars_) {
+  for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
     resp_hdr.add_sidecar_offsets(absolute_sidecar_offset);
     absolute_sidecar_offset += car->AsSlice().size();
   }
@@ -168,23 +179,23 @@ void InboundCall::SerializeResponseTo(vector<Slice>* slices) const {
   TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo");
   CHECK_GT(response_hdr_buf_.size(), 0);
   CHECK_GT(response_msg_buf_.size(), 0);
-  slices->reserve(slices->size() + 2 + sidecars_.size());
+  slices->reserve(slices->size() + 2 + outbound_sidecars_.size());
   slices->push_back(Slice(response_hdr_buf_));
   slices->push_back(Slice(response_msg_buf_));
-  for (RpcSidecar* car : sidecars_) {
+  for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
     slices->push_back(car->AsSlice());
   }
 }
 
-Status InboundCall::AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx) {
+Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
   // Check that the number of sidecars does not exceed the number of payload
   // slices that are free (two are used up by the header and main message
   // protobufs).
-  if (sidecars_.size() + 2 > OutboundTransfer::kMaxPayloadSlices) {
+  if (outbound_sidecars_.size() > TransferLimits::kMaxSidecars) {
     return Status::ServiceUnavailable("All available sidecars already used");
   }
-  sidecars_.push_back(car.release());
-  *idx = sidecars_.size() - 1;
+  outbound_sidecars_.emplace_back(std::move(car));
+  *idx = outbound_sidecars_.size() - 1;
   return Status::OK();
 }
 
@@ -288,5 +299,14 @@ vector<uint32_t> InboundCall::GetRequiredFeatures() const {
   return features;
 }
 
+Status InboundCall::GetInboundSidecar(int idx, Slice* sidecar) const {
+  if (idx < 0 || idx >= header_.sidecar_offsets_size()) {
+    return Status::InvalidArgument(strings::Substitute(
+            "Index $0 does not reference a valid sidecar", idx));
+  }
+  *sidecar = inbound_sidecar_slices_[idx];
+  return Status::OK();
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 4f99dee..ea6eade 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -22,7 +22,6 @@
 #include <vector>
 
 #include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/remote_method.h"
@@ -124,7 +123,7 @@ class InboundCall {
   void SerializeResponseTo(std::vector<Slice>* slices) const;
 
   // See RpcContext::AddRpcSidecar()
-  Status AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx);
+  Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
 
   std::string ToString() const;
 
@@ -187,6 +186,10 @@ class InboundCall {
   // the RPC.
   std::vector<uint32_t> GetRequiredFeatures() const;
 
+  // Get a sidecar sent as part of the request. If idx < 0 || idx > num sidecars - 1,
+  // returns an error.
+  Status GetInboundSidecar(int idx, Slice* sidecar) const;
+
  private:
   friend class RpczStore;
 
@@ -227,8 +230,11 @@ class InboundCall {
 
   // Vector of additional sidecars that are tacked on to the call's response
   // after serialization of the protobuf. See rpc/rpc_sidecar.h for more info.
-  std::vector<RpcSidecar*> sidecars_;
-  ElementDeleter sidecars_deleter_;
+  std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
+
+  // Inbound sidecars from the request. The slices are views onto transfer_. There are as
+  // many slices as header_.sidecar_offsets_size().
+  Slice inbound_sidecar_slices_[TransferLimits::kMaxSidecars];
 
   // The trace buffer.
   scoped_refptr<Trace> trace_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index 19ec0ec..9b160a1 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -18,6 +18,7 @@
 #include <algorithm>
 #include <boost/functional/hash.hpp>
 #include <gflags/gflags.h>
+#include <memory>
 #include <mutex>
 #include <string>
 #include <unordered_set>
@@ -29,6 +30,7 @@
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/serialization.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/util/flag_tags.h"
@@ -44,6 +46,8 @@ DEFINE_int64(rpc_callback_max_cycles, 100 * 1000 * 1000,
 TAG_FLAG(rpc_callback_max_cycles, advanced);
 TAG_FLAG(rpc_callback_max_cycles, runtime);
 
+using std::unique_ptr;
+
 namespace kudu {
 namespace rpc {
 
@@ -88,10 +92,8 @@ OutboundCall::~OutboundCall() {
 }
 
 Status OutboundCall::SerializeTo(vector<Slice>* slices) {
-  size_t param_len = request_buf_.size();
-  if (PREDICT_FALSE(param_len == 0)) {
-    return Status::InvalidArgument("Must call SetRequestParam() before SerializeTo()");
-  }
+  DCHECK_LT(0, request_buf_.size())
+      << "Must call SetRequestPayload() before SerializeTo()";
 
   const MonoDelta &timeout = controller_->timeout();
   if (timeout.Initialized()) {
@@ -102,16 +104,32 @@ Status OutboundCall::SerializeTo(vector<Slice>* slices) {
     header_.add_required_feature_flags(feature);
   }
 
-  serialization::SerializeHeader(header_, param_len, &header_buf_);
+  DCHECK_LE(0, sidecar_byte_size_);
+  serialization::SerializeHeader(
+      header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_);
 
-  // Return the concatenated packet.
   slices->push_back(Slice(header_buf_));
   slices->push_back(Slice(request_buf_));
+  for (const unique_ptr<RpcSidecar>& car : sidecars_) slices->push_back(car->AsSlice());
   return Status::OK();
 }
 
-void OutboundCall::SetRequestParam(const Message& message) {
-  serialization::SerializeMessage(message, &request_buf_);
+void OutboundCall::SetRequestPayload(const Message& req,
+    vector<unique_ptr<RpcSidecar>>&& sidecars) {
+  DCHECK_EQ(-1, sidecar_byte_size_);
+
+  sidecars_ = move(sidecars);
+
+  // Compute total size of sidecar payload so that extra space can be reserved as part of
+  // the request body.
+  uint32_t message_size = req.ByteSize();
+  sidecar_byte_size_ = 0;
+  for (const unique_ptr<RpcSidecar>& car: sidecars_) {
+    header_.add_sidecar_offsets(sidecar_byte_size_ + message_size);
+    sidecar_byte_size_ += car->AsSlice().size();
+  }
+
+  serialization::SerializeMessage(req, &request_buf_, sidecar_byte_size_, true);
 }
 
 Status OutboundCall::status() const {
@@ -432,44 +450,16 @@ Status CallResponse::GetSidecar(int idx, Slice* sidecar) const {
 
 Status CallResponse::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
   CHECK(!parsed_);
-  Slice entire_message;
   RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_,
-                                            &entire_message));
+                                            &serialized_response_));
 
   // Use information from header to extract the payload slices.
-  int last = header_.sidecar_offsets_size() - 1;
+  RETURN_NOT_OK(RpcSidecar::ParseSidecars(header_.sidecar_offsets(),
+          serialized_response_, sidecar_slices_));
 
-  if (last >= OutboundTransfer::kMaxPayloadSlices) {
-    return Status::Corruption(strings::Substitute(
-        "Received $0 additional payload slices, expected at most %d",
-        last, OutboundTransfer::kMaxPayloadSlices));
-  }
-
-  if (last >= 0) {
-    serialized_response_ = Slice(entire_message.data(),
-                                 header_.sidecar_offsets(0));
-    for (int i = 0; i < last; ++i) {
-      uint32_t next_offset = header_.sidecar_offsets(i);
-      int32_t len = header_.sidecar_offsets(i + 1) - next_offset;
-      if (next_offset + len > entire_message.size() || len < 0) {
-        return Status::Corruption(strings::Substitute(
-            "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
-            " has length $2, but the entire message has length $3",
-            i, next_offset, len, entire_message.size()));
-      }
-      sidecar_slices_[i] = Slice(entire_message.data() + next_offset, len);
-    }
-    uint32_t next_offset = header_.sidecar_offsets(last);
-    if (next_offset > entire_message.size()) {
-        return Status::Corruption(strings::Substitute(
-            "Invalid sidecar offsets; the last sidecar ($0) apparently starts "
-            "at $1, but the entire message has length $3",
-            last, next_offset, entire_message.size()));
-    }
-    sidecar_slices_[last] = Slice(entire_message.data() + next_offset,
-                                  entire_message.size() - next_offset);
-  } else {
-    serialized_response_ = entire_message;
+  if (header_.sidecar_offsets_size() > 0) {
+    serialized_response_ =
+        Slice(serialized_response_.data(), header_.sidecar_offsets(0));
   }
 
   transfer_.swap(transfer);

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index fa599fd..87ca39a 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -27,6 +27,7 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/remote_method.h"
 #include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/transfer.h"
@@ -52,6 +53,7 @@ class DumpRunningRpcsRequestPB;
 class InboundTransfer;
 class RpcCallInProgressPB;
 class RpcController;
+class RpcSidecar;
 
 
 // Used to key on Connection information.
@@ -124,11 +126,13 @@ class OutboundCall {
 
   ~OutboundCall();
 
-  // Serialize the given request PB into this call's internal storage.
+  // Serialize the given request PB into this call's internal storage, and assume
+  // ownership of any sidecars that should accompany this request.
   //
-  // Because the data is fully serialized by this call, 'req' may be
-  // subsequently mutated with no ill effects.
-  void SetRequestParam(const google::protobuf::Message& req);
+  // Because the request data is fully serialized by this call, 'req' may be subsequently
+  // mutated with no ill effects.
+  void SetRequestPayload(const google::protobuf::Message& req,
+      std::vector<std::unique_ptr<RpcSidecar>>&& sidecars);
 
   // Assign the call ID for this call. This is called from the reactor
   // thread once a connection has been assigned. Must only be called once.
@@ -137,7 +141,7 @@ class OutboundCall {
     header_.set_call_id(call_id);
   }
 
-  // Serialize the call for the wire. Requires that SetRequestParam()
+  // Serialize the call for the wire. Requires that SetRequestPayload()
   // is called first. This is called from the Reactor thread.
   Status SerializeTo(std::vector<Slice>* slices);
 
@@ -269,6 +273,12 @@ class OutboundCall {
   // Otherwise NULL.
   gscoped_ptr<CallResponse> call_response_;
 
+  // All sidecars to be sent with this call.
+  std::vector<std::unique_ptr<RpcSidecar>> sidecars_;
+
+  // Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload().
+  int64_t sidecar_byte_size_ = -1;
+
   DISALLOW_COPY_AND_ASSIGN(OutboundCall);
 };
 
@@ -322,7 +332,7 @@ class CallResponse {
   Slice serialized_response_;
 
   // Slices of data for rpc sidecars. They point into memory owned by transfer_.
-  Slice sidecar_slices_[OutboundTransfer::kMaxPayloadSlices];
+  Slice sidecar_slices_[TransferLimits::kMaxSidecars];
 
   // The incoming transfer data - retained because serialized_response_
   // and sidecar_slices_ refer into its data.

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 206aac3..077af58 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -81,7 +81,7 @@ void Proxy::AsyncRequest(const string& method,
   RemoteMethod remote_method(service_name_, method);
   OutboundCall* call = new OutboundCall(conn_id_, remote_method, response, controller, callback);
   controller->call_.reset(call);
-  call->SetRequestParam(req);
+  controller->SetRequestParam(req);
 
   // If this fails to queue, the callback will get called immediately
   // and the controller will be in an ERROR state.

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 35a19f2..75ef792 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -65,6 +65,8 @@ using kudu::rpc_test::ExactlyOnceResponsePB;
 using kudu::rpc_test::FeatureFlags;
 using kudu::rpc_test::PanicRequestPB;
 using kudu::rpc_test::PanicResponsePB;
+using kudu::rpc_test::PushTwoStringsRequestPB;
+using kudu::rpc_test::PushTwoStringsResponsePB;
 using kudu::rpc_test::SendTwoStringsRequestPB;
 using kudu::rpc_test::SendTwoStringsResponsePB;
 using kudu::rpc_test::SleepRequestPB;
@@ -83,6 +85,7 @@ class GenericCalculatorService : public ServiceIf {
   static const char *kFullServiceName;
   static const char *kAddMethodName;
   static const char *kSleepMethodName;
+  static const char *kPushTwoStringsMethodName;
   static const char *kSendTwoStringsMethodName;
   static const char *kAddExactlyOnce;
 
@@ -105,6 +108,8 @@ class GenericCalculatorService : public ServiceIf {
       DoSleep(incoming);
     } else if (incoming->remote_method().method_name() == kSendTwoStringsMethodName) {
       DoSendTwoStrings(incoming);
+    } else if (incoming->remote_method().method_name() == kPushTwoStringsMethodName) {
+      DoPushTwoStrings(incoming);
     } else {
       incoming->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_METHOD,
                                Status::InvalidArgument("bad method"));
@@ -134,8 +139,8 @@ class GenericCalculatorService : public ServiceIf {
       LOG(FATAL) << "couldn't parse: " << param.ToDebugString();
     }
 
-    gscoped_ptr<faststring> first(new faststring);
-    gscoped_ptr<faststring> second(new faststring);
+    std::unique_ptr<faststring> first(new faststring);
+    std::unique_ptr<faststring> second(new faststring);
 
     Random r(req.random_seed());
     first->resize(req.size1());
@@ -146,16 +151,42 @@ class GenericCalculatorService : public ServiceIf {
 
     SendTwoStringsResponsePB resp;
     int idx1, idx2;
-    CHECK_OK(incoming->AddRpcSidecar(
-        make_gscoped_ptr(new RpcSidecar(std::move(first))), &idx1));
-    CHECK_OK(incoming->AddRpcSidecar(
-        make_gscoped_ptr(new RpcSidecar(std::move(second))), &idx2));
+    CHECK_OK(incoming->AddOutboundSidecar(
+            RpcSidecar::FromFaststring(std::move(first)), &idx1));
+    CHECK_OK(incoming->AddOutboundSidecar(
+            RpcSidecar::FromFaststring(std::move(second)), &idx2));
     resp.set_sidecar1(idx1);
     resp.set_sidecar2(idx2);
 
     incoming->RespondSuccess(resp);
   }
 
+  void DoPushTwoStrings(InboundCall* incoming) {
+    Slice param(incoming->serialized_request());
+    PushTwoStringsRequestPB req;
+    if (!req.ParseFromArray(param.data(), param.size())) {
+      LOG(FATAL) << "couldn't parse: " << param.ToDebugString();
+    }
+
+    Slice sidecar1;
+    CHECK_OK(incoming->GetInboundSidecar(req.sidecar1_idx(), &sidecar1));
+
+    Slice sidecar2;
+    CHECK_OK(incoming->GetInboundSidecar(req.sidecar2_idx(), &sidecar2));
+
+    // Check that reading non-existant sidecars doesn't work.
+    Slice tmp;
+    CHECK(!incoming->GetInboundSidecar(req.sidecar2_idx() + 2, &tmp).ok());
+
+    PushTwoStringsResponsePB resp;
+    resp.set_size1(sidecar1.size());
+    resp.set_data1(reinterpret_cast<const char*>(sidecar1.data()), sidecar1.size());
+    resp.set_size2(sidecar2.size());
+    resp.set_data2(reinterpret_cast<const char*>(sidecar2.data()), sidecar2.size());
+
+    incoming->RespondSuccess(resp);
+  }
+
   void DoSleep(InboundCall *incoming) {
     Slice param(incoming->serialized_request());
     SleepRequestPB req;
@@ -326,6 +357,7 @@ class CalculatorService : public CalculatorServiceIf {
 const char *GenericCalculatorService::kFullServiceName = "kudu.rpc.GenericCalculatorService";
 const char *GenericCalculatorService::kAddMethodName = "Add";
 const char *GenericCalculatorService::kSleepMethodName = "Sleep";
+const char *GenericCalculatorService::kPushTwoStringsMethodName = "PushTwoStrings";
 const char *GenericCalculatorService::kSendTwoStringsMethodName = "SendTwoStrings";
 const char *GenericCalculatorService::kAddExactlyOnce = "AddExactlyOnce";
 
@@ -425,6 +457,30 @@ class RpcTestBase : public KuduTest {
     CHECK_EQ(0, second.compare(Slice(expected)));
   }
 
+  void DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) {
+    PushTwoStringsRequestPB request;
+    RpcController controller;
+
+    int idx1;
+    string s1(size1, 'a');
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s1)), &idx1));
+
+    int idx2;
+    string s2(size2, 'b');
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s2)), &idx2));
+
+    request.set_sidecar1_idx(idx1);
+    request.set_sidecar2_idx(idx2);
+
+    PushTwoStringsResponsePB resp;
+    CHECK_OK(p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+            request, &resp, &controller));
+    CHECK_EQ(size1, resp.size1());
+    CHECK_EQ(resp.data1(), s1);
+    CHECK_EQ(size2, resp.size2());
+    CHECK_EQ(resp.data2(), s2);
+  }
+
   void DoTestExpectTimeout(const Proxy &p, const MonoDelta &timeout) {
     SleepRequestPB req;
     SleepResponsePB resp;
@@ -476,7 +532,7 @@ class RpcTestBase : public KuduTest {
   static Slice GetSidecarPointer(const RpcController& controller, int idx,
                                  int expected_size) {
     Slice sidecar;
-    CHECK_OK(controller.GetSidecar(idx, &sidecar));
+    CHECK_OK(controller.GetInboundSidecar(idx, &sidecar));
     CHECK_EQ(expected_size, sidecar.size());
     return Slice(sidecar.data(), expected_size);
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index e18d07c..d707a50 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -294,6 +294,51 @@ TEST_P(TestRpc, TestRpcSidecar) {
   // Test some larger sidecars to verify that we properly handle the case where
   // we can't write the whole response to the socket in a single call.
   DoTestSidecar(p, 3000 * 1024, 2000 * 1024);
+
+  DoTestOutgoingSidecar(p, 0, 0);
+  DoTestOutgoingSidecar(p, 123, 456);
+  DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024);
+}
+
+TEST_P(TestRpc, TestRpcSidecarLimits) {
+  {
+    // Test that the limits on the number of sidecars is respected.
+    RpcController controller;
+    string s = "foo";
+    int idx;
+    for (int i = 0; i < TransferLimits::kMaxSidecars; ++i) {
+      CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx));
+    }
+
+    CHECK(!controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx).ok());
+  }
+
+  {
+    // Test that the payload may not exceed --rpc_max_message_size.
+    // Set up server.
+    Sockaddr server_addr;
+    bool enable_ssl = GetParam();
+    StartTestServer(&server_addr, enable_ssl);
+
+    // Set up client.
+    shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam()));
+    Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+
+    RpcController controller;
+    string s(FLAGS_rpc_max_message_size + 1, 'a');
+    int idx;
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx));
+
+    PushTwoStringsRequestPB request;
+    request.set_sidecar1_idx(idx);
+    request.set_sidecar2_idx(idx);
+    PushTwoStringsResponsePB resp;
+    Status status = p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+        request, &resp, &controller);
+    ASSERT_TRUE(status.IsNetworkError()) << "Unexpected error: " << status.ToString();
+    // Remote responds to extra-large payloads by closing the connection.
+    ASSERT_STR_CONTAINS(status.ToString(), "Connection reset by peer");
+  }
 }
 
 // Test that timeouts are properly handled.

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_context.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.cc b/src/kudu/rpc/rpc_context.cc
index a0e634c..e93e093 100644
--- a/src/kudu/rpc/rpc_context.cc
+++ b/src/kudu/rpc/rpc_context.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/rpc/rpc_context.h"
 
+#include <memory>
 #include <ostream>
 #include <sstream>
 
@@ -32,6 +33,7 @@
 #include "kudu/util/trace.h"
 
 using google::protobuf::Message;
+using std::unique_ptr;
 
 namespace kudu {
 namespace rpc {
@@ -141,8 +143,12 @@ const rpc::RequestIdPB* RpcContext::request_id() const {
   return call_->header().has_request_id() ? &call_->header().request_id() : nullptr;
 }
 
-Status RpcContext::AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx) {
-  return call_->AddRpcSidecar(std::move(car), idx);
+Status RpcContext::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+  return call_->AddOutboundSidecar(std::move(car), idx);
+}
+
+Status RpcContext::GetInboundSidecar(int idx, Slice* slice) {
+  return call_->GetInboundSidecar(idx, slice);
 }
 
 const RemoteUser& RpcContext::remote_user() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h
index b95a9ce..12e8907 100644
--- a/src/kudu/rpc/rpc_context.h
+++ b/src/kudu/rpc/rpc_context.h
@@ -153,7 +153,11 @@ class RpcContext {
   // Upon success, writes the index of the sidecar (necessary to be retrieved
   // later) to 'idx'. Call may fail if all sidecars have already been used
   // by the RPC response.
-  Status AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx);
+  Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
+
+  // Fills 'sidecar' with a sidecar sent by the client. Returns an error if 'idx' is out
+  // of bounds.
+  Status GetInboundSidecar(int idx, Slice* slice);
 
   // Return the identity of remote user who made this call.
   const RemoteUser& remote_user() const;

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index adaf5ce..5e5cbc3 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -19,11 +19,14 @@
 
 #include <algorithm>
 #include <glog/logging.h>
+#include <memory>
 #include <mutex>
 
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/outbound_call.h"
 
+using std::unique_ptr;
+
 namespace kudu { namespace rpc {
 
 RpcController::RpcController() {
@@ -43,6 +46,7 @@ void RpcController::Swap(RpcController* other) {
     CHECK(other->finished());
   }
 
+  std::swap(outbound_sidecars_, other->outbound_sidecars_);
   std::swap(timeout_, other->timeout_);
   std::swap(call_, other->call_);
 }
@@ -77,7 +81,7 @@ const ErrorStatusPB* RpcController::error_response() const {
   return nullptr;
 }
 
-Status RpcController::GetSidecar(int idx, Slice* sidecar) const {
+Status RpcController::GetInboundSidecar(int idx, Slice* sidecar) const {
   return call_->call_response_->GetSidecar(idx, sidecar);
 }
 
@@ -114,5 +118,19 @@ MonoDelta RpcController::timeout() const {
   return timeout_;
 }
 
+Status RpcController::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+  if (outbound_sidecars_.size() >= TransferLimits::kMaxSidecars) {
+    return Status::RuntimeError("All available sidecars already used");
+  }
+  outbound_sidecars_.emplace_back(std::move(car));
+  *idx = outbound_sidecars_.size() - 1;
+  return Status::OK();
+}
+
+void RpcController::SetRequestParam(const google::protobuf::Message& req) {
+  DCHECK(call_ != nullptr);
+  call_->SetRequestPayload(req, std::move(outbound_sidecars_));
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.h b/src/kudu/rpc/rpc_controller.h
index cce1ff2..6d521d0 100644
--- a/src/kudu/rpc/rpc_controller.h
+++ b/src/kudu/rpc/rpc_controller.h
@@ -21,11 +21,20 @@
 #include <glog/logging.h>
 #include <memory>
 #include <unordered_set>
+#include <vector>
 
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/stl_util.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
 namespace kudu {
 
 namespace rpc {
@@ -33,6 +42,7 @@ namespace rpc {
 class ErrorStatusPB;
 class OutboundCall;
 class RequestIdPB;
+class RpcSidecar;
 
 // Controller for managing properties of a single RPC call, on the client side.
 //
@@ -177,12 +187,21 @@ class RpcController {
   // been Reset().
   //
   // May fail if index is invalid.
-  Status GetSidecar(int idx, Slice* sidecar) const;
+  Status GetInboundSidecar(int idx, Slice* sidecar) const;
+
+  // Adds a sidecar to the outbound request. The index of the sidecar is written to
+  // 'idx'. Returns an error if TransferLimits::kMaxSidecars have already been added
+  // to this request.
+  Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
 
  private:
   friend class OutboundCall;
   friend class Proxy;
 
+  // Set the outbound call_'s request parameter, and transfer ownership of
+  // outbound_sidecars_ to call_ in preparation for serialization.
+  void SetRequestParam(const google::protobuf::Message& req);
+
   MonoDelta timeout_;
   std::unordered_set<uint32_t> required_server_features_;
 
@@ -195,6 +214,8 @@ class RpcController {
   // Once the call is sent, it is tracked here.
   std::shared_ptr<OutboundCall> call_;
 
+  std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
+
   DISALLOW_COPY_AND_ASSIGN(RpcController);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_header.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index 6721c44..a6c9728 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -257,6 +257,11 @@ message RequestHeader {
   // Optional for requests that are naturally idempotent or to maintain compatibility with
   // older clients for requests that are not.
   optional RequestIdPB request_id = 15;
+
+  // Byte offsets for side cars in the main body of the request message.
+  // These offsets are counted AFTER the message header, i.e., offset 0
+  // is the first byte after the bytes for this protobuf.
+  repeated uint32 sidecar_offsets = 16;
 }
 
 message ResponseHeader {

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_sidecar.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_sidecar.cc b/src/kudu/rpc/rpc_sidecar.cc
new file mode 100644
index 0000000..580c6eb
--- /dev/null
+++ b/src/kudu/rpc/rpc_sidecar.cc
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_sidecar.h"
+
+#include "kudu/util/status.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/gutil/strings/substitute.h"
+
+using std::unique_ptr;
+
+namespace kudu {
+namespace rpc {
+
+// Sidecar that simply wraps a Slice. The data associated with the slice is therefore not
+// owned by this class, and it's the caller's responsibility to ensure it has a lifetime
+// at least as long as this sidecar.
+class SliceSidecar : public RpcSidecar {
+ public:
+  explicit SliceSidecar(Slice slice) : slice_(slice) { }
+  Slice AsSlice() const override { return slice_; }
+
+ private:
+  const Slice slice_;
+};
+
+class FaststringSidecar : public RpcSidecar {
+ public:
+  explicit FaststringSidecar(unique_ptr<faststring> data) : data_(std::move(data)) { }
+  Slice AsSlice() const override { return *data_; }
+
+ private:
+  const unique_ptr<faststring> data_;
+};
+
+unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(unique_ptr<faststring> data) {
+  return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data)));
+}
+
+unique_ptr<RpcSidecar> RpcSidecar::FromSlice(Slice slice) {
+  return unique_ptr<RpcSidecar>(new SliceSidecar(slice));
+}
+
+
+Status RpcSidecar::ParseSidecars(
+    const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
+    Slice buffer, Slice* sidecars) {
+  if (offsets.size() == 0) return Status::OK();
+
+  int last = offsets.size() - 1;
+  if (last >= TransferLimits::kMaxSidecars) {
+    return Status::Corruption(strings::Substitute(
+            "Received $0 additional payload slices, expected at most %d",
+            last, TransferLimits::kMaxSidecars));
+  }
+
+  for (int i = 0; i < last; ++i) {
+    int64_t cur_offset = offsets.Get(i);
+    int64_t next_offset = offsets.Get(i + 1);
+    if (next_offset > buffer.size()) {
+      return Status::Corruption(strings::Substitute(
+              "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
+              " has length $2, but the entire message has length $3",
+              i, cur_offset, (next_offset - cur_offset), buffer.size()));
+    }
+    if (next_offset < cur_offset) {
+      return Status::Corruption(strings::Substitute(
+              "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
+              " but ends before that at offset $1.", i, cur_offset, next_offset));
+    }
+
+    sidecars[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset);
+  }
+
+  int64_t cur_offset = offsets.Get(last);
+  if (cur_offset > buffer.size()) {
+    return Status::Corruption(strings::Substitute("Invalid sidecar offsets: sidecar $0 "
+            "starts at offset $1after message ends (message length $2).", last,
+            cur_offset, buffer.size()));
+  }
+  sidecars[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset);
+
+  return Status::OK();
+}
+
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_sidecar.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_sidecar.h b/src/kudu/rpc/rpc_sidecar.h
index da7e00f..00d6e4b 100644
--- a/src/kudu/rpc/rpc_sidecar.h
+++ b/src/kudu/rpc/rpc_sidecar.h
@@ -17,50 +17,48 @@
 #ifndef KUDU_RPC_RPC_SIDECAR_H
 #define KUDU_RPC_RPC_SIDECAR_H
 
-#include "kudu/gutil/gscoped_ptr.h"
+#include <google/protobuf/repeated_field.h>
+#include <memory>
+
 #include "kudu/util/faststring.h"
 #include "kudu/util/slice.h"
 
 namespace kudu {
 namespace rpc {
 
-// An RpcSidecar is a mechanism which allows replies to RPCs
-// to reference blocks of data without extra copies. In other words,
-// whenever a protobuf would have a large field where additional copies
-// become expensive, one may opt instead to use an RpcSidecar.
-//
-// The RpcSidecar saves on an additional copy to/from the protobuf on both the
-// server and client side. The InboundCall class accepts RpcSidecars, ignorant
-// of the form that the sidecar's data is kept in, requiring only that it can
-// be represented as a Slice. Data is then immediately copied from the
-// Slice returned from AsSlice() to the socket that is responding to the original
-// RPC.
+// An RpcSidecar is a mechanism which allows replies to RPCs to reference blocks of data
+// without extra copies. In other words, whenever a protobuf would have a large field
+// where additional copies become expensive, one may opt instead to use an RpcSidecar.
 //
-// In order to distinguish between separate sidecars, whenever a sidecar is
-// added to the RPC response on the server side, an index for that sidecar is
-// returned. This index must then in some way (i.e., via protobuf) be
-// communicated to the client side.
+// The RpcSidecar saves on an additional copy to/from the protobuf on both the server and
+// client side. Both Inbound- and OutboundCall classes accept sidecars to be sent to the
+// client and server respectively. They are ignorant of the sidecar's format, requiring
+// only that it can be represented as a Slice. Data is copied from the Slice returned from
+// AsSlice() to the socket that is responding to the original RPC. The slice should remain
+// valid for as long as the call it is attached to takes to complete.
 //
-// After receiving the RPC response on the client side, OutboundCall decodes
-// the original message along with the separate sidecars by using a list
-// of sidecar byte offsets that was sent in the message header.
+// In order to distinguish between separate sidecars, whenever a sidecar is added to the
+// RPC response on the server side, an index for that sidecar is returned. This index must
+// then in some way (i.e., via protobuf) be communicated to the recipient.
 //
-// After reconstructing the array of sidecars, the OutboundCall (through
-// RpcController's interface) is able to offer retrieval of the sidecar data
-// through the same indices that were returned by InboundCall (or indirectly
-// through the RpcContext wrapper) on the client side.
+// After reconstructing the array of sidecars, servers and clients may retrieve the
+// sidecar data through the RpcContext or RpcController interfaces respectively.
 class RpcSidecar {
  public:
-  // Generates a sidecar with the parameter faststring as its data.
-  explicit RpcSidecar(gscoped_ptr<faststring> data) : data_(std::move(data)) {}
+  static std::unique_ptr<RpcSidecar> FromFaststring(std::unique_ptr<faststring> data);
+  static std::unique_ptr<RpcSidecar> FromSlice(Slice slice);
 
-  // Returns a Slice representation of the sidecar's data.
-  Slice AsSlice() const { return *data_; }
+  // Utility method to parse a series of sidecar slices into 'sidecars' from 'buffer' and
+  // a set of offsets. 'sidecars' must have length >= TransferLimits::kMaxSidecars, and
+  // will be filled from index 0.
+  // TODO(henryr): Consider a vector instead here if there's no perf. impact.
+  static Status ParseSidecars(
+      const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
+      Slice buffer, Slice* sidecars);
 
- private:
-  const gscoped_ptr<faststring> data_;
-
-  DISALLOW_COPY_AND_ASSIGN(RpcSidecar);
+  // Returns a Slice representation of the sidecar's data.
+  virtual Slice AsSlice() const = 0;
+  virtual ~RpcSidecar() { }
 };
 
 } // namespace rpc

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rtest.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rtest.proto b/src/kudu/rpc/rtest.proto
index df307a6..c52b535 100644
--- a/src/kudu/rpc/rtest.proto
+++ b/src/kudu/rpc/rtest.proto
@@ -65,6 +65,19 @@ message SendTwoStringsResponsePB {
   required uint32 sidecar2 = 2;
 }
 
+// Push two strings to the server as part of the request, in sidecars.
+message PushTwoStringsRequestPB {
+  required uint32 sidecar1_idx = 1;
+  required uint32 sidecar2_idx = 2;
+}
+
+message PushTwoStringsResponsePB {
+  required uint32 size1 = 1;
+  required string data1 = 2;
+  required uint32 size2 = 3;
+  required string data2 = 4;
+}
+
 message EchoRequestPB {
   required string data = 1;
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 7fb6b10..671347a 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -46,6 +46,16 @@ namespace rpc {
 class Messenger;
 struct TransferCallbacks;
 
+class TransferLimits {
+ public:
+  enum {
+    kMaxSidecars = 10,
+    kMaxPayloadSlices = kMaxSidecars + 2 // (header + msg)
+  };
+
+  DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
+};
+
 // This class is used internally by the RPC layer to represent an inbound
 // transfer in progress.
 //
@@ -94,8 +104,6 @@ class InboundTransfer {
 // Upon completion of the transfer, a callback is triggered.
 class OutboundTransfer : public boost::intrusive::list_base_hook<> {
  public:
-  enum { kMaxPayloadSlices = 10 };
-
   // Factory methods for creating transfers associated with call requests
   // or responses. The 'payload' slices will be concatenated and
   // written to the socket. When the transfer completes or errors, the
@@ -159,7 +167,7 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> {
 
   // Slices to send. Uses an array here instead of a vector to avoid an expensive
   // vector construction (improved performance a couple percent).
-  Slice payload_slices_[kMaxPayloadSlices];
+  Slice payload_slices_[TransferLimits::kMaxPayloadSlices];
   size_t n_payload_slices_;
 
   // The current slice that is being sent.

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index 22edce7..6ebf0e3 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -312,10 +312,10 @@ class TabletServerTestBase : public KuduTest {
                                  vector<string>* results) {
     RowwiseRowBlockPB* rrpb = resp.mutable_data();
     Slice direct, indirect; // sidecar data buffers
-    ASSERT_OK(rpc.GetSidecar(rrpb->rows_sidecar(), &direct));
+    ASSERT_OK(rpc.GetInboundSidecar(rrpb->rows_sidecar(), &direct));
     if (rrpb->has_indirect_data_sidecar()) {
-      ASSERT_OK(rpc.GetSidecar(rrpb->indirect_data_sidecar(),
-                               &indirect));
+      ASSERT_OK(rpc.GetInboundSidecar(rrpb->indirect_data_sidecar(),
+              &indirect));
     }
     vector<const uint8_t*> rows;
     ASSERT_OK(ExtractRowsFromRowBlockPB(projection, *rrpb,

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 69ee7ab..c9e7d79 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -118,6 +118,7 @@ using kudu::consensus::StartTabletCopyResponsePB;
 using kudu::consensus::VoteRequestPB;
 using kudu::consensus::VoteResponsePB;
 using kudu::rpc::RpcContext;
+using kudu::rpc::RpcSidecar;
 using kudu::server::ServerBase;
 using kudu::tablet::AlterSchemaTransactionState;
 using kudu::tablet::Tablet;
@@ -1073,8 +1074,8 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
   }
 
   size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
-  gscoped_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10));
-  gscoped_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 10));
+  unique_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10));
+  unique_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 10));
   RowwiseRowBlockPB data;
   ScanResultCopier collector(&data, rows_data.get(), indirect_data.get());
 
@@ -1123,15 +1124,15 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
 
     // Add sidecar data to context and record the returned indices.
     int rows_idx;
-    CHECK_OK(context->AddRpcSidecar(make_gscoped_ptr(
-        new rpc::RpcSidecar(std::move(rows_data))), &rows_idx));
+    CHECK_OK(context->AddOutboundSidecar(RpcSidecar::FromFaststring((std::move(rows_data))),
+            &rows_idx));
     resp->mutable_data()->set_rows_sidecar(rows_idx);
 
     // Add indirect data as a sidecar, if applicable.
     if (indirect_data->size() > 0) {
       int indirect_idx;
-      CHECK_OK(context->AddRpcSidecar(make_gscoped_ptr(
-          new rpc::RpcSidecar(std::move(indirect_data))), &indirect_idx));
+      CHECK_OK(context->AddOutboundSidecar(RpcSidecar::FromFaststring(
+          std::move(indirect_data)), &indirect_idx));
       resp->mutable_data()->set_indirect_data_sidecar(indirect_idx);
     }