You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/10/08 00:48:02 UTC

[kudu] 01/03: [rpc] allow reuse of outbound request buffers when retrying

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 4fe61a26ad5bbb0a41bb7be002a6b32d9c0ea916
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Sep 28 16:00:30 2021 -0700

    [rpc] allow reuse of outbound request buffers when retrying
    
    This patch fixes a heap-use-after-free bug in the address-re-resolving
    proxy caused by the fact that requests may be stack allocated, are
    serialized once when making the OutboundCall, and then thrown away once
    calling the (typically) user-provided callback.
    
    This patch splits out the state that may be used by retries (i.e. the
    serialized request, header, and sidecars) into its own class, and passes
    this around when retrying.
    
    There are some methods that don't appear to be used in this codebase,
    but I opted to keep existing behavior because they seem to be used in
    other codebases that share library code. Specifically, I don't see usage
    of RpcController::AddOutboundSidecar(), but this does seem to be used in
    Impala[1].
    
    1. https://github.com/apache/impala/blob/b28da054f3595bb92873433211438306fc22fbc7/be/src/rpc/sidecar-util.h#L48
    
    Change-Id: I118f3559c4647bdd996617443bd371a041711295
    Reviewed-on: http://gerrit.cloudera.org:8080/17892
    Tested-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/rpc/mt-rpc-test.cc    |  6 +--
 src/kudu/rpc/outbound_call.cc  | 83 ++++++++++++++++++++++++++----------
 src/kudu/rpc/outbound_call.h   | 95 ++++++++++++++++++++++++++++++++----------
 src/kudu/rpc/proxy-test.cc     | 48 +++++++++++++++++++++
 src/kudu/rpc/proxy.cc          | 54 ++++++++++++++++--------
 src/kudu/rpc/proxy.h           |  8 ++--
 src/kudu/rpc/rpc-test-base.h   | 29 ++++++-------
 src/kudu/rpc/rpc_controller.cc | 14 +++++++
 src/kudu/rpc/rpc_controller.h  | 17 ++++++++
 9 files changed, 272 insertions(+), 82 deletions(-)

diff --git a/src/kudu/rpc/mt-rpc-test.cc b/src/kudu/rpc/mt-rpc-test.cc
index 3496e26..773eb39 100644
--- a/src/kudu/rpc/mt-rpc-test.cc
+++ b/src/kudu/rpc/mt-rpc-test.cc
@@ -64,7 +64,7 @@ namespace rpc {
 class MultiThreadedRpcTest : public RpcTestBase {
  public:
   // Make a single RPC call.
-  void SingleCall(Sockaddr server_addr, const char* method_name,
+  void SingleCall(Sockaddr server_addr, const string& method_name,
                   Status* result, CountDownLatch* latch) {
     LOG(INFO) << "Connecting to " << server_addr.ToString();
     shared_ptr<Messenger> client_messenger;
@@ -76,7 +76,7 @@ class MultiThreadedRpcTest : public RpcTestBase {
   }
 
   // Make RPC calls until we see a failure.
-  void HammerServer(Sockaddr server_addr, const char* method_name,
+  void HammerServer(Sockaddr server_addr, const string& method_name,
                     Status* last_result) {
     shared_ptr<Messenger> client_messenger;
     CHECK_OK(CreateMessenger("ClientHS", &client_messenger));
@@ -84,7 +84,7 @@ class MultiThreadedRpcTest : public RpcTestBase {
   }
 
   void HammerServerWithMessenger(
-      Sockaddr server_addr, const char* method_name, Status* last_result,
+      Sockaddr server_addr, const string& method_name, Status* last_result,
       const shared_ptr<Messenger>& messenger) {
     LOG(INFO) << "Connecting to " << server_addr.ToString();
     Proxy p(messenger, server_addr, server_addr.host(),
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index e2f1fd3..151195f 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -81,21 +81,24 @@ static const double kMicrosPerSecond = 1000000.0;
 
 OutboundCall::OutboundCall(const ConnectionId& conn_id,
                            const RemoteMethod& remote_method,
+                           unique_ptr<RequestPayload> payload,
+                           CallbackBehavior cb_behavior,
                            google::protobuf::Message* response_storage,
                            RpcController* controller,
                            ResponseCallback callback)
-    : state_(READY),
+    : cb_behavior_(cb_behavior),
+      state_(READY),
       remote_method_(remote_method),
       conn_id_(conn_id),
       callback_(std::move(callback)),
       controller_(DCHECK_NOTNULL(controller)),
       response_(DCHECK_NOTNULL(response_storage)),
+      payload_(std::move(payload)),
       cancellation_requested_(false) {
-  DVLOG(4) << "OutboundCall " << this << " constructed with state_: " << StateName(state_)
-           << " and RPC timeout: "
-           << (controller->timeout().Initialized() ? controller->timeout().ToString() : "none");
-  header_.set_call_id(kInvalidCallId);
-  remote_method.ToPB(header_.mutable_remote_method());
+  DVLOG(4) << Substitute("OutboundCall $0 constructed with the state_: $1 and RPC timeout: $2",
+      this, StateName(state_),
+      (controller->timeout().Initialized() ? controller->timeout().ToString() : "none"));
+  payload_->header_.set_call_id(kInvalidCallId);
   start_time_ = MonoTime::Now();
 
   if (!controller_->required_server_features().empty()) {
@@ -103,41 +106,66 @@ OutboundCall::OutboundCall(const ConnectionId& conn_id,
   }
 
   if (controller_->request_id_) {
-    header_.set_allocated_request_id(controller_->request_id_.release());
+    payload_->header_.set_allocated_request_id(controller_->request_id_.release());
   }
 }
 
+
+OutboundCall::OutboundCall(const ConnectionId& conn_id,
+                           const RemoteMethod& remote_method,
+                           google::protobuf::Message* response_storage,
+                           RpcController* controller,
+                           ResponseCallback callback)
+  : OutboundCall(conn_id, remote_method, std::make_unique<RequestPayload>(remote_method),
+                 CallbackBehavior::kFreeSidecars, response_storage, controller,
+                 std::move(callback)) {
+}
+
 OutboundCall::~OutboundCall() {
   DCHECK(IsFinished());
   DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " << StateName(state_);
 }
 
 void OutboundCall::SerializeTo(TransferPayload* slices) {
-  DCHECK_LT(0, request_buf_.size())
+  DCHECK_LT(0, payload_->request_buf_.size())
       << "Must call SetRequestPayload() before SerializeTo()";
 
-  const MonoDelta &timeout = controller_->timeout();
-  if (timeout.Initialized()) {
-    header_.set_timeout_millis(timeout.ToMilliseconds());
+  if (controller_->timeout().Initialized()) {
+    payload_->header_.set_timeout_millis(controller_->timeout().ToMilliseconds());
   }
 
   for (uint32_t feature : controller_->required_server_features()) {
-    header_.add_required_feature_flags(feature);
+    payload_->header_.add_required_feature_flags(feature);
   }
 
-  DCHECK_LE(0, sidecar_byte_size_);
+  DCHECK_LE(0, payload_->sidecar_byte_size_);
   serialization::SerializeHeader(
-      header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_);
+      payload_->header_, payload_->sidecar_byte_size_ + payload_->request_buf_.size(),
+      &payload_->header_buf_);
 
   slices->clear();
-  slices->push_back(header_buf_);
-  slices->push_back(request_buf_);
-  for (auto& sidecar : sidecars_) {
+  slices->push_back(payload_->header_buf_);
+  slices->push_back(payload_->request_buf_);
+  for (auto& sidecar : payload_->sidecars_) {
     sidecar->AppendSlices(slices);
   }
 }
 
-void OutboundCall::SetRequestPayload(const Message& req,
+RequestPayload::RequestPayload(const RemoteMethod& remote_method) {
+  header_.set_call_id(kInvalidCallId);
+  remote_method.ToPB(header_.mutable_remote_method());
+}
+
+unique_ptr<RequestPayload> RequestPayload::CreateRequestPayload(
+    const RemoteMethod& remote_method,
+    const Message& req,
+    vector<unique_ptr<RpcSidecar>>&& sidecars) {
+  auto payload = std::make_unique<RequestPayload>(remote_method);
+  payload->PopulateRequestPayload(req, std::move(sidecars));
+  return payload;
+}
+
+void RequestPayload::PopulateRequestPayload(const Message& req,
     vector<unique_ptr<RpcSidecar>>&& sidecars) {
   DCHECK_EQ(-1, sidecar_byte_size_);
 
@@ -159,6 +187,11 @@ void OutboundCall::SetRequestPayload(const Message& req,
   serialization::SerializeMessage(req, &request_buf_, sidecar_byte_size_, true);
 }
 
+void OutboundCall::SetRequestPayload(const Message& req,
+                                     vector<unique_ptr<RpcSidecar>>&& sidecars) {
+  DCHECK_NOTNULL(payload_)->PopulateRequestPayload(req, std::move(sidecars));
+}
+
 Status OutboundCall::status() const {
   std::lock_guard<simple_spinlock> l(lock_);
   return status_;
@@ -266,7 +299,9 @@ void OutboundCall::Cancel() {
 
 void OutboundCall::CallCallback() {
   // Clear references to outbound sidecars before invoking callback.
-  sidecars_.clear();
+  if (cb_behavior_ == CallbackBehavior::kFreeSidecars) {
+    FreeSidecars();
+  }
 
   int64_t start_cycles = CycleClock::Now();
   {
@@ -317,6 +352,10 @@ void OutboundCall::SetResponse(unique_ptr<CallResponse> resp) {
   }
 }
 
+unique_ptr<RequestPayload> OutboundCall::ReleaseRequestPayload() {
+  return std::move(payload_);
+}
+
 void OutboundCall::SetQueued() {
   set_state(ON_OUTBOUND_QUEUE);
 }
@@ -333,9 +372,9 @@ void OutboundCall::SetSent() {
   // behavior is a lot more efficient if memory is freed from the same thread
   // which allocated it -- this lets it keep to thread-local operations instead
   // of taking a mutex to put memory back on the global freelist.
-  delete [] header_buf_.release();
+  delete [] payload_->header_buf_.release();
 
-  // request_buf_ is also done being used here, but since it was allocated by
+  // payload_ is also done being used here, but since it was allocated by
   // the caller thread, we would rather let that thread free it whenever it
   // deletes the RpcController.
 
@@ -452,7 +491,7 @@ string OutboundCall::ToString() const {
 void OutboundCall::DumpPB(const DumpConnectionsRequestPB& req,
                           RpcCallInProgressPB* resp) {
   std::lock_guard<simple_spinlock> l(lock_);
-  resp->mutable_header()->CopyFrom(header_);
+  resp->mutable_header()->CopyFrom(payload_->header_);
   resp->set_micros_elapsed((MonoTime::Now() - start_time_).ToMicroseconds());
 
   switch (state_) {
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index 74db5ca..620a28b 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -57,6 +57,43 @@ class DumpConnectionsRequestPB;
 class RpcCallInProgressPB;
 class RpcController;
 
+// Encapsulates the request payload being sent by a call.
+class RequestPayload {
+ public:
+  // Creates a payload for the given remote method, serializing the given
+  // request, taking ownership of the sidecars, and populating the header as
+  // necessary.
+  static std::unique_ptr<RequestPayload> CreateRequestPayload(
+      const RemoteMethod& remote_method,
+      const google::protobuf::Message& req,
+      std::vector<std::unique_ptr<RpcSidecar>>&& sidecars);
+
+  // Creates an "empty" payload for the given remote method. Callers should
+  // also call PopulateRequestPayload() to form a usable payload.
+  explicit RequestPayload(const RemoteMethod& remote_method);
+
+  // Serializes the given 'req' and takes ownership of 'sidecars', populating
+  // the header as necessary.
+  void PopulateRequestPayload(const google::protobuf::Message& req,
+      std::vector<std::unique_ptr<RpcSidecar>>&& sidecars);
+ private:
+  friend class OutboundCall;
+
+  // The RPC header.
+  // Parts of this (eg the call ID) are only assigned once this request has
+  // been passed to the reactor thread and assigned a connection. Calls should
+  // re-assign the call ID if this payload is used in multiple calls (e.g.
+  // retries after re-resolving the address).
+  RequestHeader header_;
+  faststring header_buf_;
+  faststring request_buf_;
+  std::vector<std::unique_ptr<RpcSidecar>> sidecars_;
+
+  // Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload().
+  // This cannot exceed TransferLimits::kMaxTotalSidecarBytes.
+  int32_t sidecar_byte_size_ = -1;
+};
+
 // Tracks the status of a call on the client side.
 //
 // This is an internal-facing class -- clients interact with the
@@ -80,6 +117,20 @@ class OutboundCall {
     REMOTE_CALL,
   };
 
+  // Behavior when running the callback with regards to freeing resources. Some
+  // callers may expect the call itself free sidecars upon completion, while
+  // others may attempt to reuse the sidecars in another call attempt upon
+  // failure.
+  enum class CallbackBehavior {
+    kFreeSidecars,
+    kDontFreeSidecars,
+  };
+
+  OutboundCall(const ConnectionId& conn_id, const RemoteMethod& remote_method,
+               std::unique_ptr<RequestPayload> req_payload, CallbackBehavior cb_behavior,
+               google::protobuf::Message* response_storage,
+               RpcController* controller, ResponseCallback callback);
+
   OutboundCall(const ConnectionId& conn_id, const RemoteMethod& remote_method,
                google::protobuf::Message* response_storage,
                RpcController* controller, ResponseCallback callback);
@@ -97,8 +148,8 @@ class OutboundCall {
   // Assign the call ID for this call. This is called from the reactor
   // thread once a connection has been assigned. Must only be called once.
   void set_call_id(int32_t call_id) {
-    DCHECK_EQ(header_.call_id(), kInvalidCallId) << "Already has a call ID";
-    header_.set_call_id(call_id);
+    DCHECK_EQ(payload_->header_.call_id(), kInvalidCallId) << "Already has a call ID";
+    payload_->header_.set_call_id(call_id);
   }
 
   // Serialize the call for the wire. Requires that SetRequestPayload()
@@ -148,6 +199,12 @@ class OutboundCall {
     return required_rpc_features_;
   }
 
+  std::unique_ptr<RequestPayload> ReleaseRequestPayload();
+
+  void FreeSidecars() {
+    DCHECK_NOTNULL(payload_)->sidecars_.clear();
+  }
+
   std::string ToString() const;
 
   void DumpPB(const DumpConnectionsRequestPB& req, RpcCallInProgressPB* resp);
@@ -164,12 +221,12 @@ class OutboundCall {
 
   // Return true if a call ID has been assigned to this call.
   bool call_id_assigned() const {
-    return header_.call_id() != kInvalidCallId;
+    return payload_->header_.call_id() != kInvalidCallId;
   }
 
   int32_t call_id() const {
     DCHECK(call_id_assigned());
-    return header_.call_id();
+    return payload_->header_.call_id();
   }
 
   // Returns true if cancellation has been requested. Must be called from
@@ -227,6 +284,16 @@ class OutboundCall {
   // This will only be non-NULL if status().IsRemoteError().
   const ErrorStatusPB* error_pb() const;
 
+  // Call the user-provided callback. Note that entries in 'sidecars_' are cleared
+  // prior to invoking the callback so the client can assume that the call doesn't
+  // hold references to outbound sidecars.
+  void CallCallback();
+
+  // The behavior defining whether to free sidecars upon calling the callback.
+  // Certain callbacks may perfer freeing the sidecars manually from within the
+  // callback.
+  const CallbackBehavior cb_behavior_;
+
   // Lock for state_ status_, error_pb_ fields, since they
   // may be mutated by the reactor thread while the client thread
   // reads them.
@@ -235,16 +302,6 @@ class OutboundCall {
   Status status_;
   std::unique_ptr<ErrorStatusPB> error_pb_;
 
-  // Call the user-provided callback. Note that entries in 'sidecars_' are cleared
-  // prior to invoking the callback so the client can assume that the call doesn't
-  // hold references to outbound sidecars.
-  void CallCallback();
-
-  // The RPC header.
-  // Parts of this (eg the call ID) are only assigned once this call has been
-  // passed to the reactor thread and assigned a connection.
-  RequestHeader header_;
-
   // The remote method being called.
   RemoteMethod remote_method_;
 
@@ -259,20 +316,12 @@ class OutboundCall {
   google::protobuf::Message* response_;
 
   // Buffers for storing segments of the wire-format request.
-  faststring header_buf_;
-  faststring request_buf_;
+  std::unique_ptr<RequestPayload> payload_;
 
   // Once a response has been received for this call, contains that response.
   // Otherwise NULL.
   std::unique_ptr<CallResponse> call_response_;
 
-  // All sidecars to be sent with this call.
-  std::vector<std::unique_ptr<RpcSidecar>> sidecars_;
-
-  // Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload().
-  // This cannot exceed TransferLimits::kMaxTotalSidecarBytes.
-  int32_t sidecar_byte_size_ = -1;
-
   // True if cancellation was requested on this call.
   bool cancellation_requested_;
 
diff --git a/src/kudu/rpc/proxy-test.cc b/src/kudu/rpc/proxy-test.cc
index 34b626c..c5ad52b 100644
--- a/src/kudu/rpc/proxy-test.cc
+++ b/src/kudu/rpc/proxy-test.cc
@@ -18,6 +18,7 @@
 #include "kudu/rpc/proxy.h"
 
 #include <cstdint>
+#include <functional>
 #include <memory>
 #include <string>
 #include <thread>
@@ -29,6 +30,7 @@
 
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/rpc-test-base.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rtest.pb.h"
@@ -37,6 +39,7 @@
 #include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
+#include "kudu/util/notification.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -60,6 +63,13 @@ constexpr uint16_t kPort = 1111;
 constexpr const char* kFakeHost = "fakehost";
 const HostPort kFakeHostPort(kFakeHost, kPort);
 
+void SendRequestAsync(const ResponseCallback& cb,
+                      Proxy* p, RpcController* controller, SleepResponsePB* resp) {
+  SleepRequestPB req;
+  req.set_sleep_micros(100 * 1000); // 100ms
+  p->AsyncRequest(GenericCalculatorService::kSleepMethodName, req, resp, controller, cb);
+}
+
 Status SendRequest(Proxy* p) {
   SleepRequestPB req;
   req.set_sleep_micros(100 * 1000); // 100ms
@@ -74,6 +84,44 @@ Status SendRequest(Proxy* p) {
 class RpcProxyTest : public RpcTestBase {
 };
 
+TEST_F(RpcProxyTest, TestProxyRetriesWhenRequestLeavesScope) {
+  DnsResolver dns_resolver(1, 1024 * 1024);
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("client_messenger", &client_messenger));
+
+  // We're providing a fake hostport to encourage retries of DNS resolution,
+  // which will attempt to send the request payload after the request protobuf
+  // has already left scope.
+  Proxy p(client_messenger, kFakeHostPort, &dns_resolver,
+          CalculatorService::static_service_name());
+  p.Init();
+
+  SleepResponsePB resp;
+  {
+    Notification note;
+    RpcController controller;
+    controller.set_timeout(MonoDelta::FromMilliseconds(10000));
+    SendRequestAsync([&note] () { note.Notify(); }, &p, &controller, &resp);
+    note.WaitForNotification();
+    Status s = controller.status();
+    ASSERT_TRUE(s.IsNetworkError()) << s.ToString();
+  }
+
+  // Now try a successful call.
+  string ip = GetBindIpForDaemon(/*index*/2, kDefaultBindMode);
+  Sockaddr addr;
+  ASSERT_OK(addr.ParseString(ip, kPort));
+  ASSERT_OK(StartTestServerWithGeneratedCode(&addr));
+  FLAGS_dns_addr_resolution_override = Substitute("$0=$1", kFakeHost, addr.ToString());
+
+  Notification note;
+  RpcController controller;
+  controller.set_timeout(MonoDelta::FromMilliseconds(10000));
+  SendRequestAsync([&note] () { note.Notify(); }, &p, &controller, &resp);
+  note.WaitForNotification();
+  ASSERT_OK(controller.status());
+}
+
 // Test that proxies initialized with a DnsResolver return errors when
 // receiving a non-transient error.
 TEST_F(RpcProxyTest, TestProxyReturnsOnNonTransientError) {
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 8d810b8..fbda5de 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -33,6 +33,7 @@
 #include "kudu/rpc/response_callback.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/user_credentials.h"
+#include "kudu/util/kernel_stack_watchdog.h"
 #include "kudu/util/net/dns_resolver.h"
 #include "kudu/util/net/net_util.h"
 #include "kudu/util/net/sockaddr.h"
@@ -132,16 +133,16 @@ Proxy::~Proxy() {
 }
 
 void Proxy::EnqueueRequest(const string& method,
-                           const google::protobuf::Message& req,
+                           unique_ptr<RequestPayload> req_payload,
                            google::protobuf::Message* response,
                            RpcController* controller,
-                           const ResponseCallback& callback) const {
+                           const ResponseCallback& callback,
+                           OutboundCall::CallbackBehavior cb_behavior) const {
   ConnectionId connection = conn_id();
   DCHECK(connection.remote().is_initialized());
-  RemoteMethod remote_method(service_name_, method);
   controller->call_.reset(
-      new OutboundCall(connection, remote_method, response, controller, callback));
-  controller->SetRequestParam(req);
+      new OutboundCall(connection, {service_name_, method}, std::move(req_payload),
+                       cb_behavior, response, controller, callback));
   controller->SetMessenger(messenger_.get());
 
   // If this fails to queue, the callback will get called immediately
@@ -150,14 +151,16 @@ void Proxy::EnqueueRequest(const string& method,
 }
 
 void Proxy::RefreshDnsAndEnqueueRequest(const std::string& method,
-                                        const google::protobuf::Message& req,
+                                        unique_ptr<RequestPayload> req_payload,
                                         google::protobuf::Message* response,
                                         RpcController* controller,
                                         const ResponseCallback& callback) {
   DCHECK(!controller->call_);
   vector<Sockaddr>* addrs = new vector<Sockaddr>();
   DCHECK_NOTNULL(dns_resolver_)->RefreshAddressesAsync(hp_, addrs,
-      [this, &req, &method, callback, response, controller, addrs] (const Status& s) {
+      [this, req_raw = req_payload.release(),
+       &method, callback, response, controller, addrs] (const Status& s) mutable {
+    unique_ptr<RequestPayload> req_payload(req_raw);
     unique_ptr<vector<Sockaddr>> unique_addrs(addrs);
     // If we fail to resolve the address, treat the call as failed.
     if (!s.ok() || addrs->empty()) {
@@ -178,7 +181,10 @@ void Proxy::RefreshDnsAndEnqueueRequest(const std::string& method,
       std::lock_guard<simple_spinlock> l(lock_);
       conn_id_.set_remote(*addr);
     }
-    EnqueueRequest(method, req, response, controller, callback);
+    // NOTE: we don't expect the user-provided callback to free sidecars, so
+    // make sure the outbound call frees it for us.
+    EnqueueRequest(method, std::move(req_payload), response, controller, callback,
+                   OutboundCall::CallbackBehavior::kFreeSidecars);
   });
 }
 
@@ -189,8 +195,16 @@ void Proxy::AsyncRequest(const string& method,
                          const ResponseCallback& callback) {
   CHECK(!controller->call_) << "Controller should be reset";
   base::subtle::NoBarrier_Store(&is_started_, true);
+  // TODO(awong): it would be great if we didn't have to heap allocate the
+  // payload.
+  auto req_payload = RequestPayload::CreateRequestPayload(
+      RemoteMethod{service_name_, method},
+      req, controller->ReleaseOutboundSidecars());
   if (!dns_resolver_) {
-    EnqueueRequest(method, req, response, controller, callback);
+    // NOTE: we don't expect the user-provided callback to free sidecars, so
+    // make sure the outbound call frees it for us.
+    EnqueueRequest(method, std::move(req_payload), response, controller, callback,
+                   OutboundCall::CallbackBehavior::kFreeSidecars);
     return;
   }
 
@@ -202,26 +216,32 @@ void Proxy::AsyncRequest(const string& method,
     remote_initialized = conn_id_.remote().is_initialized();
   }
   if (!remote_initialized) {
-    RefreshDnsAndEnqueueRequest(method, req, response, controller, callback);
+    RefreshDnsAndEnqueueRequest(method, std::move(req_payload), response, controller, callback);
     return;
   }
 
-  // Otherwise, just enqueue the request, but retry if there's a network error,
-  // since it's possible the physical address of the host was changed. We only
-  // retry once more before calling the callback.
-  auto refresh_dns_and_cb = [this, &req, &method,
-                             callback, response, controller] () {
+  // Otherwise, just enqueue the request, but retry if there's an error, since
+  // it's possible the physical address of the host was changed. We only retry
+  // once more before calling the callback.
+  auto refresh_dns_and_cb = [this, &method, callback, response, controller] () {
     // TODO(awong): we should be more specific here -- consider having the RPC
     // layer set a flag in the controller that warrants a retry.
     if (PREDICT_FALSE(!controller->status().ok())) {
+      auto req_payload = controller->ReleaseRequestPayload();
       controller->Reset();
-      RefreshDnsAndEnqueueRequest(method, req, response, controller, callback);
+      RefreshDnsAndEnqueueRequest(method, std::move(req_payload), response, controller, callback);
       return;
     }
     // For any other status, OK or otherwise, just run the callback.
+    controller->FreeOutboundSidecars();
+    SCOPED_WATCH_STACK(100);
     callback();
   };
-  EnqueueRequest(method, req, response, controller, refresh_dns_and_cb);
+  // Since we may end up using the request payload in the event of a retry,
+  // ensure the outbound call doesn't free the sidecars, and instead free
+  // manually from within our callback.
+  EnqueueRequest(method, std::move(req_payload), response, controller, refresh_dns_and_cb,
+                 OutboundCall::CallbackBehavior::kDontFreeSidecars);
 }
 
 
diff --git a/src/kudu/rpc/proxy.h b/src/kudu/rpc/proxy.h
index b0bc512..cb82205 100644
--- a/src/kudu/rpc/proxy.h
+++ b/src/kudu/rpc/proxy.h
@@ -24,6 +24,7 @@
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/rpc/connection_id.h"
+#include "kudu/rpc/outbound_call.h"
 #include "kudu/rpc/response_callback.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/net/net_util.h"
@@ -148,7 +149,7 @@ class Proxy {
   // Asynchronously refreshes the DNS, enqueueing the given request upon
   // success, or failing the call and calling the callback upon failure.
   void RefreshDnsAndEnqueueRequest(const std::string& method,
-                                   const google::protobuf::Message& req,
+                                   std::unique_ptr<RequestPayload> req_payload,
                                    google::protobuf::Message* response,
                                    RpcController* controller,
                                    const ResponseCallback& callback);
@@ -156,10 +157,11 @@ class Proxy {
   // Queues the given request as an outbound call using the given messenger,
   // controller, and response.
   void EnqueueRequest(const std::string& method,
-                      const google::protobuf::Message& req,
+                      std::unique_ptr<RequestPayload> req_payload,
                       google::protobuf::Message* response,
                       RpcController* controller,
-                      const ResponseCallback& callback) const;
+                      const ResponseCallback& callback,
+                      OutboundCall::CallbackBehavior cb_behavior) const;
 
   // Returns a single Sockaddr from the 'addrs', logging a warning if there is
   // more than one to choose from.
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index c32423e..0aa46ce 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -91,12 +91,12 @@ namespace rpc {
 class GenericCalculatorService : public ServiceIf {
  public:
   static const std::string kFullServiceName;
-  static const char *kAddMethodName;
-  static const char *kSleepMethodName;
-  static const char *kSleepWithSidecarMethodName;
-  static const char *kPushStringsMethodName;
-  static const char *kSendTwoStringsMethodName;
-  static const char *kAddExactlyOnce;
+  static const std::string kAddMethodName;
+  static const std::string kSleepMethodName;
+  static const std::string kSleepWithSidecarMethodName;
+  static const std::string kPushStringsMethodName;
+  static const std::string kSendTwoStringsMethodName;
+  static const std::string kAddExactlyOnce;
 
   static const char* kFirstString;
   static const char* kSecondString;
@@ -400,13 +400,14 @@ class CalculatorService : public CalculatorServiceIf {
 
 };
 
-const std::string GenericCalculatorService::kFullServiceName = "kudu.rpc.GenericCalculatorService";
-const char *GenericCalculatorService::kAddMethodName = "Add";
-const char *GenericCalculatorService::kSleepMethodName = "Sleep";
-const char *GenericCalculatorService::kSleepWithSidecarMethodName = "SleepWithSidecar";
-const char *GenericCalculatorService::kPushStringsMethodName = "PushStrings";
-const char *GenericCalculatorService::kSendTwoStringsMethodName = "SendTwoStrings";
-const char *GenericCalculatorService::kAddExactlyOnce = "AddExactlyOnce";
+const std::string GenericCalculatorService::kFullServiceName =
+    "kudu.rpc.GenericCalculatorService";
+const std::string GenericCalculatorService::kAddMethodName = "Add";
+const std::string GenericCalculatorService::kSleepMethodName = "Sleep";
+const std::string GenericCalculatorService::kSleepWithSidecarMethodName = "SleepWithSidecar";
+const std::string GenericCalculatorService::kPushStringsMethodName = "PushStrings";
+const std::string GenericCalculatorService::kSendTwoStringsMethodName = "SendTwoStrings";
+const std::string GenericCalculatorService::kAddExactlyOnce = "AddExactlyOnce";
 
 const char *GenericCalculatorService::kFirstString =
     "1111111111111111111111111111111111111111111111111111111111";
@@ -468,7 +469,7 @@ class RpcTestBase : public KuduTest {
     return bld.Build(messenger);
   }
 
-  static Status DoTestSyncCall(Proxy* p, const char *method,
+  static Status DoTestSyncCall(Proxy* p, const std::string& method,
                                CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS) {
     AddRequestPB req;
     req.set_x(rand());
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index d0e07dd..6730e7f 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -168,6 +168,20 @@ void RpcController::SetRequestParam(const google::protobuf::Message& req) {
   call_->SetRequestPayload(req, std::move(outbound_sidecars_));
 }
 
+void RpcController::FreeOutboundSidecars() {
+  outbound_sidecars_total_bytes_ = 0;
+  call_->FreeSidecars();
+}
+
+std::vector<unique_ptr<RpcSidecar>> RpcController::ReleaseOutboundSidecars() {
+  outbound_sidecars_total_bytes_ = 0;
+  return std::move(outbound_sidecars_);
+}
+
+unique_ptr<RequestPayload> RpcController::ReleaseRequestPayload() {
+  return DCHECK_NOTNULL(call_)->ReleaseRequestPayload();
+}
+
 void RpcController::Cancel() {
   DCHECK(call_);
   DCHECK(messenger_);
diff --git a/src/kudu/rpc/rpc_controller.h b/src/kudu/rpc/rpc_controller.h
index a8c0af5..49bd527 100644
--- a/src/kudu/rpc/rpc_controller.h
+++ b/src/kudu/rpc/rpc_controller.h
@@ -42,6 +42,7 @@ class ErrorStatusPB;
 class Messenger;
 class OutboundCall;
 class RequestIdPB;
+class RequestPayload;
 class RpcSidecar;
 
 // Authentication credentials policy for outbound RPCs. Some RPC methods
@@ -141,6 +142,21 @@ class RpcController {
   // in some cases even when sent to multiple servers, enabling exactly once semantics.
   void SetRequestIdPB(std::unique_ptr<RequestIdPB> request_id);
 
+  // Releases the outbound sidecars added to this controller. This is useful if
+  // callers want to create a request payload for a request.
+  std::vector<std::unique_ptr<RpcSidecar>> ReleaseOutboundSidecars();
+
+  // Frees the outbound sidecars added to this controller. OutboundCalls may
+  // call this before running the user-provided callback to ensure the state is
+  // freed before the callback is run. However, certain usages of OutboundCall
+  // warrants delaying the freeing until during the callback.
+  void FreeOutboundSidecars();
+
+  // Releases the request payload owned by this controller. This is useful if
+  // callers want to reuse a request payload in another attempt of a call, as
+  // it allows callers to transfer ownership to a new outbound call.
+  std::unique_ptr<RequestPayload> ReleaseRequestPayload();
+
   // Returns whether a request id has been set on RPC header.
   bool has_request_id() const;
 
@@ -273,6 +289,7 @@ class RpcController {
   // Once the call is sent, it is tracked here.
   std::shared_ptr<OutboundCall> call_;
 
+  // Owned by the controller until released and taken by a call.
   std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
 
   // Total size of sidecars in outbound_sidecars_. This is limited to a maximum