You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/17 03:14:22 UTC
[06/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC
library from kudu@314c9d8
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_if.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_if.cc b/be/src/kudu/rpc/service_if.cc
new file mode 100644
index 0000000..39e9ab5
--- /dev/null
+++ b/be/src/kudu/rpc/service_if.cc
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/service_if.h"
+
+#include <memory>
+#include <string>
+#include <google/protobuf/descriptor.pb.h>
+
+#include "kudu/gutil/strings/substitute.h"
+
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/flag_tags.h"
+
+// TODO remove this once we have fully cluster-tested this.
+// Despite being on by default, this is left in in case we discover
+// any issues in 0.10.0, we'll have an easy workaround to disable the feature.
+DEFINE_bool(enable_exactly_once, true, "Whether to enable exactly once semantics.");
+TAG_FLAG(enable_exactly_once, hidden);
+
+using google::protobuf::Message;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+ServiceIf::~ServiceIf() {
+}
+
+void ServiceIf::Shutdown() {
+}
+
+bool ServiceIf::SupportsFeature(uint32_t feature) const {
+ return false;
+}
+
+bool ServiceIf::ParseParam(InboundCall *call, google::protobuf::Message *message) {
+ Slice param(call->serialized_request());
+ if (PREDICT_FALSE(!message->ParseFromArray(param.data(), param.size()))) {
+ string err = Substitute("invalid parameter for call $0: missing fields: $1",
+ call->remote_method().ToString(),
+ message->InitializationErrorString().c_str());
+ LOG(WARNING) << err;
+ call->RespondFailure(ErrorStatusPB::ERROR_INVALID_REQUEST,
+ Status::InvalidArgument(err));
+ return false;
+ }
+ return true;
+}
+
+void ServiceIf::RespondBadMethod(InboundCall *call) {
+ Sockaddr local_addr, remote_addr;
+
+ CHECK_OK(call->connection()->socket()->GetSocketAddress(&local_addr));
+ CHECK_OK(call->connection()->socket()->GetPeerAddress(&remote_addr));
+ string err = Substitute("Call on service $0 received at $1 from $2 with an "
+ "invalid method name: $3",
+ call->remote_method().service_name(),
+ local_addr.ToString(),
+ remote_addr.ToString(),
+ call->remote_method().method_name());
+ LOG(WARNING) << err;
+ call->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_METHOD,
+ Status::InvalidArgument(err));
+}
+
+GeneratedServiceIf::~GeneratedServiceIf() {
+}
+
+
+void GeneratedServiceIf::Handle(InboundCall *call) {
+ const RpcMethodInfo* method_info = call->method_info();
+ if (!method_info) {
+ RespondBadMethod(call);
+ return;
+ }
+ unique_ptr<Message> req(method_info->req_prototype->New());
+ if (PREDICT_FALSE(!ParseParam(call, req.get()))) {
+ return;
+ }
+ Message* resp = method_info->resp_prototype->New();
+
+ bool track_result = call->header().has_request_id()
+ && method_info->track_result
+ && FLAGS_enable_exactly_once;
+ RpcContext* ctx = new RpcContext(call,
+ req.release(),
+ resp,
+ track_result ? result_tracker_ : nullptr);
+ if (!method_info->authz_method(ctx->request_pb(), resp, ctx)) {
+ // The authz_method itself should have responded to the RPC.
+ return;
+ }
+
+ if (track_result) {
+ RequestIdPB request_id(call->header().request_id());
+ ResultTracker::RpcState state = ctx->result_tracker()->TrackRpc(
+ call->header().request_id(),
+ resp,
+ ctx);
+ switch (state) {
+ case ResultTracker::NEW:
+ // Fall out of the 'if' statement to the normal path.
+ break;
+ case ResultTracker::COMPLETED:
+ case ResultTracker::IN_PROGRESS:
+ case ResultTracker::STALE:
+ // ResultTracker has already responded to the RPC and deleted
+ // 'ctx'.
+ return;
+ default:
+ LOG(FATAL) << "Unknown state: " << state;
+ }
+ }
+ method_info->func(ctx->request_pb(), resp, ctx);
+}
+
+
+RpcMethodInfo* GeneratedServiceIf::LookupMethod(const RemoteMethod& method) {
+ DCHECK_EQ(method.service_name(), service_name());
+ const auto& it = methods_by_name_.find(method.method_name());
+ if (PREDICT_FALSE(it == methods_by_name_.end())) {
+ return nullptr;
+ }
+ return it->second.get();
+}
+
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_if.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_if.h b/be/src/kudu/rpc/service_if.h
new file mode 100644
index 0000000..a3722c6
--- /dev/null
+++ b/be/src/kudu/rpc/service_if.h
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_SERVICE_IF_H
+#define KUDU_RPC_SERVICE_IF_H
+
+#include <unordered_map>
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/rpc/result_tracker.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
+
+namespace kudu {
+
+class Histogram;
+
+namespace rpc {
+
+class InboundCall;
+class RemoteMethod;
+class RpcContext;
+class ServiceIf;
+
+// Generated services define an instance of this class for each
+// method that they implement. The generic server code implemented
+// by GeneratedServiceIf look up the RpcMethodInfo in order to handle
+// each RPC.
+struct RpcMethodInfo : public RefCountedThreadSafe<RpcMethodInfo> {
+ // Prototype protobufs for requests and responses.
+ // These are empty protobufs which are cloned in order to provide an
+ // instance for each request.
+ std::unique_ptr<google::protobuf::Message> req_prototype;
+ std::unique_ptr<google::protobuf::Message> resp_prototype;
+
+ scoped_refptr<Histogram> handler_latency_histogram;
+
+ // Whether we should track this method's result, using ResultTracker.
+ bool track_result;
+
+ // The authorization function for this RPC. If this function
+ // returns false, the RPC has already been handled (i.e. rejected)
+ // by the authorization function.
+ std::function<bool(const google::protobuf::Message* req,
+ google::protobuf::Message* resp,
+ RpcContext* ctx)> authz_method;
+
+ // The actual function to be called.
+ std::function<void(const google::protobuf::Message* req,
+ google::protobuf::Message* resp,
+ RpcContext* ctx)> func;
+};
+
+// Handles incoming messages that initiate an RPC.
+class ServiceIf {
+ public:
+ virtual ~ServiceIf();
+ virtual void Handle(InboundCall* incoming) = 0;
+ virtual void Shutdown();
+ virtual std::string service_name() const = 0;
+
+ // The service should return true if it supports the provided application
+ // specific feature flag.
+ virtual bool SupportsFeature(uint32_t feature) const;
+
+ // Look up the method being requested by the remote call.
+ //
+ // If this returns nullptr, then certain functionality like
+ // metrics collection will not be performed for this call.
+ virtual RpcMethodInfo* LookupMethod(const RemoteMethod& method) {
+ return nullptr;
+ }
+
+ // Default authorization method, which just allows all RPCs.
+ //
+ // See docs/design-docs/rpc.md for details on how to add custom
+ // authorization checks to a service.
+ bool AuthorizeAllowAll(const google::protobuf::Message* /*req*/,
+ google::protobuf::Message* /*resp*/,
+ RpcContext* /*ctx*/) {
+ return true;
+ }
+
+ protected:
+ bool ParseParam(InboundCall* call, google::protobuf::Message* message);
+ void RespondBadMethod(InboundCall* call);
+};
+
+
+// Base class for code-generated service classes.
+class GeneratedServiceIf : public ServiceIf {
+ public:
+ virtual ~GeneratedServiceIf();
+
+ // Looks up the appropriate method in 'methods_by_name_' and executes
+ // it on the current thread.
+ //
+ // If no such method is found, responds with an error.
+ void Handle(InboundCall* incoming) override;
+
+ RpcMethodInfo* LookupMethod(const RemoteMethod& method) override;
+
+ protected:
+ // For each method, stores the relevant information about how to handle the
+ // call. Methods are inserted by the constructor of the generated subclass.
+ // After construction, this map is accessed by multiple threads and therefore
+ // must not be modified.
+ std::unordered_map<std::string, scoped_refptr<RpcMethodInfo>> methods_by_name_;
+
+ // The result tracker for this service's methods.
+ scoped_refptr<ResultTracker> result_tracker_;
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_pool.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_pool.cc b/be/src/kudu/rpc/service_pool.cc
new file mode 100644
index 0000000..1a23ca9
--- /dev/null
+++ b/be/src/kudu/rpc/service_pool.cc
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/service_pool.h"
+
+#include <glog/logging.h>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/rpc/service_queue.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/trace.h"
+
+using std::shared_ptr;
+using strings::Substitute;
+
+METRIC_DEFINE_histogram(server, rpc_incoming_queue_time,
+ "RPC Queue Time",
+ kudu::MetricUnit::kMicroseconds,
+ "Number of microseconds incoming RPC requests spend in the worker queue",
+ 60000000LU, 3);
+
+METRIC_DEFINE_counter(server, rpcs_timed_out_in_queue,
+ "RPC Queue Timeouts",
+ kudu::MetricUnit::kRequests,
+ "Number of RPCs whose timeout elapsed while waiting "
+ "in the service queue, and thus were not processed.");
+
+METRIC_DEFINE_counter(server, rpcs_queue_overflow,
+ "RPC Queue Overflows",
+ kudu::MetricUnit::kRequests,
+ "Number of RPCs dropped because the service queue "
+ "was full.");
+
+namespace kudu {
+namespace rpc {
+
+ServicePool::ServicePool(gscoped_ptr<ServiceIf> service,
+ const scoped_refptr<MetricEntity>& entity,
+ size_t service_queue_length)
+ : service_(std::move(service)),
+ service_queue_(service_queue_length),
+ incoming_queue_time_(METRIC_rpc_incoming_queue_time.Instantiate(entity)),
+ rpcs_timed_out_in_queue_(METRIC_rpcs_timed_out_in_queue.Instantiate(entity)),
+ rpcs_queue_overflow_(METRIC_rpcs_queue_overflow.Instantiate(entity)),
+ closing_(false) {
+}
+
+ServicePool::~ServicePool() {
+ Shutdown();
+}
+
+Status ServicePool::Init(int num_threads) {
+ for (int i = 0; i < num_threads; i++) {
+ scoped_refptr<kudu::Thread> new_thread;
+ CHECK_OK(kudu::Thread::Create("service pool", "rpc worker",
+ &ServicePool::RunThread, this, &new_thread));
+ threads_.push_back(new_thread);
+ }
+ return Status::OK();
+}
+
+void ServicePool::Shutdown() {
+ service_queue_.Shutdown();
+
+ MutexLock lock(shutdown_lock_);
+ if (closing_) return;
+ closing_ = true;
+ // TODO: Use a proper thread pool implementation.
+ for (scoped_refptr<kudu::Thread>& thread : threads_) {
+ CHECK_OK(ThreadJoiner(thread.get()).Join());
+ }
+
+ // Now we must drain the service queue.
+ Status status = Status::ServiceUnavailable("Service is shutting down");
+ std::unique_ptr<InboundCall> incoming;
+ while (service_queue_.BlockingGet(&incoming)) {
+ incoming.release()->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
+ }
+
+ service_->Shutdown();
+}
+
+void ServicePool::RejectTooBusy(InboundCall* c) {
+ string err_msg =
+ Substitute("$0 request on $1 from $2 dropped due to backpressure. "
+ "The service queue is full; it has $3 items.",
+ c->remote_method().method_name(),
+ service_->service_name(),
+ c->remote_address().ToString(),
+ service_queue_.max_size());
+ rpcs_queue_overflow_->Increment();
+ KLOG_EVERY_N_SECS(WARNING, 1) << err_msg;
+ c->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+ Status::ServiceUnavailable(err_msg));
+ DLOG(INFO) << err_msg << " Contents of service queue:\n"
+ << service_queue_.ToString();
+}
+
+RpcMethodInfo* ServicePool::LookupMethod(const RemoteMethod& method) {
+ return service_->LookupMethod(method);
+}
+
+Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call) {
+ InboundCall* c = call.release();
+
+ vector<uint32_t> unsupported_features;
+ for (uint32_t feature : c->GetRequiredFeatures()) {
+ if (!service_->SupportsFeature(feature)) {
+ unsupported_features.push_back(feature);
+ }
+ }
+
+ if (!unsupported_features.empty()) {
+ c->RespondUnsupportedFeature(unsupported_features);
+ return Status::NotSupported("call requires unsupported application feature flags",
+ JoinMapped(unsupported_features,
+ [] (uint32_t flag) { return std::to_string(flag); },
+ ", "));
+ }
+
+ TRACE_TO(c->trace(), "Inserting onto call queue");
+
+ // Queue message on service queue
+ boost::optional<InboundCall*> evicted;
+ auto queue_status = service_queue_.Put(c, &evicted);
+ if (queue_status == QUEUE_FULL) {
+ RejectTooBusy(c);
+ return Status::OK();
+ }
+
+ if (PREDICT_FALSE(evicted != boost::none)) {
+ RejectTooBusy(*evicted);
+ }
+
+ if (PREDICT_TRUE(queue_status == QUEUE_SUCCESS)) {
+ // NB: do not do anything with 'c' after it is successfully queued --
+ // a service thread may have already dequeued it, processed it, and
+ // responded by this point, in which case the pointer would be invalid.
+ return Status::OK();
+ }
+
+ Status status = Status::OK();
+ if (queue_status == QUEUE_SHUTDOWN) {
+ status = Status::ServiceUnavailable("Service is shutting down");
+ c->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
+ } else {
+ status = Status::RuntimeError(Substitute("Unknown error from BlockingQueue: $0", queue_status));
+ c->RespondFailure(ErrorStatusPB::FATAL_UNKNOWN, status);
+ }
+ return status;
+}
+
+void ServicePool::RunThread() {
+ while (true) {
+ std::unique_ptr<InboundCall> incoming;
+ if (!service_queue_.BlockingGet(&incoming)) {
+ VLOG(1) << "ServicePool: messenger shutting down.";
+ return;
+ }
+
+ incoming->RecordHandlingStarted(incoming_queue_time_);
+ ADOPT_TRACE(incoming->trace());
+
+ if (PREDICT_FALSE(incoming->ClientTimedOut())) {
+ TRACE_TO(incoming->trace(), "Skipping call since client already timed out");
+ rpcs_timed_out_in_queue_->Increment();
+
+ // Respond as a failure, even though the client will probably ignore
+ // the response anyway.
+ incoming->RespondFailure(
+ ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+ Status::TimedOut("Call waited in the queue past client deadline"));
+
+ // Must release since RespondFailure above ends up taking ownership
+ // of the object.
+ ignore_result(incoming.release());
+ continue;
+ }
+
+ TRACE_TO(incoming->trace(), "Handling call");
+
+ // Release the InboundCall pointer -- when the call is responded to,
+ // it will get deleted at that point.
+ service_->Handle(incoming.release());
+ }
+}
+
+const string ServicePool::service_name() const {
+ return service_->service_name();
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_pool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_pool.h b/be/src/kudu/rpc/service_pool.h
new file mode 100644
index 0000000..70611c8
--- /dev/null
+++ b/be/src/kudu/rpc/service_pool.h
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_SERVICE_POOL_H
+#define KUDU_SERVICE_POOL_H
+
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/service_queue.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Counter;
+class Histogram;
+class MetricEntity;
+class Socket;
+
+namespace rpc {
+
+class Messenger;
+class ServiceIf;
+
+// A pool of threads that handle new incoming RPC calls.
+// Also includes a queue that calls get pushed onto for handling by the pool.
+class ServicePool : public RpcService {
+ public:
+ ServicePool(gscoped_ptr<ServiceIf> service,
+ const scoped_refptr<MetricEntity>& metric_entity,
+ size_t service_queue_length);
+ virtual ~ServicePool();
+
+ // Start up the thread pool.
+ virtual Status Init(int num_threads);
+
+ // Shut down the queue and the thread pool.
+ virtual void Shutdown();
+
+ RpcMethodInfo* LookupMethod(const RemoteMethod& method) override;
+
+ virtual Status QueueInboundCall(gscoped_ptr<InboundCall> call) OVERRIDE;
+
+ const Counter* RpcsTimedOutInQueueMetricForTests() const {
+ return rpcs_timed_out_in_queue_.get();
+ }
+
+ const Histogram* IncomingQueueTimeMetricForTests() const {
+ return incoming_queue_time_.get();
+ }
+
+ const Counter* RpcsQueueOverflowMetric() const {
+ return rpcs_queue_overflow_.get();
+ }
+
+ const std::string service_name() const;
+
+ private:
+ void RunThread();
+ void RejectTooBusy(InboundCall* c);
+
+ gscoped_ptr<ServiceIf> service_;
+ std::vector<scoped_refptr<kudu::Thread> > threads_;
+ LifoServiceQueue service_queue_;
+ scoped_refptr<Histogram> incoming_queue_time_;
+ scoped_refptr<Counter> rpcs_timed_out_in_queue_;
+ scoped_refptr<Counter> rpcs_queue_overflow_;
+
+ mutable Mutex shutdown_lock_;
+ bool closing_;
+
+ DISALLOW_COPY_AND_ASSIGN(ServicePool);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_queue-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_queue-test.cc b/be/src/kudu/rpc/service_queue-test.cc
new file mode 100644
index 0000000..0bcbd12
--- /dev/null
+++ b/be/src/kudu/rpc/service_queue-test.cc
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include <atomic>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "kudu/rpc/service_queue.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+DEFINE_int32(num_producers, 4,
+ "Number of producer threads");
+
+DEFINE_int32(num_consumers, 20,
+ "Number of consumer threads");
+
+DEFINE_int32(max_queue_size, 50,
+ "Max queue length");
+
+namespace kudu {
+namespace rpc {
+
+static std::atomic<uint32_t> inprogress;
+
+static std::atomic<uint32_t> total;
+
+template <typename Queue>
+void ProducerThread(Queue* queue) {
+ int max_inprogress = FLAGS_max_queue_size - FLAGS_num_producers;
+ while (true) {
+ while (inprogress > max_inprogress) {
+ base::subtle::PauseCPU();
+ }
+ inprogress++;
+ InboundCall* call = new InboundCall(nullptr);
+ boost::optional<InboundCall*> evicted;
+ auto status = queue->Put(call, &evicted);
+ if (status == QUEUE_FULL) {
+ LOG(INFO) << "queue full: producer exiting";
+ delete call;
+ break;
+ }
+
+ if (PREDICT_FALSE(evicted != boost::none)) {
+ LOG(INFO) << "call evicted: producer exiting";
+ delete evicted.get();
+ break;
+ }
+
+ if (PREDICT_TRUE(status == QUEUE_SHUTDOWN)) {
+ delete call;
+ break;
+ }
+ }
+}
+
+template <typename Queue>
+void ConsumerThread(Queue* queue) {
+ unique_ptr<InboundCall> call;
+ while (queue->BlockingGet(&call)) {
+ inprogress--;
+ total++;
+ call.reset();
+ }
+}
+
+TEST(TestServiceQueue, LifoServiceQueuePerf) {
+ LifoServiceQueue queue(FLAGS_max_queue_size);
+ vector<std::thread> producers;
+ vector<std::thread> consumers;
+
+ for (int i = 0; i < FLAGS_num_producers; i++) {
+ producers.emplace_back(&ProducerThread<LifoServiceQueue>, &queue);
+ }
+
+ for (int i = 0; i < FLAGS_num_consumers; i++) {
+ consumers.emplace_back(&ConsumerThread<LifoServiceQueue>, &queue);
+ }
+
+ int seconds = AllowSlowTests() ? 10 : 1;
+ uint64_t total_sample = 0;
+ uint64_t total_queue_len = 0;
+ uint64_t total_idle_workers = 0;
+ Stopwatch sw(Stopwatch::ALL_THREADS);
+ sw.start();
+ int32_t before = total;
+
+ for (int i = 0; i < seconds * 50; i++) {
+ SleepFor(MonoDelta::FromMilliseconds(20));
+ total_sample++;
+ total_queue_len += queue.estimated_queue_length();
+ total_idle_workers += queue.estimated_idle_worker_count();
+ }
+
+ sw.stop();
+ int32_t delta = total - before;
+
+ queue.Shutdown();
+ for (int i = 0; i < FLAGS_num_producers; i++) {
+ producers[i].join();
+ }
+ for (int i = 0; i < FLAGS_num_consumers; i++) {
+ consumers[i].join();
+ }
+
+ float reqs_per_second = static_cast<float>(delta / sw.elapsed().wall_seconds());
+ float user_cpu_micros_per_req = static_cast<float>(sw.elapsed().user / 1000.0 / delta);
+ float sys_cpu_micros_per_req = static_cast<float>(sw.elapsed().system / 1000.0 / delta);
+
+ LOG(INFO) << "Reqs/sec: " << (int32_t)reqs_per_second;
+ LOG(INFO) << "User CPU per req: " << user_cpu_micros_per_req << "us";
+ LOG(INFO) << "Sys CPU per req: " << sys_cpu_micros_per_req << "us";
+ LOG(INFO) << "Avg rpc queue length: " << total_queue_len / static_cast<double>(total_sample);
+ LOG(INFO) << "Avg idle workers: " << total_idle_workers / static_cast<double>(total_sample);
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_queue.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_queue.cc b/be/src/kudu/rpc/service_queue.cc
new file mode 100644
index 0000000..9b938f6
--- /dev/null
+++ b/be/src/kudu/rpc/service_queue.cc
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/service_queue.h"
+
+#include <mutex>
+
+#include "kudu/util/logging.h"
+
+namespace kudu {
+namespace rpc {
+
+__thread LifoServiceQueue::ConsumerState* LifoServiceQueue::tl_consumer_ = nullptr;
+
+LifoServiceQueue::LifoServiceQueue(int max_size)
+ : shutdown_(false),
+ max_queue_size_(max_size) {
+ CHECK_GT(max_queue_size_, 0);
+}
+
+LifoServiceQueue::~LifoServiceQueue() {
+ DCHECK(queue_.empty())
+ << "ServiceQueue holds bare pointers at destruction time";
+}
+
+bool LifoServiceQueue::BlockingGet(std::unique_ptr<InboundCall>* out) {
+ auto consumer = tl_consumer_;
+ if (PREDICT_FALSE(!consumer)) {
+ consumer = tl_consumer_ = new ConsumerState(this);
+ std::lock_guard<simple_spinlock> l(lock_);
+ consumers_.emplace_back(consumer);
+ }
+
+ while (true) {
+ {
+ std::lock_guard<simple_spinlock> l(lock_);
+ if (!queue_.empty()) {
+ auto it = queue_.begin();
+ out->reset(*it);
+ queue_.erase(it);
+ return true;
+ }
+ if (PREDICT_FALSE(shutdown_)) {
+ return false;
+ }
+ consumer->DCheckBoundInstance(this);
+ waiting_consumers_.push_back(consumer);
+ }
+ InboundCall* call = consumer->Wait();
+ if (call != nullptr) {
+ out->reset(call);
+ return true;
+ }
+ // if call == nullptr, this means we are shutting down the queue.
+ // Loop back around and re-check 'shutdown_'.
+ }
+}
+
+QueueStatus LifoServiceQueue::Put(InboundCall* call,
+ boost::optional<InboundCall*>* evicted) {
+ std::unique_lock<simple_spinlock> l(lock_);
+ if (PREDICT_FALSE(shutdown_)) {
+ return QUEUE_SHUTDOWN;
+ }
+
+ DCHECK(!(waiting_consumers_.size() > 0 && queue_.size() > 0));
+
+ // fast path
+ if (queue_.empty() && waiting_consumers_.size() > 0) {
+ auto consumer = waiting_consumers_[waiting_consumers_.size() - 1];
+ waiting_consumers_.pop_back();
+ // Notify condition var(and wake up consumer thread) takes time,
+ // so put it out of spinlock scope.
+ l.unlock();
+ consumer->Post(call);
+ return QUEUE_SUCCESS;
+ }
+
+ if (PREDICT_FALSE(queue_.size() >= max_queue_size_)) {
+ // eviction
+ DCHECK_EQ(queue_.size(), max_queue_size_);
+ auto it = queue_.end();
+ --it;
+ if (DeadlineLess(*it, call)) {
+ return QUEUE_FULL;
+ }
+
+ *evicted = *it;
+ queue_.erase(it);
+ }
+
+ queue_.insert(call);
+ return QUEUE_SUCCESS;
+}
+
+void LifoServiceQueue::Shutdown() {
+ std::lock_guard<simple_spinlock> l(lock_);
+ shutdown_ = true;
+
+ // Post a nullptr to wake up any consumers which are waiting.
+ for (auto* cs : waiting_consumers_) {
+ cs->Post(nullptr);
+ }
+ waiting_consumers_.clear();
+}
+
+bool LifoServiceQueue::empty() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return queue_.empty();
+}
+
+int LifoServiceQueue::max_size() const {
+ return max_queue_size_;
+}
+
+std::string LifoServiceQueue::ToString() const {
+ std::string ret;
+
+ std::lock_guard<simple_spinlock> l(lock_);
+ for (const auto* t : queue_) {
+ ret.append(t->ToString());
+ ret.append("\n");
+ }
+ return ret;
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_queue.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_queue.h b/be/src/kudu/rpc/service_queue.h
new file mode 100644
index 0000000..d68576f
--- /dev/null
+++ b/be/src/kudu/rpc/service_queue.h
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SERVICE_QUEUE_H
+#define KUDU_UTIL_SERVICE_QUEUE_H
+
+#include <boost/optional.hpp>
+#include <memory>
+#include <string>
+#include <set>
+#include <vector>
+
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+namespace rpc {
+
+// Return values for ServiceQueue::Put()
+enum QueueStatus {
+ QUEUE_SUCCESS = 0,
+ QUEUE_SHUTDOWN = 1,
+ QUEUE_FULL = 2
+};
+
+// Blocking queue used for passing inbound RPC calls to the service handler pool.
+// Calls are dequeued in 'earliest-deadline first' order. The queue also maintains a
+// bounded number of calls. If the queue overflows, then calls with deadlines farthest
+// in the future are evicted.
+//
+// When calls do not provide deadlines, the RPC layer considers their deadline to
+// be infinitely in the future. This means that any call that does have a deadline
+// can evict any call that does not have a deadline. This incentivizes clients to
+// provide accurate deadlines for their calls.
+//
+// In order to improve concurrent throughput, this class uses a LIFO design:
+// Each consumer thread has its own lock and condition variable. If a
+// consumer arrives and there is no work available in the queue, it will not
+// wait on the queue lock, but rather push its own 'ConsumerState' object
+// to the 'waiting_consumers_' stack. When work arrives, if there are waiting
+// consumers, the top consumer is popped from the stack and woken up.
+//
+// This design has a few advantages over the basic BlockingQueue:
+// - the worker who was most recently busy is the one which will be selected for
+// new work. This gives an opportunity for the worker to be scheduled again
+// without going to sleep, and also keeps CPU cache and allocator caches hot.
+// - in the common case that there are enough workers to fully service the incoming
+// work rate, the queue implementation itself is never used. Thus, we can
+// have a priority queue without paying extra for it in the common case.
+//
+// NOTE: because of the use of thread-local consumer records, once a consumer
+// thread accesses one LifoServiceQueue, it becomes "bound" to that queue and
+// must never access any other instance.
+class LifoServiceQueue {
+ public:
+ explicit LifoServiceQueue(int max_size);
+
+ ~LifoServiceQueue();
+
+ // Get an element from the queue. Returns false if we were shut down prior to
+ // getting the element.
+ bool BlockingGet(std::unique_ptr<InboundCall>* out);
+
+ // Add a new call to the queue.
+ // Returns:
+ // - QUEUE_SHUTDOWN if Shutdown() has already been called.
+ // - QUEUE_FULL if the queue is full and 'call' has a later deadline than any
+ // RPC already in the queue.
+ // - QUEUE_SUCCESS if 'call' was enqueued.
+ //
+ // In the case of a 'QUEUE_SUCCESS' response, the new element may have bumped
+ // another call out of the queue. In that case, *evicted will be set to the
+ // call that was bumped.
+ QueueStatus Put(InboundCall* call, boost::optional<InboundCall*>* evicted);
+
+ // Shut down the queue.
+ // When a blocking queue is shut down, no more elements can be added to it,
+ // and Put() will return QUEUE_SHUTDOWN.
+ // Existing elements will drain out of it, and then BlockingGet will start
+ // returning false.
+ void Shutdown();
+
+ bool empty() const;
+
+ int max_size() const;
+
+ std::string ToString() const;
+
+ // Return an estimate of the current queue length.
+ int estimated_queue_length() const {
+ ANNOTATE_IGNORE_READS_BEGIN();
+ // The C++ standard says that std::multiset::size must be constant time,
+ // so this method won't try to traverse any actual nodes of the underlying
+ // RB tree. Investigation of the libstdcxx implementation confirms that
+ // size() is a simple field access of the _Rb_tree structure.
+ int ret = queue_.size();
+ ANNOTATE_IGNORE_READS_END();
+ return ret;
+ }
+
+ // Return an estimate of the number of idle threads currently awaiting work.
+ int estimated_idle_worker_count() const {
+ ANNOTATE_IGNORE_READS_BEGIN();
+ // Size of a vector is a simple field access so this is safe.
+ int ret = waiting_consumers_.size();
+ ANNOTATE_IGNORE_READS_END();
+ return ret;
+ }
+
+ private:
+ // Comparison function which orders calls by their deadlines.
+ static bool DeadlineLess(const InboundCall* a,
+ const InboundCall* b) {
+ auto time_a = a->GetClientDeadline();
+ auto time_b = b->GetClientDeadline();
+ if (time_a == time_b) {
+ // If two calls have the same deadline (most likely because neither one specified
+ // one) then we should order them by arrival order.
+ time_a = a->GetTimeReceived();
+ time_b = b->GetTimeReceived();
+ }
+ return time_a < time_b;
+ }
+
+ // Struct functor wrapper for DeadlineLess.
+ struct DeadlineLessStruct {
+ bool operator()(const InboundCall* a, const InboundCall* b) const {
+ return DeadlineLess(a, b);
+ }
+ };
+
+ // The thread-local record corresponding to a single consumer thread.
+ // Threads push this record onto the waiting_consumers_ stack when
+ // they are awaiting work. Producers pop the top waiting consumer and
+ // post work using Post().
+ class ConsumerState {
+ public:
+ explicit ConsumerState(LifoServiceQueue* queue) :
+ cond_(&lock_),
+ call_(nullptr),
+ should_wake_(false),
+ bound_queue_(queue) {
+ }
+
+ void Post(InboundCall* call) {
+ DCHECK(call_ == nullptr);
+ MutexLock l(lock_);
+ call_ = call;
+ should_wake_ = true;
+ cond_.Signal();
+ }
+
+ InboundCall* Wait() {
+ MutexLock l(lock_);
+ while (should_wake_ == false) {
+ cond_.Wait();
+ }
+ should_wake_ = false;
+ InboundCall* ret = call_;
+ call_ = nullptr;
+ return ret;
+ }
+
+ void DCheckBoundInstance(LifoServiceQueue* q) {
+ DCHECK_EQ(q, bound_queue_);
+ }
+
+ private:
+ Mutex lock_;
+ ConditionVariable cond_;
+ InboundCall* call_;
+ bool should_wake_;
+
+ // For the purpose of assertions, tracks the LifoServiceQueue instance that
+ // this consumer is reading from.
+ LifoServiceQueue* bound_queue_;
+ };
+
+ static __thread ConsumerState* tl_consumer_;
+
+ mutable simple_spinlock lock_;
+ bool shutdown_;
+ int max_queue_size_;
+
+ // Stack of consumer threads which are currently waiting for work.
+ std::vector<ConsumerState*> waiting_consumers_;
+
+ // The actual queue. Work is only added to the queue when there were no
+ // consumers available for a "direct hand-off".
+ std::multiset<InboundCall*, DeadlineLessStruct> queue_;
+
+ // The total set of consumers who have ever accessed this queue.
+ std::vector<std::unique_ptr<ConsumerState>> consumers_;
+
+ DISALLOW_COPY_AND_ASSIGN(LifoServiceQueue);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/transfer.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.cc b/be/src/kudu/rpc/transfer.cc
new file mode 100644
index 0000000..d24e94d
--- /dev/null
+++ b/be/src/kudu/rpc/transfer.cc
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/transfer.h"
+
+#include <stdint.h>
+
+#include <iostream>
+#include <sstream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+
+DEFINE_int32(rpc_max_message_size, (50 * 1024 * 1024),
+ "The maximum size of a message that any RPC that the server will accept. "
+ "Must be at least 1MB.");
+TAG_FLAG(rpc_max_message_size, advanced);
+TAG_FLAG(rpc_max_message_size, runtime);
+
+static bool ValidateMaxMessageSize(const char* flagname, int32_t value) {
+ if (value < 1 * 1024 * 1024) {
+ LOG(ERROR) << flagname << " must be at least 1MB.";
+ return false;
+ }
+ return true;
+}
+static bool dummy = google::RegisterFlagValidator(
+ &FLAGS_rpc_max_message_size, &ValidateMaxMessageSize);
+
+namespace kudu {
+namespace rpc {
+
+using std::ostringstream;
+using std::set;
+using std::string;
+using strings::Substitute;
+
+#define RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status) \
+ if (PREDICT_FALSE(!status.ok())) { \
+ if (Socket::IsTemporarySocketError(status.posix_code())) { \
+ return Status::OK(); /* EAGAIN, etc. */ \
+ } \
+ return status; \
+ }
+
+TransferCallbacks::~TransferCallbacks()
+{}
+
+InboundTransfer::InboundTransfer()
+ : total_length_(kMsgLengthPrefixLength),
+ cur_offset_(0) {
+ buf_.resize(kMsgLengthPrefixLength);
+}
+
+Status InboundTransfer::ReceiveBuffer(Socket &socket) {
+ if (cur_offset_ < kMsgLengthPrefixLength) {
+ // receive int32 length prefix
+ int32_t rem = kMsgLengthPrefixLength - cur_offset_;
+ int32_t nread;
+ Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
+ RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+ if (nread == 0) {
+ return Status::OK();
+ }
+ DCHECK_GE(nread, 0);
+ cur_offset_ += nread;
+ if (cur_offset_ < kMsgLengthPrefixLength) {
+ // If we still don't have the full length prefix, we can't continue
+ // reading yet.
+ return Status::OK();
+ }
+ // Since we only read 'rem' bytes above, we should now have exactly
+ // the length prefix in our buffer and no more.
+ DCHECK_EQ(cur_offset_, kMsgLengthPrefixLength);
+
+ // The length prefix doesn't include its own 4 bytes, so we have to
+ // add that back in.
+ total_length_ = NetworkByteOrder::Load32(&buf_[0]) + kMsgLengthPrefixLength;
+ if (total_length_ > FLAGS_rpc_max_message_size) {
+ return Status::NetworkError(Substitute(
+ "RPC frame had a length of $0, but we only support messages up to $1 bytes "
+ "long.", total_length_, FLAGS_rpc_max_message_size));
+ }
+ if (total_length_ <= kMsgLengthPrefixLength) {
+ return Status::NetworkError(Substitute("RPC frame had invalid length of $0",
+ total_length_));
+ }
+ buf_.resize(total_length_);
+
+ // Fall through to receive the message body, which is likely to be already
+ // available on the socket.
+ }
+
+ // receive message body
+ int32_t nread;
+ int32_t rem = total_length_ - cur_offset_;
+ Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
+ RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+ cur_offset_ += nread;
+
+ return Status::OK();
+}
+
+bool InboundTransfer::TransferStarted() const {
+ return cur_offset_ != 0;
+}
+
+bool InboundTransfer::TransferFinished() const {
+ return cur_offset_ == total_length_;
+}
+
+string InboundTransfer::StatusAsString() const {
+ return Substitute("$0/$1 bytes received", cur_offset_, total_length_);
+}
+
+OutboundTransfer* OutboundTransfer::CreateForCallRequest(
+ int32_t call_id,
+ const std::vector<Slice> &payload,
+ TransferCallbacks *callbacks) {
+ return new OutboundTransfer(call_id, payload, callbacks);
+}
+
+OutboundTransfer* OutboundTransfer::CreateForCallResponse(const std::vector<Slice> &payload,
+ TransferCallbacks *callbacks) {
+ return new OutboundTransfer(kInvalidCallId, payload, callbacks);
+}
+
+
+OutboundTransfer::OutboundTransfer(int32_t call_id,
+ const std::vector<Slice> &payload,
+ TransferCallbacks *callbacks)
+ : cur_slice_idx_(0),
+ cur_offset_in_slice_(0),
+ callbacks_(callbacks),
+ call_id_(call_id),
+ aborted_(false) {
+ CHECK(!payload.empty());
+
+ n_payload_slices_ = payload.size();
+ CHECK_LE(n_payload_slices_, arraysize(payload_slices_));
+ for (int i = 0; i < payload.size(); i++) {
+ payload_slices_[i] = payload[i];
+ }
+}
+
+OutboundTransfer::~OutboundTransfer() {
+ if (!TransferFinished() && !aborted_) {
+ callbacks_->NotifyTransferAborted(
+ Status::RuntimeError("RPC transfer destroyed before it finished sending"));
+ }
+}
+
+void OutboundTransfer::Abort(const Status &status) {
+ CHECK(!aborted_) << "Already aborted";
+ CHECK(!TransferFinished()) << "Cannot abort a finished transfer";
+ callbacks_->NotifyTransferAborted(status);
+ aborted_ = true;
+}
+
+Status OutboundTransfer::SendBuffer(Socket &socket) {
+ CHECK_LT(cur_slice_idx_, n_payload_slices_);
+
+ int n_iovecs = n_payload_slices_ - cur_slice_idx_;
+ struct iovec iovec[n_iovecs];
+ {
+ int offset_in_slice = cur_offset_in_slice_;
+ for (int i = 0; i < n_iovecs; i++) {
+ Slice &slice = payload_slices_[cur_slice_idx_ + i];
+ iovec[i].iov_base = slice.mutable_data() + offset_in_slice;
+ iovec[i].iov_len = slice.size() - offset_in_slice;
+
+ offset_in_slice = 0;
+ }
+ }
+
+ int32_t written;
+ Status status = socket.Writev(iovec, n_iovecs, &written);
+ RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+
+ // Adjust our accounting of current writer position.
+ for (int i = cur_slice_idx_; i < n_payload_slices_; i++) {
+ Slice &slice = payload_slices_[i];
+ int rem_in_slice = slice.size() - cur_offset_in_slice_;
+ DCHECK_GE(rem_in_slice, 0);
+
+ if (written >= rem_in_slice) {
+ // Used up this entire slice, advance to the next slice.
+ cur_slice_idx_++;
+ cur_offset_in_slice_ = 0;
+ written -= rem_in_slice;
+ } else {
+ // Partially used up this slice, just advance the offset within it.
+ cur_offset_in_slice_ += written;
+ break;
+ }
+ }
+
+ if (cur_slice_idx_ == n_payload_slices_) {
+ callbacks_->NotifyTransferFinished();
+ DCHECK_EQ(0, cur_offset_in_slice_);
+ } else {
+ DCHECK_LT(cur_slice_idx_, n_payload_slices_);
+ DCHECK_LT(cur_offset_in_slice_, payload_slices_[cur_slice_idx_].size());
+ }
+
+ return Status::OK();
+}
+
+bool OutboundTransfer::TransferStarted() const {
+ return cur_offset_in_slice_ != 0 || cur_slice_idx_ != 0;
+}
+
+bool OutboundTransfer::TransferFinished() const {
+ if (cur_slice_idx_ == n_payload_slices_) {
+ DCHECK_EQ(0, cur_offset_in_slice_); // sanity check
+ return true;
+ }
+ return false;
+}
+
+string OutboundTransfer::HexDump() const {
+ if (KUDU_SHOULD_REDACT()) {
+ return kRedactionMessage;
+ }
+
+ string ret;
+ for (int i = 0; i < n_payload_slices_; i++) {
+ ret.append(payload_slices_[i].ToDebugString());
+ }
+ return ret;
+}
+
+int32_t OutboundTransfer::TotalLength() const {
+ int32_t ret = 0;
+ for (int i = 0; i < n_payload_slices_; i++) {
+ ret += payload_slices_[i].size();
+ }
+ return ret;
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.h b/be/src/kudu/rpc/transfer.h
new file mode 100644
index 0000000..671347a
--- /dev/null
+++ b/be/src/kudu/rpc/transfer.h
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_TRANSFER_H
+#define KUDU_RPC_TRANSFER_H
+
+#include <boost/intrusive/list.hpp>
+#include <gflags/gflags.h>
+#include <set>
+#include <stdint.h>
+#include <string>
+#include <vector>
+
+#include "kudu/rpc/constants.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+
+DECLARE_int32(rpc_max_message_size);
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Socket;
+
+namespace rpc {
+
+class Messenger;
+struct TransferCallbacks;
+
+class TransferLimits {
+ public:
+ enum {
+ kMaxSidecars = 10,
+ kMaxPayloadSlices = kMaxSidecars + 2 // (header + msg)
+ };
+
+ DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
+};
+
+// This class is used internally by the RPC layer to represent an inbound
+// transfer in progress.
+//
+// Inbound Transfer objects are created by a Connection receiving data. When the
+// message is fully received, it is either parsed as a call, or a call response,
+// and the InboundTransfer object itself is handed off.
+class InboundTransfer {
+ public:
+
+ InboundTransfer();
+
+ // read from the socket into our buffer
+ Status ReceiveBuffer(Socket &socket);
+
+ // Return true if any bytes have yet been sent.
+ bool TransferStarted() const;
+
+ // Return true if the entire transfer has been sent.
+ bool TransferFinished() const;
+
+ Slice data() const {
+ return Slice(buf_);
+ }
+
+ // Return a string indicating the status of this transfer (number of bytes received, etc)
+ // suitable for logging.
+ std::string StatusAsString() const;
+
+ private:
+
+ Status ProcessInboundHeader();
+
+ faststring buf_;
+
+ int32_t total_length_;
+ int32_t cur_offset_;
+
+ DISALLOW_COPY_AND_ASSIGN(InboundTransfer);
+};
+
+// When the connection wants to send data, it creates an OutboundTransfer object
+// to encompass it. This sits on a queue within the Connection, so that each time
+// the Connection wakes up with a writable socket, it consumes more bytes off
+// the next pending transfer in the queue.
+//
+// Upon completion of the transfer, a callback is triggered.
+class OutboundTransfer : public boost::intrusive::list_base_hook<> {
+ public:
+ // Factory methods for creating transfers associated with call requests
+ // or responses. The 'payload' slices will be concatenated and
+ // written to the socket. When the transfer completes or errors, the
+ // appropriate method of 'callbacks' is invoked.
+ //
+ // Does not take ownership of the callbacks object or the underlying
+ // memory of the slices. The slices must remain valid until the callback
+ // is triggered.
+ //
+ // NOTE: 'payload' is currently restricted to a maximum of kMaxPayloadSlices
+ // slices.
+ // ------------------------------------------------------------
+
+ // Create an outbound transfer for a call request.
+ static OutboundTransfer* CreateForCallRequest(int32_t call_id,
+ const std::vector<Slice> &payload,
+ TransferCallbacks *callbacks);
+
+ // Create an outbound transfer for a call response.
+ // See above for details.
+ static OutboundTransfer* CreateForCallResponse(const std::vector<Slice> &payload,
+ TransferCallbacks *callbacks);
+
+ // Destruct the transfer. A transfer object should never be deallocated
+ // before it has either (a) finished transferring, or (b) been Abort()ed.
+ ~OutboundTransfer();
+
+ // Abort the current transfer, with the given status.
+ // This triggers TransferCallbacks::NotifyTransferAborted.
+ void Abort(const Status &status);
+
+ // send from our buffers into the sock
+ Status SendBuffer(Socket &socket);
+
+ // Return true if any bytes have yet been sent.
+ bool TransferStarted() const;
+
+ // Return true if the entire transfer has been sent.
+ bool TransferFinished() const;
+
+ // Return the total number of bytes to be sent (including those already sent)
+ int32_t TotalLength() const;
+
+ std::string HexDump() const;
+
+ bool is_for_outbound_call() const {
+ return call_id_ != kInvalidCallId;
+ }
+
+ // Returns the call ID for a transfer associated with an outbound
+ // call. Must not be called for call responses.
+ int32_t call_id() const {
+ DCHECK_NE(call_id_, kInvalidCallId);
+ return call_id_;
+ }
+
+ private:
+ OutboundTransfer(int32_t call_id,
+ const std::vector<Slice> &payload,
+ TransferCallbacks *callbacks);
+
+ // Slices to send. Uses an array here instead of a vector to avoid an expensive
+ // vector construction (improved performance a couple percent).
+ Slice payload_slices_[TransferLimits::kMaxPayloadSlices];
+ size_t n_payload_slices_;
+
+ // The current slice that is being sent.
+ int32_t cur_slice_idx_;
+ // The number of bytes in the above slice which has already been sent.
+ int32_t cur_offset_in_slice_;
+
+ TransferCallbacks *callbacks_;
+
+ // In the case of outbound calls, the associated call ID.
+ // In the case of call responses, kInvalidCallId
+ int32_t call_id_;
+
+ bool aborted_;
+
+ DISALLOW_COPY_AND_ASSIGN(OutboundTransfer);
+};
+
+// Callbacks made after a transfer completes.
+struct TransferCallbacks {
+ public:
+ virtual ~TransferCallbacks();
+
+ // The transfer finished successfully.
+ virtual void NotifyTransferFinished() = 0;
+
+ // The transfer was aborted (e.g because the connection died or an error occurred).
+ virtual void NotifyTransferAborted(const Status &status) = 0;
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/user_credentials.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/user_credentials.cc b/be/src/kudu/rpc/user_credentials.cc
new file mode 100644
index 0000000..fdc3ac2
--- /dev/null
+++ b/be/src/kudu/rpc/user_credentials.cc
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/user_credentials.h"
+
+#include <string>
+
+#include <boost/functional/hash.hpp>
+
+#include "kudu/gutil/strings/substitute.h"
+
+using std::string;
+
+namespace kudu {
+namespace rpc {
+
+bool UserCredentials::has_real_user() const {
+ return !real_user_.empty();
+}
+
+void UserCredentials::set_real_user(const string& real_user) {
+ real_user_ = real_user;
+}
+
+string UserCredentials::ToString() const {
+ // Does not print the password.
+ return strings::Substitute("{real_user=$0}", real_user_);
+}
+
+size_t UserCredentials::HashCode() const {
+ size_t seed = 0;
+ if (has_real_user()) {
+ boost::hash_combine(seed, real_user());
+ }
+ return seed;
+}
+
+bool UserCredentials::Equals(const UserCredentials& other) const {
+ return real_user() == other.real_user();
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/user_credentials.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/user_credentials.h b/be/src/kudu/rpc/user_credentials.h
new file mode 100644
index 0000000..56af70a
--- /dev/null
+++ b/be/src/kudu/rpc/user_credentials.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+namespace kudu {
+namespace rpc {
+
+// Client-side user credentials. Currently this is more-or-less a simple wrapper
+// around a username string. However, we anticipate moving more credentials such as
+// tokens into a per-Proxy structure rather than Messenger-wide, and this will
+// be the place to store them.
+class UserCredentials {
+ public:
+ // Real user.
+ bool has_real_user() const;
+ void set_real_user(const std::string& real_user);
+ const std::string& real_user() const { return real_user_; }
+
+ // Returns a string representation of the object.
+ std::string ToString() const;
+
+ std::size_t HashCode() const;
+ bool Equals(const UserCredentials& other) const;
+
+ private:
+ // Remember to update HashCode() and Equals() when new fields are added.
+ std::string real_user_;
+};
+
+} // namespace rpc
+} // namespace kudu