You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2017/03/08 18:27:10 UTC
[2/2] kudu git commit: KUDU-1866: Add request-side sidecars
KUDU-1866: Add request-side sidecars
This patch adds sidecars to client requests. Using the same mechanism as
on the response-side, clients may attach slices to outbound requests
which do not pass through a serialization or copy before being pushed to
the network socket. On the server side, these sidecars may be read
directly from the underlying byte stream with the interposition of a
Protobuf wrapper.
The sidecars may be added to a request via RpcController and retrieved
via RpcContext (i.e. the reverse of the existing response-side
interface).
This patch adds a few tests to rpc-test, and all rpc-test tests pass.
Change-Id: I3d709edb2a22dc983f51b69d7660a39e8d8d6a09
Reviewed-on: http://gerrit.cloudera.org:8080/5908
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/72895966
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/72895966
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/72895966
Branch: refs/heads/master
Commit: 72895966c6458d6f33e68d53450d9bd43a2c57b1
Parents: 5566bc9
Author: Henry Robinson <he...@cloudera.com>
Authored: Thu Feb 2 20:31:33 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Mar 8 00:49:34 2017 +0000
----------------------------------------------------------------------
src/kudu/client/scanner-internal.cc | 14 ++--
src/kudu/rpc/CMakeLists.txt | 2 +-
src/kudu/rpc/inbound_call.cc | 40 +++++++---
src/kudu/rpc/inbound_call.h | 14 +++-
src/kudu/rpc/outbound_call.cc | 74 ++++++++---------
src/kudu/rpc/outbound_call.h | 22 +++--
src/kudu/rpc/proxy.cc | 2 +-
src/kudu/rpc/rpc-test-base.h | 70 ++++++++++++++--
src/kudu/rpc/rpc-test.cc | 45 +++++++++++
src/kudu/rpc/rpc_context.cc | 10 ++-
src/kudu/rpc/rpc_context.h | 6 +-
src/kudu/rpc/rpc_controller.cc | 20 ++++-
src/kudu/rpc/rpc_controller.h | 23 +++++-
src/kudu/rpc/rpc_header.proto | 5 ++
src/kudu/rpc/rpc_sidecar.cc | 102 ++++++++++++++++++++++++
src/kudu/rpc/rpc_sidecar.h | 60 +++++++-------
src/kudu/rpc/rtest.proto | 13 +++
src/kudu/rpc/transfer.h | 14 +++-
src/kudu/tserver/tablet_server-test-base.h | 6 +-
src/kudu/tserver/tablet_service.cc | 13 +--
20 files changed, 429 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 07f93a7..c804046 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -505,16 +505,16 @@ Status KuduScanBatch::Data::Reset(RpcController* controller,
// First, rewrite the relative addresses into absolute ones.
if (PREDICT_FALSE(!resp_data_.has_rows_sidecar())) {
return Status::Corruption("Server sent invalid response: no row data");
- } else {
- Status s = controller_.GetSidecar(resp_data_.rows_sidecar(), &direct_data_);
- if (!s.ok()) {
- return Status::Corruption("Server sent invalid response: row data "
- "sidecar index corrupt", s.ToString());
- }
+ }
+
+ Status s = controller_.GetInboundSidecar(resp_data_.rows_sidecar(), &direct_data_);
+ if (!s.ok()) {
+ return Status::Corruption("Server sent invalid response: row data "
+ "sidecar index corrupt", s.ToString());
}
if (resp_data_.has_indirect_data_sidecar()) {
- Status s = controller_.GetSidecar(resp_data_.indirect_data_sidecar(),
+ Status s = controller_.GetInboundSidecar(resp_data_.indirect_data_sidecar(),
&indirect_data_);
if (!s.ok()) {
return Status::Corruption("Server sent invalid response: indirect data "
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 19a7610..0cfe6e9 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -59,6 +59,7 @@ set(KRPC_SRCS
rpc.cc
rpc_context.cc
rpc_controller.cc
+ rpc_sidecar.cc
rpcz_store.cc
sasl_common.cc
sasl_helper.cc
@@ -125,4 +126,3 @@ ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
ADD_KUDU_TEST(rpc-test)
ADD_KUDU_TEST(rpc_stub-test)
ADD_KUDU_TEST(service_queue-test)
-
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/inbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc
index 448fd70..03e7da4 100644
--- a/src/kudu/rpc/inbound_call.cc
+++ b/src/kudu/rpc/inbound_call.cc
@@ -33,9 +33,8 @@
using google::protobuf::FieldDescriptor;
using google::protobuf::io::CodedOutputStream;
-using google::protobuf::Message;
using google::protobuf::MessageLite;
-using std::shared_ptr;
+using std::unique_ptr;
using std::vector;
using strings::Substitute;
@@ -44,7 +43,6 @@ namespace rpc {
InboundCall::InboundCall(Connection* conn)
: conn_(conn),
- sidecars_deleter_(&sidecars_),
trace_(new Trace),
method_info_(nullptr) {
RecordCallReceived();
@@ -67,6 +65,19 @@ Status InboundCall::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
}
remote_method_.FromPB(header_.remote_method());
+ if (header_.sidecar_offsets_size() > TransferLimits::kMaxSidecars) {
+ return Status::Corruption(strings::Substitute(
+ "Received $0 additional payload slices, expected at most %d",
+ header_.sidecar_offsets_size(), TransferLimits::kMaxSidecars));
+ }
+
+ RETURN_NOT_OK(RpcSidecar::ParseSidecars(
+ header_.sidecar_offsets(), serialized_request_, inbound_sidecar_slices_));
+ if (header_.sidecar_offsets_size() > 0) {
+ // Trim the request to just the message
+ serialized_request_ = Slice(serialized_request_.data(), header_.sidecar_offsets(0));
+ }
+
// Retain the buffer that we have a view into.
transfer_.swap(transfer);
return Status::OK();
@@ -151,7 +162,7 @@ void InboundCall::SerializeResponseBuffer(const MessageLite& response,
resp_hdr.set_call_id(header_.call_id());
resp_hdr.set_is_error(!is_success);
uint32_t absolute_sidecar_offset = protobuf_msg_size;
- for (RpcSidecar* car : sidecars_) {
+ for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
resp_hdr.add_sidecar_offsets(absolute_sidecar_offset);
absolute_sidecar_offset += car->AsSlice().size();
}
@@ -168,23 +179,23 @@ void InboundCall::SerializeResponseTo(vector<Slice>* slices) const {
TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo");
CHECK_GT(response_hdr_buf_.size(), 0);
CHECK_GT(response_msg_buf_.size(), 0);
- slices->reserve(slices->size() + 2 + sidecars_.size());
+ slices->reserve(slices->size() + 2 + outbound_sidecars_.size());
slices->push_back(Slice(response_hdr_buf_));
slices->push_back(Slice(response_msg_buf_));
- for (RpcSidecar* car : sidecars_) {
+ for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
slices->push_back(car->AsSlice());
}
}
-Status InboundCall::AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx) {
+Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
// Check that the number of sidecars does not exceed the number of payload
// slices that are free (two are used up by the header and main message
// protobufs).
- if (sidecars_.size() + 2 > OutboundTransfer::kMaxPayloadSlices) {
+ if (outbound_sidecars_.size() > TransferLimits::kMaxSidecars) {
return Status::ServiceUnavailable("All available sidecars already used");
}
- sidecars_.push_back(car.release());
- *idx = sidecars_.size() - 1;
+ outbound_sidecars_.emplace_back(std::move(car));
+ *idx = outbound_sidecars_.size() - 1;
return Status::OK();
}
@@ -288,5 +299,14 @@ vector<uint32_t> InboundCall::GetRequiredFeatures() const {
return features;
}
+Status InboundCall::GetInboundSidecar(int idx, Slice* sidecar) const {
+ if (idx < 0 || idx >= header_.sidecar_offsets_size()) {
+ return Status::InvalidArgument(strings::Substitute(
+ "Index $0 does not reference a valid sidecar", idx));
+ }
+ *sidecar = inbound_sidecar_slices_[idx];
+ return Status::OK();
+}
+
} // namespace rpc
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h
index 4f99dee..ea6eade 100644
--- a/src/kudu/rpc/inbound_call.h
+++ b/src/kudu/rpc/inbound_call.h
@@ -22,7 +22,6 @@
#include <vector>
#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/remote_method.h"
@@ -124,7 +123,7 @@ class InboundCall {
void SerializeResponseTo(std::vector<Slice>* slices) const;
// See RpcContext::AddRpcSidecar()
- Status AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx);
+ Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
std::string ToString() const;
@@ -187,6 +186,10 @@ class InboundCall {
// the RPC.
std::vector<uint32_t> GetRequiredFeatures() const;
+ // Get a sidecar sent as part of the request. If idx < 0 || idx > num sidecars - 1,
+ // returns an error.
+ Status GetInboundSidecar(int idx, Slice* sidecar) const;
+
private:
friend class RpczStore;
@@ -227,8 +230,11 @@ class InboundCall {
// Vector of additional sidecars that are tacked on to the call's response
// after serialization of the protobuf. See rpc/rpc_sidecar.h for more info.
- std::vector<RpcSidecar*> sidecars_;
- ElementDeleter sidecars_deleter_;
+ std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
+
+ // Inbound sidecars from the request. The slices are views onto transfer_. There are as
+ // many slices as header_.sidecar_offsets_size().
+ Slice inbound_sidecar_slices_[TransferLimits::kMaxSidecars];
// The trace buffer.
scoped_refptr<Trace> trace_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc
index 19ec0ec..9b160a1 100644
--- a/src/kudu/rpc/outbound_call.cc
+++ b/src/kudu/rpc/outbound_call.cc
@@ -18,6 +18,7 @@
#include <algorithm>
#include <boost/functional/hash.hpp>
#include <gflags/gflags.h>
+#include <memory>
#include <mutex>
#include <string>
#include <unordered_set>
@@ -29,6 +30,7 @@
#include "kudu/rpc/constants.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/rpc/serialization.h"
#include "kudu/rpc/transfer.h"
#include "kudu/util/flag_tags.h"
@@ -44,6 +46,8 @@ DEFINE_int64(rpc_callback_max_cycles, 100 * 1000 * 1000,
TAG_FLAG(rpc_callback_max_cycles, advanced);
TAG_FLAG(rpc_callback_max_cycles, runtime);
+using std::unique_ptr;
+
namespace kudu {
namespace rpc {
@@ -88,10 +92,8 @@ OutboundCall::~OutboundCall() {
}
Status OutboundCall::SerializeTo(vector<Slice>* slices) {
- size_t param_len = request_buf_.size();
- if (PREDICT_FALSE(param_len == 0)) {
- return Status::InvalidArgument("Must call SetRequestParam() before SerializeTo()");
- }
+ DCHECK_LT(0, request_buf_.size())
+ << "Must call SetRequestPayload() before SerializeTo()";
const MonoDelta &timeout = controller_->timeout();
if (timeout.Initialized()) {
@@ -102,16 +104,32 @@ Status OutboundCall::SerializeTo(vector<Slice>* slices) {
header_.add_required_feature_flags(feature);
}
- serialization::SerializeHeader(header_, param_len, &header_buf_);
+ DCHECK_LE(0, sidecar_byte_size_);
+ serialization::SerializeHeader(
+ header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_);
- // Return the concatenated packet.
slices->push_back(Slice(header_buf_));
slices->push_back(Slice(request_buf_));
+ for (const unique_ptr<RpcSidecar>& car : sidecars_) slices->push_back(car->AsSlice());
return Status::OK();
}
-void OutboundCall::SetRequestParam(const Message& message) {
- serialization::SerializeMessage(message, &request_buf_);
+void OutboundCall::SetRequestPayload(const Message& req,
+ vector<unique_ptr<RpcSidecar>>&& sidecars) {
+ DCHECK_EQ(-1, sidecar_byte_size_);
+
+ sidecars_ = move(sidecars);
+
+ // Compute total size of sidecar payload so that extra space can be reserved as part of
+ // the request body.
+ uint32_t message_size = req.ByteSize();
+ sidecar_byte_size_ = 0;
+ for (const unique_ptr<RpcSidecar>& car: sidecars_) {
+ header_.add_sidecar_offsets(sidecar_byte_size_ + message_size);
+ sidecar_byte_size_ += car->AsSlice().size();
+ }
+
+ serialization::SerializeMessage(req, &request_buf_, sidecar_byte_size_, true);
}
Status OutboundCall::status() const {
@@ -432,44 +450,16 @@ Status CallResponse::GetSidecar(int idx, Slice* sidecar) const {
Status CallResponse::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
CHECK(!parsed_);
- Slice entire_message;
RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_,
- &entire_message));
+ &serialized_response_));
// Use information from header to extract the payload slices.
- int last = header_.sidecar_offsets_size() - 1;
+ RETURN_NOT_OK(RpcSidecar::ParseSidecars(header_.sidecar_offsets(),
+ serialized_response_, sidecar_slices_));
- if (last >= OutboundTransfer::kMaxPayloadSlices) {
- return Status::Corruption(strings::Substitute(
- "Received $0 additional payload slices, expected at most %d",
- last, OutboundTransfer::kMaxPayloadSlices));
- }
-
- if (last >= 0) {
- serialized_response_ = Slice(entire_message.data(),
- header_.sidecar_offsets(0));
- for (int i = 0; i < last; ++i) {
- uint32_t next_offset = header_.sidecar_offsets(i);
- int32_t len = header_.sidecar_offsets(i + 1) - next_offset;
- if (next_offset + len > entire_message.size() || len < 0) {
- return Status::Corruption(strings::Substitute(
- "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
- " has length $2, but the entire message has length $3",
- i, next_offset, len, entire_message.size()));
- }
- sidecar_slices_[i] = Slice(entire_message.data() + next_offset, len);
- }
- uint32_t next_offset = header_.sidecar_offsets(last);
- if (next_offset > entire_message.size()) {
- return Status::Corruption(strings::Substitute(
- "Invalid sidecar offsets; the last sidecar ($0) apparently starts "
- "at $1, but the entire message has length $3",
- last, next_offset, entire_message.size()));
- }
- sidecar_slices_[last] = Slice(entire_message.data() + next_offset,
- entire_message.size() - next_offset);
- } else {
- serialized_response_ = entire_message;
+ if (header_.sidecar_offsets_size() > 0) {
+ serialized_response_ =
+ Slice(serialized_response_.data(), header_.sidecar_offsets(0));
}
transfer_.swap(transfer);
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h
index fa599fd..87ca39a 100644
--- a/src/kudu/rpc/outbound_call.h
+++ b/src/kudu/rpc/outbound_call.h
@@ -27,6 +27,7 @@
#include "kudu/gutil/macros.h"
#include "kudu/rpc/constants.h"
#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/rpc/remote_method.h"
#include "kudu/rpc/response_callback.h"
#include "kudu/rpc/transfer.h"
@@ -52,6 +53,7 @@ class DumpRunningRpcsRequestPB;
class InboundTransfer;
class RpcCallInProgressPB;
class RpcController;
+class RpcSidecar;
// Used to key on Connection information.
@@ -124,11 +126,13 @@ class OutboundCall {
~OutboundCall();
- // Serialize the given request PB into this call's internal storage.
+ // Serialize the given request PB into this call's internal storage, and assume
+ // ownership of any sidecars that should accompany this request.
//
- // Because the data is fully serialized by this call, 'req' may be
- // subsequently mutated with no ill effects.
- void SetRequestParam(const google::protobuf::Message& req);
+ // Because the request data is fully serialized by this call, 'req' may be subsequently
+ // mutated with no ill effects.
+ void SetRequestPayload(const google::protobuf::Message& req,
+ std::vector<std::unique_ptr<RpcSidecar>>&& sidecars);
// Assign the call ID for this call. This is called from the reactor
// thread once a connection has been assigned. Must only be called once.
@@ -137,7 +141,7 @@ class OutboundCall {
header_.set_call_id(call_id);
}
- // Serialize the call for the wire. Requires that SetRequestParam()
+ // Serialize the call for the wire. Requires that SetRequestPayload()
// is called first. This is called from the Reactor thread.
Status SerializeTo(std::vector<Slice>* slices);
@@ -269,6 +273,12 @@ class OutboundCall {
// Otherwise NULL.
gscoped_ptr<CallResponse> call_response_;
+ // All sidecars to be sent with this call.
+ std::vector<std::unique_ptr<RpcSidecar>> sidecars_;
+
+ // Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload().
+ int64_t sidecar_byte_size_ = -1;
+
DISALLOW_COPY_AND_ASSIGN(OutboundCall);
};
@@ -322,7 +332,7 @@ class CallResponse {
Slice serialized_response_;
// Slices of data for rpc sidecars. They point into memory owned by transfer_.
- Slice sidecar_slices_[OutboundTransfer::kMaxPayloadSlices];
+ Slice sidecar_slices_[TransferLimits::kMaxSidecars];
// The incoming transfer data - retained because serialized_response_
// and sidecar_slices_ refer into its data.
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/proxy.cc b/src/kudu/rpc/proxy.cc
index 206aac3..077af58 100644
--- a/src/kudu/rpc/proxy.cc
+++ b/src/kudu/rpc/proxy.cc
@@ -81,7 +81,7 @@ void Proxy::AsyncRequest(const string& method,
RemoteMethod remote_method(service_name_, method);
OutboundCall* call = new OutboundCall(conn_id_, remote_method, response, controller, callback);
controller->call_.reset(call);
- call->SetRequestParam(req);
+ controller->SetRequestParam(req);
// If this fails to queue, the callback will get called immediately
// and the controller will be in an ERROR state.
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index 35a19f2..75ef792 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -65,6 +65,8 @@ using kudu::rpc_test::ExactlyOnceResponsePB;
using kudu::rpc_test::FeatureFlags;
using kudu::rpc_test::PanicRequestPB;
using kudu::rpc_test::PanicResponsePB;
+using kudu::rpc_test::PushTwoStringsRequestPB;
+using kudu::rpc_test::PushTwoStringsResponsePB;
using kudu::rpc_test::SendTwoStringsRequestPB;
using kudu::rpc_test::SendTwoStringsResponsePB;
using kudu::rpc_test::SleepRequestPB;
@@ -83,6 +85,7 @@ class GenericCalculatorService : public ServiceIf {
static const char *kFullServiceName;
static const char *kAddMethodName;
static const char *kSleepMethodName;
+ static const char *kPushTwoStringsMethodName;
static const char *kSendTwoStringsMethodName;
static const char *kAddExactlyOnce;
@@ -105,6 +108,8 @@ class GenericCalculatorService : public ServiceIf {
DoSleep(incoming);
} else if (incoming->remote_method().method_name() == kSendTwoStringsMethodName) {
DoSendTwoStrings(incoming);
+ } else if (incoming->remote_method().method_name() == kPushTwoStringsMethodName) {
+ DoPushTwoStrings(incoming);
} else {
incoming->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_METHOD,
Status::InvalidArgument("bad method"));
@@ -134,8 +139,8 @@ class GenericCalculatorService : public ServiceIf {
LOG(FATAL) << "couldn't parse: " << param.ToDebugString();
}
- gscoped_ptr<faststring> first(new faststring);
- gscoped_ptr<faststring> second(new faststring);
+ std::unique_ptr<faststring> first(new faststring);
+ std::unique_ptr<faststring> second(new faststring);
Random r(req.random_seed());
first->resize(req.size1());
@@ -146,16 +151,42 @@ class GenericCalculatorService : public ServiceIf {
SendTwoStringsResponsePB resp;
int idx1, idx2;
- CHECK_OK(incoming->AddRpcSidecar(
- make_gscoped_ptr(new RpcSidecar(std::move(first))), &idx1));
- CHECK_OK(incoming->AddRpcSidecar(
- make_gscoped_ptr(new RpcSidecar(std::move(second))), &idx2));
+ CHECK_OK(incoming->AddOutboundSidecar(
+ RpcSidecar::FromFaststring(std::move(first)), &idx1));
+ CHECK_OK(incoming->AddOutboundSidecar(
+ RpcSidecar::FromFaststring(std::move(second)), &idx2));
resp.set_sidecar1(idx1);
resp.set_sidecar2(idx2);
incoming->RespondSuccess(resp);
}
+ void DoPushTwoStrings(InboundCall* incoming) {
+ Slice param(incoming->serialized_request());
+ PushTwoStringsRequestPB req;
+ if (!req.ParseFromArray(param.data(), param.size())) {
+ LOG(FATAL) << "couldn't parse: " << param.ToDebugString();
+ }
+
+ Slice sidecar1;
+ CHECK_OK(incoming->GetInboundSidecar(req.sidecar1_idx(), &sidecar1));
+
+ Slice sidecar2;
+ CHECK_OK(incoming->GetInboundSidecar(req.sidecar2_idx(), &sidecar2));
+
+ // Check that reading non-existant sidecars doesn't work.
+ Slice tmp;
+ CHECK(!incoming->GetInboundSidecar(req.sidecar2_idx() + 2, &tmp).ok());
+
+ PushTwoStringsResponsePB resp;
+ resp.set_size1(sidecar1.size());
+ resp.set_data1(reinterpret_cast<const char*>(sidecar1.data()), sidecar1.size());
+ resp.set_size2(sidecar2.size());
+ resp.set_data2(reinterpret_cast<const char*>(sidecar2.data()), sidecar2.size());
+
+ incoming->RespondSuccess(resp);
+ }
+
void DoSleep(InboundCall *incoming) {
Slice param(incoming->serialized_request());
SleepRequestPB req;
@@ -326,6 +357,7 @@ class CalculatorService : public CalculatorServiceIf {
const char *GenericCalculatorService::kFullServiceName = "kudu.rpc.GenericCalculatorService";
const char *GenericCalculatorService::kAddMethodName = "Add";
const char *GenericCalculatorService::kSleepMethodName = "Sleep";
+const char *GenericCalculatorService::kPushTwoStringsMethodName = "PushTwoStrings";
const char *GenericCalculatorService::kSendTwoStringsMethodName = "SendTwoStrings";
const char *GenericCalculatorService::kAddExactlyOnce = "AddExactlyOnce";
@@ -425,6 +457,30 @@ class RpcTestBase : public KuduTest {
CHECK_EQ(0, second.compare(Slice(expected)));
}
+ void DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) {
+ PushTwoStringsRequestPB request;
+ RpcController controller;
+
+ int idx1;
+ string s1(size1, 'a');
+ CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s1)), &idx1));
+
+ int idx2;
+ string s2(size2, 'b');
+ CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s2)), &idx2));
+
+ request.set_sidecar1_idx(idx1);
+ request.set_sidecar2_idx(idx2);
+
+ PushTwoStringsResponsePB resp;
+ CHECK_OK(p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+ request, &resp, &controller));
+ CHECK_EQ(size1, resp.size1());
+ CHECK_EQ(resp.data1(), s1);
+ CHECK_EQ(size2, resp.size2());
+ CHECK_EQ(resp.data2(), s2);
+ }
+
void DoTestExpectTimeout(const Proxy &p, const MonoDelta &timeout) {
SleepRequestPB req;
SleepResponsePB resp;
@@ -476,7 +532,7 @@ class RpcTestBase : public KuduTest {
static Slice GetSidecarPointer(const RpcController& controller, int idx,
int expected_size) {
Slice sidecar;
- CHECK_OK(controller.GetSidecar(idx, &sidecar));
+ CHECK_OK(controller.GetInboundSidecar(idx, &sidecar));
CHECK_EQ(expected_size, sidecar.size());
return Slice(sidecar.data(), expected_size);
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index e18d07c..d707a50 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -294,6 +294,51 @@ TEST_P(TestRpc, TestRpcSidecar) {
// Test some larger sidecars to verify that we properly handle the case where
// we can't write the whole response to the socket in a single call.
DoTestSidecar(p, 3000 * 1024, 2000 * 1024);
+
+ DoTestOutgoingSidecar(p, 0, 0);
+ DoTestOutgoingSidecar(p, 123, 456);
+ DoTestOutgoingSidecar(p, 3000 * 1024, 2000 * 1024);
+}
+
+TEST_P(TestRpc, TestRpcSidecarLimits) {
+ {
+ // Test that the limits on the number of sidecars is respected.
+ RpcController controller;
+ string s = "foo";
+ int idx;
+ for (int i = 0; i < TransferLimits::kMaxSidecars; ++i) {
+ CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx));
+ }
+
+ CHECK(!controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx).ok());
+ }
+
+ {
+ // Test that the payload may not exceed --rpc_max_message_size.
+ // Set up server.
+ Sockaddr server_addr;
+ bool enable_ssl = GetParam();
+ StartTestServer(&server_addr, enable_ssl);
+
+ // Set up client.
+ shared_ptr<Messenger> client_messenger(CreateMessenger("Client", 1, GetParam()));
+ Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
+
+ RpcController controller;
+ string s(FLAGS_rpc_max_message_size + 1, 'a');
+ int idx;
+ CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx));
+
+ PushTwoStringsRequestPB request;
+ request.set_sidecar1_idx(idx);
+ request.set_sidecar2_idx(idx);
+ PushTwoStringsResponsePB resp;
+ Status status = p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName,
+ request, &resp, &controller);
+ ASSERT_TRUE(status.IsNetworkError()) << "Unexpected error: " << status.ToString();
+ // Remote responds to extra-large payloads by closing the connection.
+ ASSERT_STR_CONTAINS(status.ToString(), "Connection reset by peer");
+ }
}
// Test that timeouts are properly handled.
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_context.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.cc b/src/kudu/rpc/rpc_context.cc
index a0e634c..e93e093 100644
--- a/src/kudu/rpc/rpc_context.cc
+++ b/src/kudu/rpc/rpc_context.cc
@@ -17,6 +17,7 @@
#include "kudu/rpc/rpc_context.h"
+#include <memory>
#include <ostream>
#include <sstream>
@@ -32,6 +33,7 @@
#include "kudu/util/trace.h"
using google::protobuf::Message;
+using std::unique_ptr;
namespace kudu {
namespace rpc {
@@ -141,8 +143,12 @@ const rpc::RequestIdPB* RpcContext::request_id() const {
return call_->header().has_request_id() ? &call_->header().request_id() : nullptr;
}
-Status RpcContext::AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx) {
- return call_->AddRpcSidecar(std::move(car), idx);
+Status RpcContext::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+ return call_->AddOutboundSidecar(std::move(car), idx);
+}
+
+Status RpcContext::GetInboundSidecar(int idx, Slice* slice) {
+ return call_->GetInboundSidecar(idx, slice);
}
const RemoteUser& RpcContext::remote_user() const {
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_context.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_context.h b/src/kudu/rpc/rpc_context.h
index b95a9ce..12e8907 100644
--- a/src/kudu/rpc/rpc_context.h
+++ b/src/kudu/rpc/rpc_context.h
@@ -153,7 +153,11 @@ class RpcContext {
// Upon success, writes the index of the sidecar (necessary to be retrieved
// later) to 'idx'. Call may fail if all sidecars have already been used
// by the RPC response.
- Status AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx);
+ Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
+
+ // Fills 'sidecar' with a sidecar sent by the client. Returns an error if 'idx' is out
+ // of bounds.
+ Status GetInboundSidecar(int idx, Slice* slice);
// Return the identity of remote user who made this call.
const RemoteUser& remote_user() const;
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_controller.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.cc b/src/kudu/rpc/rpc_controller.cc
index adaf5ce..5e5cbc3 100644
--- a/src/kudu/rpc/rpc_controller.cc
+++ b/src/kudu/rpc/rpc_controller.cc
@@ -19,11 +19,14 @@
#include <algorithm>
#include <glog/logging.h>
+#include <memory>
#include <mutex>
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/outbound_call.h"
+using std::unique_ptr;
+
namespace kudu { namespace rpc {
RpcController::RpcController() {
@@ -43,6 +46,7 @@ void RpcController::Swap(RpcController* other) {
CHECK(other->finished());
}
+ std::swap(outbound_sidecars_, other->outbound_sidecars_);
std::swap(timeout_, other->timeout_);
std::swap(call_, other->call_);
}
@@ -77,7 +81,7 @@ const ErrorStatusPB* RpcController::error_response() const {
return nullptr;
}
-Status RpcController::GetSidecar(int idx, Slice* sidecar) const {
+Status RpcController::GetInboundSidecar(int idx, Slice* sidecar) const {
return call_->call_response_->GetSidecar(idx, sidecar);
}
@@ -114,5 +118,19 @@ MonoDelta RpcController::timeout() const {
return timeout_;
}
+Status RpcController::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+ if (outbound_sidecars_.size() >= TransferLimits::kMaxSidecars) {
+ return Status::RuntimeError("All available sidecars already used");
+ }
+ outbound_sidecars_.emplace_back(std::move(car));
+ *idx = outbound_sidecars_.size() - 1;
+ return Status::OK();
+}
+
+void RpcController::SetRequestParam(const google::protobuf::Message& req) {
+ DCHECK(call_ != nullptr);
+ call_->SetRequestPayload(req, std::move(outbound_sidecars_));
+}
+
} // namespace rpc
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_controller.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_controller.h b/src/kudu/rpc/rpc_controller.h
index cce1ff2..6d521d0 100644
--- a/src/kudu/rpc/rpc_controller.h
+++ b/src/kudu/rpc/rpc_controller.h
@@ -21,11 +21,20 @@
#include <glog/logging.h>
#include <memory>
#include <unordered_set>
+#include <vector>
#include "kudu/gutil/macros.h"
+#include "kudu/gutil/stl_util.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
namespace kudu {
namespace rpc {
@@ -33,6 +42,7 @@ namespace rpc {
class ErrorStatusPB;
class OutboundCall;
class RequestIdPB;
+class RpcSidecar;
// Controller for managing properties of a single RPC call, on the client side.
//
@@ -177,12 +187,21 @@ class RpcController {
// been Reset().
//
// May fail if index is invalid.
- Status GetSidecar(int idx, Slice* sidecar) const;
+ Status GetInboundSidecar(int idx, Slice* sidecar) const;
+
+ // Adds a sidecar to the outbound request. The index of the sidecar is written to
+ // 'idx'. Returns an error if TransferLimits::kMaxSidecars have already been added
+ // to this request.
+ Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
private:
friend class OutboundCall;
friend class Proxy;
+ // Set the outbound call_'s request parameter, and transfer ownership of
+ // outbound_sidecars_ to call_ in preparation for serialization.
+ void SetRequestParam(const google::protobuf::Message& req);
+
MonoDelta timeout_;
std::unordered_set<uint32_t> required_server_features_;
@@ -195,6 +214,8 @@ class RpcController {
// Once the call is sent, it is tracked here.
std::shared_ptr<OutboundCall> call_;
+ std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
+
DISALLOW_COPY_AND_ASSIGN(RpcController);
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_header.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_header.proto b/src/kudu/rpc/rpc_header.proto
index 6721c44..a6c9728 100644
--- a/src/kudu/rpc/rpc_header.proto
+++ b/src/kudu/rpc/rpc_header.proto
@@ -257,6 +257,11 @@ message RequestHeader {
// Optional for requests that are naturally idempotent or to maintain compatibility with
// older clients for requests that are not.
optional RequestIdPB request_id = 15;
+
+ // Byte offsets for side cars in the main body of the request message.
+ // These offsets are counted AFTER the message header, i.e., offset 0
+ // is the first byte after the bytes for this protobuf.
+ repeated uint32 sidecar_offsets = 16;
}
message ResponseHeader {
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_sidecar.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_sidecar.cc b/src/kudu/rpc/rpc_sidecar.cc
new file mode 100644
index 0000000..580c6eb
--- /dev/null
+++ b/src/kudu/rpc/rpc_sidecar.cc
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc_sidecar.h"
+
+#include "kudu/util/status.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/gutil/strings/substitute.h"
+
+using std::unique_ptr;
+
+namespace kudu {
+namespace rpc {
+
+// Sidecar that simply wraps a Slice. The data associated with the slice is therefore not
+// owned by this class, and it's the caller's responsibility to ensure it has a lifetime
+// at least as long as this sidecar.
+class SliceSidecar : public RpcSidecar {
+ public:
+ explicit SliceSidecar(Slice slice) : slice_(slice) { }
+ Slice AsSlice() const override { return slice_; }
+
+ private:
+ const Slice slice_;
+};
+
+class FaststringSidecar : public RpcSidecar {
+ public:
+ explicit FaststringSidecar(unique_ptr<faststring> data) : data_(std::move(data)) { }
+ Slice AsSlice() const override { return *data_; }
+
+ private:
+ const unique_ptr<faststring> data_;
+};
+
+unique_ptr<RpcSidecar> RpcSidecar::FromFaststring(unique_ptr<faststring> data) {
+ return unique_ptr<RpcSidecar>(new FaststringSidecar(std::move(data)));
+}
+
+unique_ptr<RpcSidecar> RpcSidecar::FromSlice(Slice slice) {
+ return unique_ptr<RpcSidecar>(new SliceSidecar(slice));
+}
+
+
+Status RpcSidecar::ParseSidecars(
+ const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
+ Slice buffer, Slice* sidecars) {
+ if (offsets.size() == 0) return Status::OK();
+
+ int last = offsets.size() - 1;
+ if (last >= TransferLimits::kMaxSidecars) {
+ return Status::Corruption(strings::Substitute(
+ "Received $0 additional payload slices, expected at most %d",
+ last, TransferLimits::kMaxSidecars));
+ }
+
+ for (int i = 0; i < last; ++i) {
+ int64_t cur_offset = offsets.Get(i);
+ int64_t next_offset = offsets.Get(i + 1);
+ if (next_offset > buffer.size()) {
+ return Status::Corruption(strings::Substitute(
+ "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
+ " has length $2, but the entire message has length $3",
+ i, cur_offset, (next_offset - cur_offset), buffer.size()));
+ }
+ if (next_offset < cur_offset) {
+ return Status::Corruption(strings::Substitute(
+ "Invalid sidecar offsets; sidecar $0 apparently starts at $1,"
+ " but ends before that at offset $1.", i, cur_offset, next_offset));
+ }
+
+ sidecars[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset);
+ }
+
+ int64_t cur_offset = offsets.Get(last);
+ if (cur_offset > buffer.size()) {
+ return Status::Corruption(strings::Substitute("Invalid sidecar offsets: sidecar $0 "
+ "starts at offset $1after message ends (message length $2).", last,
+ cur_offset, buffer.size()));
+ }
+ sidecars[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset);
+
+ return Status::OK();
+}
+
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rpc_sidecar.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rpc_sidecar.h b/src/kudu/rpc/rpc_sidecar.h
index da7e00f..00d6e4b 100644
--- a/src/kudu/rpc/rpc_sidecar.h
+++ b/src/kudu/rpc/rpc_sidecar.h
@@ -17,50 +17,48 @@
#ifndef KUDU_RPC_RPC_SIDECAR_H
#define KUDU_RPC_RPC_SIDECAR_H
-#include "kudu/gutil/gscoped_ptr.h"
+#include <google/protobuf/repeated_field.h>
+#include <memory>
+
#include "kudu/util/faststring.h"
#include "kudu/util/slice.h"
namespace kudu {
namespace rpc {
-// An RpcSidecar is a mechanism which allows replies to RPCs
-// to reference blocks of data without extra copies. In other words,
-// whenever a protobuf would have a large field where additional copies
-// become expensive, one may opt instead to use an RpcSidecar.
-//
-// The RpcSidecar saves on an additional copy to/from the protobuf on both the
-// server and client side. The InboundCall class accepts RpcSidecars, ignorant
-// of the form that the sidecar's data is kept in, requiring only that it can
-// be represented as a Slice. Data is then immediately copied from the
-// Slice returned from AsSlice() to the socket that is responding to the original
-// RPC.
+// An RpcSidecar is a mechanism which allows replies to RPCs to reference blocks of data
+// without extra copies. In other words, whenever a protobuf would have a large field
+// where additional copies become expensive, one may opt instead to use an RpcSidecar.
//
-// In order to distinguish between separate sidecars, whenever a sidecar is
-// added to the RPC response on the server side, an index for that sidecar is
-// returned. This index must then in some way (i.e., via protobuf) be
-// communicated to the client side.
+// The RpcSidecar saves on an additional copy to/from the protobuf on both the server and
+// client side. Both Inbound- and OutboundCall classes accept sidecars to be sent to the
+// client and server respectively. They are ignorant of the sidecar's format, requiring
+// only that it can be represented as a Slice. Data is copied from the Slice returned from
+// AsSlice() to the socket that is responding to the original RPC. The slice should remain
+// valid for as long as the call it is attached to takes to complete.
//
-// After receiving the RPC response on the client side, OutboundCall decodes
-// the original message along with the separate sidecars by using a list
-// of sidecar byte offsets that was sent in the message header.
+// In order to distinguish between separate sidecars, whenever a sidecar is added to the
+// RPC response on the server side, an index for that sidecar is returned. This index must
+// then in some way (i.e., via protobuf) be communicated to the recipient.
//
-// After reconstructing the array of sidecars, the OutboundCall (through
-// RpcController's interface) is able to offer retrieval of the sidecar data
-// through the same indices that were returned by InboundCall (or indirectly
-// through the RpcContext wrapper) on the client side.
+// After reconstructing the array of sidecars, servers and clients may retrieve the
+// sidecar data through the RpcContext or RpcController interfaces respectively.
class RpcSidecar {
public:
- // Generates a sidecar with the parameter faststring as its data.
- explicit RpcSidecar(gscoped_ptr<faststring> data) : data_(std::move(data)) {}
+ static std::unique_ptr<RpcSidecar> FromFaststring(std::unique_ptr<faststring> data);
+ static std::unique_ptr<RpcSidecar> FromSlice(Slice slice);
- // Returns a Slice representation of the sidecar's data.
- Slice AsSlice() const { return *data_; }
+ // Utility method to parse a series of sidecar slices into 'sidecars' from 'buffer' and
+ // a set of offsets. 'sidecars' must have length >= TransferLimits::kMaxSidecars, and
+ // will be filled from index 0.
+ // TODO(henryr): Consider a vector instead here if there's no perf. impact.
+ static Status ParseSidecars(
+ const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets,
+ Slice buffer, Slice* sidecars);
- private:
- const gscoped_ptr<faststring> data_;
-
- DISALLOW_COPY_AND_ASSIGN(RpcSidecar);
+ // Returns a Slice representation of the sidecar's data.
+ virtual Slice AsSlice() const = 0;
+ virtual ~RpcSidecar() { }
};
} // namespace rpc
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/rtest.proto
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/rtest.proto b/src/kudu/rpc/rtest.proto
index df307a6..c52b535 100644
--- a/src/kudu/rpc/rtest.proto
+++ b/src/kudu/rpc/rtest.proto
@@ -65,6 +65,19 @@ message SendTwoStringsResponsePB {
required uint32 sidecar2 = 2;
}
+// Push two strings to the server as part of the request, in sidecars.
+message PushTwoStringsRequestPB {
+ required uint32 sidecar1_idx = 1;
+ required uint32 sidecar2_idx = 2;
+}
+
+message PushTwoStringsResponsePB {
+ required uint32 size1 = 1;
+ required string data1 = 2;
+ required uint32 size2 = 3;
+ required string data2 = 4;
+}
+
message EchoRequestPB {
required string data = 1;
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index 7fb6b10..671347a 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -46,6 +46,16 @@ namespace rpc {
class Messenger;
struct TransferCallbacks;
+class TransferLimits {
+ public:
+ enum {
+ kMaxSidecars = 10,
+ kMaxPayloadSlices = kMaxSidecars + 2 // (header + msg)
+ };
+
+ DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
+};
+
// This class is used internally by the RPC layer to represent an inbound
// transfer in progress.
//
@@ -94,8 +104,6 @@ class InboundTransfer {
// Upon completion of the transfer, a callback is triggered.
class OutboundTransfer : public boost::intrusive::list_base_hook<> {
public:
- enum { kMaxPayloadSlices = 10 };
-
// Factory methods for creating transfers associated with call requests
// or responses. The 'payload' slices will be concatenated and
// written to the socket. When the transfer completes or errors, the
@@ -159,7 +167,7 @@ class OutboundTransfer : public boost::intrusive::list_base_hook<> {
// Slices to send. Uses an array here instead of a vector to avoid an expensive
// vector construction (improved performance a couple percent).
- Slice payload_slices_[kMaxPayloadSlices];
+ Slice payload_slices_[TransferLimits::kMaxPayloadSlices];
size_t n_payload_slices_;
// The current slice that is being sent.
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/tserver/tablet_server-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index 22edce7..6ebf0e3 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -312,10 +312,10 @@ class TabletServerTestBase : public KuduTest {
vector<string>* results) {
RowwiseRowBlockPB* rrpb = resp.mutable_data();
Slice direct, indirect; // sidecar data buffers
- ASSERT_OK(rpc.GetSidecar(rrpb->rows_sidecar(), &direct));
+ ASSERT_OK(rpc.GetInboundSidecar(rrpb->rows_sidecar(), &direct));
if (rrpb->has_indirect_data_sidecar()) {
- ASSERT_OK(rpc.GetSidecar(rrpb->indirect_data_sidecar(),
- &indirect));
+ ASSERT_OK(rpc.GetInboundSidecar(rrpb->indirect_data_sidecar(),
+ &indirect));
}
vector<const uint8_t*> rows;
ASSERT_OK(ExtractRowsFromRowBlockPB(projection, *rrpb,
http://git-wip-us.apache.org/repos/asf/kudu/blob/72895966/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 69ee7ab..c9e7d79 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -118,6 +118,7 @@ using kudu::consensus::StartTabletCopyResponsePB;
using kudu::consensus::VoteRequestPB;
using kudu::consensus::VoteResponsePB;
using kudu::rpc::RpcContext;
+using kudu::rpc::RpcSidecar;
using kudu::server::ServerBase;
using kudu::tablet::AlterSchemaTransactionState;
using kudu::tablet::Tablet;
@@ -1073,8 +1074,8 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
}
size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
- gscoped_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10));
- gscoped_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 10));
+ unique_ptr<faststring> rows_data(new faststring(batch_size_bytes * 11 / 10));
+ unique_ptr<faststring> indirect_data(new faststring(batch_size_bytes * 11 / 10));
RowwiseRowBlockPB data;
ScanResultCopier collector(&data, rows_data.get(), indirect_data.get());
@@ -1123,15 +1124,15 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
// Add sidecar data to context and record the returned indices.
int rows_idx;
- CHECK_OK(context->AddRpcSidecar(make_gscoped_ptr(
- new rpc::RpcSidecar(std::move(rows_data))), &rows_idx));
+ CHECK_OK(context->AddOutboundSidecar(RpcSidecar::FromFaststring((std::move(rows_data))),
+ &rows_idx));
resp->mutable_data()->set_rows_sidecar(rows_idx);
// Add indirect data as a sidecar, if applicable.
if (indirect_data->size() > 0) {
int indirect_idx;
- CHECK_OK(context->AddRpcSidecar(make_gscoped_ptr(
- new rpc::RpcSidecar(std::move(indirect_data))), &indirect_idx));
+ CHECK_OK(context->AddOutboundSidecar(RpcSidecar::FromFaststring(
+ std::move(indirect_data)), &indirect_idx));
resp->mutable_data()->set_indirect_data_sidecar(indirect_idx);
}