You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:56 UTC

[44/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/inbound_call.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/inbound_call.cc b/be/src/kudu/rpc/inbound_call.cc
new file mode 100644
index 0000000..6920071
--- /dev/null
+++ b/be/src/kudu/rpc/inbound_call.cc
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/inbound_call.h"
+
+#include <cstdint>
+#include <memory>
+#include <ostream>
+
+#include <glog/logging.h>
+#include <google/protobuf/message.h>
+#include <google/protobuf/message_lite.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/rpcz_store.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/trace.h"
+
+namespace google {
+namespace protobuf {
+class FieldDescriptor;
+}
+}
+
+using google::protobuf::FieldDescriptor;
+using google::protobuf::Message;
+using google::protobuf::MessageLite;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+InboundCall::InboundCall(Connection* conn)
+  : conn_(conn),
+    trace_(new Trace),
+    method_info_(nullptr),
+    deadline_(MonoTime::Max()) {
+  RecordCallReceived();
+}
+
+InboundCall::~InboundCall() {}
+
+Status InboundCall::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
+  TRACE_EVENT_FLOW_BEGIN0("rpc", "InboundCall", this);
+  TRACE_EVENT0("rpc", "InboundCall::ParseFrom");
+  RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_, &serialized_request_));
+
+  // Adopt the service/method info from the header as soon as it's available.
+  if (PREDICT_FALSE(!header_.has_remote_method())) {
+    return Status::Corruption("Non-connection context request header must specify remote_method");
+  }
+  if (PREDICT_FALSE(!header_.remote_method().IsInitialized())) {
+    return Status::Corruption("remote_method in request header is not initialized",
+                              header_.remote_method().InitializationErrorString());
+  }
+  remote_method_.FromPB(header_.remote_method());
+
+  // Compute and cache the call deadline.
+  if (header_.has_timeout_millis() && header_.timeout_millis() != 0) {
+    deadline_ = timing_.time_received + MonoDelta::FromMilliseconds(header_.timeout_millis());
+  }
+
+  if (header_.sidecar_offsets_size() > TransferLimits::kMaxSidecars) {
+    return Status::Corruption(strings::Substitute(
+            "Received $0 additional payload slices, expected at most %d",
+            header_.sidecar_offsets_size(), TransferLimits::kMaxSidecars));
+  }
+
+  RETURN_NOT_OK(RpcSidecar::ParseSidecars(
+          header_.sidecar_offsets(), serialized_request_, inbound_sidecar_slices_));
+  if (header_.sidecar_offsets_size() > 0) {
+    // Trim the request to just the message
+    serialized_request_ = Slice(serialized_request_.data(), header_.sidecar_offsets(0));
+  }
+
+  // Retain the buffer that we have a view into.
+  transfer_.swap(transfer);
+  return Status::OK();
+}
+
+void InboundCall::RespondSuccess(const MessageLite& response) {
+  TRACE_EVENT0("rpc", "InboundCall::RespondSuccess");
+  Respond(response, true);
+}
+
+void InboundCall::RespondUnsupportedFeature(const vector<uint32_t>& unsupported_features) {
+  TRACE_EVENT0("rpc", "InboundCall::RespondUnsupportedFeature");
+  ErrorStatusPB err;
+  err.set_message("unsupported feature flags");
+  err.set_code(ErrorStatusPB::ERROR_INVALID_REQUEST);
+  for (uint32_t feature : unsupported_features) {
+    err.add_unsupported_feature_flags(feature);
+  }
+
+  Respond(err, false);
+}
+
+void InboundCall::RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code,
+                                 const Status& status) {
+  TRACE_EVENT0("rpc", "InboundCall::RespondFailure");
+  ErrorStatusPB err;
+  err.set_message(status.ToString());
+  err.set_code(error_code);
+
+  Respond(err, false);
+}
+
+void InboundCall::RespondApplicationError(int error_ext_id, const std::string& message,
+                                          const MessageLite& app_error_pb) {
+  ErrorStatusPB err;
+  ApplicationErrorToPB(error_ext_id, message, app_error_pb, &err);
+  Respond(err, false);
+}
+
+void InboundCall::ApplicationErrorToPB(int error_ext_id, const std::string& message,
+                                       const google::protobuf::MessageLite& app_error_pb,
+                                       ErrorStatusPB* err) {
+  err->set_message(message);
+  const FieldDescriptor* app_error_field =
+    err->GetReflection()->FindKnownExtensionByNumber(error_ext_id);
+  if (app_error_field != nullptr) {
+    err->GetReflection()->MutableMessage(err, app_error_field)->CheckTypeAndMergeFrom(app_error_pb);
+  } else {
+    LOG(DFATAL) << "Unable to find application error extension ID " << error_ext_id
+                << " (message=" << message << ")";
+  }
+}
+
+void InboundCall::Respond(const MessageLite& response,
+                          bool is_success) {
+  TRACE_EVENT_FLOW_END0("rpc", "InboundCall", this);
+  SerializeResponseBuffer(response, is_success);
+
+  TRACE_EVENT_ASYNC_END1("rpc", "InboundCall", this,
+                         "method", remote_method_.method_name());
+  TRACE_TO(trace_, "Queueing $0 response", is_success ? "success" : "failure");
+  RecordHandlingCompleted();
+  conn_->rpcz_store()->AddCall(this);
+  conn_->QueueResponseForCall(gscoped_ptr<InboundCall>(this));
+}
+
+void InboundCall::SerializeResponseBuffer(const MessageLite& response,
+                                          bool is_success) {
+  if (PREDICT_FALSE(!response.IsInitialized())) {
+    LOG(ERROR) << "Invalid RPC response for " << ToString()
+               << ": protobuf missing required fields: "
+               << response.InitializationErrorString();
+    // Send it along anyway -- the client will also notice the missing fields
+    // and produce an error on the other side, but this will at least
+    // make it clear on both sides of the RPC connection what kind of error
+    // happened.
+  }
+
+  uint32_t protobuf_msg_size = response.ByteSize();
+
+  ResponseHeader resp_hdr;
+  resp_hdr.set_call_id(header_.call_id());
+  resp_hdr.set_is_error(!is_success);
+  int32_t sidecar_byte_size = 0;
+  for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
+    resp_hdr.add_sidecar_offsets(sidecar_byte_size + protobuf_msg_size);
+    int32_t sidecar_bytes = car->AsSlice().size();
+    DCHECK_LE(sidecar_byte_size, TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes);
+    sidecar_byte_size += sidecar_bytes;
+  }
+
+  serialization::SerializeMessage(response, &response_msg_buf_,
+                                  sidecar_byte_size, true);
+  int64_t main_msg_size = sidecar_byte_size + response_msg_buf_.size();
+  serialization::SerializeHeader(resp_hdr, main_msg_size,
+                                 &response_hdr_buf_);
+}
+
+size_t InboundCall::SerializeResponseTo(TransferPayload* slices) const {
+  TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo");
+  DCHECK_GT(response_hdr_buf_.size(), 0);
+  DCHECK_GT(response_msg_buf_.size(), 0);
+  size_t n_slices = 2 + outbound_sidecars_.size();
+  DCHECK_LE(n_slices, slices->size());
+  auto slice_iter = slices->begin();
+  *slice_iter++ = Slice(response_hdr_buf_);
+  *slice_iter++ = Slice(response_msg_buf_);
+  for (auto& sidecar : outbound_sidecars_) {
+    *slice_iter++ = sidecar->AsSlice();
+  }
+  DCHECK_EQ(slice_iter - slices->begin(), n_slices);
+  return n_slices;
+}
+
+Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+  // Check that the number of sidecars does not exceed the number of payload
+  // slices that are free (two are used up by the header and main message
+  // protobufs).
+  if (outbound_sidecars_.size() > TransferLimits::kMaxSidecars) {
+    return Status::ServiceUnavailable("All available sidecars already used");
+  }
+  int64_t sidecar_bytes = car->AsSlice().size();
+  if (outbound_sidecars_total_bytes_ >
+      TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes) {
+    return Status::RuntimeError(Substitute("Total size of sidecars $0 would exceed limit $1",
+        static_cast<int64_t>(outbound_sidecars_total_bytes_) + sidecar_bytes,
+        TransferLimits::kMaxTotalSidecarBytes));
+  }
+
+  outbound_sidecars_.emplace_back(std::move(car));
+  outbound_sidecars_total_bytes_ += sidecar_bytes;
+  DCHECK_GE(outbound_sidecars_total_bytes_, 0);
+  *idx = outbound_sidecars_.size() - 1;
+  return Status::OK();
+}
+
+string InboundCall::ToString() const {
+  if (header_.has_request_id()) {
+    return Substitute("Call $0 from $1 (ReqId={client: $2, seq_no=$3, attempt_no=$4})",
+                      remote_method_.ToString(),
+                      conn_->remote().ToString(),
+                      header_.request_id().client_id(),
+                      header_.request_id().seq_no(),
+                      header_.request_id().attempt_no());
+  }
+  return Substitute("Call $0 from $1 (request call id $2)",
+                      remote_method_.ToString(),
+                      conn_->remote().ToString(),
+                      header_.call_id());
+}
+
+void InboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
+                         RpcCallInProgressPB* resp) {
+  resp->mutable_header()->CopyFrom(header_);
+  if (req.include_traces() && trace_) {
+    resp->set_trace_buffer(trace_->DumpToString());
+  }
+  resp->set_micros_elapsed((MonoTime::Now() - timing_.time_received)
+                           .ToMicroseconds());
+}
+
+const RemoteUser& InboundCall::remote_user() const {
+  return conn_->remote_user();
+}
+
+const Sockaddr& InboundCall::remote_address() const {
+  return conn_->remote();
+}
+
+const scoped_refptr<Connection>& InboundCall::connection() const {
+  return conn_;
+}
+
+Trace* InboundCall::trace() {
+  return trace_.get();
+}
+
+void InboundCall::RecordCallReceived() {
+  TRACE_EVENT_ASYNC_BEGIN0("rpc", "InboundCall", this);
+  DCHECK(!timing_.time_received.Initialized());  // Protect against multiple calls.
+  timing_.time_received = MonoTime::Now();
+}
+
+void InboundCall::RecordHandlingStarted(Histogram* incoming_queue_time) {
+  DCHECK(incoming_queue_time != nullptr);
+  DCHECK(!timing_.time_handled.Initialized());  // Protect against multiple calls.
+  timing_.time_handled = MonoTime::Now();
+  incoming_queue_time->Increment(
+      (timing_.time_handled - timing_.time_received).ToMicroseconds());
+}
+
+void InboundCall::RecordHandlingCompleted() {
+  DCHECK(!timing_.time_completed.Initialized());  // Protect against multiple calls.
+  timing_.time_completed = MonoTime::Now();
+
+  if (!timing_.time_handled.Initialized()) {
+    // Sometimes we respond to a call before we begin handling it (e.g. due to queue
+    // overflow, etc). These cases should not be counted against the histogram.
+    return;
+  }
+
+  if (method_info_) {
+    method_info_->handler_latency_histogram->Increment(
+        (timing_.time_completed - timing_.time_handled).ToMicroseconds());
+  }
+}
+
+bool InboundCall::ClientTimedOut() const {
+  return MonoTime::Now() >= deadline_;
+}
+
+MonoTime InboundCall::GetTimeReceived() const {
+  return timing_.time_received;
+}
+
+vector<uint32_t> InboundCall::GetRequiredFeatures() const {
+  vector<uint32_t> features;
+  for (uint32_t feature : header_.required_feature_flags()) {
+    features.push_back(feature);
+  }
+  return features;
+}
+
+Status InboundCall::GetInboundSidecar(int idx, Slice* sidecar) const {
+  DCHECK(transfer_) << "Sidecars have been discarded";
+  if (idx < 0 || idx >= header_.sidecar_offsets_size()) {
+    return Status::InvalidArgument(strings::Substitute(
+            "Index $0 does not reference a valid sidecar", idx));
+  }
+  *sidecar = inbound_sidecar_slices_[idx];
+  return Status::OK();
+}
+
+void InboundCall::DiscardTransfer() {
+  transfer_.reset();
+}
+
+size_t InboundCall::GetTransferSize() {
+  if (!transfer_) return 0;
+  return transfer_->data().size();
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/inbound_call.h b/be/src/kudu/rpc/inbound_call.h
new file mode 100644
index 0000000..0db4c37
--- /dev/null
+++ b/be/src/kudu/rpc/inbound_call.h
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_INBOUND_CALL_H
+#define KUDU_RPC_INBOUND_CALL_H
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class MessageLite;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Histogram;
+class Sockaddr;
+class Trace;
+
+namespace rpc {
+
+class Connection;
+class DumpRunningRpcsRequestPB;
+class RemoteUser;
+class RpcCallInProgressPB;
+class RpcSidecar;
+
+struct InboundCallTiming {
+  MonoTime time_received;   // Time the call was first accepted.
+  MonoTime time_handled;    // Time the call handler was kicked off.
+  MonoTime time_completed;  // Time the call handler completed.
+
+  MonoDelta TotalDuration() const {
+    return time_completed - time_received;
+  }
+};
+
+// Inbound call on server
+class InboundCall {
+ public:
+  explicit InboundCall(Connection* conn);
+  ~InboundCall();
+
+  // Parse an inbound call message.
+  //
+  // This only deserializes the call header, populating the 'header_' and
+  // 'serialized_request_' member variables. The actual call parameter is
+  // not deserialized, as this may be CPU-expensive, and this is called
+  // from the reactor thread.
+  Status ParseFrom(gscoped_ptr<InboundTransfer> transfer);
+
+  // Return the serialized request parameter protobuf.
+  const Slice& serialized_request() const {
+    DCHECK(transfer_) << "Transfer discarded before parameter parsing";
+    return serialized_request_;
+  }
+
+  const RemoteMethod& remote_method() const {
+    return remote_method_;
+  }
+
+  const int32_t call_id() const {
+    return header_.call_id();
+  }
+
+  // Serializes 'response' into the InboundCall's internal buffer, and marks
+  // the call as a success. Enqueues the response back to the connection
+  // that made the call.
+  //
+  // This method deletes the InboundCall object, so no further calls may be
+  // made after this one.
+  void RespondSuccess(const google::protobuf::MessageLite& response);
+
+  // Serializes a failure response into the internal buffer, marking the
+  // call as a failure. Enqueues the response back to the connection that
+  // made the call.
+  //
+  // This method deletes the InboundCall object, so no further calls may be
+  // made after this one.
+  void RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code,
+                      const Status &status);
+
+  void RespondUnsupportedFeature(const std::vector<uint32_t>& unsupported_features);
+
+  void RespondApplicationError(int error_ext_id, const std::string& message,
+                               const google::protobuf::MessageLite& app_error_pb);
+
+  // Convert an application error extension to an ErrorStatusPB.
+  // These ErrorStatusPB objects are what are returned in application error responses.
+  static void ApplicationErrorToPB(int error_ext_id, const std::string& message,
+                                   const google::protobuf::MessageLite& app_error_pb,
+                                   ErrorStatusPB* err);
+
+  // Serialize the response packet for the finished call into 'slices'.
+  // The resulting slices refer to memory in this object.
+  // Returns the number of slices in the serialized response.
+  size_t SerializeResponseTo(TransferPayload* slices) const;
+
+  // See RpcContext::AddRpcSidecar()
+  Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
+
+  std::string ToString() const;
+
+  void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp);
+
+  const RemoteUser& remote_user() const;
+
+  const Sockaddr& remote_address() const;
+
+  const scoped_refptr<Connection>& connection() const;
+
+  Trace* trace();
+
+  const InboundCallTiming& timing() const {
+    return timing_;
+  }
+
+  const RequestHeader& header() const {
+    return header_;
+  }
+
+  // Associate this call with a particular method that will be invoked
+  // by the service.
+  void set_method_info(scoped_refptr<RpcMethodInfo> info) {
+    method_info_ = std::move(info);
+  }
+
+  // Return the method associated with this call. This is set just before
+  // the call is enqueued onto the service queue, and therefore may be
+  // 'nullptr' for much of the lifecycle of a call.
+  RpcMethodInfo* method_info() {
+    return method_info_.get();
+  }
+
+  // When this InboundCall was received (instantiated).
+  // Should only be called once on a given instance.
+  // Not thread-safe. Should only be called by the current "owner" thread.
+  void RecordCallReceived();
+
+  // When RPC call Handle() was called on the server side.
+  // Updates the Histogram with time elapsed since the call was received,
+  // and should only be called once on a given instance.
+  // Not thread-safe. Should only be called by the current "owner" thread.
+  void RecordHandlingStarted(Histogram* incoming_queue_time);
+
+  // Return true if the deadline set by the client has already elapsed.
+  // In this case, the server may stop processing the call, since the
+  // call response will be ignored anyway.
+  bool ClientTimedOut() const;
+
+  // Return an upper bound on the client timeout deadline. This does not
+  // account for transmission delays between the client and the server.
+  // If the client did not specify a deadline, returns MonoTime::Max().
+  MonoTime GetClientDeadline() const {
+    return deadline_;
+  }
+
+  // Return the time when this call was received.
+  MonoTime GetTimeReceived() const;
+
+  // Returns the set of application-specific feature flags required to service
+  // the RPC.
+  std::vector<uint32_t> GetRequiredFeatures() const;
+
+  // Get a sidecar sent as part of the request. If idx < 0 || idx > num sidecars - 1,
+  // returns an error.
+  Status GetInboundSidecar(int idx, Slice* sidecar) const;
+
+  // Releases the buffer that contains the request + sidecar data. It is an error to
+  // access sidecars or serialized_request() after this method is called.
+  void DiscardTransfer();
+
+  // Returns the size of the transfer buffer that backs this call. If the transfer does
+  // not exist (e.g. GetTransferSize() is called after DiscardTransfer()), returns 0.
+  size_t GetTransferSize();
+
+ private:
+  friend class RpczStore;
+
+  // Serialize and queue the response.
+  void Respond(const google::protobuf::MessageLite& response,
+               bool is_success);
+
+  // Serialize a response message for either success or failure. If it is a success,
+  // 'response' should be the user-defined response type for the call. If it is a
+  // failure, 'response' should be an ErrorStatusPB instance.
+  void SerializeResponseBuffer(const google::protobuf::MessageLite& response,
+                               bool is_success);
+
+  // When RPC call Handle() completed execution on the server side.
+  // Updates the Histogram with time elapsed since the call was started,
+  // and should only be called once on a given instance.
+  // Not thread-safe. Should only be called by the current "owner" thread.
+  void RecordHandlingCompleted();
+
+  // The connection on which this inbound call arrived.
+  scoped_refptr<Connection> conn_;
+
+  // The header of the incoming call. Set by ParseFrom()
+  RequestHeader header_;
+
+  // The serialized bytes of the request param protobuf. Set by ParseFrom().
+  // This references memory held by 'transfer_'.
+  Slice serialized_request_;
+
+  // The transfer that produced the call.
+  // This is kept around because it retains the memory referred to
+  // by 'serialized_request_' above.
+  gscoped_ptr<InboundTransfer> transfer_;
+
+  // The buffers for serialized response. Set by SerializeResponseBuffer().
+  faststring response_hdr_buf_;
+  faststring response_msg_buf_;
+
+  // Vector of additional sidecars that are tacked on to the call's response
+  // after serialization of the protobuf. See rpc/rpc_sidecar.h for more info.
+  std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
+
+  // Total size of sidecars in outbound_sidecars_. This is limited to a maximum
+  // of TransferLimits::kMaxTotalSidecarBytes.
+  int32_t outbound_sidecars_total_bytes_ = 0;
+
+  // Inbound sidecars from the request. The slices are views onto transfer_. There are as
+  // many slices as header_.sidecar_offsets_size().
+  Slice inbound_sidecar_slices_[TransferLimits::kMaxSidecars];
+
+  // The trace buffer.
+  scoped_refptr<Trace> trace_;
+
+  // Timing information related to this RPC call.
+  InboundCallTiming timing_;
+
+  // Proto service this calls belongs to. Used for routing.
+  // This field is filled in when the inbound request header is parsed.
+  RemoteMethod remote_method_;
+
+  // After the method has been looked up within the service, this is filled in
+  // to point to the information about this method. Acts as a pointer back to
+  // per-method info such as tracing.
+  scoped_refptr<RpcMethodInfo> method_info_;
+
+  // A time at which the client will time out, or MonoTime::Max if the
+  // client did not pass a timeout.
+  MonoTime deadline_;
+
+  DISALLOW_COPY_AND_ASSIGN(InboundCall);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.cc b/be/src/kudu/rpc/messenger.cc
new file mode 100644
index 0000000..17ac0c5
--- /dev/null
+++ b/be/src/kudu/rpc/messenger.cc
@@ -0,0 +1,502 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/messenger.h"
+
+#include <cstdlib>
+#include <functional>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <type_traits>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/acceptor_pool.h"
+#include "kudu/rpc/connection_id.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/reactor.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/rpcz_store.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread_restrictions.h"
+#include "kudu/util/threadpool.h"
+
+using std::string;
+using std::shared_ptr;
+using std::make_shared;
+using strings::Substitute;
+
+namespace boost {
+template <typename Signature> class function;
+}
+
+namespace kudu {
+namespace rpc {
+
+MessengerBuilder::MessengerBuilder(std::string name)
+    : name_(std::move(name)),
+      connection_keepalive_time_(MonoDelta::FromMilliseconds(65000)),
+      num_reactors_(4),
+      min_negotiation_threads_(0),
+      max_negotiation_threads_(4),
+      coarse_timer_granularity_(MonoDelta::FromMilliseconds(100)),
+      rpc_negotiation_timeout_ms_(3000),
+      sasl_proto_name_("kudu"),
+      rpc_authentication_("optional"),
+      rpc_encryption_("optional"),
+      rpc_tls_ciphers_(kudu::security::SecurityDefaults::kDefaultTlsCiphers),
+      rpc_tls_min_protocol_(kudu::security::SecurityDefaults::kDefaultTlsMinVersion),
+      enable_inbound_tls_(false),
+      reuseport_(false) {
+}
+
+MessengerBuilder& MessengerBuilder::set_connection_keepalive_time(const MonoDelta &keepalive) {
+  connection_keepalive_time_ = keepalive;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_num_reactors(int num_reactors) {
+  num_reactors_ = num_reactors;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_min_negotiation_threads(int min_negotiation_threads) {
+  min_negotiation_threads_ = min_negotiation_threads;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_max_negotiation_threads(int max_negotiation_threads) {
+  max_negotiation_threads_ = max_negotiation_threads;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_coarse_timer_granularity(const MonoDelta &granularity) {
+  coarse_timer_granularity_ = granularity;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_metric_entity(
+    const scoped_refptr<MetricEntity>& metric_entity) {
+  metric_entity_ = metric_entity;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_connection_keep_alive_time(int32_t time_in_ms) {
+  connection_keepalive_time_ = MonoDelta::FromMilliseconds(time_in_ms);
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_negotiation_timeout_ms(int64_t time_in_ms) {
+  rpc_negotiation_timeout_ms_ = time_in_ms;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_sasl_proto_name(const std::string& sasl_proto_name) {
+  sasl_proto_name_ = sasl_proto_name;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_authentication(const std::string& rpc_authentication) {
+  rpc_authentication_ = rpc_authentication;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_encryption(const std::string& rpc_encryption) {
+  rpc_encryption_ = rpc_encryption;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_tls_ciphers(const std::string& rpc_tls_ciphers) {
+  rpc_tls_ciphers_ = rpc_tls_ciphers;
+  return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_tls_min_protocol(
+    const std::string& rpc_tls_min_protocol) {
+  rpc_tls_min_protocol_ = rpc_tls_min_protocol;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_epki_cert_key_files(
+    const std::string& cert, const std::string& private_key) {
+  rpc_certificate_file_ = cert;
+  rpc_private_key_file_ = private_key;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_epki_certificate_authority_file(const std::string& ca) {
+  rpc_ca_certificate_file_ = ca;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_epki_private_password_key_cmd(const std::string& cmd) {
+  rpc_private_key_password_cmd_ = cmd;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_keytab_file(const std::string& keytab_file) {
+  keytab_file_ = keytab_file;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::enable_inbound_tls() {
+  enable_inbound_tls_ = true;
+  return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_reuseport() {
+  reuseport_ = true;
+  return *this;
+}
+
+Status MessengerBuilder::Build(shared_ptr<Messenger> *msgr) {
+  // Initialize SASL library before we start making requests
+  RETURN_NOT_OK(SaslInit(!keytab_file_.empty()));
+
+  Messenger* new_msgr(new Messenger(*this));
+
+  auto cleanup = MakeScopedCleanup([&] () {
+      new_msgr->AllExternalReferencesDropped();
+  });
+
+  RETURN_NOT_OK(ParseTriState("--rpc_authentication",
+                              rpc_authentication_,
+                              &new_msgr->authentication_));
+
+  RETURN_NOT_OK(ParseTriState("--rpc_encryption",
+                              rpc_encryption_,
+                              &new_msgr->encryption_));
+
+  RETURN_NOT_OK(new_msgr->Init());
+  if (new_msgr->encryption_ != RpcEncryption::DISABLED && enable_inbound_tls_) {
+    auto* tls_context = new_msgr->mutable_tls_context();
+
+    if (!rpc_certificate_file_.empty()) {
+      CHECK(!rpc_private_key_file_.empty());
+      CHECK(!rpc_ca_certificate_file_.empty());
+
+      // TODO(KUDU-1920): should we try and enforce that the server
+      // is in the subject or alt names of the cert?
+      RETURN_NOT_OK(tls_context->LoadCertificateAuthority(rpc_ca_certificate_file_));
+      if (rpc_private_key_password_cmd_.empty()) {
+        RETURN_NOT_OK(tls_context->LoadCertificateAndKey(rpc_certificate_file_,
+                                                         rpc_private_key_file_));
+      } else {
+        RETURN_NOT_OK(tls_context->LoadCertificateAndPasswordProtectedKey(
+            rpc_certificate_file_, rpc_private_key_file_,
+            [&](){
+              string ret;
+              WARN_NOT_OK(security::GetPasswordFromShellCommand(
+                  rpc_private_key_password_cmd_, &ret),
+                  "could not get RPC password from configured command");
+              return ret;
+            }
+        ));
+      }
+    } else {
+      RETURN_NOT_OK(tls_context->GenerateSelfSignedCertAndKey());
+    }
+  }
+
+  // See docs on Messenger::retain_self_ for info about this odd hack.
+  cleanup.cancel();
+  *msgr = shared_ptr<Messenger>(new_msgr, std::mem_fun(&Messenger::AllExternalReferencesDropped));
+  return Status::OK();
+}
+
+// See comment on Messenger::retain_self_ member.
+void Messenger::AllExternalReferencesDropped() {
+  // The last external ref may have been dropped in the context of a task
+  // running on a reactor thread. If that's the case, a SYNC shutdown here
+  // would deadlock.
+  //
+  // If a SYNC shutdown is desired, Shutdown() should be called explicitly.
+  ShutdownInternal(ShutdownMode::ASYNC);
+
+  CHECK(retain_self_.get());
+  // If we have no more external references, then we no longer
+  // need to retain ourself. We'll destruct as soon as all our
+  // internal-facing references are dropped (ie those from reactor
+  // threads).
+  retain_self_.reset();
+}
+
+void Messenger::Shutdown() {
+  ShutdownInternal(ShutdownMode::SYNC);
+}
+
+void Messenger::ShutdownInternal(ShutdownMode mode) {
+  if (mode == ShutdownMode::SYNC) {
+    ThreadRestrictions::AssertWaitAllowed();
+  }
+
+  // Since we're shutting down, it's OK to block.
+  //
+  // TODO(adar): this ought to be removed (i.e. if ASYNC, waiting should be
+  // forbidden, and if SYNC, we already asserted above), but that's not
+  // possible while shutting down thread and acceptor pools still involves
+  // joining threads.
+  ThreadRestrictions::ScopedAllowWait allow_wait;
+
+  acceptor_vec_t pools_to_shutdown;
+  RpcServicesMap services_to_release;
+  {
+    std::lock_guard<percpu_rwlock> guard(lock_);
+    if (closing_) {
+      return;
+    }
+    VLOG(1) << "shutting down messenger " << name_;
+    closing_ = true;
+
+    services_to_release = std::move(rpc_services_);
+    pools_to_shutdown = std::move(acceptor_pools_);
+  }
+
+  // Destroy state outside of the lock.
+  services_to_release.clear();
+  for (const auto& p : pools_to_shutdown) {
+    p->Shutdown();
+  }
+
+  // Need to shut down negotiation pool before the reactors, since the
+  // reactors close the Connection sockets, and may race against the negotiation
+  // threads' blocking reads & writes.
+  client_negotiation_pool_->Shutdown();
+  server_negotiation_pool_->Shutdown();
+
+  for (Reactor* reactor : reactors_) {
+    reactor->Shutdown(mode);
+  }
+}
+
+Status Messenger::AddAcceptorPool(const Sockaddr &accept_addr,
+                                  shared_ptr<AcceptorPool>* pool) {
+  // Before listening, if we expect to require Kerberos, we want to verify
+  // that everything is set up correctly. This way we'll generate errors on
+  // startup rather than later on when we first receive a client connection.
+  if (!keytab_file_.empty()) {
+    RETURN_NOT_OK_PREPEND(ServerNegotiation::PreflightCheckGSSAPI(sasl_proto_name()),
+                          "GSSAPI/Kerberos not properly configured");
+  }
+
+  Socket sock;
+  RETURN_NOT_OK(sock.Init(0));
+  RETURN_NOT_OK(sock.SetReuseAddr(true));
+  if (reuseport_) {
+    RETURN_NOT_OK(sock.SetReusePort(true));
+  }
+  RETURN_NOT_OK(sock.Bind(accept_addr));
+  Sockaddr remote;
+  RETURN_NOT_OK(sock.GetSocketAddress(&remote));
+  auto acceptor_pool(make_shared<AcceptorPool>(this, &sock, remote));
+
+  std::lock_guard<percpu_rwlock> guard(lock_);
+  acceptor_pools_.push_back(acceptor_pool);
+  pool->swap(acceptor_pool);
+  return Status::OK();
+}
+
+// Register a new RpcService to handle inbound requests.
+Status Messenger::RegisterService(const string& service_name,
+                                  const scoped_refptr<RpcService>& service) {
+  DCHECK(service);
+  std::lock_guard<percpu_rwlock> guard(lock_);
+  if (InsertIfNotPresent(&rpc_services_, service_name, service)) {
+    return Status::OK();
+  } else {
+    return Status::AlreadyPresent("This service is already present");
+  }
+}
+
+void Messenger::UnregisterAllServices() {
+  RpcServicesMap to_release;
+  {
+    std::lock_guard<percpu_rwlock> guard(lock_);
+    to_release = std::move(rpc_services_);
+  }
+  // Release the map outside of the lock.
+}
+
+Status Messenger::UnregisterService(const string& service_name) {
+  scoped_refptr<RpcService> to_release;
+  {
+    std::lock_guard<percpu_rwlock> guard(lock_);
+    to_release = EraseKeyReturnValuePtr(&rpc_services_, service_name);
+    if (!to_release) {
+      return Status::ServiceUnavailable(Substitute(
+          "service $0 not registered on $1", service_name, name_));
+    }
+  }
+  // Release the service outside of the lock.
+  return Status::OK();
+}
+
+void Messenger::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
+  Reactor *reactor = RemoteToReactor(call->conn_id().remote());
+  reactor->QueueOutboundCall(call);
+}
+
+void Messenger::QueueInboundCall(gscoped_ptr<InboundCall> call) {
+  shared_lock<rw_spinlock> guard(lock_.get_lock());
+  scoped_refptr<RpcService>* service = FindOrNull(rpc_services_,
+                                                  call->remote_method().service_name());
+  if (PREDICT_FALSE(!service)) {
+    Status s =  Status::ServiceUnavailable(Substitute("service $0 not registered on $1",
+                                                      call->remote_method().service_name(), name_));
+    LOG(INFO) << s.ToString();
+    call.release()->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_SERVICE, s);
+    return;
+  }
+
+  call->set_method_info((*service)->LookupMethod(call->remote_method()));
+
+  // The RpcService will respond to the client on success or failure.
+  WARN_NOT_OK((*service)->QueueInboundCall(std::move(call)), "Unable to handle RPC call");
+}
+
+void Messenger::QueueCancellation(const shared_ptr<OutboundCall> &call) {
+  Reactor *reactor = RemoteToReactor(call->conn_id().remote());
+  reactor->QueueCancellation(call);
+}
+
+void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote) {
+  Reactor *reactor = RemoteToReactor(remote);
+  reactor->RegisterInboundSocket(new_socket, remote);
+}
+
+Messenger::Messenger(const MessengerBuilder &bld)
+  : name_(bld.name_),
+    closing_(false),
+    authentication_(RpcAuthentication::REQUIRED),
+    encryption_(RpcEncryption::REQUIRED),
+    tls_context_(new security::TlsContext(bld.rpc_tls_ciphers_, bld.rpc_tls_min_protocol_)),
+    token_verifier_(new security::TokenVerifier()),
+    rpcz_store_(new RpczStore()),
+    metric_entity_(bld.metric_entity_),
+    rpc_negotiation_timeout_ms_(bld.rpc_negotiation_timeout_ms_),
+    sasl_proto_name_(bld.sasl_proto_name_),
+    keytab_file_(bld.keytab_file_),
+    reuseport_(bld.reuseport_),
+    retain_self_(this) {
+  for (int i = 0; i < bld.num_reactors_; i++) {
+    reactors_.push_back(new Reactor(retain_self_, i, bld));
+  }
+  CHECK_OK(ThreadPoolBuilder("client-negotiator")
+      .set_min_threads(bld.min_negotiation_threads_)
+      .set_max_threads(bld.max_negotiation_threads_)
+      .Build(&client_negotiation_pool_));
+  CHECK_OK(ThreadPoolBuilder("server-negotiator")
+      .set_min_threads(bld.min_negotiation_threads_)
+      .set_max_threads(bld.max_negotiation_threads_)
+      .Build(&server_negotiation_pool_));
+}
+
+Messenger::~Messenger() {
+  std::lock_guard<percpu_rwlock> guard(lock_);
+  CHECK(closing_) << "Should have already shut down";
+  STLDeleteElements(&reactors_);
+}
+
+Reactor* Messenger::RemoteToReactor(const Sockaddr &remote) {
+  uint32_t hashCode = remote.HashCode();
+  int reactor_idx = hashCode % reactors_.size();
+  // This is just a static partitioning; we could get a lot
+  // fancier with assigning Sockaddrs to Reactors.
+  return reactors_[reactor_idx];
+}
+
+Status Messenger::Init() {
+  RETURN_NOT_OK(tls_context_->Init());
+  for (Reactor* r : reactors_) {
+    RETURN_NOT_OK(r->Init());
+  }
+
+  return Status::OK();
+}
+
+Status Messenger::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+                                  DumpRunningRpcsResponsePB* resp) {
+  shared_lock<rw_spinlock> guard(lock_.get_lock());
+  for (Reactor* reactor : reactors_) {
+    RETURN_NOT_OK(reactor->DumpRunningRpcs(req, resp));
+  }
+  return Status::OK();
+}
+
+void Messenger::ScheduleOnReactor(const boost::function<void(const Status&)>& func,
+                                  MonoDelta when) {
+  DCHECK(!reactors_.empty());
+
+  // If we're already running on a reactor thread, reuse it.
+  Reactor* chosen = nullptr;
+  for (Reactor* r : reactors_) {
+    if (r->IsCurrentThread()) {
+      chosen = r;
+    }
+  }
+  if (chosen == nullptr) {
+    // Not running on a reactor thread, pick one at random.
+    chosen = reactors_[rand() % reactors_.size()];
+  }
+
+  DelayedTask* task = new DelayedTask(func, when);
+  chosen->ScheduleReactorTask(task);
+}
+
+const scoped_refptr<RpcService> Messenger::rpc_service(const string& service_name) const {
+  scoped_refptr<RpcService> service;
+  {
+    shared_lock<rw_spinlock> guard(lock_.get_lock());
+    if (!FindCopy(rpc_services_, service_name, &service)) {
+      return scoped_refptr<RpcService>(nullptr);
+    }
+  }
+  return service;
+}
+
+ThreadPool* Messenger::negotiation_pool(Connection::Direction dir) {
+  switch (dir) {
+    case Connection::CLIENT: return client_negotiation_pool_.get();
+    case Connection::SERVER: return server_negotiation_pool_.get();
+  }
+  DCHECK(false) << "Unknown Connection::Direction value: " << dir;
+  return nullptr;
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h
new file mode 100644
index 0000000..64a804b
--- /dev/null
+++ b/be/src/kudu/rpc/messenger.h
@@ -0,0 +1,460 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_MESSENGER_H
+#define KUDU_RPC_MESSENGER_H
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/security/security_flags.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+
+namespace boost {
+template <typename Signature>
+class function;
+}
+
+namespace kudu {
+
+class Socket;
+class ThreadPool;
+
+namespace security {
+class TlsContext;
+class TokenVerifier;
+}
+
+namespace rpc {
+
+using security::RpcAuthentication;
+using security::RpcEncryption;
+
+class AcceptorPool;
+class DumpRunningRpcsRequestPB;
+class DumpRunningRpcsResponsePB;
+class InboundCall;
+class Messenger;
+class OutboundCall;
+class Reactor;
+class RpcService;
+class RpczStore;
+
+struct AcceptorPoolInfo {
+ public:
+  explicit AcceptorPoolInfo(Sockaddr bind_address)
+      : bind_address_(bind_address) {}
+
+  Sockaddr bind_address() const {
+    return bind_address_;
+  }
+
+ private:
+  Sockaddr bind_address_;
+};
+
+// Used to construct a Messenger.
+class MessengerBuilder {
+ public:
+  friend class Messenger;
+  friend class ReactorThread;
+
+  explicit MessengerBuilder(std::string name);
+
+  // Set the length of time we will keep a TCP connection will alive with no traffic.
+  MessengerBuilder &set_connection_keepalive_time(const MonoDelta &keepalive);
+
+  // Set the number of reactor threads that will be used for sending and
+  // receiving.
+  MessengerBuilder &set_num_reactors(int num_reactors);
+
+  // Set the minimum number of connection-negotiation threads that will be used
+  // to handle the blocking connection-negotiation step.
+  MessengerBuilder &set_min_negotiation_threads(int min_negotiation_threads);
+
+  // Set the maximum number of connection-negotiation threads that will be used
+  // to handle the blocking connection-negotiation step.
+  MessengerBuilder &set_max_negotiation_threads(int max_negotiation_threads);
+
+  // Set the granularity with which connections are checked for keepalive.
+  MessengerBuilder &set_coarse_timer_granularity(const MonoDelta &granularity);
+
+  // Set metric entity for use by RPC systems.
+  MessengerBuilder &set_metric_entity(const scoped_refptr<MetricEntity>& metric_entity);
+
+  // Set the time in milliseconds after which an idle connection from a client will be
+  // disconnected by the server.
+  MessengerBuilder &set_connection_keep_alive_time(int32_t time_in_ms);
+
+  // Set the timeout for negotiating an RPC connection.
+  MessengerBuilder &set_rpc_negotiation_timeout_ms(int64_t time_in_ms);
+
+  // Set the SASL protocol name that is used for the SASL negotiation.
+  MessengerBuilder &set_sasl_proto_name(const std::string& sasl_proto_name);
+
+  // Set the state of authentication required. If 'optional', authentication will be used when
+  // the remote end supports it. If 'required', connections which are not able to authenticate
+  // (because the remote end lacks support) are rejected.
+  MessengerBuilder &set_rpc_authentication(const std::string& rpc_authentication);
+
+  // Set the state of encryption required. If 'optional', encryption will be used when the
+  // remote end supports it. If 'required', connections which are not able to use encryption
+  // (because the remote end lacks support) are rejected. If 'disabled', encryption will not
+  // be used, and RPC authentication (--rpc_authentication) must also be disabled as well.
+  MessengerBuilder &set_rpc_encryption(const std::string& rpc_encryption);
+
+  // Set the cipher suite preferences to use for TLS-secured RPC connections. Uses the OpenSSL
+  // cipher preference list format. See man (1) ciphers for more information.
+  MessengerBuilder &set_rpc_tls_ciphers(const std::string& rpc_tls_ciphers);
+
+  // Set the minimum protocol version to allow when for securing RPC connections with TLS. May be
+  // one of 'TLSv1', 'TLSv1.1', or 'TLSv1.2'.
+  MessengerBuilder &set_rpc_tls_min_protocol(const std::string& rpc_tls_min_protocol);
+
+  // Set the TLS server certificate and private key files paths. If this is set in conjunction
+  // with enable_inbound_tls(), internal PKI will not be used for encrypted communication and
+  // external PKI will be used instead.
+  MessengerBuilder &set_epki_cert_key_files(
+      const std::string& cert, const std::string& private_key);
+
+  // Set the TLS Certificate Authority file path. Must always be set with set_epki_cert_key_files().
+  // If this is set in conjunction with enable_inbound_tls(), internal PKI will not be used for
+  // encrypted communication and external PKI will be used instead.
+  MessengerBuilder &set_epki_certificate_authority_file(const std::string& ca);
+
+  // Set a Unix command whose output returns the password used to decrypt the RPC server's private
+  // key file specified via set_epki_cert_key_files(). If the .PEM key file is not
+  // password-protected, this flag does not need to be set. Trailing whitespace will be trimmed
+  // before it is used to decrypt the private key.
+  MessengerBuilder &set_epki_private_password_key_cmd(const std::string& cmd);
+
+  // Set the path to the Kerberos Keytab file for this server.
+  MessengerBuilder &set_keytab_file(const std::string& keytab_file);
+
+  // Configure the messenger to enable TLS encryption on inbound connections.
+  MessengerBuilder& enable_inbound_tls();
+
+  // Configure the messenger to set the SO_REUSEPORT socket option.
+  MessengerBuilder& set_reuseport();
+
+  Status Build(std::shared_ptr<Messenger> *msgr);
+
+ private:
+  const std::string name_;
+  MonoDelta connection_keepalive_time_;
+  int num_reactors_;
+  int min_negotiation_threads_;
+  int max_negotiation_threads_;
+  MonoDelta coarse_timer_granularity_;
+  scoped_refptr<MetricEntity> metric_entity_;
+  int64_t rpc_negotiation_timeout_ms_;
+  std::string sasl_proto_name_;
+  std::string rpc_authentication_;
+  std::string rpc_encryption_;
+  std::string rpc_tls_ciphers_;
+  std::string rpc_tls_min_protocol_;
+  std::string rpc_certificate_file_;
+  std::string rpc_private_key_file_;
+  std::string rpc_ca_certificate_file_;
+  std::string rpc_private_key_password_cmd_;
+  std::string keytab_file_;
+  bool enable_inbound_tls_;
+  bool reuseport_;
+};
+
+// A Messenger is a container for the reactor threads which run event loops
+// for the RPC services. If the process is a server, a Messenger can also have
+// one or more attached AcceptorPools which accept RPC connections. In this case,
+// calls received over the connection are enqueued into the messenger's service_queue
+// for processing by a ServicePool.
+//
+// Users do not typically interact with the Messenger directly except to create
+// one as a singleton, and then make calls using Proxy objects.
+//
+// See rpc-test.cc and rpc-bench.cc for example usages.
+class Messenger {
+ public:
+  friend class MessengerBuilder;
+  friend class Proxy;
+  friend class Reactor;
+  friend class ReactorThread;
+  typedef std::vector<std::shared_ptr<AcceptorPool> > acceptor_vec_t;
+  typedef std::unordered_map<std::string, scoped_refptr<RpcService> > RpcServicesMap;
+
+  static const uint64_t UNKNOWN_CALL_ID = 0;
+
+  ~Messenger();
+
+  // Stops all communication and prevents further use. If called explicitly,
+  // also waits for outstanding tasks running on reactor threads to finish,
+  // which means it may  not be called from a reactor task.
+  //
+  // It's not required to call this -- dropping the shared_ptr provided
+  // from MessengerBuilder::Build will automatically call this method.
+  void Shutdown();
+
+  // Add a new acceptor pool listening to the given accept address.
+  // You can create any number of acceptor pools you want, including none.
+  //
+  // The created pool is returned in *pool. The Messenger also retains
+  // a reference to the pool, so the caller may safely drop this reference
+  // and the pool will remain live.
+  //
+  // NOTE: the returned pool is not initially started. You must call
+  // pool->Start(...) to begin accepting connections.
+  //
+  // If Kerberos is enabled, this also runs a pre-flight check that makes
+  // sure the environment is appropriately configured to authenticate
+  // clients via Kerberos. If not, this returns a RuntimeError.
+  Status AddAcceptorPool(const Sockaddr &accept_addr,
+                         std::shared_ptr<AcceptorPool>* pool);
+
+  // Register a new RpcService to handle inbound requests.
+  //
+  // Returns an error if a service with the same name is already registered.
+  Status RegisterService(const std::string& service_name,
+                         const scoped_refptr<RpcService>& service);
+
+  // Unregister an RpcService by name.
+  //
+  // Returns an error if no service with this name can be found.
+  Status UnregisterService(const std::string& service_name);
+
+  // Unregisters all RPC services.
+  void UnregisterAllServices();
+
+  // Queue a call for transmission. This will pick the appropriate reactor,
+  // and enqueue a task on that reactor to assign and send the call.
+  void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+  // Enqueue a call for processing on the server.
+  void QueueInboundCall(gscoped_ptr<InboundCall> call);
+
+  // Queue a cancellation for the given outbound call.
+  void QueueCancellation(const std::shared_ptr<OutboundCall> &call);
+
+  // Take ownership of the socket via Socket::Release
+  void RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote);
+
+  // Dump the current RPCs into the given protobuf.
+  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+                         DumpRunningRpcsResponsePB* resp);
+
+  // Run 'func' on a reactor thread after 'when' time elapses.
+  //
+  // The status argument conveys whether 'func' was run correctly (i.e.
+  // after the elapsed time) or not.
+  void ScheduleOnReactor(const boost::function<void(const Status&)>& func,
+                         MonoDelta when);
+
+  const security::TlsContext& tls_context() const { return *tls_context_; }
+  security::TlsContext* mutable_tls_context() { return tls_context_.get(); }
+
+  const security::TokenVerifier& token_verifier() const { return *token_verifier_; }
+  security::TokenVerifier* mutable_token_verifier() { return token_verifier_.get(); }
+  std::shared_ptr<security::TokenVerifier> shared_token_verifier() const {
+    return token_verifier_;
+  }
+
+  boost::optional<security::SignedTokenPB> authn_token() const {
+    std::lock_guard<simple_spinlock> l(authn_token_lock_);
+    return authn_token_;
+  }
+  void set_authn_token(const security::SignedTokenPB& token) {
+    std::lock_guard<simple_spinlock> l(authn_token_lock_);
+    authn_token_ = token;
+  }
+
+  RpcAuthentication authentication() const { return authentication_; }
+  RpcEncryption encryption() const { return encryption_; }
+
+  ThreadPool* negotiation_pool(Connection::Direction dir);
+
+  RpczStore* rpcz_store() { return rpcz_store_.get(); }
+
+  int num_reactors() const { return reactors_.size(); }
+
+  const std::string& name() const {
+    return name_;
+  }
+
+  bool closing() const {
+    shared_lock<rw_spinlock> l(lock_.get_lock());
+    return closing_;
+  }
+
+  scoped_refptr<MetricEntity> metric_entity() const { return metric_entity_; }
+
+  const int64_t rpc_negotiation_timeout_ms() const { return rpc_negotiation_timeout_ms_; }
+
+  const std::string& sasl_proto_name() const {
+    return sasl_proto_name_;
+  }
+
+  const std::string& keytab_file() const { return keytab_file_; }
+
+  const scoped_refptr<RpcService> rpc_service(const std::string& service_name) const;
+
+ private:
+  FRIEND_TEST(TestRpc, TestConnectionKeepalive);
+  FRIEND_TEST(TestRpc, TestConnectionAlwaysKeepalive);
+  FRIEND_TEST(TestRpc, TestClientConnectionsMetrics);
+  FRIEND_TEST(TestRpc, TestCredentialsPolicy);
+  FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
+
+  explicit Messenger(const MessengerBuilder &bld);
+
+  Reactor* RemoteToReactor(const Sockaddr &remote);
+  Status Init();
+  void RunTimeoutThread();
+  void UpdateCurTime();
+
+  // Shuts down the messenger.
+  //
+  // Depending on 'mode', may or may not wait on any outstanding reactor tasks.
+  enum class ShutdownMode {
+    SYNC,
+    ASYNC,
+  };
+  void ShutdownInternal(ShutdownMode mode);
+
+  // Called by external-facing shared_ptr when the user no longer holds
+  // any references. See 'retain_self_' for more info.
+  void AllExternalReferencesDropped();
+
+  const std::string name_;
+
+  // Protects closing_, acceptor_pools_, rpc_services_.
+  mutable percpu_rwlock lock_;
+
+  bool closing_;
+
+  // Whether to require authentication and encryption on the connections managed
+  // by this messenger.
+  // TODO(KUDU-1928): scope these to individual proxies, so that messengers can be
+  // reused by different clients.
+  RpcAuthentication authentication_;
+  RpcEncryption encryption_;
+
+  // Pools which are listening on behalf of this messenger.
+  // Note that the user may have called Shutdown() on one of these
+  // pools, so even though we retain the reference, it may no longer
+  // be listening.
+  acceptor_vec_t acceptor_pools_;
+
+  // RPC services that handle inbound requests.
+  RpcServicesMap rpc_services_;
+
+  std::vector<Reactor*> reactors_;
+
+  // Separate client and server negotiation pools to avoid possibility of distributed
+  // deadlock. See KUDU-2041.
+  gscoped_ptr<ThreadPool> client_negotiation_pool_;
+  gscoped_ptr<ThreadPool> server_negotiation_pool_;
+
+  std::unique_ptr<security::TlsContext> tls_context_;
+
+  // A TokenVerifier, which can verify client provided authentication tokens.
+  std::shared_ptr<security::TokenVerifier> token_verifier_;
+
+  // An optional token, which can be used to authenticate to a server.
+  mutable simple_spinlock authn_token_lock_;
+  boost::optional<security::SignedTokenPB> authn_token_;
+
+  std::unique_ptr<RpczStore> rpcz_store_;
+
+  scoped_refptr<MetricEntity> metric_entity_;
+
+  // Timeout in milliseconds after which an incomplete connection negotiation will timeout.
+  const int64_t rpc_negotiation_timeout_ms_;
+
+  // The SASL protocol name that is used for the SASL negotiation.
+  const std::string sasl_proto_name_;
+
+  // Path to the Kerberos Keytab file for this server.
+  const std::string keytab_file_;
+
+  // Whether to set SO_REUSEPORT on the listening sockets.
+  bool reuseport_;
+
+  // The ownership of the Messenger object is somewhat subtle. The pointer graph
+  // looks like this:
+  //
+  //    [User Code ]             |      [ Internal code ]
+  //                             |
+  //     shared_ptr[1]           |
+  //         |                   |
+  //         v
+  //      Messenger    <------------ shared_ptr[2] --- Reactor
+  //       ^    |      ------------- bare pointer  --> Reactor
+  //        \__/
+  //     shared_ptr[2]
+  //     (retain_self_)
+  //
+  // shared_ptr[1] instances use Messenger::AllExternalReferencesDropped()
+  //   as a deleter.
+  // shared_ptr[2] are "traditional" shared_ptrs which call 'delete' on the
+  //   object.
+  //
+  // The teardown sequence is as follows:
+  // Option 1): User calls "Shutdown()" explicitly:
+  //  - Messenger::Shutdown tells Reactors to shut down.
+  //  - When each reactor thread finishes, it drops its shared_ptr[2].
+  //  - the Messenger::retain_self instance remains, keeping the Messenger
+  //    alive.
+  //  - Before returning, Messenger::Shutdown waits for Reactors to shut down.
+  //  - The user eventually drops its shared_ptr[1], which calls
+  //    Messenger::AllExternalReferencesDropped. This drops retain_self_
+  //    and results in object destruction.
+  // Option 2): User drops all of its shared_ptr[1] references
+  //  - Though the Reactors still reference the Messenger, AllExternalReferencesDropped
+  //    will get called, which triggers Messenger::Shutdown.
+  //  - AllExternalReferencesDropped drops retain_self_, so the only remaining
+  //    references are from Reactor threads. But the reactor threads are shutting down.
+  //  - When the last Reactor thread dies, there will be no more shared_ptr[1] references
+  //    and the Messenger will be destroyed.
+  //
+  // The main goal of all of this confusion is that when using option 2, the
+  // reactor threads need to be able to shut down asynchronously, and we need
+  // to keep the Messenger alive until they do so. If normal shared_ptrs were
+  // handed out to users, the Messenger destructor may be forced to Join() the
+  // reactor threads, which deadlocks if the user destructs the Messenger from
+  // within a Reactor thread itself.
+  std::shared_ptr<Messenger> retain_self_;
+
+  DISALLOW_COPY_AND_ASSIGN(Messenger);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/mt-rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/mt-rpc-test.cc b/be/src/kudu/rpc/mt-rpc-test.cc
new file mode 100644
index 0000000..7427850
--- /dev/null
+++ b/be/src/kudu/rpc/mt-rpc-test.cc
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc-test-base.h"
+
+#include <cstddef>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/acceptor_pool.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/proxy.h"
+#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/rpc/service_pool.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/thread.h"
+
+
+METRIC_DECLARE_counter(rpc_connections_accepted);
+METRIC_DECLARE_counter(rpcs_queue_overflow);
+
+using std::string;
+using std::shared_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+class MultiThreadedRpcTest : public RpcTestBase {
+ public:
+  // Make a single RPC call.
+  void SingleCall(Sockaddr server_addr, const char* method_name,
+                  Status* result, CountDownLatch* latch) {
+    LOG(INFO) << "Connecting to " << server_addr.ToString();
+    shared_ptr<Messenger> client_messenger;
+    CHECK_OK(CreateMessenger("ClientSC", &client_messenger));
+    Proxy p(client_messenger, server_addr, server_addr.host(),
+            GenericCalculatorService::static_service_name());
+    *result = DoTestSyncCall(p, method_name);
+    latch->CountDown();
+  }
+
+  // Make RPC calls until we see a failure.
+  void HammerServer(Sockaddr server_addr, const char* method_name,
+                    Status* last_result) {
+    shared_ptr<Messenger> client_messenger;
+    CHECK_OK(CreateMessenger("ClientHS", &client_messenger));
+    HammerServerWithMessenger(server_addr, method_name, last_result, client_messenger);
+  }
+
+  void HammerServerWithMessenger(
+      Sockaddr server_addr, const char* method_name, Status* last_result,
+      const shared_ptr<Messenger>& messenger) {
+    LOG(INFO) << "Connecting to " << server_addr.ToString();
+    Proxy p(messenger, server_addr, server_addr.host(),
+            GenericCalculatorService::static_service_name());
+
+    int i = 0;
+    while (true) {
+      i++;
+      Status s = DoTestSyncCall(p, method_name);
+      if (!s.ok()) {
+        // Return on first failure.
+        LOG(INFO) << "Call failed. Shutting down client thread. Ran " << i << " calls: "
+            << s.ToString();
+        *last_result = s;
+        return;
+      }
+    }
+  }
+};
+
+static void AssertShutdown(kudu::Thread* thread, const Status* status) {
+  ASSERT_OK(ThreadJoiner(thread).warn_every_ms(500).Join());
+  string msg = status->ToString();
+  ASSERT_TRUE(msg.find("Service unavailable") != string::npos ||
+              msg.find("Network error") != string::npos)
+              << "Status is actually: " << msg;
+}
+
+// Test making several concurrent RPC calls while shutting down.
+// Simply verify that we don't hit any CHECK errors.
+TEST_F(MultiThreadedRpcTest, TestShutdownDuringService) {
+  // Set up server.
+  Sockaddr server_addr;
+  ASSERT_OK(StartTestServer(&server_addr));
+
+  const int kNumThreads = 4;
+  scoped_refptr<kudu::Thread> threads[kNumThreads];
+  Status statuses[kNumThreads];
+  for (int i = 0; i < kNumThreads; i++) {
+    ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+      &MultiThreadedRpcTest::HammerServer, this, server_addr,
+      GenericCalculatorService::kAddMethodName, &statuses[i], &threads[i]));
+  }
+
+  SleepFor(MonoDelta::FromMilliseconds(50));
+
+  // Shut down server.
+  ASSERT_OK(server_messenger_->UnregisterService(service_name_));
+  service_pool_->Shutdown();
+  server_messenger_->Shutdown();
+
+  for (int i = 0; i < kNumThreads; i++) {
+    AssertShutdown(threads[i].get(), &statuses[i]);
+  }
+}
+
+// Test shutting down the client messenger exactly as a thread is about to start
+// a new connection. This is a regression test for KUDU-104.
+TEST_F(MultiThreadedRpcTest, TestShutdownClientWhileCallsPending) {
+  // Set up server.
+  Sockaddr server_addr;
+  ASSERT_OK(StartTestServer(&server_addr));
+
+  shared_ptr<Messenger> client_messenger;
+  ASSERT_OK(CreateMessenger("Client", &client_messenger));
+
+  scoped_refptr<kudu::Thread> thread;
+  Status status;
+  ASSERT_OK(kudu::Thread::Create("test", "test",
+      &MultiThreadedRpcTest::HammerServerWithMessenger, this, server_addr,
+      GenericCalculatorService::kAddMethodName, &status, client_messenger, &thread));
+
+  // Shut down the messenger after a very brief sleep. This often will race so that the
+  // call gets submitted to the messenger before shutdown, but the negotiation won't have
+  // started yet. In a debug build this fails about half the time without the bug fix.
+  // See KUDU-104.
+  SleepFor(MonoDelta::FromMicroseconds(10));
+  client_messenger->Shutdown();
+  client_messenger.reset();
+
+  ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join());
+  ASSERT_TRUE(status.IsAborted() ||
+              status.IsServiceUnavailable());
+  string msg = status.ToString();
+  SCOPED_TRACE(msg);
+  ASSERT_TRUE(msg.find("Client RPC Messenger shutting down") != string::npos ||
+              msg.find("reactor is shutting down") != string::npos ||
+              msg.find("Unable to start connection negotiation thread") != string::npos)
+              << "Status is actually: " << msg;
+}
+
+// This bogus service pool leaves the service queue full.
+class BogusServicePool : public ServicePool {
+ public:
+  BogusServicePool(gscoped_ptr<ServiceIf> service,
+                   const scoped_refptr<MetricEntity>& metric_entity,
+                   size_t service_queue_length)
+    : ServicePool(std::move(service), metric_entity, service_queue_length) {
+  }
+  virtual Status Init(int num_threads) OVERRIDE {
+    // Do nothing
+    return Status::OK();
+  }
+};
+
+void IncrementBackpressureOrShutdown(const Status* status, int* backpressure, int* shutdown) {
+  string msg = status->ToString();
+  if (msg.find("service queue is full") != string::npos) {
+    ++(*backpressure);
+  } else if (msg.find("shutting down") != string::npos) {
+    ++(*shutdown);
+  } else if (msg.find("got EOF from remote") != string::npos) {
+    ++(*shutdown);
+  } else {
+    FAIL() << "Unexpected status message: " << msg;
+  }
+}
+
+// Test that we get a Service Unavailable error when we max out the incoming RPC service queue.
+TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) {
+  const size_t kMaxConcurrency = 2;
+
+  MessengerBuilder bld("messenger1");
+  bld.set_num_reactors(kMaxConcurrency);
+  bld.set_metric_entity(metric_entity_);
+  CHECK_OK(bld.Build(&server_messenger_));
+
+  shared_ptr<AcceptorPool> pool;
+  ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr(), &pool));
+  ASSERT_OK(pool->Start(kMaxConcurrency));
+  Sockaddr server_addr = pool->bind_address();
+
+  gscoped_ptr<ServiceIf> service(new GenericCalculatorService());
+  service_name_ = service->service_name();
+  service_pool_ = new BogusServicePool(std::move(service),
+                                      server_messenger_->metric_entity(),
+                                      kMaxConcurrency);
+  ASSERT_OK(service_pool_->Init(n_worker_threads_));
+  server_messenger_->RegisterService(service_name_, service_pool_);
+
+  scoped_refptr<kudu::Thread> threads[3];
+  Status status[3];
+  CountDownLatch latch(1);
+  for (int i = 0; i < 3; i++) {
+    ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+      &MultiThreadedRpcTest::SingleCall, this, server_addr,
+      GenericCalculatorService::kAddMethodName, &status[i], &latch, &threads[i]));
+  }
+
+  // One should immediately fail due to backpressure. The latch is only initialized
+  // to wait for the first of three threads to finish.
+  latch.Wait();
+
+  // The rest would time out after 10 sec, but we help them along.
+  ASSERT_OK(server_messenger_->UnregisterService(service_name_));
+  service_pool_->Shutdown();
+  server_messenger_->Shutdown();
+
+  for (const auto& thread : threads) {
+    ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join());
+  }
+
+  // Verify that one error was due to backpressure.
+  int errors_backpressure = 0;
+  int errors_shutdown = 0;
+
+  for (const auto& s : status) {
+    IncrementBackpressureOrShutdown(&s, &errors_backpressure, &errors_shutdown);
+  }
+
+  ASSERT_EQ(1, errors_backpressure);
+  ASSERT_EQ(2, errors_shutdown);
+
+  // Check that RPC queue overflow metric is 1
+  Counter *rpcs_queue_overflow =
+    METRIC_rpcs_queue_overflow.Instantiate(server_messenger_->metric_entity()).get();
+  ASSERT_EQ(1, rpcs_queue_overflow->value());
+}
+
+static void HammerServerWithTCPConns(const Sockaddr& addr) {
+  while (true) {
+    Socket socket;
+    CHECK_OK(socket.Init(0));
+    Status s;
+    LOG_SLOW_EXECUTION(INFO, 100, "Connect took long") {
+      s = socket.Connect(addr);
+    }
+    if (!s.ok()) {
+      CHECK(s.IsNetworkError()) << "Unexpected error: " << s.ToString();
+      return;
+    }
+    CHECK_OK(socket.Close());
+  }
+}
+
+// Regression test for KUDU-128.
+// Test that shuts down the server while new TCP connections are incoming.
+TEST_F(MultiThreadedRpcTest, TestShutdownWithIncomingConnections) {
+  // Set up server.
+  Sockaddr server_addr;
+  ASSERT_OK(StartTestServer(&server_addr));
+
+  // Start a number of threads which just hammer the server with TCP connections.
+  vector<scoped_refptr<kudu::Thread> > threads;
+  for (int i = 0; i < 8; i++) {
+    scoped_refptr<kudu::Thread> new_thread;
+    CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+        &HammerServerWithTCPConns, server_addr, &new_thread));
+    threads.push_back(new_thread);
+  }
+
+  // Sleep until the server has started to actually accept some connections from the
+  // test threads.
+  scoped_refptr<Counter> conns_accepted =
+    METRIC_rpc_connections_accepted.Instantiate(server_messenger_->metric_entity());
+  while (conns_accepted->value() == 0) {
+    SleepFor(MonoDelta::FromMicroseconds(100));
+  }
+
+  // Shutdown while there are still new connections appearing.
+  ASSERT_OK(server_messenger_->UnregisterService(service_name_));
+  service_pool_->Shutdown();
+  server_messenger_->Shutdown();
+
+  for (scoped_refptr<kudu::Thread>& t : threads) {
+    ASSERT_OK(ThreadJoiner(t.get()).warn_every_ms(500).Join());
+  }
+}
+
+} // namespace rpc
+} // namespace kudu
+