You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/17 03:14:22 UTC

[06/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC library from kudu@314c9d8

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_if.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_if.cc b/be/src/kudu/rpc/service_if.cc
new file mode 100644
index 0000000..39e9ab5
--- /dev/null
+++ b/be/src/kudu/rpc/service_if.cc
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/service_if.h"
+
+#include <memory>
+#include <string>
+#include <google/protobuf/descriptor.pb.h>
+
+#include "kudu/gutil/strings/substitute.h"
+
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/rpc_context.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/flag_tags.h"
+
+// TODO remove this once we have fully cluster-tested this.
+// Despite being on by default, this is left in in case we discover
+// any issues in 0.10.0, we'll have an easy workaround to disable the feature.
+DEFINE_bool(enable_exactly_once, true, "Whether to enable exactly once semantics.");
+TAG_FLAG(enable_exactly_once, hidden);
+
+using google::protobuf::Message;
+using std::string;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+ServiceIf::~ServiceIf() {
+}
+
+void ServiceIf::Shutdown() {
+}
+
+bool ServiceIf::SupportsFeature(uint32_t feature) const {
+  return false;
+}
+
+bool ServiceIf::ParseParam(InboundCall *call, google::protobuf::Message *message) {
+  Slice param(call->serialized_request());
+  if (PREDICT_FALSE(!message->ParseFromArray(param.data(), param.size()))) {
+    string err = Substitute("invalid parameter for call $0: missing fields: $1",
+                            call->remote_method().ToString(),
+                            message->InitializationErrorString().c_str());
+    LOG(WARNING) << err;
+    call->RespondFailure(ErrorStatusPB::ERROR_INVALID_REQUEST,
+                         Status::InvalidArgument(err));
+    return false;
+  }
+  return true;
+}
+
+void ServiceIf::RespondBadMethod(InboundCall *call) {
+  Sockaddr local_addr, remote_addr;
+
+  CHECK_OK(call->connection()->socket()->GetSocketAddress(&local_addr));
+  CHECK_OK(call->connection()->socket()->GetPeerAddress(&remote_addr));
+  string err = Substitute("Call on service $0 received at $1 from $2 with an "
+                          "invalid method name: $3",
+                          call->remote_method().service_name(),
+                          local_addr.ToString(),
+                          remote_addr.ToString(),
+                          call->remote_method().method_name());
+  LOG(WARNING) << err;
+  call->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_METHOD,
+                       Status::InvalidArgument(err));
+}
+
+GeneratedServiceIf::~GeneratedServiceIf() {
+}
+
+
+void GeneratedServiceIf::Handle(InboundCall *call) {
+  const RpcMethodInfo* method_info = call->method_info();
+  if (!method_info) {
+    RespondBadMethod(call);
+    return;
+  }
+  unique_ptr<Message> req(method_info->req_prototype->New());
+  if (PREDICT_FALSE(!ParseParam(call, req.get()))) {
+    return;
+  }
+  Message* resp = method_info->resp_prototype->New();
+
+  bool track_result = call->header().has_request_id()
+                      && method_info->track_result
+                      && FLAGS_enable_exactly_once;
+  RpcContext* ctx = new RpcContext(call,
+                                   req.release(),
+                                   resp,
+                                   track_result ? result_tracker_ : nullptr);
+  if (!method_info->authz_method(ctx->request_pb(), resp, ctx)) {
+    // The authz_method itself should have responded to the RPC.
+    return;
+  }
+
+  if (track_result) {
+    RequestIdPB request_id(call->header().request_id());
+    ResultTracker::RpcState state = ctx->result_tracker()->TrackRpc(
+        call->header().request_id(),
+        resp,
+        ctx);
+    switch (state) {
+      case ResultTracker::NEW:
+        // Fall out of the 'if' statement to the normal path.
+        break;
+      case ResultTracker::COMPLETED:
+      case ResultTracker::IN_PROGRESS:
+      case ResultTracker::STALE:
+        // ResultTracker has already responded to the RPC and deleted
+        // 'ctx'.
+        return;
+      default:
+        LOG(FATAL) << "Unknown state: " << state;
+    }
+  }
+  method_info->func(ctx->request_pb(), resp, ctx);
+}
+
+
+RpcMethodInfo* GeneratedServiceIf::LookupMethod(const RemoteMethod& method) {
+  DCHECK_EQ(method.service_name(), service_name());
+  const auto& it = methods_by_name_.find(method.method_name());
+  if (PREDICT_FALSE(it == methods_by_name_.end())) {
+    return nullptr;
+  }
+  return it->second.get();
+}
+
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_if.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_if.h b/be/src/kudu/rpc/service_if.h
new file mode 100644
index 0000000..a3722c6
--- /dev/null
+++ b/be/src/kudu/rpc/service_if.h
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_SERVICE_IF_H
+#define KUDU_RPC_SERVICE_IF_H
+
+#include <unordered_map>
+#include <string>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/rpc/result_tracker.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+}
+}
+
+namespace kudu {
+
+class Histogram;
+
+namespace rpc {
+
+class InboundCall;
+class RemoteMethod;
+class RpcContext;
+class ServiceIf;
+
+// Generated services define an instance of this class for each
+// method that they implement. The generic server code implemented
+// by GeneratedServiceIf look up the RpcMethodInfo in order to handle
+// each RPC.
+struct RpcMethodInfo : public RefCountedThreadSafe<RpcMethodInfo> {
+  // Prototype protobufs for requests and responses.
+  // These are empty protobufs which are cloned in order to provide an
+  // instance for each request.
+  std::unique_ptr<google::protobuf::Message> req_prototype;
+  std::unique_ptr<google::protobuf::Message> resp_prototype;
+
+  scoped_refptr<Histogram> handler_latency_histogram;
+
+  // Whether we should track this method's result, using ResultTracker.
+  bool track_result;
+
+  // The authorization function for this RPC. If this function
+  // returns false, the RPC has already been handled (i.e. rejected)
+  // by the authorization function.
+  std::function<bool(const google::protobuf::Message* req,
+                     google::protobuf::Message* resp,
+                     RpcContext* ctx)> authz_method;
+
+  // The actual function to be called.
+  std::function<void(const google::protobuf::Message* req,
+                     google::protobuf::Message* resp,
+                     RpcContext* ctx)> func;
+};
+
+// Handles incoming messages that initiate an RPC.
+class ServiceIf {
+ public:
+  virtual ~ServiceIf();
+  virtual void Handle(InboundCall* incoming) = 0;
+  virtual void Shutdown();
+  virtual std::string service_name() const = 0;
+
+  // The service should return true if it supports the provided application
+  // specific feature flag.
+  virtual bool SupportsFeature(uint32_t feature) const;
+
+  // Look up the method being requested by the remote call.
+  //
+  // If this returns nullptr, then certain functionality like
+  // metrics collection will not be performed for this call.
+  virtual RpcMethodInfo* LookupMethod(const RemoteMethod& method) {
+    return nullptr;
+  }
+
+  // Default authorization method, which just allows all RPCs.
+  //
+  // See docs/design-docs/rpc.md for details on how to add custom
+  // authorization checks to a service.
+  bool AuthorizeAllowAll(const google::protobuf::Message* /*req*/,
+                         google::protobuf::Message* /*resp*/,
+                         RpcContext* /*ctx*/) {
+    return true;
+  }
+
+ protected:
+  bool ParseParam(InboundCall* call, google::protobuf::Message* message);
+  void RespondBadMethod(InboundCall* call);
+};
+
+
+// Base class for code-generated service classes.
+class GeneratedServiceIf : public ServiceIf {
+ public:
+  virtual ~GeneratedServiceIf();
+
+  // Looks up the appropriate method in 'methods_by_name_' and executes
+  // it on the current thread.
+  //
+  // If no such method is found, responds with an error.
+  void Handle(InboundCall* incoming) override;
+
+  RpcMethodInfo* LookupMethod(const RemoteMethod& method) override;
+
+ protected:
+  // For each method, stores the relevant information about how to handle the
+  // call. Methods are inserted by the constructor of the generated subclass.
+  // After construction, this map is accessed by multiple threads and therefore
+  // must not be modified.
+  std::unordered_map<std::string, scoped_refptr<RpcMethodInfo>> methods_by_name_;
+
+  // The result tracker for this service's methods.
+  scoped_refptr<ResultTracker> result_tracker_;
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_pool.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_pool.cc b/be/src/kudu/rpc/service_pool.cc
new file mode 100644
index 0000000..1a23ca9
--- /dev/null
+++ b/be/src/kudu/rpc/service_pool.cc
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/service_pool.h"
+
+#include <glog/logging.h>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/rpc/service_queue.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/trace.h"
+
+using std::shared_ptr;
+using strings::Substitute;
+
+METRIC_DEFINE_histogram(server, rpc_incoming_queue_time,
+                        "RPC Queue Time",
+                        kudu::MetricUnit::kMicroseconds,
+                        "Number of microseconds incoming RPC requests spend in the worker queue",
+                        60000000LU, 3);
+
+METRIC_DEFINE_counter(server, rpcs_timed_out_in_queue,
+                      "RPC Queue Timeouts",
+                      kudu::MetricUnit::kRequests,
+                      "Number of RPCs whose timeout elapsed while waiting "
+                      "in the service queue, and thus were not processed.");
+
+METRIC_DEFINE_counter(server, rpcs_queue_overflow,
+                      "RPC Queue Overflows",
+                      kudu::MetricUnit::kRequests,
+                      "Number of RPCs dropped because the service queue "
+                      "was full.");
+
+namespace kudu {
+namespace rpc {
+
+ServicePool::ServicePool(gscoped_ptr<ServiceIf> service,
+                         const scoped_refptr<MetricEntity>& entity,
+                         size_t service_queue_length)
+  : service_(std::move(service)),
+    service_queue_(service_queue_length),
+    incoming_queue_time_(METRIC_rpc_incoming_queue_time.Instantiate(entity)),
+    rpcs_timed_out_in_queue_(METRIC_rpcs_timed_out_in_queue.Instantiate(entity)),
+    rpcs_queue_overflow_(METRIC_rpcs_queue_overflow.Instantiate(entity)),
+    closing_(false) {
+}
+
+ServicePool::~ServicePool() {
+  Shutdown();
+}
+
+Status ServicePool::Init(int num_threads) {
+  for (int i = 0; i < num_threads; i++) {
+    scoped_refptr<kudu::Thread> new_thread;
+    CHECK_OK(kudu::Thread::Create("service pool", "rpc worker",
+        &ServicePool::RunThread, this, &new_thread));
+    threads_.push_back(new_thread);
+  }
+  return Status::OK();
+}
+
+void ServicePool::Shutdown() {
+  service_queue_.Shutdown();
+
+  MutexLock lock(shutdown_lock_);
+  if (closing_) return;
+  closing_ = true;
+  // TODO: Use a proper thread pool implementation.
+  for (scoped_refptr<kudu::Thread>& thread : threads_) {
+    CHECK_OK(ThreadJoiner(thread.get()).Join());
+  }
+
+  // Now we must drain the service queue.
+  Status status = Status::ServiceUnavailable("Service is shutting down");
+  std::unique_ptr<InboundCall> incoming;
+  while (service_queue_.BlockingGet(&incoming)) {
+    incoming.release()->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
+  }
+
+  service_->Shutdown();
+}
+
+void ServicePool::RejectTooBusy(InboundCall* c) {
+  string err_msg =
+      Substitute("$0 request on $1 from $2 dropped due to backpressure. "
+                 "The service queue is full; it has $3 items.",
+                 c->remote_method().method_name(),
+                 service_->service_name(),
+                 c->remote_address().ToString(),
+                 service_queue_.max_size());
+  rpcs_queue_overflow_->Increment();
+  KLOG_EVERY_N_SECS(WARNING, 1) << err_msg;
+  c->RespondFailure(ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+                    Status::ServiceUnavailable(err_msg));
+  DLOG(INFO) << err_msg << " Contents of service queue:\n"
+             << service_queue_.ToString();
+}
+
+RpcMethodInfo* ServicePool::LookupMethod(const RemoteMethod& method) {
+  return service_->LookupMethod(method);
+}
+
+Status ServicePool::QueueInboundCall(gscoped_ptr<InboundCall> call) {
+  InboundCall* c = call.release();
+
+  vector<uint32_t> unsupported_features;
+  for (uint32_t feature : c->GetRequiredFeatures()) {
+    if (!service_->SupportsFeature(feature)) {
+      unsupported_features.push_back(feature);
+    }
+  }
+
+  if (!unsupported_features.empty()) {
+    c->RespondUnsupportedFeature(unsupported_features);
+    return Status::NotSupported("call requires unsupported application feature flags",
+                                JoinMapped(unsupported_features,
+                                           [] (uint32_t flag) { return std::to_string(flag); },
+                                           ", "));
+  }
+
+  TRACE_TO(c->trace(), "Inserting onto call queue");
+
+  // Queue message on service queue
+  boost::optional<InboundCall*> evicted;
+  auto queue_status = service_queue_.Put(c, &evicted);
+  if (queue_status == QUEUE_FULL) {
+    RejectTooBusy(c);
+    return Status::OK();
+  }
+
+  if (PREDICT_FALSE(evicted != boost::none)) {
+    RejectTooBusy(*evicted);
+  }
+
+  if (PREDICT_TRUE(queue_status == QUEUE_SUCCESS)) {
+    // NB: do not do anything with 'c' after it is successfully queued --
+    // a service thread may have already dequeued it, processed it, and
+    // responded by this point, in which case the pointer would be invalid.
+    return Status::OK();
+  }
+
+  Status status = Status::OK();
+  if (queue_status == QUEUE_SHUTDOWN) {
+    status = Status::ServiceUnavailable("Service is shutting down");
+    c->RespondFailure(ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
+  } else {
+    status = Status::RuntimeError(Substitute("Unknown error from BlockingQueue: $0", queue_status));
+    c->RespondFailure(ErrorStatusPB::FATAL_UNKNOWN, status);
+  }
+  return status;
+}
+
+void ServicePool::RunThread() {
+  while (true) {
+    std::unique_ptr<InboundCall> incoming;
+    if (!service_queue_.BlockingGet(&incoming)) {
+      VLOG(1) << "ServicePool: messenger shutting down.";
+      return;
+    }
+
+    incoming->RecordHandlingStarted(incoming_queue_time_);
+    ADOPT_TRACE(incoming->trace());
+
+    if (PREDICT_FALSE(incoming->ClientTimedOut())) {
+      TRACE_TO(incoming->trace(), "Skipping call since client already timed out");
+      rpcs_timed_out_in_queue_->Increment();
+
+      // Respond as a failure, even though the client will probably ignore
+      // the response anyway.
+      incoming->RespondFailure(
+        ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+        Status::TimedOut("Call waited in the queue past client deadline"));
+
+      // Must release since RespondFailure above ends up taking ownership
+      // of the object.
+      ignore_result(incoming.release());
+      continue;
+    }
+
+    TRACE_TO(incoming->trace(), "Handling call");
+
+    // Release the InboundCall pointer -- when the call is responded to,
+    // it will get deleted at that point.
+    service_->Handle(incoming.release());
+  }
+}
+
+const string ServicePool::service_name() const {
+  return service_->service_name();
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_pool.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_pool.h b/be/src/kudu/rpc/service_pool.h
new file mode 100644
index 0000000..70611c8
--- /dev/null
+++ b/be/src/kudu/rpc/service_pool.h
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_SERVICE_POOL_H
+#define KUDU_SERVICE_POOL_H
+
+#include <string>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/service_queue.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Counter;
+class Histogram;
+class MetricEntity;
+class Socket;
+
+namespace rpc {
+
+class Messenger;
+class ServiceIf;
+
+// A pool of threads that handle new incoming RPC calls.
+// Also includes a queue that calls get pushed onto for handling by the pool.
+class ServicePool : public RpcService {
+ public:
+  ServicePool(gscoped_ptr<ServiceIf> service,
+              const scoped_refptr<MetricEntity>& metric_entity,
+              size_t service_queue_length);
+  virtual ~ServicePool();
+
+  // Start up the thread pool.
+  virtual Status Init(int num_threads);
+
+  // Shut down the queue and the thread pool.
+  virtual void Shutdown();
+
+  RpcMethodInfo* LookupMethod(const RemoteMethod& method) override;
+
+  virtual Status QueueInboundCall(gscoped_ptr<InboundCall> call) OVERRIDE;
+
+  const Counter* RpcsTimedOutInQueueMetricForTests() const {
+    return rpcs_timed_out_in_queue_.get();
+  }
+
+  const Histogram* IncomingQueueTimeMetricForTests() const {
+    return incoming_queue_time_.get();
+  }
+
+  const Counter* RpcsQueueOverflowMetric() const {
+    return rpcs_queue_overflow_.get();
+  }
+
+  const std::string service_name() const;
+
+ private:
+  void RunThread();
+  void RejectTooBusy(InboundCall* c);
+
+  gscoped_ptr<ServiceIf> service_;
+  std::vector<scoped_refptr<kudu::Thread> > threads_;
+  LifoServiceQueue service_queue_;
+  scoped_refptr<Histogram> incoming_queue_time_;
+  scoped_refptr<Counter> rpcs_timed_out_in_queue_;
+  scoped_refptr<Counter> rpcs_queue_overflow_;
+
+  mutable Mutex shutdown_lock_;
+  bool closing_;
+
+  DISALLOW_COPY_AND_ASSIGN(ServicePool);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_queue-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_queue-test.cc b/be/src/kudu/rpc/service_queue-test.cc
new file mode 100644
index 0000000..0bcbd12
--- /dev/null
+++ b/be/src/kudu/rpc/service_queue-test.cc
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include <atomic>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+#include <memory>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include "kudu/rpc/service_queue.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_util.h"
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+DEFINE_int32(num_producers, 4,
+             "Number of producer threads");
+
+DEFINE_int32(num_consumers, 20,
+             "Number of consumer threads");
+
+DEFINE_int32(max_queue_size, 50,
+             "Max queue length");
+
+namespace kudu {
+namespace rpc {
+
+static std::atomic<uint32_t> inprogress;
+
+static std::atomic<uint32_t> total;
+
+template <typename Queue>
+void ProducerThread(Queue* queue) {
+  int max_inprogress = FLAGS_max_queue_size - FLAGS_num_producers;
+  while (true) {
+    while (inprogress > max_inprogress) {
+      base::subtle::PauseCPU();
+    }
+    inprogress++;
+    InboundCall* call = new InboundCall(nullptr);
+    boost::optional<InboundCall*> evicted;
+    auto status = queue->Put(call, &evicted);
+    if (status == QUEUE_FULL) {
+      LOG(INFO) << "queue full: producer exiting";
+      delete call;
+      break;
+    }
+
+    if (PREDICT_FALSE(evicted != boost::none)) {
+      LOG(INFO) << "call evicted: producer exiting";
+      delete evicted.get();
+      break;
+    }
+
+    if (PREDICT_TRUE(status == QUEUE_SHUTDOWN)) {
+      delete call;
+      break;
+    }
+  }
+}
+
+template <typename Queue>
+void ConsumerThread(Queue* queue) {
+  unique_ptr<InboundCall> call;
+  while (queue->BlockingGet(&call)) {
+    inprogress--;
+    total++;
+    call.reset();
+  }
+}
+
+TEST(TestServiceQueue, LifoServiceQueuePerf) {
+  LifoServiceQueue queue(FLAGS_max_queue_size);
+  vector<std::thread> producers;
+  vector<std::thread> consumers;
+
+  for (int i = 0; i < FLAGS_num_producers; i++) {
+    producers.emplace_back(&ProducerThread<LifoServiceQueue>, &queue);
+  }
+
+  for (int i = 0; i < FLAGS_num_consumers; i++) {
+    consumers.emplace_back(&ConsumerThread<LifoServiceQueue>, &queue);
+  }
+
+  int seconds = AllowSlowTests() ? 10 : 1;
+  uint64_t total_sample = 0;
+  uint64_t total_queue_len = 0;
+  uint64_t total_idle_workers = 0;
+  Stopwatch sw(Stopwatch::ALL_THREADS);
+  sw.start();
+  int32_t before = total;
+
+  for (int i = 0; i < seconds * 50; i++) {
+    SleepFor(MonoDelta::FromMilliseconds(20));
+    total_sample++;
+    total_queue_len += queue.estimated_queue_length();
+    total_idle_workers += queue.estimated_idle_worker_count();
+  }
+
+  sw.stop();
+  int32_t delta = total - before;
+
+  queue.Shutdown();
+  for (int i = 0; i < FLAGS_num_producers; i++) {
+    producers[i].join();
+  }
+  for (int i = 0; i < FLAGS_num_consumers; i++) {
+    consumers[i].join();
+  }
+
+  float reqs_per_second = static_cast<float>(delta / sw.elapsed().wall_seconds());
+  float user_cpu_micros_per_req = static_cast<float>(sw.elapsed().user / 1000.0 / delta);
+  float sys_cpu_micros_per_req = static_cast<float>(sw.elapsed().system / 1000.0 / delta);
+
+  LOG(INFO) << "Reqs/sec:         " << (int32_t)reqs_per_second;
+  LOG(INFO) << "User CPU per req: " << user_cpu_micros_per_req << "us";
+  LOG(INFO) << "Sys CPU per req:  " << sys_cpu_micros_per_req << "us";
+  LOG(INFO) << "Avg rpc queue length: " << total_queue_len / static_cast<double>(total_sample);
+  LOG(INFO) << "Avg idle workers:     " << total_idle_workers / static_cast<double>(total_sample);
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_queue.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_queue.cc b/be/src/kudu/rpc/service_queue.cc
new file mode 100644
index 0000000..9b938f6
--- /dev/null
+++ b/be/src/kudu/rpc/service_queue.cc
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/service_queue.h"
+
+#include <mutex>
+
+#include "kudu/util/logging.h"
+
+namespace kudu {
+namespace rpc {
+
+__thread LifoServiceQueue::ConsumerState* LifoServiceQueue::tl_consumer_ = nullptr;
+
+LifoServiceQueue::LifoServiceQueue(int max_size)
+   : shutdown_(false),
+     max_queue_size_(max_size) {
+  CHECK_GT(max_queue_size_, 0);
+}
+
+LifoServiceQueue::~LifoServiceQueue() {
+  DCHECK(queue_.empty())
+      << "ServiceQueue holds bare pointers at destruction time";
+}
+
+bool LifoServiceQueue::BlockingGet(std::unique_ptr<InboundCall>* out) {
+  auto consumer = tl_consumer_;
+  if (PREDICT_FALSE(!consumer)) {
+    consumer = tl_consumer_ = new ConsumerState(this);
+    std::lock_guard<simple_spinlock> l(lock_);
+    consumers_.emplace_back(consumer);
+  }
+
+  while (true) {
+    {
+      std::lock_guard<simple_spinlock> l(lock_);
+      if (!queue_.empty()) {
+        auto it = queue_.begin();
+        out->reset(*it);
+        queue_.erase(it);
+        return true;
+      }
+      if (PREDICT_FALSE(shutdown_)) {
+        return false;
+      }
+      consumer->DCheckBoundInstance(this);
+      waiting_consumers_.push_back(consumer);
+    }
+    InboundCall* call = consumer->Wait();
+    if (call != nullptr) {
+      out->reset(call);
+      return true;
+    }
+    // if call == nullptr, this means we are shutting down the queue.
+    // Loop back around and re-check 'shutdown_'.
+  }
+}
+
+QueueStatus LifoServiceQueue::Put(InboundCall* call,
+                                  boost::optional<InboundCall*>* evicted) {
+  std::unique_lock<simple_spinlock> l(lock_);
+  if (PREDICT_FALSE(shutdown_)) {
+    return QUEUE_SHUTDOWN;
+  }
+
+  DCHECK(!(waiting_consumers_.size() > 0 && queue_.size() > 0));
+
+  // fast path
+  if (queue_.empty() && waiting_consumers_.size() > 0) {
+    auto consumer = waiting_consumers_[waiting_consumers_.size() - 1];
+    waiting_consumers_.pop_back();
+    // Notify condition var(and wake up consumer thread) takes time,
+    // so put it out of spinlock scope.
+    l.unlock();
+    consumer->Post(call);
+    return QUEUE_SUCCESS;
+  }
+
+  if (PREDICT_FALSE(queue_.size() >= max_queue_size_)) {
+    // eviction
+    DCHECK_EQ(queue_.size(), max_queue_size_);
+    auto it = queue_.end();
+    --it;
+    if (DeadlineLess(*it, call)) {
+      return QUEUE_FULL;
+    }
+
+    *evicted = *it;
+    queue_.erase(it);
+  }
+
+  queue_.insert(call);
+  return QUEUE_SUCCESS;
+}
+
+void LifoServiceQueue::Shutdown() {
+  std::lock_guard<simple_spinlock> l(lock_);
+  shutdown_ = true;
+
+  // Post a nullptr to wake up any consumers which are waiting.
+  for (auto* cs : waiting_consumers_) {
+    cs->Post(nullptr);
+  }
+  waiting_consumers_.clear();
+}
+
+bool LifoServiceQueue::empty() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return queue_.empty();
+}
+
+int LifoServiceQueue::max_size() const {
+  return max_queue_size_;
+}
+
+std::string LifoServiceQueue::ToString() const {
+  std::string ret;
+
+  std::lock_guard<simple_spinlock> l(lock_);
+  for (const auto* t : queue_) {
+    ret.append(t->ToString());
+    ret.append("\n");
+  }
+  return ret;
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/service_queue.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/service_queue.h b/be/src/kudu/rpc/service_queue.h
new file mode 100644
index 0000000..d68576f
--- /dev/null
+++ b/be/src/kudu/rpc/service_queue.h
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_SERVICE_QUEUE_H
+#define KUDU_UTIL_SERVICE_QUEUE_H
+
+#include <boost/optional.hpp>
+#include <memory>
+#include <string>
+#include <set>
+#include <vector>
+
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+namespace rpc {
+
+// Return values for ServiceQueue::Put()
+enum QueueStatus {
+  QUEUE_SUCCESS = 0,
+  QUEUE_SHUTDOWN = 1,
+  QUEUE_FULL = 2
+};
+
+// Blocking queue used for passing inbound RPC calls to the service handler pool.
+// Calls are dequeued in 'earliest-deadline first' order. The queue also maintains a
+// bounded number of calls. If the queue overflows, then calls with deadlines farthest
+// in the future are evicted.
+//
+// When calls do not provide deadlines, the RPC layer considers their deadline to
+// be infinitely in the future. This means that any call that does have a deadline
+// can evict any call that does not have a deadline. This incentivizes clients to
+// provide accurate deadlines for their calls.
+//
+// In order to improve concurrent throughput, this class uses a LIFO design:
+// Each consumer thread has its own lock and condition variable. If a
+// consumer arrives and there is no work available in the queue, it will not
+// wait on the queue lock, but rather push its own 'ConsumerState' object
+// to the 'waiting_consumers_' stack. When work arrives, if there are waiting
+// consumers, the top consumer is popped from the stack and woken up.
+//
+// This design has a few advantages over the basic BlockingQueue:
+// - the worker who was most recently busy is the one which will be selected for
+//   new work. This gives an opportunity for the worker to be scheduled again
+//   without going to sleep, and also keeps CPU cache and allocator caches hot.
+// - in the common case that there are enough workers to fully service the incoming
+//   work rate, the queue implementation itself is never used. Thus, we can
+//   have a priority queue without paying extra for it in the common case.
+//
+// NOTE: because of the use of thread-local consumer records, once a consumer
+// thread accesses one LifoServiceQueue, it becomes "bound" to that queue and
+// must never access any other instance.
+class LifoServiceQueue {
+ public:
+  explicit LifoServiceQueue(int max_size);
+
+  ~LifoServiceQueue();
+
+  // Get an element from the queue.  Returns false if we were shut down prior to
+  // getting the element.
+  bool BlockingGet(std::unique_ptr<InboundCall>* out);
+
+  // Add a new call to the queue.
+  // Returns:
+  // - QUEUE_SHUTDOWN if Shutdown() has already been called.
+  // - QUEUE_FULL if the queue is full and 'call' has a later deadline than any
+  //   RPC already in the queue.
+  // - QUEUE_SUCCESS if 'call' was enqueued.
+  //
+  // In the case of a 'QUEUE_SUCCESS' response, the new element may have bumped
+  // another call out of the queue. In that case, *evicted will be set to the
+  // call that was bumped.
+  QueueStatus Put(InboundCall* call, boost::optional<InboundCall*>* evicted);
+
+  // Shut down the queue.
+  // When a blocking queue is shut down, no more elements can be added to it,
+  // and Put() will return QUEUE_SHUTDOWN.
+  // Existing elements will drain out of it, and then BlockingGet will start
+  // returning false.
+  void Shutdown();
+
+  bool empty() const;
+
+  int max_size() const;
+
+  std::string ToString() const;
+
+  // Return an estimate of the current queue length.
+  int estimated_queue_length() const {
+    ANNOTATE_IGNORE_READS_BEGIN();
+    // The C++ standard says that std::multiset::size must be constant time,
+    // so this method won't try to traverse any actual nodes of the underlying
+    // RB tree. Investigation of the libstdcxx implementation confirms that
+    // size() is a simple field access of the _Rb_tree structure.
+    int ret = queue_.size();
+    ANNOTATE_IGNORE_READS_END();
+    return ret;
+  }
+
+  // Return an estimate of the number of idle threads currently awaiting work.
+  int estimated_idle_worker_count() const {
+    ANNOTATE_IGNORE_READS_BEGIN();
+    // Size of a vector is a simple field access so this is safe.
+    int ret = waiting_consumers_.size();
+    ANNOTATE_IGNORE_READS_END();
+    return ret;
+  }
+
+ private:
+  // Comparison function which orders calls by their deadlines.
+  static bool DeadlineLess(const InboundCall* a,
+                           const InboundCall* b) {
+    auto time_a = a->GetClientDeadline();
+    auto time_b = b->GetClientDeadline();
+    if (time_a == time_b) {
+      // If two calls have the same deadline (most likely because neither one specified
+      // one) then we should order them by arrival order.
+      time_a = a->GetTimeReceived();
+      time_b = b->GetTimeReceived();
+    }
+    return time_a < time_b;
+  }
+
+  // Struct functor wrapper for DeadlineLess.
+  struct DeadlineLessStruct {
+    bool operator()(const InboundCall* a, const InboundCall* b) const {
+      return DeadlineLess(a, b);
+    }
+  };
+
+  // The thread-local record corresponding to a single consumer thread.
+  // Threads push this record onto the waiting_consumers_ stack when
+  // they are awaiting work. Producers pop the top waiting consumer and
+  // post work using Post().
+  class ConsumerState {
+   public:
+    explicit ConsumerState(LifoServiceQueue* queue) :
+        cond_(&lock_),
+        call_(nullptr),
+        should_wake_(false),
+        bound_queue_(queue) {
+    }
+
+    void Post(InboundCall* call) {
+      DCHECK(call_ == nullptr);
+      MutexLock l(lock_);
+      call_ = call;
+      should_wake_ = true;
+      cond_.Signal();
+    }
+
+    InboundCall* Wait() {
+      MutexLock l(lock_);
+      while (should_wake_ == false) {
+        cond_.Wait();
+      }
+      should_wake_ = false;
+      InboundCall* ret = call_;
+      call_ = nullptr;
+      return ret;
+    }
+
+    void DCheckBoundInstance(LifoServiceQueue* q) {
+      DCHECK_EQ(q, bound_queue_);
+    }
+
+   private:
+    Mutex lock_;
+    ConditionVariable cond_;
+    InboundCall* call_;
+    bool should_wake_;
+
+    // For the purpose of assertions, tracks the LifoServiceQueue instance that
+    // this consumer is reading from.
+    LifoServiceQueue* bound_queue_;
+  };
+
+  static __thread ConsumerState* tl_consumer_;
+
+  mutable simple_spinlock lock_;
+  bool shutdown_;
+  int max_queue_size_;
+
+  // Stack of consumer threads which are currently waiting for work.
+  std::vector<ConsumerState*> waiting_consumers_;
+
+  // The actual queue. Work is only added to the queue when there were no
+  // consumers available for a "direct hand-off".
+  std::multiset<InboundCall*, DeadlineLessStruct> queue_;
+
+  // The total set of consumers who have ever accessed this queue.
+  std::vector<std::unique_ptr<ConsumerState>> consumers_;
+
+  DISALLOW_COPY_AND_ASSIGN(LifoServiceQueue);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/transfer.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.cc b/be/src/kudu/rpc/transfer.cc
new file mode 100644
index 0000000..d24e94d
--- /dev/null
+++ b/be/src/kudu/rpc/transfer.cc
@@ -0,0 +1,264 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/transfer.h"
+
+#include <stdint.h>
+
+#include <iostream>
+#include <sstream>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+
+DEFINE_int32(rpc_max_message_size, (50 * 1024 * 1024),
+             "The maximum size of a message that any RPC that the server will accept. "
+             "Must be at least 1MB.");
+TAG_FLAG(rpc_max_message_size, advanced);
+TAG_FLAG(rpc_max_message_size, runtime);
+
+static bool ValidateMaxMessageSize(const char* flagname, int32_t value) {
+  if (value < 1 * 1024 * 1024) {
+    LOG(ERROR) << flagname << " must be at least 1MB.";
+    return false;
+  }
+  return true;
+}
+static bool dummy = google::RegisterFlagValidator(
+    &FLAGS_rpc_max_message_size, &ValidateMaxMessageSize);
+
+namespace kudu {
+namespace rpc {
+
+using std::ostringstream;
+using std::set;
+using std::string;
+using strings::Substitute;
+
+#define RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status) \
+  if (PREDICT_FALSE(!status.ok())) {                            \
+    if (Socket::IsTemporarySocketError(status.posix_code())) {  \
+      return Status::OK(); /* EAGAIN, etc. */                   \
+    }                                                           \
+    return status;                                              \
+  }
+
+TransferCallbacks::~TransferCallbacks()
+{}
+
+InboundTransfer::InboundTransfer()
+  : total_length_(kMsgLengthPrefixLength),
+    cur_offset_(0) {
+  buf_.resize(kMsgLengthPrefixLength);
+}
+
+Status InboundTransfer::ReceiveBuffer(Socket &socket) {
+  if (cur_offset_ < kMsgLengthPrefixLength) {
+    // receive int32 length prefix
+    int32_t rem = kMsgLengthPrefixLength - cur_offset_;
+    int32_t nread;
+    Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
+    RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+    if (nread == 0) {
+      return Status::OK();
+    }
+    DCHECK_GE(nread, 0);
+    cur_offset_ += nread;
+    if (cur_offset_ < kMsgLengthPrefixLength) {
+      // If we still don't have the full length prefix, we can't continue
+      // reading yet.
+      return Status::OK();
+    }
+    // Since we only read 'rem' bytes above, we should now have exactly
+    // the length prefix in our buffer and no more.
+    DCHECK_EQ(cur_offset_, kMsgLengthPrefixLength);
+
+    // The length prefix doesn't include its own 4 bytes, so we have to
+    // add that back in.
+    total_length_ = NetworkByteOrder::Load32(&buf_[0]) + kMsgLengthPrefixLength;
+    if (total_length_ > FLAGS_rpc_max_message_size) {
+      return Status::NetworkError(Substitute(
+          "RPC frame had a length of $0, but we only support messages up to $1 bytes "
+          "long.", total_length_, FLAGS_rpc_max_message_size));
+    }
+    if (total_length_ <= kMsgLengthPrefixLength) {
+      return Status::NetworkError(Substitute("RPC frame had invalid length of $0",
+                                             total_length_));
+    }
+    buf_.resize(total_length_);
+
+    // Fall through to receive the message body, which is likely to be already
+    // available on the socket.
+  }
+
+  // receive message body
+  int32_t nread;
+  int32_t rem = total_length_ - cur_offset_;
+  Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
+  RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+  cur_offset_ += nread;
+
+  return Status::OK();
+}
+
+bool InboundTransfer::TransferStarted() const {
+  return cur_offset_ != 0;
+}
+
+bool InboundTransfer::TransferFinished() const {
+  return cur_offset_ == total_length_;
+}
+
+string InboundTransfer::StatusAsString() const {
+  return Substitute("$0/$1 bytes received", cur_offset_, total_length_);
+}
+
+OutboundTransfer* OutboundTransfer::CreateForCallRequest(
+    int32_t call_id,
+    const std::vector<Slice> &payload,
+    TransferCallbacks *callbacks) {
+  return new OutboundTransfer(call_id, payload, callbacks);
+}
+
+OutboundTransfer* OutboundTransfer::CreateForCallResponse(const std::vector<Slice> &payload,
+                                                          TransferCallbacks *callbacks) {
+  return new OutboundTransfer(kInvalidCallId, payload, callbacks);
+}
+
+
+OutboundTransfer::OutboundTransfer(int32_t call_id,
+                                   const std::vector<Slice> &payload,
+                                   TransferCallbacks *callbacks)
+  : cur_slice_idx_(0),
+    cur_offset_in_slice_(0),
+    callbacks_(callbacks),
+    call_id_(call_id),
+    aborted_(false) {
+  CHECK(!payload.empty());
+
+  n_payload_slices_ = payload.size();
+  CHECK_LE(n_payload_slices_, arraysize(payload_slices_));
+  for (int i = 0; i < payload.size(); i++) {
+    payload_slices_[i] = payload[i];
+  }
+}
+
+OutboundTransfer::~OutboundTransfer() {
+  if (!TransferFinished() && !aborted_) {
+    callbacks_->NotifyTransferAborted(
+      Status::RuntimeError("RPC transfer destroyed before it finished sending"));
+  }
+}
+
+void OutboundTransfer::Abort(const Status &status) {
+  CHECK(!aborted_) << "Already aborted";
+  CHECK(!TransferFinished()) << "Cannot abort a finished transfer";
+  callbacks_->NotifyTransferAborted(status);
+  aborted_ = true;
+}
+
+Status OutboundTransfer::SendBuffer(Socket &socket) {
+  CHECK_LT(cur_slice_idx_, n_payload_slices_);
+
+  int n_iovecs = n_payload_slices_ - cur_slice_idx_;
+  struct iovec iovec[n_iovecs];
+  {
+    int offset_in_slice = cur_offset_in_slice_;
+    for (int i = 0; i < n_iovecs; i++) {
+      Slice &slice = payload_slices_[cur_slice_idx_ + i];
+      iovec[i].iov_base = slice.mutable_data() + offset_in_slice;
+      iovec[i].iov_len = slice.size() - offset_in_slice;
+
+      offset_in_slice = 0;
+    }
+  }
+
+  int32_t written;
+  Status status = socket.Writev(iovec, n_iovecs, &written);
+  RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
+
+  // Adjust our accounting of current writer position.
+  for (int i = cur_slice_idx_; i < n_payload_slices_; i++) {
+    Slice &slice = payload_slices_[i];
+    int rem_in_slice = slice.size() - cur_offset_in_slice_;
+    DCHECK_GE(rem_in_slice, 0);
+
+    if (written >= rem_in_slice) {
+      // Used up this entire slice, advance to the next slice.
+      cur_slice_idx_++;
+      cur_offset_in_slice_ = 0;
+      written -= rem_in_slice;
+    } else {
+      // Partially used up this slice, just advance the offset within it.
+      cur_offset_in_slice_ += written;
+      break;
+    }
+  }
+
+  if (cur_slice_idx_ == n_payload_slices_) {
+    callbacks_->NotifyTransferFinished();
+    DCHECK_EQ(0, cur_offset_in_slice_);
+  } else {
+    DCHECK_LT(cur_slice_idx_, n_payload_slices_);
+    DCHECK_LT(cur_offset_in_slice_, payload_slices_[cur_slice_idx_].size());
+  }
+
+  return Status::OK();
+}
+
+bool OutboundTransfer::TransferStarted() const {
+  return cur_offset_in_slice_ != 0 || cur_slice_idx_ != 0;
+}
+
+bool OutboundTransfer::TransferFinished() const {
+  if (cur_slice_idx_ == n_payload_slices_) {
+    DCHECK_EQ(0, cur_offset_in_slice_); // sanity check
+    return true;
+  }
+  return false;
+}
+
+string OutboundTransfer::HexDump() const {
+  if (KUDU_SHOULD_REDACT()) {
+    return kRedactionMessage;
+  }
+
+  string ret;
+  for (int i = 0; i < n_payload_slices_; i++) {
+    ret.append(payload_slices_[i].ToDebugString());
+  }
+  return ret;
+}
+
+int32_t OutboundTransfer::TotalLength() const {
+  int32_t ret = 0;
+  for (int i = 0; i < n_payload_slices_; i++) {
+    ret += payload_slices_[i].size();
+  }
+  return ret;
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/transfer.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/transfer.h b/be/src/kudu/rpc/transfer.h
new file mode 100644
index 0000000..671347a
--- /dev/null
+++ b/be/src/kudu/rpc/transfer.h
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_TRANSFER_H
+#define KUDU_RPC_TRANSFER_H
+
+#include <boost/intrusive/list.hpp>
+#include <gflags/gflags.h>
+#include <set>
+#include <stdint.h>
+#include <string>
+#include <vector>
+
+#include "kudu/rpc/constants.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+
+DECLARE_int32(rpc_max_message_size);
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Socket;
+
+namespace rpc {
+
+class Messenger;
+struct TransferCallbacks;
+
+class TransferLimits {
+ public:
+  enum {
+    kMaxSidecars = 10,
+    kMaxPayloadSlices = kMaxSidecars + 2 // (header + msg)
+  };
+
+  DISALLOW_IMPLICIT_CONSTRUCTORS(TransferLimits);
+};
+
+// This class is used internally by the RPC layer to represent an inbound
+// transfer in progress.
+//
+// Inbound Transfer objects are created by a Connection receiving data. When the
+// message is fully received, it is either parsed as a call, or a call response,
+// and the InboundTransfer object itself is handed off.
+class InboundTransfer {
+ public:
+
+  InboundTransfer();
+
+  // read from the socket into our buffer
+  Status ReceiveBuffer(Socket &socket);
+
+  // Return true if any bytes have yet been sent.
+  bool TransferStarted() const;
+
+  // Return true if the entire transfer has been sent.
+  bool TransferFinished() const;
+
+  Slice data() const {
+    return Slice(buf_);
+  }
+
+  // Return a string indicating the status of this transfer (number of bytes received, etc)
+  // suitable for logging.
+  std::string StatusAsString() const;
+
+ private:
+
+  Status ProcessInboundHeader();
+
+  faststring buf_;
+
+  int32_t total_length_;
+  int32_t cur_offset_;
+
+  DISALLOW_COPY_AND_ASSIGN(InboundTransfer);
+};
+
+// When the connection wants to send data, it creates an OutboundTransfer object
+// to encompass it. This sits on a queue within the Connection, so that each time
+// the Connection wakes up with a writable socket, it consumes more bytes off
+// the next pending transfer in the queue.
+//
+// Upon completion of the transfer, a callback is triggered.
+class OutboundTransfer : public boost::intrusive::list_base_hook<> {
+ public:
+  // Factory methods for creating transfers associated with call requests
+  // or responses. The 'payload' slices will be concatenated and
+  // written to the socket. When the transfer completes or errors, the
+  // appropriate method of 'callbacks' is invoked.
+  //
+  // Does not take ownership of the callbacks object or the underlying
+  // memory of the slices. The slices must remain valid until the callback
+  // is triggered.
+  //
+  // NOTE: 'payload' is currently restricted to a maximum of kMaxPayloadSlices
+  // slices.
+  // ------------------------------------------------------------
+
+  // Create an outbound transfer for a call request.
+  static OutboundTransfer* CreateForCallRequest(int32_t call_id,
+                                                const std::vector<Slice> &payload,
+                                                TransferCallbacks *callbacks);
+
+  // Create an outbound transfer for a call response.
+  // See above for details.
+  static OutboundTransfer* CreateForCallResponse(const std::vector<Slice> &payload,
+                                                 TransferCallbacks *callbacks);
+
+  // Destruct the transfer. A transfer object should never be deallocated
+  // before it has either (a) finished transferring, or (b) been Abort()ed.
+  ~OutboundTransfer();
+
+  // Abort the current transfer, with the given status.
+  // This triggers TransferCallbacks::NotifyTransferAborted.
+  void Abort(const Status &status);
+
+  // send from our buffers into the sock
+  Status SendBuffer(Socket &socket);
+
+  // Return true if any bytes have yet been sent.
+  bool TransferStarted() const;
+
+  // Return true if the entire transfer has been sent.
+  bool TransferFinished() const;
+
+  // Return the total number of bytes to be sent (including those already sent)
+  int32_t TotalLength() const;
+
+  std::string HexDump() const;
+
+  bool is_for_outbound_call() const {
+    return call_id_ != kInvalidCallId;
+  }
+
+  // Returns the call ID for a transfer associated with an outbound
+  // call. Must not be called for call responses.
+  int32_t call_id() const {
+    DCHECK_NE(call_id_, kInvalidCallId);
+    return call_id_;
+  }
+
+ private:
+  OutboundTransfer(int32_t call_id,
+                   const std::vector<Slice> &payload,
+                   TransferCallbacks *callbacks);
+
+  // Slices to send. Uses an array here instead of a vector to avoid an expensive
+  // vector construction (improved performance a couple percent).
+  Slice payload_slices_[TransferLimits::kMaxPayloadSlices];
+  size_t n_payload_slices_;
+
+  // The current slice that is being sent.
+  int32_t cur_slice_idx_;
+  // The number of bytes in the above slice which has already been sent.
+  int32_t cur_offset_in_slice_;
+
+  TransferCallbacks *callbacks_;
+
+  // In the case of outbound calls, the associated call ID.
+  // In the case of call responses, kInvalidCallId
+  int32_t call_id_;
+
+  bool aborted_;
+
+  DISALLOW_COPY_AND_ASSIGN(OutboundTransfer);
+};
+
+// Callbacks made after a transfer completes.
+struct TransferCallbacks {
+ public:
+  virtual ~TransferCallbacks();
+
+  // The transfer finished successfully.
+  virtual void NotifyTransferFinished() = 0;
+
+  // The transfer was aborted (e.g because the connection died or an error occurred).
+  virtual void NotifyTransferAborted(const Status &status) = 0;
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/user_credentials.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/user_credentials.cc b/be/src/kudu/rpc/user_credentials.cc
new file mode 100644
index 0000000..fdc3ac2
--- /dev/null
+++ b/be/src/kudu/rpc/user_credentials.cc
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/user_credentials.h"
+
+#include <string>
+
+#include <boost/functional/hash.hpp>
+
+#include "kudu/gutil/strings/substitute.h"
+
+using std::string;
+
+namespace kudu {
+namespace rpc {
+
+bool UserCredentials::has_real_user() const {
+  return !real_user_.empty();
+}
+
+void UserCredentials::set_real_user(const string& real_user) {
+  real_user_ = real_user;
+}
+
+string UserCredentials::ToString() const {
+  // Does not print the password.
+  return strings::Substitute("{real_user=$0}", real_user_);
+}
+
+size_t UserCredentials::HashCode() const {
+  size_t seed = 0;
+  if (has_real_user()) {
+    boost::hash_combine(seed, real_user());
+  }
+  return seed;
+}
+
+bool UserCredentials::Equals(const UserCredentials& other) const {
+  return real_user() == other.real_user();
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/user_credentials.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/user_credentials.h b/be/src/kudu/rpc/user_credentials.h
new file mode 100644
index 0000000..56af70a
--- /dev/null
+++ b/be/src/kudu/rpc/user_credentials.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+
+namespace kudu {
+namespace rpc {
+
+// Client-side user credentials. Currently this is more-or-less a simple wrapper
+// around a username string. However, we anticipate moving more credentials such as
+// tokens into a per-Proxy structure rather than Messenger-wide, and this will
+// be the place to store them.
+class UserCredentials {
+ public:
+  // Real user.
+  bool has_real_user() const;
+  void set_real_user(const std::string& real_user);
+  const std::string& real_user() const { return real_user_; }
+
+  // Returns a string representation of the object.
+  std::string ToString() const;
+
+  std::size_t HashCode() const;
+  bool Equals(const UserCredentials& other) const;
+
+ private:
+  // Remember to update HashCode() and Equals() when new fields are added.
+  std::string real_user_;
+};
+
+} // namespace rpc
+} // namespace kudu