You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2017/03/08 18:27:09 UTC

[1/2] kudu git commit: java: avoid spewing ClosedChannelException on client-initiated disconnects

Repository: kudu
Updated Branches:
  refs/heads/master f24307db4 -> 72895966c


java: avoid spewing ClosedChannelException on client-initiated disconnects

When submitting an MR job, the submitting program opens a client
instance for a short amount of time, just to fetch an authentication
token. When it then calls 'close()', any TabletClient connections that
the client may still have open get disconnected, and that was causing an
exception to be logged like follows:

17/03/07 13:44:51 ERROR client.TabletClient: [Peer ] Unexpected exception from downstream on [id: 0x06a6a87b, /172.31.112.110:41188 :> kudu-security-1.gce.cloudera.com/172.31.112.110:7051]
java.nio.channels.ClosedChannelException
        at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433)
        at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128)
        at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:84)
        at org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779)
        at org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.write(Channels.java:725)
        at org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71)
        at org.apache.kudu.client.shaded.org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59)
        at org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
        at org.apache.kudu.client.shaded.org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
        at org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.write(Channels.java:704)
        at org.apache.kudu.client.shaded.org.jboss.netty.channel.Channels.write(Channels.java:671)
        at org.apache.kudu.client.Negotiator.sendSaslMessage(Negotiator.java:218)
        at org.apache.kudu.client.Negotiator.sendTunneledTls(Negotiator.java:515)
        at org.apache.kudu.client.Negotiator.sendPendingOutboundTls(Negotiator.java:505)
        at org.apache.kudu.client.Negotiator.handleTlsMessage(Negotiator.java:451)
        at org.apache.kudu.client.Negotiator.handleResponse(Negotiator.java:250)
        at org.apache.kudu.client.Negotiator.messageReceived(Negotiator.java:229)

This patch adds a flag in TabletClient which keeps track of the fact that
the disconnection was requested rather than unexpected, and in that case
avoids logging anything.

A new test triggers the same connection, captures logs, and makes sure
that no exceptions are in the logs.

Change-Id: I4e940d821c7d3f670c5a6b7407385952dc9debfc
Reviewed-on: http://gerrit.cloudera.org:8080/6303
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/5566bc90
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/5566bc90
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/5566bc90

Branch: refs/heads/master
Commit: 5566bc902ef94a9ce825f7c2f939cf87428941df
Parents: f24307d
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Mar 7 15:26:10 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Mar 8 00:34:29 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/TabletClient.java    | 13 ++++
 .../org/apache/kudu/client/TestKuduClient.java  | 25 +++++++
 .../apache/kudu/util/CapturingLogAppender.java  | 79 ++++++++++++++++++++
 3 files changed, 117 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/5566bc90/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
index 4d44406..e26e883 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/TabletClient.java
@@ -26,6 +26,7 @@
 
 package org.apache.kudu.client;
 
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -144,6 +145,13 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
 
   private final ServerInfo serverInfo;
 
+  /**
+   * Set to true when the client initiates a disconnect. The channelDisconnected
+   * event handler then knows not to log any warning about unexpected disconnection
+   * from the peer.
+   */
+  private volatile boolean closedByClient;
+
   public TabletClient(AsyncKuduClient client, ServerInfo serverInfo) {
     this.kuduClient = client;
     this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs();
@@ -293,6 +301,7 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
     // added to a ChannelPipeline, which synchronously fires the channelOpen()
     // event.
     Preconditions.checkNotNull(chan);
+    closedByClient = true;
     return Channels.disconnect(chan);
   }
 
@@ -689,6 +698,10 @@ public class TabletClient extends SimpleChannelUpstreamHandler {
           " ignore this if we're shutting down", e);
     } else if (e instanceof ReadTimeoutException) {
       LOG.debug(getPeerUuidLoggingString() + "Encountered a read timeout, will close the channel");
+    } else if (e instanceof ClosedChannelException) {
+      if (!closedByClient) {
+        LOG.info(getPeerUuidLoggingString() + "Lost connection to peer");
+      }
     } else {
       LOG.error(getPeerUuidLoggingString() + "Unexpected exception from downstream on " + c, e);
     }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5566bc90/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 488d7ea..6d3be3e 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -24,6 +24,7 @@ import static org.apache.kudu.client.RowResult.timestampToString;
 import static org.junit.Assert.*;
 import static org.junit.matchers.JUnitMatchers.containsString;
 
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -34,12 +35,15 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
+import org.apache.kudu.util.CapturingLogAppender;
+import org.apache.log4j.AppenderSkeleton;
 
 public class TestKuduClient extends BaseKuduTest {
   private static final Logger LOG = LoggerFactory.getLogger(TestKuduClient.class);
@@ -786,6 +790,25 @@ public class TestKuduClient extends BaseKuduTest {
     assertEquals(1, countRowsInScan(scanner));
   }
 
+  /**
+   * Regression test for some log spew which occurred in short-lived client instances which
+   * had outbound connections.
+   */
+  @Test(timeout = 100000)
+  public void testCloseShortlyAfterOpen() throws Exception {
+    CapturingLogAppender cla = new CapturingLogAppender();
+    try (Closeable c = cla.attach()) {
+      try (KuduClient localClient = new KuduClient.KuduClientBuilder(masterAddresses).build()) {
+        // Force the client to connect to the masters.
+        localClient.exportAuthenticationCredentials();
+      }
+      // Wait a little bit since the "channel disconnected" exceptions could come
+      // from threads that don't get synchronously joined by client.close().
+      Thread.sleep(500);
+    }
+    assertFalse(cla.getAppendedText(), cla.getAppendedText().contains("Exception"));
+  }
+
   @Test(timeout = 100000)
   public void testCustomNioExecutor() throws Exception {
     long startTime = System.nanoTime();
@@ -829,4 +852,6 @@ public class TestKuduClient extends BaseKuduTest {
   public void testNoDefaultPartitioning() throws Exception {
     syncClient.createTable(tableName, basicSchema, new CreateTableOptions());
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/5566bc90/java/kudu-client/src/test/java/org/apache/kudu/util/CapturingLogAppender.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/util/CapturingLogAppender.java b/java/kudu-client/src/test/java/org/apache/kudu/util/CapturingLogAppender.java
new file mode 100644
index 0000000..3d2d5b6
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/util/CapturingLogAppender.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
+import org.apache.log4j.SimpleLayout;
+import org.apache.log4j.spi.LoggingEvent;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Test utility which wraps Log4j and captures all messages logged
+ * while it is attached. This can be useful for asserting that a particular
+ * message is (or is not) logged.
+ */
+public class CapturingLogAppender extends AppenderSkeleton {
+  private StringBuilder appended = new StringBuilder();
+  private static final Layout layout = new SimpleLayout();
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public boolean requiresLayout() {
+    return false;
+  }
+
+  @Override
+  protected void append(LoggingEvent event) {
+    appended.append(layout.format(event));
+    if (event.getThrowableInformation() != null) {
+      appended.append(Throwables.getStackTraceAsString(
+          event.getThrowableInformation().getThrowable())).append("\n");
+    }
+  }
+
+  public String getAppendedText() {
+    return appended.toString();
+  }
+
+  /**
+   * Temporarily attach the capturing appender to the Log4j root logger.
+   * This can be used in a 'try-with-resources' block:
+   * <code>
+   *   try (Closeable c = capturer.attach()) {
+   *     ...
+   *   }
+   * </code>
+   */
+  public Closeable attach() {
+    Logger.getRootLogger().addAppender(this);
+    return new Closeable() {
+      @Override
+      public void close() throws IOException {
+        Logger.getRootLogger().removeAppender(CapturingLogAppender.this);
+      }
+    };
+  }
+}


[2/2] kudu git commit: KUDU-1866: Add request-side sidecars

Posted by jd...@apache.org.
KUDU-1866: Add request-side sidecars

This patch adds sidecars to client requests. Using the same mechanism as
on the response-side, clients may attach slices to outbound requests
which do not pass through a serialization or copy before being pushed to
the network socket. On the server side, these sidecars may be read
directly from the underlying byte stream with the interposition of a
Protobuf wrapper.

The sidecars may be added to a request via RpcController and retrieved
via RpcContext (i.e. the reverse of the existing response-side
interface).

This patch adds a few tests to rpc-test, and all rpc-test tests pass.

Change-Id: I3d709edb2a22dc983f51b69d7660a39e8d8d6a09
Reviewed-on: http://gerrit.cloudera.org:8080/5908
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/72895966
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/72895966
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/72895966

Branch: refs/heads/master
Commit: 72895966c6458d6f33e68d53450d9bd43a2c57b1
Parents: 5566bc9
Author: Henry Robinson <he...@cloudera.com>
Authored: Thu Feb 2 20:31:33 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Mar 8 00:49:34 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/scanner-internal.cc        |  14 ++--
 src/kudu/rpc/CMakeLists.txt                |   2 +-
 src/kudu/rpc/inbound_call.cc               |  40 +++++++---
 src/kudu/rpc/inbound_call.h                |  14 +++-
 src/kudu/rpc/outbound_call.cc              |  74 ++++++++---------
 src/kudu/rpc/outbound_call.h               |  22 +++--
 src/kudu/rpc/proxy.cc                      |   2 +-
 src/kudu/rpc/rpc-test-base.h               |  70 ++++++++++++++--
 src/kudu/rpc/rpc-test.cc                   |  45 +++++++++++
 src/kudu/rpc/rpc_context.cc                |  10 ++-
 src/kudu/rpc/rpc_context.h                 |   6 +-
 src/kudu/rpc/rpc_controller.cc             |  20 ++++-
 src/kudu/rpc/rpc_controller.h              |  23 +++++-
 src/kudu/rpc/rpc_header.proto              |   5 ++
 src/kudu/rpc/rpc_sidecar.cc                | 102 ++++++++++++++++++++++++
 src/kudu/rpc/rpc_sidecar.h                 |  60 +++++++-------
 src/kudu/rpc/rtest.proto                   |  13 +++
 src/kudu/rpc/transfer.h                    |  14 +++-
 src/kudu/tserver/tablet_server-test-base.h |   6 +-
 src/kudu/tserver/tablet_service.cc         |  13 +--
 20 files changed, 429 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 07f93a7..c804046 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -505,16 +505,16 @@ Status KuduScanBatch::Data::Reset(RpcController* controller,
   // First, rewrite the relative addresses into absolute ones.
   if (PREDICT_FALSE(!resp_data_.has_rows_sidecar())) {
     return Status::Corruption("Server sent invalid response: no row data");
-  } else {
-    Status s = controller_.GetSidecar(resp_data_.rows_sidecar(), &direct_data_);
-    if (!s.ok()) {
-      return Status::Corruption("Server sent invalid response: row data "
-                                "sidecar index corrupt", s.ToString());
-    }
+  }
+
+  Status s = controller_.GetInboundSidecar(resp_data_.rows_sidecar(), &direct_data_);
+  if (!s.ok()) {
+    return Status::Corruption("Server sent invalid response: row data "
+        "sidecar index corrupt", s.ToString());
   }
 
   if (resp_data_.has_indirect_data_sidecar()) {
-    Status s = controller_.GetSidecar(resp_data_.indirect_data_sidecar(),
+    Status s = controller_.GetInboundSidecar(resp_data_.indirect_data_sidecar(),
                                       &indirect_data_);
     if (!s.ok()) {
       return Status::Corruption("Server sent invalid response: indirect data "

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 19a7610..0cfe6e9 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -59,6 +59,7 @@ set(KRPC_SRCS
     rpc.cc
     rpc_context.cc
     rpc_controller.cc
+    rpc_sidecar.cc
     rpcz_store.cc
     sasl_common.cc
     sasl_helper.cc
@@ -125,4 +126,3 @@ ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
 ADD_KUDU_TEST(rpc-test)
 ADD_KUDU_TEST(rpc_stub-test)
 ADD_KUDU_TEST(service_queue-test)
-

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/inbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 448fd70..03e7da4 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -33,9 +33,8 @@
 
 using google::protobuf::FieldDescriptor;
 using google::protobuf::io::CodedOutputStream;
-using google::protobuf::Message;
 using google::protobuf::MessageLite;
-using std::shared_ptr;
+using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
@@ -44,7 +43,6 @@ namespace rpc {
 
 InboundCall::InboundCall(Connection* conn)
   : conn_(conn),
-    sidecars_deleter_(&sidecars_),
     trace_(new Trace),
     method_info_(nullptr) {
   RecordCallReceived();
@@ -67,6 +65,19 @@ Status InboundCall::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
   }
   remote_method_.FromPB(header_.remote_method());
 
+  if (header_.sidecar_offsets_size() > TransferLimits::kMaxSidecars) {
+    return Status::Corruption(strings::Substitute(
+            "Received $0 additional payload slices, expected at most %d",
+            header_.sidecar_offsets_size(), TransferLimits::kMaxSidecars));
+  }
+
+  RETURN_NOT_OK(RpcSidecar::ParseSidecars(
+          header_.sidecar_offsets(), serialized_request_, inbound_sidecar_slices_));
+  if (header_.sidecar_offsets_size() > 0) {
+    // Trim the request to just the message
+    serialized_request_ = Slice(serialized_request_.data(), header_.sidecar_offsets(0));
+  }
+
   // Retain the buffer that we have a view into.
   transfer_.swap(transfer);
   return Status::OK();
@@ -151,7 +162,7 @@ void InboundCall::SerializeResponseBuffer(const MessageLite& response,
   resp_hdr.set_call_id(header_.call_id());
   resp_hdr.set_is_error(!is_success);
   uint32_t absolute_sidecar_offset = protobuf_msg_size;
-  for (RpcSidecar* car : sidecars_) {
+  for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
     resp_hdr.add_sidecar_offsets(absolute_sidecar_offset);
     absolute_sidecar_offset += car->AsSlice().size();
   }
@@ -168,23 +179,23 @@ void InboundCall::SerializeResponseTo(vector<Slice>* slices) const {
   TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo");
   CHECK_GT(response_hdr_buf_.size(), 0);
   CHECK_GT(response_msg_buf_.size(), 0);
-  slices->reserve(slices->size() + 2 + sidecars_.size());
+  slices->reserve(slices->size() + 2 + outbound_sidecars_.size());
   slices->push_back(Slice(response_hdr_buf_));
   slices->push_back(Slice(response_msg_buf_));
-  for (RpcSidecar* car : sidecars_) {
+  for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
     slices->push_back(car->AsSlice());
   }
 }
 
-Status InboundCall::AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx) {
+Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
   // Check that the number of sidecars does not exceed the number of payload
   // slices that are free (two are used up by the header and main message
   // protobufs).
-  if (sidecars_.size() + 2 > OutboundTransfer::kMaxPayloadSlices) {
+  if (outbound_sidecars_.size() > TransferLimits::kMaxSidecars) {
     return Status::ServiceUnavailable("All available sidecars already used");
   }
-  sidecars_.push_back(car.release());
-  *idx = sidecars_.size() - 1;
+  outbound_sidecars_.emplace_back(std::move(car));
+  *idx = outbound_sidecars_.size() - 1;
   return Status::OK();
 }
 
@@ -288,5 +299,14 @@ vector<uint32_t> InboundCall::GetRequiredFeatures() const {
   return features;
 }
 
+Status InboundCall::GetInboundSidecar(int idx, Slice* sidecar) const {
+  if (idx < 0 || idx >= header_.sidecar_offsets_size()) {
+    return Status::InvalidArgument(strings::Substitute(
+            "Index $0 does not reference a valid sidecar", idx));
+  }
+  *sidecar = inbound_sidecar_slices_[idx];
+  return Status::OK();
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 4f99dee..ea6eade 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -22,7 +22,6 @@
 #include <vector>
 
 #include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/remote_method.h"
@@ -124,7 +123,7 @@ class InboundCall {
   void SerializeResponseTo(std::vector<Slice>* slices) const;
 
   // See RpcContext::AddRpcSidecar()
-  Status AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx);
+  Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
 
   std::string ToString() const;
 
@@ -187,6 +186,10 @@ class InboundCall {
   // the RPC.
   std::vector<uint32_t> GetRequiredFeatures() const;
 
+  // Get a sidecar sent as part of the request. If idx < 0 || idx > num sidecars - 1,
+  // returns an error.
+  Status GetInboundSidecar(int idx, Slice* sidecar) const;
+
  private:
   friend class RpczStore;
 
@@ -227,8 +230,11 @@ class InboundCall {
 
   // Vector of additional sidecars that are tacked on to the call's response
   // after serialization of the protobuf. See rpc/rpc_sidecar.h for more info.
-  std::vector<RpcSidecar*> sidecars_;
-  ElementDeleter sidecars_deleter_;
+  std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
+
+  // Inbound sidecars from the request. The slices are views onto transfer_. There are as
+  // many slices as header_.sidecar_offsets_size().
+  Slice inbound_sidecar_slices_[TransferLimits::kMaxSidecars];
 
   // The trace buffer.
   scoped_refptr<Trace> trace_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index 19ec0ec..9b160a1 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -18,6 +18,7 @@
 #include <algorithm>
 #include <boost/functional/hash.hpp>
 #include <gflags/gflags.h>
+#include <memory>
 #include <mutex>
 #include <string>
 #include <unordered_set>
@@ -29,6 +30,7 @@
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/serialization.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/util/flag_tags.h"
@@ -44,6 +46,8 @@ DEFINE_int64(rpc_callback_max_cycles, 100 * 1000 * 1000,
 TAG_FLAG(rpc_callback_max_cycles, advanced);
 TAG_FLAG(rpc_callback_max_cycles, runtime);
 
+using std::unique_ptr;
+
 namespace kudu {
 namespace rpc {
 
@@ -88,10 +92,8 @@ OutboundCall::~OutboundCall() {
 }
 
 Status OutboundCall::SerializeTo(vector<Slice>* slices) {
-  size_t param_len = request_buf_.size();
-  if (PREDICT_FALSE(param_len == 0)) {
-    return Status::InvalidArgument("Must call SetRequestParam() before SerializeTo()");
-  }
+  DCHECK_LT(0, request_buf_.size())
+      << "Must call SetRequestPayload() before SerializeTo()";
 
   const MonoDelta &timeout = controller_->timeout();
   if (timeout.Initialized()) {
@@ -102,16 +104,32 @@ Status OutboundCall::SerializeTo(vector<Slice>* slices) {
     header_.add_required_feature_flags(feature);
   }
 
-  serialization::SerializeHeader(header_, param_len, &header_buf_);
+  DCHECK_LE(0, sidecar_byte_size_);
+  serialization::SerializeHeader(
+      header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_);
 
-  // Return the concatenated packet.
   slices->push_back(Slice(header_buf_));
   slices->push_back(Slice(request_buf_));
+  for (const unique_ptr<RpcSidecar>& car : sidecars_) slices->push_back(car->AsSlice());
   return Status::OK();
 }
 
-void OutboundCall::SetRequestParam(const Message& message) {
-  serialization::SerializeMessage(message, &request_buf_);
+void OutboundCall::SetRequestPayload(const Message& req,
+    vector<unique_ptr<RpcSidecar>>&& sidecars) {
+  DCHECK_EQ(-1, sidecar_byte_size_);
+
+  sidecars_ = move(sidecars);
+
+  // Compute total size of sidecar payload so that extra space can be reserved as part of
+  // the request body.
+  uint32_t message_size = req.ByteSize();
+  sidecar_byte_size_ = 0;
+  for (const unique_ptr<RpcSidecar>& car: sidecars_) {
+    header_.add_sidecar_offsets(sidecar_byte_size_ + message_size);
+    sidecar_byte_size_ += car->AsSlice().size();
+  }
+
+  serialization::SerializeMessage(req, &request_buf_, sidecar_byte_size_, true);
 }
 
 Status OutboundCall::status() const {
@@ -432,44 +450,16 @@ Status CallResponse::GetSidecar(int idx, Slice* sidecar) const {
 
 Status CallResponse::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
   CHECK(!parsed_);
-  Slice entire_message;
   RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_,
-                                            &entire_message));
+                                            &serialized_response_));
 
   // Use information from header to extract the payload slices.
-  int last = header_.sidecar_offsets_size() - 1;
+  RETURN_NOT_OK(RpcSidecar::ParseSidecars(header_.sidecar_offsets(),
+          serialized_response_, sidecar_slices_));
 
-  if (last >= OutboundTransfer::kMaxPayloadSlices) {
-    return Status::Corruption(strings::Substitute(
-        "Received $0 additional payload slices, expected at most %d",
-        last, OutboundTransfer::kMaxPayloadSlices));
-  }
-
-  if (last >= 0) {
-    serialized_response_ = Slice(entire_message.data(),
-                                 header_.sidecar_offsets(0));
-    for (int i = 0; i < last; ++i) {
-      uint32_t next_offset = header_.sidecar_offsets(i);
-      int32_t len = header_.sidecar_offsets(i + 1) - next_offset;
-      if (next_offset + len > entire_message.size() || len < 0) {
-        return Status::Corruption(strings::Substitute(
-            "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
-            " has length $2, but the entire message has length $3",
-            i, next_offset, len, entire_message.size()));
-      }
-      sidecar_slices_[i] = Slice(entire_message.data() + next_offset, len);
-    }
-    uint32_t next_offset = header_.sidecar_offsets(last);
-    if (next_offset > entire_message.size()) {
-        return Status::Corruption(strings::Substitute(
-            "Invalid sidecar offsets; the last sidecar ($0) apparently starts "
-            "at $1, but the entire message has length $3",
-            last, next_offset, entire_message.size()));
-    }
-    sidecar_slices_[last] = Slice(entire_message.data() + next_offset,
-                                  entire_message.size() - next_offset);
-  } else {
-    serialized_response_ = entire_message;
+  if (header_.sidecar_offsets_size() > 0) {
+    serialized_response_ =
+        Slice(serialized_response_.data(), header_.sidecar_offsets(0));
   }
 
   transfer_.swap(transfer);

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index fa599fd..87ca39a 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -27,6 +27,7 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
 #include "kudu/rpc/remote_method.h"
 #include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/transfer.h"
@@ -52,6 +53,7 @@ class DumpRunningRpcsRequestPB;
 class InboundTransfer;
 class RpcCallInProgressPB;
 class RpcController;
+class RpcSidecar;
 
 
 // Used to key on Connection information.
@@ -124,11 +126,13 @@ class OutboundCall {
 
   ~OutboundCall();
 
-  // Serialize the given request PB into this call's internal storage.
+  // Serialize the given request PB into this call's internal storage, and assume
+  // ownership of any sidecars that should accompany this request.
   //
-  // Because the data is fully serialized by this call, 'req' may be
-  // subsequently mutated with no ill effects.
-  void SetRequestParam(const google::protobuf::Message& req);
+  // Because the request data is fully serialized by this call, 'req' may be subsequently
+  // mutated with no ill effects.
+  void SetRequestPayload(const google::protobuf::Message& req,
+      std::vector<std::unique_ptr<RpcSidecar>>&& sidecars);
 
   // Assign the call ID for this call. This is called from the reactor
   // thread once a connection has been assigned. Must only be called once.
@@ -137,7 +141,7 @@ class OutboundCall {
     header_.set_call_id(call_id);
   }
 
-  // Serialize the call for the wire. Requires that SetRequestParam()
+  // Serialize the call for the wire. Requires that SetRequestPayload()
   // is called first. This is called from the Reactor thread.
   Status SerializeTo(std::vector<Slice>* slices);
 
@@ -269,6 +273,12 @@ class OutboundCall {
   // Otherwise NULL.
   gscoped_ptr<CallResponse> call_response_;
 
+  // All sidecars to be sent with this call.
+  std::vector<std::unique_ptr<RpcSidecar>> sidecars_;
+
+  // Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload().
+  int64_t sidecar_byte_size_ = -1;
+
   DISALLOW_COPY_AND_ASSIGN(OutboundCall);
 };
 
@@ -322,7 +332,7 @@ class CallResponse {
   Slice serialized_response_;
 
   // Slices of data for rpc sidecars. They point into memory owned by transfer_.
-  Slice sidecar_slices_[OutboundTransfer::kMaxPayloadSlices];
+  Slice sidecar_slices_[TransferLimits::kMaxSidecars];
 
   // The incoming transfer data - retained because serialized_response_
   // and sidecar_slices_ refer into its data.

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 206aac3..077af58 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -81,7 +81,7 @@ void Proxy::AsyncRequest(const string& method,
   RemoteMethod remote_method(service_name_, method);
   OutboundCall* call = new OutboundCall(conn_id_, remote_method, response, controller, callback);
   controller->call_.reset(call);
-  call->SetRequestParam(req);
+  controller->SetRequestParam(req);
 
   // If this fails to queue, the callback will get called immediately
   // and the controller will be in an ERROR state.

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 35a19f2..75ef792 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -65,6 +65,8 @@ using kudu::rpc_test::ExactlyOnceResponsePB;
 using kudu::rpc_test::FeatureFlags;
 using kudu::rpc_test::PanicRequestPB;
 using kudu::rpc_test::PanicResponsePB;
+using kudu::rpc_test::PushTwoStringsRequestPB;
+using kudu::rpc_test::PushTwoStringsResponsePB;
 using kudu::rpc_test::SendTwoStringsRequestPB;
 using kudu::rpc_test::SendTwoStringsResponsePB;
 using kudu::rpc_test::SleepRequestPB;
@@ -83,6 +85,7 @@ class GenericCalculatorService : public ServiceIf {
   static const char *kFullServiceName;
   static const char *kAddMethodName;
   static const char *kSleepMethodName;
+  static const char *kPushTwoStringsMethodName;
   static const char *kSendTwoStringsMethodName;
   static const char *kAddExactlyOnce;
 
@@ -105,6 +108,8 @@ class GenericCalculatorService : public ServiceIf {
       DoSleep(incoming);
     } else if (incoming->remote_method().method_name() == kSendTwoStringsMethodName) {
       DoSendTwoStrings(incoming);
+    } else if (incoming->remote_method().method_name() == kPushTwoStringsMethodName) {
+      DoPushTwoStrings(incoming);
     } else {
       incoming->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_METHOD,
                                Status::InvalidArgument("bad method"));
@@ -134,8 +139,8 @@ class GenericCalculatorService : public ServiceIf {
       LOG(FATAL) << "couldn't parse: " << param.ToDebugString();
     }
 
-    gscoped_ptr<faststring> first(new faststring);
-    gscoped_ptr<faststring> second(new faststring);
+    std::unique_ptr<faststring> first(new faststring);
+    std::unique_ptr<faststring> second(new faststring);
 
     Random r(req.random_seed());
     first->resize(req.size1());
@@ -146,16 +151,42 @@ class GenericCalculatorService : public ServiceIf {
 
     SendTwoStringsResponsePB resp;
     int idx1, idx2;
-    CHECK_OK(incoming->AddRpcSidecar(
-        make_gscoped_ptr(new RpcSidecar(std::move(first))), &idx1));
-    CHECK_OK(incoming->AddRpcSidecar(
-        make_gscoped_ptr(new RpcSidecar(std::move(second))), &idx2));
+    CHECK_OK(incoming->AddOutboundSidecar(
+            RpcSidecar::FromFaststring(std::move(first)), &idx1));
+    CHECK_OK(incoming->AddOutboundSidecar(
+            RpcSidecar::FromFaststring(std::move(second)), &idx2));
     resp.set_sidecar1(idx1);
     resp.set_sidecar2(idx2);
 
     incoming->RespondSuccess(resp);
   }
 
+  void DoPushTwoStrings(InboundCall* incoming) {
+    Slice param(incoming->serialized_request());
+    PushTwoStringsRequestPB req;
+    if (!req.ParseFromArray(param.data(), param.size())) {
+      LOG(FATAL) << "couldn't parse: " << param.ToDebugString();
+    }
+
+    Slice sidecar1;
+    CHECK_OK(incoming->GetInboundSidecar(req.sidecar1_idx(), &sidecar1));
+
+    Slice sidecar2;
+    CHECK_OK(incoming->GetInboundSidecar(req.sidecar2_idx(), &sidecar2));
+
+    // Check that reading non-existant sidecars doesn't work.
+    Slice tmp;
+    CHECK(!incoming->GetInboundSidecar(req.sidecar2_idx() + 2, &tmp).ok());
+
+    PushTwoStringsResponsePB resp;
+    resp.set_size1(sidecar1.size());
+    resp.set_data1(reinterpret_cast<const char*>(sidecar1.data()), sidecar1.size());
+    resp.set_size2(sidecar2.size());
+    resp.set_data2(reinterpret_cast<const char*>(sidecar2.data()), sidecar2.size());
+
+    incoming->RespondSuccess(resp);
+  }
+
   void DoSleep(InboundCall *incoming) {
     Slice param(incoming->serialized_request());
     SleepRequestPB req;
@@ -326,6 +357,7 @@ class CalculatorService : public CalculatorServiceIf {
 const char *GenericCalculatorService::kFullServiceName = "kudu.rpc.GenericCalculatorService";
 const char *GenericCalculatorService::kAddMethodName = "Add";
 const char *GenericCalculatorService::kSleepMethodName = "Sleep";
+const char *GenericCalculatorService::kPushTwoStringsMethodName = "PushTwoStrings";
 const char *GenericCalculatorService::kSendTwoStringsMethodName = "SendTwoStrings";
 const char *GenericCalculatorService::kAddExactlyOnce = "AddExactlyOnce";
 
@@ -425,6 +457,30 @@ class RpcTestBase : public KuduTest {
     CHECK_EQ(0, second.compare(Slice(expected)));
   }
 
+  void DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) {
+    PushTwoStringsRequestPB request;
+    RpcController controller;
+
+    int idx1;
+    string s1(size1, 'a');
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s1)), &idx1));
+
+    int idx2;
+    string s2(size2, 'b');
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s2)), &idx2));
+
+    request.set_sidecar1_idx(idx1);
+    request.set_sidecar2_idx(idx2);
+
+    PushTwoStringsResponsePB resp;
+    CHECK_OK(p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+            request, &resp, &controller));
+    CHECK_EQ(size1, resp.size1());
+    CHECK_EQ(resp.data1(), s1);
+    CHECK_EQ(size2, resp.size2());
+    CHECK_EQ(resp.data2(), s2);
+  }
+
   void DoTestExpectTimeout(const Proxy &p, const MonoDelta &timeout) {
     SleepRequestPB req;
     SleepResponsePB resp;
@@ -476,7 +532,7 @@ class RpcTestBase : public KuduTest {
   static Slice GetSidecarPointer(const RpcController& controller, int idx,
                                  int expected_size) {
     Slice sidecar;
-    CHECK_OK(controller.GetSidecar(idx, &sidecar));
+    CHECK_OK(controller.GetInboundSidecar(idx, &sidecar));
     CHECK_EQ(expected_size, sidecar.size());
     return Slice(sidecar.data(), expected_size);
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index e18d07c..d707a50 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -294,6 +294,51 @@ TEST_P(TestRpc, TestRpcSidecar) {
   // Test some larger sidecars to verify that we properly handle the case where
   // we can't write the whole response to the socket in a single call.
   DoTestSidecar(p, 3000 * 1024, 2000 * 1024);
+
+  DoTestOutgoingSidecar(p, 0, 0);
+  DoTestOutgoingSidecar(p, 123, 456);
+  DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024);
+}
+
+TEST_P(TestRpc, TestRpcSidecarLimits) {
+  {
+    // Test that the limits on the number of sidecars is respected.
+    RpcController controller;
+    string s = "foo";
+    int idx;
+    for (int i = 0; i < TransferLimits::kMaxSidecars; ++i) {
+      CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx));
+    }
+
+    CHECK(!controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx).ok());
+  }
+
+  {
+    // Test that the payload may not exceed --rpc_max_message_size.
+    // Set up server.
+    Sockaddr server_addr;
+    bool enable_ssl = GetParam();
+    StartTestServer(&server_addr, enable_ssl);
+
+    // Set up client.
+    shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam()));
+    Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+
+    RpcController controller;
+    string s(FLAGS_rpc_max_message_size + 1, 'a');
+    int idx;
+    CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx));
+
+    PushTwoStringsRequestPB request;
+    request.set_sidecar1_idx(idx);
+    request.set_sidecar2_idx(idx);
+    PushTwoStringsResponsePB resp;
+    Status status = p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+        request, &resp, &controller);
+    ASSERT_TRUE(status.IsNetworkError()) << "Unexpected error: " << status.ToString();
+    // Remote responds to extra-large payloads by closing the connection.
+    ASSERT_STR_CONTAINS(status.ToString(), "Connection reset by peer");
+  }
 }
 
 // Test that timeouts are properly handled.

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_context.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.cc b/src/kudu/rpc/rpc_context.cc
index a0e634c..e93e093 100644
--- a/src/kudu/rpc/rpc_context.cc
+++ b/src/kudu/rpc/rpc_context.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/rpc/rpc_context.h"
 
+#include <memory>
 #include <ostream>
 #include <sstream>
 
@@ -32,6 +33,7 @@
 #include "kudu/util/trace.h"
 
 using google::protobuf::Message;
+using std::unique_ptr;
 
 namespace kudu {
 namespace rpc {
@@ -141,8 +143,12 @@ const rpc::RequestIdPB* RpcContext::request_id() const {
   return call_->header().has_request_id() ? &call_->header().request_id() : nullptr;
 }
 
-Status RpcContext::AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx) {
-  return call_->AddRpcSidecar(std::move(car), idx);
+Status RpcContext::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+  return call_->AddOutboundSidecar(std::move(car), idx);
+}
+
+Status RpcContext::GetInboundSidecar(int idx, Slice* slice) {
+  return call_->GetInboundSidecar(idx, slice);
 }
 
 const RemoteUser& RpcContext::remote_user() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h
index b95a9ce..12e8907 100644
--- a/src/kudu/rpc/rpc_context.h
+++ b/src/kudu/rpc/rpc_context.h
@@ -153,7 +153,11 @@ class RpcContext {
   // Upon success, writes the index of the sidecar (necessary to be retrieved
   // later) to 'idx'. Call may fail if all sidecars have already been used
   // by the RPC response.
-  Status AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx);
+  Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
+
+  // Fills 'sidecar' with a sidecar sent by the client. Returns an error if 'idx' is out
+  // of bounds.
+  Status GetInboundSidecar(int idx, Slice* slice);
 
   // Return the identity of remote user who made this call.
   const RemoteUser& remote_user() const;

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index adaf5ce..5e5cbc3 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -19,11 +19,14 @@
 
 #include <algorithm>
 #include <glog/logging.h>
+#include <memory>
 #include <mutex>
 
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/outbound_call.h"
 
+using std::unique_ptr;
+
 namespace kudu { namespace rpc {
 
 RpcController::RpcController() {
@@ -43,6 +46,7 @@ void RpcController::Swap(RpcController* other) {
     CHECK(other->finished());
   }
 
+  std::swap(outbound_sidecars_, other->outbound_sidecars_);
   std::swap(timeout_, other->timeout_);
   std::swap(call_, other->call_);
 }
@@ -77,7 +81,7 @@ const ErrorStatusPB* RpcController::error_response() const {
   return nullptr;
 }
 
-Status RpcController::GetSidecar(int idx, Slice* sidecar) const {
+Status RpcController::GetInboundSidecar(int idx, Slice* sidecar) const {
   return call_->call_response_->GetSidecar(idx, sidecar);
 }
 
@@ -114,5 +118,19 @@ MonoDelta RpcController::timeout() const {
   return timeout_;
 }
 
+Status RpcController::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+  if (outbound_sidecars_.size() >= TransferLimits::kMaxSidecars) {
+    return Status::RuntimeError("All available sidecars already used");
+  }
+  outbound_sidecars_.emplace_back(std::move(car));
+  *idx = outbound_sidecars_.size() - 1;
+  return Status::OK();
+}
+
+void RpcController::SetRequestParam(const google::protobuf::Message& req) {
+  DCHECK(call_ != nullptr);
+  call_->SetRequestPayload(req, std::move(outbound_sidecars_));
+}
+
 } // namespace rpc
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.h b/src/kudu/rpc/rpc_controller.h
index cce1ff2..6d521d0 100644
--- a/src/kudu/rpc/rpc_controller.h
+++ b/src/kudu/rpc/rpc_controller.h
@@ -21,11 +21,20 @@
 #include <glog/logging.h>
 #include <memory>
 #include <unordered_set>
+#include <vector>
 
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/stl_util.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
 namespace kudu {
 
 namespace rpc {
@@ -33,6 +42,7 @@ namespace rpc {
 class ErrorStatusPB;
 class OutboundCall;
 class RequestIdPB;
+class RpcSidecar;
 
 // Controller for managing properties of a single RPC call, on the client side.
 //
@@ -177,12 +187,21 @@ class RpcController {
   // been Reset().
   //
   // May fail if index is invalid.
-  Status GetSidecar(int idx, Slice* sidecar) const;
+  Status GetInboundSidecar(int idx, Slice* sidecar) const;
+
+  // Adds a sidecar to the outbound request. The index of the sidecar is written to
+  // 'idx'. Returns an error if TransferLimits::kMaxSidecars have already been added
+  // to this request.
+  Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
 
  private:
   friend class OutboundCall;
   friend class Proxy;
 
+  // Set the outbound call_'s request parameter, and transfer ownership of
+  // outbound_sidecars_ to call_ in preparation for serialization.
+  void SetRequestParam(const google::protobuf::Message& req);
+
   MonoDelta timeout_;
   std::unordered_set<uint32_t> required_server_features_;
 
@@ -195,6 +214,8 @@ class RpcController {
   // Once the call is sent, it is tracked here.
   std::shared_ptr<OutboundCall> call_;
 
+  std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
+
   DISALLOW_COPY_AND_ASSIGN(RpcController);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_header.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index 6721c44..a6c9728 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -257,6 +257,11 @@ message RequestHeader {
   // Optional for requests that are naturally idempotent or to maintain compatibility with
   // older clients for requests that are not.
   optional RequestIdPB request_id = 15;
+
+  // Byte offsets for side cars in the main body of the request message.
+  // These offsets are counted AFTER the message header, i.e., offset 0
+  // is the first byte after the bytes for this protobuf.
+  repeated uint32 sidecar_offsets = 16;
 }
 
 message ResponseHeader {

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_sidecar.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_sidecar.cc b/src/kudu/rpc/rpc_sidecar.cc
new file mode 100644
index 0000000..580c6eb
--- /dev/null
+++ b/src/kudu/rpc/rpc_sidecar.cc
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_sidecar.h"
+
+#include "kudu/util/status.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/gutil/strings/substitute.h"
+
+using std::unique_ptr;
+
+namespace kudu {
+namespace rpc {
+
+// Sidecar that simply wraps a Slice. The data associated with the slice is therefore not
+// owned by this class, and it's the caller's responsibility to ensure it has a lifetime
+// at least as long as this sidecar.
+class SliceSidecar : public RpcSidecar {
+ public:
+  explicit SliceSidecar(Slice slice) : slice_(slice) { }
+  Slice AsSlice() const override { return slice_; }
+
+ private:
+  const Slice slice_;
+};
+
+class FaststringSidecar : public RpcSidecar {
+ public:
+  explicit FaststringSidecar(unique_ptr<faststring> data) : data_(std::move(data)) { }
+  Slice AsSlice() const override { return *data_; }
+
+ private:
+  const unique_ptr<faststring> data_;
+};
+
+unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(unique_ptr<faststring> data) {
+  return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data)));
+}
+
+unique_ptr<RpcSidecar> RpcSidecar::FromSlice(Slice slice) {
+  return unique_ptr<RpcSidecar>(new SliceSidecar(slice));
+}
+
+
+Status RpcSidecar::ParseSidecars(
+    const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
+    Slice buffer, Slice* sidecars) {
+  if (offsets.size() == 0) return Status::OK();
+
+  int last = offsets.size() - 1;
+  if (last >= TransferLimits::kMaxSidecars) {
+    return Status::Corruption(strings::Substitute(
+            "Received $0 additional payload slices, expected at most %d",
+            last, TransferLimits::kMaxSidecars));
+  }
+
+  for (int i = 0; i < last; ++i) {
+    int64_t cur_offset = offsets.Get(i);
+    int64_t next_offset = offsets.Get(i + 1);
+    if (next_offset > buffer.size()) {
+      return Status::Corruption(strings::Substitute(
+              "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
+              " has length $2, but the entire message has length $3",
+              i, cur_offset, (next_offset - cur_offset), buffer.size()));
+    }
+    if (next_offset < cur_offset) {
+      return Status::Corruption(strings::Substitute(
+              "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
+              " but ends before that at offset $1.", i, cur_offset, next_offset));
+    }
+
+    sidecars[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset);
+  }
+
+  int64_t cur_offset = offsets.Get(last);
+  if (cur_offset > buffer.size()) {
+    return Status::Corruption(strings::Substitute("Invalid sidecar offsets: sidecar $0 "
+            "starts at offset $1after message ends (message length $2).", last,
+            cur_offset, buffer.size()));
+  }
+  sidecars[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset);
+
+  return Status::OK();
+}
+
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_sidecar.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_sidecar.h b/src/kudu/rpc/rpc_sidecar.h
index da7e00f..00d6e4b 100644
--- a/src/kudu/rpc/rpc_sidecar.h
+++ b/src/kudu/rpc/rpc_sidecar.h
@@ -17,50 +17,48 @@
 #ifndef KUDU_RPC_RPC_SIDECAR_H
 #define KUDU_RPC_RPC_SIDECAR_H
 
-#include "kudu/gutil/gscoped_ptr.h"
+#include <google/protobuf/repeated_field.h>
+#include <memory>
+
 #include "kudu/util/faststring.h"
 #include "kudu/util/slice.h"
 
 namespace kudu {
 namespace rpc {
 
-// An RpcSidecar is a mechanism which allows replies to RPCs
-// to reference blocks of data without extra copies. In other words,
-// whenever a protobuf would have a large field where additional copies
-// become expensive, one may opt instead to use an RpcSidecar.
-//
-// The RpcSidecar saves on an additional copy to/from the protobuf on both the
-// server and client side. The InboundCall class accepts RpcSidecars, ignorant
-// of the form that the sidecar's data is kept in, requiring only that it can
-// be represented as a Slice. Data is then immediately copied from the
-// Slice returned from AsSlice() to the socket that is responding to the original
-// RPC.
+// An RpcSidecar is a mechanism which allows replies to RPCs to reference blocks of data
+// without extra copies. In other words, whenever a protobuf would have a large field
+// where additional copies become expensive, one may opt instead to use an RpcSidecar.
 //
-// In order to distinguish between separate sidecars, whenever a sidecar is
-// added to the RPC response on the server side, an index for that sidecar is
-// returned. This index must then in some way (i.e., via protobuf) be
-// communicated to the client side.
+// The RpcSidecar saves on an additional copy to/from the protobuf on both the server and
+// client side. Both Inbound- and OutboundCall classes accept sidecars to be sent to the
+// client and server respectively. They are ignorant of the sidecar's format, requiring
+// only that it can be represented as a Slice. Data is copied from the Slice returned from
+// AsSlice() to the socket that is responding to the original RPC. The slice should remain
+// valid for as long as the call it is attached to takes to complete.
 //
-// After receiving the RPC response on the client side, OutboundCall decodes
-// the original message along with the separate sidecars by using a list
-// of sidecar byte offsets that was sent in the message header.
+// In order to distinguish between separate sidecars, whenever a sidecar is added to the
+// RPC response on the server side, an index for that sidecar is returned. This index must
+// then in some way (i.e., via protobuf) be communicated to the recipient.
 //
-// After reconstructing the array of sidecars, the OutboundCall (through
-// RpcController's interface) is able to offer retrieval of the sidecar data
-// through the same indices that were returned by InboundCall (or indirectly
-// through the RpcContext wrapper) on the client side.
+// After reconstructing the array of sidecars, servers and clients may retrieve the
+// sidecar data through the RpcContext or RpcController interfaces respectively.
 class RpcSidecar {
  public:
-  // Generates a sidecar with the parameter faststring as its data.
-  explicit RpcSidecar(gscoped_ptr<faststring> data) : data_(std::move(data)) {}
+  static std::unique_ptr<RpcSidecar> FromFaststring(std::unique_ptr<faststring> data);
+  static std::unique_ptr<RpcSidecar> FromSlice(Slice slice);
 
-  // Returns a Slice representation of the sidecar's data.
-  Slice AsSlice() const { return *data_; }
+  // Utility method to parse a series of sidecar slices into 'sidecars' from 'buffer' and
+  // a set of offsets. 'sidecars' must have length >= TransferLimits::kMaxSidecars, and
+  // will be filled from index 0.
+  // TODO(henryr): Consider a vector instead here if there's no perf. impact.
+  static Status ParseSidecars(
+      const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
+      Slice buffer, Slice* sidecars);
 
- private:
-  const gscoped_ptr<faststring> data_;
-
-  DISALLOW_COPY_AND_ASSIGN(RpcSidecar);
+  // Returns a Slice representation of the sidecar's data.
+  virtual Slice AsSlice() const = 0;
+  virtual ~RpcSidecar() { }
 };
 
 } // namespace rpc

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rtest.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rtest.proto b/src/kudu/rpc/rtest.proto
index df307a6..c52b535 100644
--- a/src/kudu/rpc/rtest.proto
+++ b/src/kudu/rpc/rtest.proto
@@ -65,6 +65,19 @@ message SendTwoStringsResponsePB {
   required uint32 sidecar2 = 2;
 }
 
+// Push two strings to the server as part of the request, in sidecars.
+message PushTwoStringsRequestPB {
+  required uint32 sidecar1_idx = 1;
+  required uint32 sidecar2_idx = 2;
+}
+
+message PushTwoStringsResponsePB {
+  required uint32 size1 = 1;
+  required string data1 = 2;
+  required uint32 size2 = 3;
+  required string data2 = 4;
+}
+
 message EchoRequestPB {
   required string data = 1;
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 7fb6b10..671347a 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -46,6 +46,16 @@ namespace rpc {
 class Messenger;
 struct TransferCallbacks;
 
+class TransferLimits {
+ public:
+  enum {
+    kMaxSidecars = 10,
+    kMaxPayloadSlices = kMaxSidecars + 2 // (header + msg)
+  };
+
+  DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
+};
+
 // This class is used internally by the RPC layer to represent an inbound
 // transfer in progress.
 //
@@ -94,8 +104,6 @@ class InboundTransfer {
 // Upon completion of the transfer, a callback is triggered.
 class OutboundTransfer : public boost::intrusive::list_base_hook<> {
  public:
-  enum { kMaxPayloadSlices = 10 };
-
   // Factory methods for creating transfers associated with call requests
   // or responses. The 'payload' slices will be concatenated and
   // written to the socket. When the transfer completes or errors, the
@@ -159,7 +167,7 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> {
 
   // Slices to send. Uses an array here instead of a vector to avoid an expensive
   // vector construction (improved performance a couple percent).
-  Slice payload_slices_[kMaxPayloadSlices];
+  Slice payload_slices_[TransferLimits::kMaxPayloadSlices];
   size_t n_payload_slices_;
 
   // The current slice that is being sent.

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index 22edce7..6ebf0e3 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -312,10 +312,10 @@ class TabletServerTestBase : public KuduTest {
                                  vector<string>* results) {
     RowwiseRowBlockPB* rrpb = resp.mutable_data();
     Slice direct, indirect; // sidecar data buffers
-    ASSERT_OK(rpc.GetSidecar(rrpb->rows_sidecar(), &direct));
+    ASSERT_OK(rpc.GetInboundSidecar(rrpb->rows_sidecar(), &direct));
     if (rrpb->has_indirect_data_sidecar()) {
-      ASSERT_OK(rpc.GetSidecar(rrpb->indirect_data_sidecar(),
-                               &indirect));
+      ASSERT_OK(rpc.GetInboundSidecar(rrpb->indirect_data_sidecar(),
+              &indirect));
     }
     vector<const uint8_t*> rows;
     ASSERT_OK(ExtractRowsFromRowBlockPB(projection, *rrpb,

http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 69ee7ab..c9e7d79 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -118,6 +118,7 @@ using kudu::consensus::StartTabletCopyResponsePB;
 using kudu::consensus::VoteRequestPB;
 using kudu::consensus::VoteResponsePB;
 using kudu::rpc::RpcContext;
+using kudu::rpc::RpcSidecar;
 using kudu::server::ServerBase;
 using kudu::tablet::AlterSchemaTransactionState;
 using kudu::tablet::Tablet;
@@ -1073,8 +1074,8 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
   }
 
   size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
-  gscoped_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10));
-  gscoped_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 10));
+  unique_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10));
+  unique_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 10));
   RowwiseRowBlockPB data;
   ScanResultCopier collector(&data, rows_data.get(), indirect_data.get());
 
@@ -1123,15 +1124,15 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
 
     // Add sidecar data to context and record the returned indices.
     int rows_idx;
-    CHECK_OK(context->AddRpcSidecar(make_gscoped_ptr(
-        new rpc::RpcSidecar(std::move(rows_data))), &rows_idx));
+    CHECK_OK(context->AddOutboundSidecar(RpcSidecar::FromFaststring((std::move(rows_data))),
+            &rows_idx));
     resp->mutable_data()->set_rows_sidecar(rows_idx);
 
     // Add indirect data as a sidecar, if applicable.
     if (indirect_data->size() > 0) {
       int indirect_idx;
-      CHECK_OK(context->AddRpcSidecar(make_gscoped_ptr(
-          new rpc::RpcSidecar(std::move(indirect_data))), &indirect_idx));
+      CHECK_OK(context->AddOutboundSidecar(RpcSidecar::FromFaststring(
+          std::move(indirect_data)), &indirect_idx));
       resp->mutable_data()->set_indirect_data_sidecar(indirect_idx);
     }