You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:56 UTC
[44/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/inbound_call.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/inbound_call.cc b/be/src/kudu/rpc/inbound_call.cc
new file mode 100644
index 0000000..6920071
--- /dev/null
+++ b/be/src/kudu/rpc/inbound_call.cc
@@ -0,0 +1,345 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/inbound_call.h"
+
+#include <cstdint>
+#include <memory>
+#include <ostream>
+
+#include <glog/logging.h>
+#include <google/protobuf/message.h>
+#include <google/protobuf/message_lite.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/rpcz_store.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/trace.h"
+
+namespace google {
+namespace protobuf {
+class FieldDescriptor;
+}
+}
+
+using google::protobuf::FieldDescriptor;
+using google::protobuf::Message;
+using google::protobuf::MessageLite;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+InboundCall::InboundCall(Connection* conn)
+ : conn_(conn),
+ trace_(new Trace),
+ method_info_(nullptr),
+ deadline_(MonoTime::Max()) {
+ RecordCallReceived();
+}
+
+InboundCall::~InboundCall() {}
+
+Status InboundCall::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
+ TRACE_EVENT_FLOW_BEGIN0("rpc", "InboundCall", this);
+ TRACE_EVENT0("rpc", "InboundCall::ParseFrom");
+ RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_, &serialized_request_));
+
+ // Adopt the service/method info from the header as soon as it's available.
+ if (PREDICT_FALSE(!header_.has_remote_method())) {
+ return Status::Corruption("Non-connection context request header must specify remote_method");
+ }
+ if (PREDICT_FALSE(!header_.remote_method().IsInitialized())) {
+ return Status::Corruption("remote_method in request header is not initialized",
+ header_.remote_method().InitializationErrorString());
+ }
+ remote_method_.FromPB(header_.remote_method());
+
+ // Compute and cache the call deadline.
+ if (header_.has_timeout_millis() && header_.timeout_millis() != 0) {
+ deadline_ = timing_.time_received + MonoDelta::FromMilliseconds(header_.timeout_millis());
+ }
+
+ if (header_.sidecar_offsets_size() > TransferLimits::kMaxSidecars) {
+ return Status::Corruption(strings::Substitute(
+ "Received $0 additional payload slices, expected at most %d",
+ header_.sidecar_offsets_size(), TransferLimits::kMaxSidecars));
+ }
+
+ RETURN_NOT_OK(RpcSidecar::ParseSidecars(
+ header_.sidecar_offsets(), serialized_request_, inbound_sidecar_slices_));
+ if (header_.sidecar_offsets_size() > 0) {
+ // Trim the request to just the message
+ serialized_request_ = Slice(serialized_request_.data(), header_.sidecar_offsets(0));
+ }
+
+ // Retain the buffer that we have a view into.
+ transfer_.swap(transfer);
+ return Status::OK();
+}
+
+void InboundCall::RespondSuccess(const MessageLite& response) {
+ TRACE_EVENT0("rpc", "InboundCall::RespondSuccess");
+ Respond(response, true);
+}
+
+void InboundCall::RespondUnsupportedFeature(const vector<uint32_t>& unsupported_features) {
+ TRACE_EVENT0("rpc", "InboundCall::RespondUnsupportedFeature");
+ ErrorStatusPB err;
+ err.set_message("unsupported feature flags");
+ err.set_code(ErrorStatusPB::ERROR_INVALID_REQUEST);
+ for (uint32_t feature : unsupported_features) {
+ err.add_unsupported_feature_flags(feature);
+ }
+
+ Respond(err, false);
+}
+
+void InboundCall::RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code,
+ const Status& status) {
+ TRACE_EVENT0("rpc", "InboundCall::RespondFailure");
+ ErrorStatusPB err;
+ err.set_message(status.ToString());
+ err.set_code(error_code);
+
+ Respond(err, false);
+}
+
+void InboundCall::RespondApplicationError(int error_ext_id, const std::string& message,
+ const MessageLite& app_error_pb) {
+ ErrorStatusPB err;
+ ApplicationErrorToPB(error_ext_id, message, app_error_pb, &err);
+ Respond(err, false);
+}
+
+void InboundCall::ApplicationErrorToPB(int error_ext_id, const std::string& message,
+ const google::protobuf::MessageLite& app_error_pb,
+ ErrorStatusPB* err) {
+ err->set_message(message);
+ const FieldDescriptor* app_error_field =
+ err->GetReflection()->FindKnownExtensionByNumber(error_ext_id);
+ if (app_error_field != nullptr) {
+ err->GetReflection()->MutableMessage(err, app_error_field)->CheckTypeAndMergeFrom(app_error_pb);
+ } else {
+ LOG(DFATAL) << "Unable to find application error extension ID " << error_ext_id
+ << " (message=" << message << ")";
+ }
+}
+
+void InboundCall::Respond(const MessageLite& response,
+ bool is_success) {
+ TRACE_EVENT_FLOW_END0("rpc", "InboundCall", this);
+ SerializeResponseBuffer(response, is_success);
+
+ TRACE_EVENT_ASYNC_END1("rpc", "InboundCall", this,
+ "method", remote_method_.method_name());
+ TRACE_TO(trace_, "Queueing $0 response", is_success ? "success" : "failure");
+ RecordHandlingCompleted();
+ conn_->rpcz_store()->AddCall(this);
+ conn_->QueueResponseForCall(gscoped_ptr<InboundCall>(this));
+}
+
+void InboundCall::SerializeResponseBuffer(const MessageLite& response,
+ bool is_success) {
+ if (PREDICT_FALSE(!response.IsInitialized())) {
+ LOG(ERROR) << "Invalid RPC response for " << ToString()
+ << ": protobuf missing required fields: "
+ << response.InitializationErrorString();
+ // Send it along anyway -- the client will also notice the missing fields
+ // and produce an error on the other side, but this will at least
+ // make it clear on both sides of the RPC connection what kind of error
+ // happened.
+ }
+
+ uint32_t protobuf_msg_size = response.ByteSize();
+
+ ResponseHeader resp_hdr;
+ resp_hdr.set_call_id(header_.call_id());
+ resp_hdr.set_is_error(!is_success);
+ int32_t sidecar_byte_size = 0;
+ for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
+ resp_hdr.add_sidecar_offsets(sidecar_byte_size + protobuf_msg_size);
+ int32_t sidecar_bytes = car->AsSlice().size();
+ DCHECK_LE(sidecar_byte_size, TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes);
+ sidecar_byte_size += sidecar_bytes;
+ }
+
+ serialization::SerializeMessage(response, &response_msg_buf_,
+ sidecar_byte_size, true);
+ int64_t main_msg_size = sidecar_byte_size + response_msg_buf_.size();
+ serialization::SerializeHeader(resp_hdr, main_msg_size,
+ &response_hdr_buf_);
+}
+
+size_t InboundCall::SerializeResponseTo(TransferPayload* slices) const {
+ TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo");
+ DCHECK_GT(response_hdr_buf_.size(), 0);
+ DCHECK_GT(response_msg_buf_.size(), 0);
+ size_t n_slices = 2 + outbound_sidecars_.size();
+ DCHECK_LE(n_slices, slices->size());
+ auto slice_iter = slices->begin();
+ *slice_iter++ = Slice(response_hdr_buf_);
+ *slice_iter++ = Slice(response_msg_buf_);
+ for (auto& sidecar : outbound_sidecars_) {
+ *slice_iter++ = sidecar->AsSlice();
+ }
+ DCHECK_EQ(slice_iter - slices->begin(), n_slices);
+ return n_slices;
+}
+
+Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
+ // Check that the number of sidecars does not exceed the number of payload
+ // slices that are free (two are used up by the header and main message
+ // protobufs).
+ if (outbound_sidecars_.size() > TransferLimits::kMaxSidecars) {
+ return Status::ServiceUnavailable("All available sidecars already used");
+ }
+ int64_t sidecar_bytes = car->AsSlice().size();
+ if (outbound_sidecars_total_bytes_ >
+ TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes) {
+ return Status::RuntimeError(Substitute("Total size of sidecars $0 would exceed limit $1",
+ static_cast<int64_t>(outbound_sidecars_total_bytes_) + sidecar_bytes,
+ TransferLimits::kMaxTotalSidecarBytes));
+ }
+
+ outbound_sidecars_.emplace_back(std::move(car));
+ outbound_sidecars_total_bytes_ += sidecar_bytes;
+ DCHECK_GE(outbound_sidecars_total_bytes_, 0);
+ *idx = outbound_sidecars_.size() - 1;
+ return Status::OK();
+}
+
+string InboundCall::ToString() const {
+ if (header_.has_request_id()) {
+ return Substitute("Call $0 from $1 (ReqId={client: $2, seq_no=$3, attempt_no=$4})",
+ remote_method_.ToString(),
+ conn_->remote().ToString(),
+ header_.request_id().client_id(),
+ header_.request_id().seq_no(),
+ header_.request_id().attempt_no());
+ }
+ return Substitute("Call $0 from $1 (request call id $2)",
+ remote_method_.ToString(),
+ conn_->remote().ToString(),
+ header_.call_id());
+}
+
+void InboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
+ RpcCallInProgressPB* resp) {
+ resp->mutable_header()->CopyFrom(header_);
+ if (req.include_traces() && trace_) {
+ resp->set_trace_buffer(trace_->DumpToString());
+ }
+ resp->set_micros_elapsed((MonoTime::Now() - timing_.time_received)
+ .ToMicroseconds());
+}
+
+const RemoteUser& InboundCall::remote_user() const {
+ return conn_->remote_user();
+}
+
+const Sockaddr& InboundCall::remote_address() const {
+ return conn_->remote();
+}
+
+const scoped_refptr<Connection>& InboundCall::connection() const {
+ return conn_;
+}
+
+Trace* InboundCall::trace() {
+ return trace_.get();
+}
+
+void InboundCall::RecordCallReceived() {
+ TRACE_EVENT_ASYNC_BEGIN0("rpc", "InboundCall", this);
+ DCHECK(!timing_.time_received.Initialized()); // Protect against multiple calls.
+ timing_.time_received = MonoTime::Now();
+}
+
+void InboundCall::RecordHandlingStarted(Histogram* incoming_queue_time) {
+ DCHECK(incoming_queue_time != nullptr);
+ DCHECK(!timing_.time_handled.Initialized()); // Protect against multiple calls.
+ timing_.time_handled = MonoTime::Now();
+ incoming_queue_time->Increment(
+ (timing_.time_handled - timing_.time_received).ToMicroseconds());
+}
+
+void InboundCall::RecordHandlingCompleted() {
+ DCHECK(!timing_.time_completed.Initialized()); // Protect against multiple calls.
+ timing_.time_completed = MonoTime::Now();
+
+ if (!timing_.time_handled.Initialized()) {
+ // Sometimes we respond to a call before we begin handling it (e.g. due to queue
+ // overflow, etc). These cases should not be counted against the histogram.
+ return;
+ }
+
+ if (method_info_) {
+ method_info_->handler_latency_histogram->Increment(
+ (timing_.time_completed - timing_.time_handled).ToMicroseconds());
+ }
+}
+
+bool InboundCall::ClientTimedOut() const {
+ return MonoTime::Now() >= deadline_;
+}
+
+MonoTime InboundCall::GetTimeReceived() const {
+ return timing_.time_received;
+}
+
+vector<uint32_t> InboundCall::GetRequiredFeatures() const {
+ vector<uint32_t> features;
+ for (uint32_t feature : header_.required_feature_flags()) {
+ features.push_back(feature);
+ }
+ return features;
+}
+
+Status InboundCall::GetInboundSidecar(int idx, Slice* sidecar) const {
+ DCHECK(transfer_) << "Sidecars have been discarded";
+ if (idx < 0 || idx >= header_.sidecar_offsets_size()) {
+ return Status::InvalidArgument(strings::Substitute(
+ "Index $0 does not reference a valid sidecar", idx));
+ }
+ *sidecar = inbound_sidecar_slices_[idx];
+ return Status::OK();
+}
+
+void InboundCall::DiscardTransfer() {
+ transfer_.reset();
+}
+
+size_t InboundCall::GetTransferSize() {
+ if (!transfer_) return 0;
+ return transfer_->data().size();
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/inbound_call.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/inbound_call.h b/be/src/kudu/rpc/inbound_call.h
new file mode 100644
index 0000000..0db4c37
--- /dev/null
+++ b/be/src/kudu/rpc/inbound_call.h
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_INBOUND_CALL_H
+#define KUDU_RPC_INBOUND_CALL_H
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class MessageLite;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Histogram;
+class Sockaddr;
+class Trace;
+
+namespace rpc {
+
+class Connection;
+class DumpRunningRpcsRequestPB;
+class RemoteUser;
+class RpcCallInProgressPB;
+class RpcSidecar;
+
+struct InboundCallTiming {
+ MonoTime time_received; // Time the call was first accepted.
+ MonoTime time_handled; // Time the call handler was kicked off.
+ MonoTime time_completed; // Time the call handler completed.
+
+ MonoDelta TotalDuration() const {
+ return time_completed - time_received;
+ }
+};
+
+// Inbound call on server
+class InboundCall {
+ public:
+ explicit InboundCall(Connection* conn);
+ ~InboundCall();
+
+ // Parse an inbound call message.
+ //
+ // This only deserializes the call header, populating the 'header_' and
+ // 'serialized_request_' member variables. The actual call parameter is
+ // not deserialized, as this may be CPU-expensive, and this is called
+ // from the reactor thread.
+ Status ParseFrom(gscoped_ptr<InboundTransfer> transfer);
+
+ // Return the serialized request parameter protobuf.
+ const Slice& serialized_request() const {
+ DCHECK(transfer_) << "Transfer discarded before parameter parsing";
+ return serialized_request_;
+ }
+
+ const RemoteMethod& remote_method() const {
+ return remote_method_;
+ }
+
+ const int32_t call_id() const {
+ return header_.call_id();
+ }
+
+ // Serializes 'response' into the InboundCall's internal buffer, and marks
+ // the call as a success. Enqueues the response back to the connection
+ // that made the call.
+ //
+ // This method deletes the InboundCall object, so no further calls may be
+ // made after this one.
+ void RespondSuccess(const google::protobuf::MessageLite& response);
+
+ // Serializes a failure response into the internal buffer, marking the
+ // call as a failure. Enqueues the response back to the connection that
+ // made the call.
+ //
+ // This method deletes the InboundCall object, so no further calls may be
+ // made after this one.
+ void RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code,
+ const Status &status);
+
+ void RespondUnsupportedFeature(const std::vector<uint32_t>& unsupported_features);
+
+ void RespondApplicationError(int error_ext_id, const std::string& message,
+ const google::protobuf::MessageLite& app_error_pb);
+
+ // Convert an application error extension to an ErrorStatusPB.
+ // These ErrorStatusPB objects are what are returned in application error responses.
+ static void ApplicationErrorToPB(int error_ext_id, const std::string& message,
+ const google::protobuf::MessageLite& app_error_pb,
+ ErrorStatusPB* err);
+
+ // Serialize the response packet for the finished call into 'slices'.
+ // The resulting slices refer to memory in this object.
+ // Returns the number of slices in the serialized response.
+ size_t SerializeResponseTo(TransferPayload* slices) const;
+
+ // See RpcContext::AddRpcSidecar()
+ Status AddOutboundSidecar(std::unique_ptr<RpcSidecar> car, int* idx);
+
+ std::string ToString() const;
+
+ void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp);
+
+ const RemoteUser& remote_user() const;
+
+ const Sockaddr& remote_address() const;
+
+ const scoped_refptr<Connection>& connection() const;
+
+ Trace* trace();
+
+ const InboundCallTiming& timing() const {
+ return timing_;
+ }
+
+ const RequestHeader& header() const {
+ return header_;
+ }
+
+ // Associate this call with a particular method that will be invoked
+ // by the service.
+ void set_method_info(scoped_refptr<RpcMethodInfo> info) {
+ method_info_ = std::move(info);
+ }
+
+ // Return the method associated with this call. This is set just before
+ // the call is enqueued onto the service queue, and therefore may be
+ // 'nullptr' for much of the lifecycle of a call.
+ RpcMethodInfo* method_info() {
+ return method_info_.get();
+ }
+
+ // When this InboundCall was received (instantiated).
+ // Should only be called once on a given instance.
+ // Not thread-safe. Should only be called by the current "owner" thread.
+ void RecordCallReceived();
+
+ // When RPC call Handle() was called on the server side.
+ // Updates the Histogram with time elapsed since the call was received,
+ // and should only be called once on a given instance.
+ // Not thread-safe. Should only be called by the current "owner" thread.
+ void RecordHandlingStarted(Histogram* incoming_queue_time);
+
+ // Return true if the deadline set by the client has already elapsed.
+ // In this case, the server may stop processing the call, since the
+ // call response will be ignored anyway.
+ bool ClientTimedOut() const;
+
+ // Return an upper bound on the client timeout deadline. This does not
+ // account for transmission delays between the client and the server.
+ // If the client did not specify a deadline, returns MonoTime::Max().
+ MonoTime GetClientDeadline() const {
+ return deadline_;
+ }
+
+ // Return the time when this call was received.
+ MonoTime GetTimeReceived() const;
+
+ // Returns the set of application-specific feature flags required to service
+ // the RPC.
+ std::vector<uint32_t> GetRequiredFeatures() const;
+
+ // Get a sidecar sent as part of the request. If idx < 0 || idx > num sidecars - 1,
+ // returns an error.
+ Status GetInboundSidecar(int idx, Slice* sidecar) const;
+
+ // Releases the buffer that contains the request + sidecar data. It is an error to
+ // access sidecars or serialized_request() after this method is called.
+ void DiscardTransfer();
+
+ // Returns the size of the transfer buffer that backs this call. If the transfer does
+ // not exist (e.g. GetTransferSize() is called after DiscardTransfer()), returns 0.
+ size_t GetTransferSize();
+
+ private:
+ friend class RpczStore;
+
+ // Serialize and queue the response.
+ void Respond(const google::protobuf::MessageLite& response,
+ bool is_success);
+
+ // Serialize a response message for either success or failure. If it is a success,
+ // 'response' should be the user-defined response type for the call. If it is a
+ // failure, 'response' should be an ErrorStatusPB instance.
+ void SerializeResponseBuffer(const google::protobuf::MessageLite& response,
+ bool is_success);
+
+ // When RPC call Handle() completed execution on the server side.
+ // Updates the Histogram with time elapsed since the call was started,
+ // and should only be called once on a given instance.
+ // Not thread-safe. Should only be called by the current "owner" thread.
+ void RecordHandlingCompleted();
+
+ // The connection on which this inbound call arrived.
+ scoped_refptr<Connection> conn_;
+
+ // The header of the incoming call. Set by ParseFrom()
+ RequestHeader header_;
+
+ // The serialized bytes of the request param protobuf. Set by ParseFrom().
+ // This references memory held by 'transfer_'.
+ Slice serialized_request_;
+
+ // The transfer that produced the call.
+ // This is kept around because it retains the memory referred to
+ // by 'serialized_request_' above.
+ gscoped_ptr<InboundTransfer> transfer_;
+
+ // The buffers for serialized response. Set by SerializeResponseBuffer().
+ faststring response_hdr_buf_;
+ faststring response_msg_buf_;
+
+ // Vector of additional sidecars that are tacked on to the call's response
+ // after serialization of the protobuf. See rpc/rpc_sidecar.h for more info.
+ std::vector<std::unique_ptr<RpcSidecar>> outbound_sidecars_;
+
+ // Total size of sidecars in outbound_sidecars_. This is limited to a maximum
+ // of TransferLimits::kMaxTotalSidecarBytes.
+ int32_t outbound_sidecars_total_bytes_ = 0;
+
+ // Inbound sidecars from the request. The slices are views onto transfer_. There are as
+ // many slices as header_.sidecar_offsets_size().
+ Slice inbound_sidecar_slices_[TransferLimits::kMaxSidecars];
+
+ // The trace buffer.
+ scoped_refptr<Trace> trace_;
+
+ // Timing information related to this RPC call.
+ InboundCallTiming timing_;
+
+ // Proto service this calls belongs to. Used for routing.
+ // This field is filled in when the inbound request header is parsed.
+ RemoteMethod remote_method_;
+
+ // After the method has been looked up within the service, this is filled in
+ // to point to the information about this method. Acts as a pointer back to
+ // per-method info such as tracing.
+ scoped_refptr<RpcMethodInfo> method_info_;
+
+ // A time at which the client will time out, or MonoTime::Max if the
+ // client did not pass a timeout.
+ MonoTime deadline_;
+
+ DISALLOW_COPY_AND_ASSIGN(InboundCall);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/messenger.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.cc b/be/src/kudu/rpc/messenger.cc
new file mode 100644
index 0000000..17ac0c5
--- /dev/null
+++ b/be/src/kudu/rpc/messenger.cc
@@ -0,0 +1,502 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/messenger.h"
+
+#include <cstdlib>
+#include <functional>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <type_traits>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/acceptor_pool.h"
+#include "kudu/rpc/connection_id.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/reactor.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/rpcz_store.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/security/openssl_util.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/util/flags.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread_restrictions.h"
+#include "kudu/util/threadpool.h"
+
+using std::string;
+using std::shared_ptr;
+using std::make_shared;
+using strings::Substitute;
+
+namespace boost {
+template <typename Signature> class function;
+}
+
+namespace kudu {
+namespace rpc {
+
+MessengerBuilder::MessengerBuilder(std::string name)
+ : name_(std::move(name)),
+ connection_keepalive_time_(MonoDelta::FromMilliseconds(65000)),
+ num_reactors_(4),
+ min_negotiation_threads_(0),
+ max_negotiation_threads_(4),
+ coarse_timer_granularity_(MonoDelta::FromMilliseconds(100)),
+ rpc_negotiation_timeout_ms_(3000),
+ sasl_proto_name_("kudu"),
+ rpc_authentication_("optional"),
+ rpc_encryption_("optional"),
+ rpc_tls_ciphers_(kudu::security::SecurityDefaults::kDefaultTlsCiphers),
+ rpc_tls_min_protocol_(kudu::security::SecurityDefaults::kDefaultTlsMinVersion),
+ enable_inbound_tls_(false),
+ reuseport_(false) {
+}
+
+MessengerBuilder& MessengerBuilder::set_connection_keepalive_time(const MonoDelta &keepalive) {
+ connection_keepalive_time_ = keepalive;
+ return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_num_reactors(int num_reactors) {
+ num_reactors_ = num_reactors;
+ return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_min_negotiation_threads(int min_negotiation_threads) {
+ min_negotiation_threads_ = min_negotiation_threads;
+ return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_max_negotiation_threads(int max_negotiation_threads) {
+ max_negotiation_threads_ = max_negotiation_threads;
+ return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_coarse_timer_granularity(const MonoDelta &granularity) {
+ coarse_timer_granularity_ = granularity;
+ return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_metric_entity(
+ const scoped_refptr<MetricEntity>& metric_entity) {
+ metric_entity_ = metric_entity;
+ return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_connection_keep_alive_time(int32_t time_in_ms) {
+ connection_keepalive_time_ = MonoDelta::FromMilliseconds(time_in_ms);
+ return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_negotiation_timeout_ms(int64_t time_in_ms) {
+ rpc_negotiation_timeout_ms_ = time_in_ms;
+ return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_sasl_proto_name(const std::string& sasl_proto_name) {
+ sasl_proto_name_ = sasl_proto_name;
+ return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_authentication(const std::string& rpc_authentication) {
+ rpc_authentication_ = rpc_authentication;
+ return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_encryption(const std::string& rpc_encryption) {
+ rpc_encryption_ = rpc_encryption;
+ return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_tls_ciphers(const std::string& rpc_tls_ciphers) {
+ rpc_tls_ciphers_ = rpc_tls_ciphers;
+ return *this;
+}
+
+MessengerBuilder &MessengerBuilder::set_rpc_tls_min_protocol(
+ const std::string& rpc_tls_min_protocol) {
+ rpc_tls_min_protocol_ = rpc_tls_min_protocol;
+ return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_epki_cert_key_files(
+ const std::string& cert, const std::string& private_key) {
+ rpc_certificate_file_ = cert;
+ rpc_private_key_file_ = private_key;
+ return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_epki_certificate_authority_file(const std::string& ca) {
+ rpc_ca_certificate_file_ = ca;
+ return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_epki_private_password_key_cmd(const std::string& cmd) {
+ rpc_private_key_password_cmd_ = cmd;
+ return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_keytab_file(const std::string& keytab_file) {
+ keytab_file_ = keytab_file;
+ return *this;
+}
+
+MessengerBuilder& MessengerBuilder::enable_inbound_tls() {
+ enable_inbound_tls_ = true;
+ return *this;
+}
+
+MessengerBuilder& MessengerBuilder::set_reuseport() {
+ reuseport_ = true;
+ return *this;
+}
+
+Status MessengerBuilder::Build(shared_ptr<Messenger> *msgr) {
+ // Initialize SASL library before we start making requests
+ RETURN_NOT_OK(SaslInit(!keytab_file_.empty()));
+
+ Messenger* new_msgr(new Messenger(*this));
+
+ auto cleanup = MakeScopedCleanup([&] () {
+ new_msgr->AllExternalReferencesDropped();
+ });
+
+ RETURN_NOT_OK(ParseTriState("--rpc_authentication",
+ rpc_authentication_,
+ &new_msgr->authentication_));
+
+ RETURN_NOT_OK(ParseTriState("--rpc_encryption",
+ rpc_encryption_,
+ &new_msgr->encryption_));
+
+ RETURN_NOT_OK(new_msgr->Init());
+ if (new_msgr->encryption_ != RpcEncryption::DISABLED && enable_inbound_tls_) {
+ auto* tls_context = new_msgr->mutable_tls_context();
+
+ if (!rpc_certificate_file_.empty()) {
+ CHECK(!rpc_private_key_file_.empty());
+ CHECK(!rpc_ca_certificate_file_.empty());
+
+ // TODO(KUDU-1920): should we try and enforce that the server
+ // is in the subject or alt names of the cert?
+ RETURN_NOT_OK(tls_context->LoadCertificateAuthority(rpc_ca_certificate_file_));
+ if (rpc_private_key_password_cmd_.empty()) {
+ RETURN_NOT_OK(tls_context->LoadCertificateAndKey(rpc_certificate_file_,
+ rpc_private_key_file_));
+ } else {
+ RETURN_NOT_OK(tls_context->LoadCertificateAndPasswordProtectedKey(
+ rpc_certificate_file_, rpc_private_key_file_,
+ [&](){
+ string ret;
+ WARN_NOT_OK(security::GetPasswordFromShellCommand(
+ rpc_private_key_password_cmd_, &ret),
+ "could not get RPC password from configured command");
+ return ret;
+ }
+ ));
+ }
+ } else {
+ RETURN_NOT_OK(tls_context->GenerateSelfSignedCertAndKey());
+ }
+ }
+
+ // See docs on Messenger::retain_self_ for info about this odd hack.
+ cleanup.cancel();
+ *msgr = shared_ptr<Messenger>(new_msgr, std::mem_fun(&Messenger::AllExternalReferencesDropped));
+ return Status::OK();
+}
+
+// See comment on Messenger::retain_self_ member.
+void Messenger::AllExternalReferencesDropped() {
+ // The last external ref may have been dropped in the context of a task
+ // running on a reactor thread. If that's the case, a SYNC shutdown here
+ // would deadlock.
+ //
+ // If a SYNC shutdown is desired, Shutdown() should be called explicitly.
+ ShutdownInternal(ShutdownMode::ASYNC);
+
+ CHECK(retain_self_.get());
+ // If we have no more external references, then we no longer
+ // need to retain ourself. We'll destruct as soon as all our
+ // internal-facing references are dropped (ie those from reactor
+ // threads).
+ retain_self_.reset();
+}
+
+void Messenger::Shutdown() {
+ ShutdownInternal(ShutdownMode::SYNC);
+}
+
+void Messenger::ShutdownInternal(ShutdownMode mode) {
+ if (mode == ShutdownMode::SYNC) {
+ ThreadRestrictions::AssertWaitAllowed();
+ }
+
+ // Since we're shutting down, it's OK to block.
+ //
+ // TODO(adar): this ought to be removed (i.e. if ASYNC, waiting should be
+ // forbidden, and if SYNC, we already asserted above), but that's not
+ // possible while shutting down thread and acceptor pools still involves
+ // joining threads.
+ ThreadRestrictions::ScopedAllowWait allow_wait;
+
+ acceptor_vec_t pools_to_shutdown;
+ RpcServicesMap services_to_release;
+ {
+ std::lock_guard<percpu_rwlock> guard(lock_);
+ if (closing_) {
+ return;
+ }
+ VLOG(1) << "shutting down messenger " << name_;
+ closing_ = true;
+
+ services_to_release = std::move(rpc_services_);
+ pools_to_shutdown = std::move(acceptor_pools_);
+ }
+
+ // Destroy state outside of the lock.
+ services_to_release.clear();
+ for (const auto& p : pools_to_shutdown) {
+ p->Shutdown();
+ }
+
+ // Need to shut down negotiation pool before the reactors, since the
+ // reactors close the Connection sockets, and may race against the negotiation
+ // threads' blocking reads & writes.
+ client_negotiation_pool_->Shutdown();
+ server_negotiation_pool_->Shutdown();
+
+ for (Reactor* reactor : reactors_) {
+ reactor->Shutdown(mode);
+ }
+}
+
+Status Messenger::AddAcceptorPool(const Sockaddr &accept_addr,
+ shared_ptr<AcceptorPool>* pool) {
+ // Before listening, if we expect to require Kerberos, we want to verify
+ // that everything is set up correctly. This way we'll generate errors on
+ // startup rather than later on when we first receive a client connection.
+ if (!keytab_file_.empty()) {
+ RETURN_NOT_OK_PREPEND(ServerNegotiation::PreflightCheckGSSAPI(sasl_proto_name()),
+ "GSSAPI/Kerberos not properly configured");
+ }
+
+ Socket sock;
+ RETURN_NOT_OK(sock.Init(0));
+ RETURN_NOT_OK(sock.SetReuseAddr(true));
+ if (reuseport_) {
+ RETURN_NOT_OK(sock.SetReusePort(true));
+ }
+ RETURN_NOT_OK(sock.Bind(accept_addr));
+ Sockaddr remote;
+ RETURN_NOT_OK(sock.GetSocketAddress(&remote));
+ auto acceptor_pool(make_shared<AcceptorPool>(this, &sock, remote));
+
+ std::lock_guard<percpu_rwlock> guard(lock_);
+ acceptor_pools_.push_back(acceptor_pool);
+ pool->swap(acceptor_pool);
+ return Status::OK();
+}
+
+// Register a new RpcService to handle inbound requests.
+Status Messenger::RegisterService(const string& service_name,
+ const scoped_refptr<RpcService>& service) {
+ DCHECK(service);
+ std::lock_guard<percpu_rwlock> guard(lock_);
+ if (InsertIfNotPresent(&rpc_services_, service_name, service)) {
+ return Status::OK();
+ } else {
+ return Status::AlreadyPresent("This service is already present");
+ }
+}
+
+void Messenger::UnregisterAllServices() {
+ RpcServicesMap to_release;
+ {
+ std::lock_guard<percpu_rwlock> guard(lock_);
+ to_release = std::move(rpc_services_);
+ }
+ // Release the map outside of the lock.
+}
+
+Status Messenger::UnregisterService(const string& service_name) {
+ scoped_refptr<RpcService> to_release;
+ {
+ std::lock_guard<percpu_rwlock> guard(lock_);
+ to_release = EraseKeyReturnValuePtr(&rpc_services_, service_name);
+ if (!to_release) {
+ return Status::ServiceUnavailable(Substitute(
+ "service $0 not registered on $1", service_name, name_));
+ }
+ }
+ // Release the service outside of the lock.
+ return Status::OK();
+}
+
+void Messenger::QueueOutboundCall(const shared_ptr<OutboundCall> &call) {
+ Reactor *reactor = RemoteToReactor(call->conn_id().remote());
+ reactor->QueueOutboundCall(call);
+}
+
+void Messenger::QueueInboundCall(gscoped_ptr<InboundCall> call) {
+ shared_lock<rw_spinlock> guard(lock_.get_lock());
+ scoped_refptr<RpcService>* service = FindOrNull(rpc_services_,
+ call->remote_method().service_name());
+ if (PREDICT_FALSE(!service)) {
+ Status s = Status::ServiceUnavailable(Substitute("service $0 not registered on $1",
+ call->remote_method().service_name(), name_));
+ LOG(INFO) << s.ToString();
+ call.release()->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_SERVICE, s);
+ return;
+ }
+
+ call->set_method_info((*service)->LookupMethod(call->remote_method()));
+
+ // The RpcService will respond to the client on success or failure.
+ WARN_NOT_OK((*service)->QueueInboundCall(std::move(call)), "Unable to handle RPC call");
+}
+
+void Messenger::QueueCancellation(const shared_ptr<OutboundCall> &call) {
+ Reactor *reactor = RemoteToReactor(call->conn_id().remote());
+ reactor->QueueCancellation(call);
+}
+
+void Messenger::RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote) {
+ Reactor *reactor = RemoteToReactor(remote);
+ reactor->RegisterInboundSocket(new_socket, remote);
+}
+
+Messenger::Messenger(const MessengerBuilder &bld)
+ : name_(bld.name_),
+ closing_(false),
+ authentication_(RpcAuthentication::REQUIRED),
+ encryption_(RpcEncryption::REQUIRED),
+ tls_context_(new security::TlsContext(bld.rpc_tls_ciphers_, bld.rpc_tls_min_protocol_)),
+ token_verifier_(new security::TokenVerifier()),
+ rpcz_store_(new RpczStore()),
+ metric_entity_(bld.metric_entity_),
+ rpc_negotiation_timeout_ms_(bld.rpc_negotiation_timeout_ms_),
+ sasl_proto_name_(bld.sasl_proto_name_),
+ keytab_file_(bld.keytab_file_),
+ reuseport_(bld.reuseport_),
+ retain_self_(this) {
+ for (int i = 0; i < bld.num_reactors_; i++) {
+ reactors_.push_back(new Reactor(retain_self_, i, bld));
+ }
+ CHECK_OK(ThreadPoolBuilder("client-negotiator")
+ .set_min_threads(bld.min_negotiation_threads_)
+ .set_max_threads(bld.max_negotiation_threads_)
+ .Build(&client_negotiation_pool_));
+ CHECK_OK(ThreadPoolBuilder("server-negotiator")
+ .set_min_threads(bld.min_negotiation_threads_)
+ .set_max_threads(bld.max_negotiation_threads_)
+ .Build(&server_negotiation_pool_));
+}
+
+Messenger::~Messenger() {
+ std::lock_guard<percpu_rwlock> guard(lock_);
+ CHECK(closing_) << "Should have already shut down";
+ STLDeleteElements(&reactors_);
+}
+
+Reactor* Messenger::RemoteToReactor(const Sockaddr &remote) {
+ uint32_t hashCode = remote.HashCode();
+ int reactor_idx = hashCode % reactors_.size();
+ // This is just a static partitioning; we could get a lot
+ // fancier with assigning Sockaddrs to Reactors.
+ return reactors_[reactor_idx];
+}
+
+Status Messenger::Init() {
+ RETURN_NOT_OK(tls_context_->Init());
+ for (Reactor* r : reactors_) {
+ RETURN_NOT_OK(r->Init());
+ }
+
+ return Status::OK();
+}
+
+Status Messenger::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+ DumpRunningRpcsResponsePB* resp) {
+ shared_lock<rw_spinlock> guard(lock_.get_lock());
+ for (Reactor* reactor : reactors_) {
+ RETURN_NOT_OK(reactor->DumpRunningRpcs(req, resp));
+ }
+ return Status::OK();
+}
+
+void Messenger::ScheduleOnReactor(const boost::function<void(const Status&)>& func,
+ MonoDelta when) {
+ DCHECK(!reactors_.empty());
+
+ // If we're already running on a reactor thread, reuse it.
+ Reactor* chosen = nullptr;
+ for (Reactor* r : reactors_) {
+ if (r->IsCurrentThread()) {
+ chosen = r;
+ }
+ }
+ if (chosen == nullptr) {
+ // Not running on a reactor thread, pick one at random.
+ chosen = reactors_[rand() % reactors_.size()];
+ }
+
+ DelayedTask* task = new DelayedTask(func, when);
+ chosen->ScheduleReactorTask(task);
+}
+
+const scoped_refptr<RpcService> Messenger::rpc_service(const string& service_name) const {
+ scoped_refptr<RpcService> service;
+ {
+ shared_lock<rw_spinlock> guard(lock_.get_lock());
+ if (!FindCopy(rpc_services_, service_name, &service)) {
+ return scoped_refptr<RpcService>(nullptr);
+ }
+ }
+ return service;
+}
+
+ThreadPool* Messenger::negotiation_pool(Connection::Direction dir) {
+ switch (dir) {
+ case Connection::CLIENT: return client_negotiation_pool_.get();
+ case Connection::SERVER: return server_negotiation_pool_.get();
+ }
+ DCHECK(false) << "Unknown Connection::Direction value: " << dir;
+ return nullptr;
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/messenger.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/messenger.h b/be/src/kudu/rpc/messenger.h
new file mode 100644
index 0000000..64a804b
--- /dev/null
+++ b/be/src/kudu/rpc/messenger.h
@@ -0,0 +1,460 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_MESSENGER_H
+#define KUDU_RPC_MESSENGER_H
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/security/security_flags.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+
+namespace boost {
+template <typename Signature>
+class function;
+}
+
+namespace kudu {
+
+class Socket;
+class ThreadPool;
+
+namespace security {
+class TlsContext;
+class TokenVerifier;
+}
+
+namespace rpc {
+
+using security::RpcAuthentication;
+using security::RpcEncryption;
+
+class AcceptorPool;
+class DumpRunningRpcsRequestPB;
+class DumpRunningRpcsResponsePB;
+class InboundCall;
+class Messenger;
+class OutboundCall;
+class Reactor;
+class RpcService;
+class RpczStore;
+
+struct AcceptorPoolInfo {
+ public:
+ explicit AcceptorPoolInfo(Sockaddr bind_address)
+ : bind_address_(bind_address) {}
+
+ Sockaddr bind_address() const {
+ return bind_address_;
+ }
+
+ private:
+ Sockaddr bind_address_;
+};
+
+// Used to construct a Messenger.
+class MessengerBuilder {
+ public:
+ friend class Messenger;
+ friend class ReactorThread;
+
+ explicit MessengerBuilder(std::string name);
+
+ // Set the length of time we will keep a TCP connection will alive with no traffic.
+ MessengerBuilder &set_connection_keepalive_time(const MonoDelta &keepalive);
+
+ // Set the number of reactor threads that will be used for sending and
+ // receiving.
+ MessengerBuilder &set_num_reactors(int num_reactors);
+
+ // Set the minimum number of connection-negotiation threads that will be used
+ // to handle the blocking connection-negotiation step.
+ MessengerBuilder &set_min_negotiation_threads(int min_negotiation_threads);
+
+ // Set the maximum number of connection-negotiation threads that will be used
+ // to handle the blocking connection-negotiation step.
+ MessengerBuilder &set_max_negotiation_threads(int max_negotiation_threads);
+
+ // Set the granularity with which connections are checked for keepalive.
+ MessengerBuilder &set_coarse_timer_granularity(const MonoDelta &granularity);
+
+ // Set metric entity for use by RPC systems.
+ MessengerBuilder &set_metric_entity(const scoped_refptr<MetricEntity>& metric_entity);
+
+ // Set the time in milliseconds after which an idle connection from a client will be
+ // disconnected by the server.
+ MessengerBuilder &set_connection_keep_alive_time(int32_t time_in_ms);
+
+ // Set the timeout for negotiating an RPC connection.
+ MessengerBuilder &set_rpc_negotiation_timeout_ms(int64_t time_in_ms);
+
+ // Set the SASL protocol name that is used for the SASL negotiation.
+ MessengerBuilder &set_sasl_proto_name(const std::string& sasl_proto_name);
+
+ // Set the state of authentication required. If 'optional', authentication will be used when
+ // the remote end supports it. If 'required', connections which are not able to authenticate
+ // (because the remote end lacks support) are rejected.
+ MessengerBuilder &set_rpc_authentication(const std::string& rpc_authentication);
+
+ // Set the state of encryption required. If 'optional', encryption will be used when the
+ // remote end supports it. If 'required', connections which are not able to use encryption
+ // (because the remote end lacks support) are rejected. If 'disabled', encryption will not
+ // be used, and RPC authentication (--rpc_authentication) must also be disabled as well.
+ MessengerBuilder &set_rpc_encryption(const std::string& rpc_encryption);
+
+ // Set the cipher suite preferences to use for TLS-secured RPC connections. Uses the OpenSSL
+ // cipher preference list format. See man (1) ciphers for more information.
+ MessengerBuilder &set_rpc_tls_ciphers(const std::string& rpc_tls_ciphers);
+
+ // Set the minimum protocol version to allow when for securing RPC connections with TLS. May be
+ // one of 'TLSv1', 'TLSv1.1', or 'TLSv1.2'.
+ MessengerBuilder &set_rpc_tls_min_protocol(const std::string& rpc_tls_min_protocol);
+
+ // Set the TLS server certificate and private key files paths. If this is set in conjunction
+ // with enable_inbound_tls(), internal PKI will not be used for encrypted communication and
+ // external PKI will be used instead.
+ MessengerBuilder &set_epki_cert_key_files(
+ const std::string& cert, const std::string& private_key);
+
+ // Set the TLS Certificate Authority file path. Must always be set with set_epki_cert_key_files().
+ // If this is set in conjunction with enable_inbound_tls(), internal PKI will not be used for
+ // encrypted communication and external PKI will be used instead.
+ MessengerBuilder &set_epki_certificate_authority_file(const std::string& ca);
+
+ // Set a Unix command whose output returns the password used to decrypt the RPC server's private
+ // key file specified via set_epki_cert_key_files(). If the .PEM key file is not
+ // password-protected, this flag does not need to be set. Trailing whitespace will be trimmed
+ // before it is used to decrypt the private key.
+ MessengerBuilder &set_epki_private_password_key_cmd(const std::string& cmd);
+
+ // Set the path to the Kerberos Keytab file for this server.
+ MessengerBuilder &set_keytab_file(const std::string& keytab_file);
+
+ // Configure the messenger to enable TLS encryption on inbound connections.
+ MessengerBuilder& enable_inbound_tls();
+
+ // Configure the messenger to set the SO_REUSEPORT socket option.
+ MessengerBuilder& set_reuseport();
+
+ Status Build(std::shared_ptr<Messenger> *msgr);
+
+ private:
+ const std::string name_;
+ MonoDelta connection_keepalive_time_;
+ int num_reactors_;
+ int min_negotiation_threads_;
+ int max_negotiation_threads_;
+ MonoDelta coarse_timer_granularity_;
+ scoped_refptr<MetricEntity> metric_entity_;
+ int64_t rpc_negotiation_timeout_ms_;
+ std::string sasl_proto_name_;
+ std::string rpc_authentication_;
+ std::string rpc_encryption_;
+ std::string rpc_tls_ciphers_;
+ std::string rpc_tls_min_protocol_;
+ std::string rpc_certificate_file_;
+ std::string rpc_private_key_file_;
+ std::string rpc_ca_certificate_file_;
+ std::string rpc_private_key_password_cmd_;
+ std::string keytab_file_;
+ bool enable_inbound_tls_;
+ bool reuseport_;
+};
+
+// A Messenger is a container for the reactor threads which run event loops
+// for the RPC services. If the process is a server, a Messenger can also have
+// one or more attached AcceptorPools which accept RPC connections. In this case,
+// calls received over the connection are enqueued into the messenger's service_queue
+// for processing by a ServicePool.
+//
+// Users do not typically interact with the Messenger directly except to create
+// one as a singleton, and then make calls using Proxy objects.
+//
+// See rpc-test.cc and rpc-bench.cc for example usages.
+class Messenger {
+ public:
+ friend class MessengerBuilder;
+ friend class Proxy;
+ friend class Reactor;
+ friend class ReactorThread;
+ typedef std::vector<std::shared_ptr<AcceptorPool> > acceptor_vec_t;
+ typedef std::unordered_map<std::string, scoped_refptr<RpcService> > RpcServicesMap;
+
+ static const uint64_t UNKNOWN_CALL_ID = 0;
+
+ ~Messenger();
+
+ // Stops all communication and prevents further use. If called explicitly,
+ // also waits for outstanding tasks running on reactor threads to finish,
+ // which means it may not be called from a reactor task.
+ //
+ // It's not required to call this -- dropping the shared_ptr provided
+ // from MessengerBuilder::Build will automatically call this method.
+ void Shutdown();
+
+ // Add a new acceptor pool listening to the given accept address.
+ // You can create any number of acceptor pools you want, including none.
+ //
+ // The created pool is returned in *pool. The Messenger also retains
+ // a reference to the pool, so the caller may safely drop this reference
+ // and the pool will remain live.
+ //
+ // NOTE: the returned pool is not initially started. You must call
+ // pool->Start(...) to begin accepting connections.
+ //
+ // If Kerberos is enabled, this also runs a pre-flight check that makes
+ // sure the environment is appropriately configured to authenticate
+ // clients via Kerberos. If not, this returns a RuntimeError.
+ Status AddAcceptorPool(const Sockaddr &accept_addr,
+ std::shared_ptr<AcceptorPool>* pool);
+
+ // Register a new RpcService to handle inbound requests.
+ //
+ // Returns an error if a service with the same name is already registered.
+ Status RegisterService(const std::string& service_name,
+ const scoped_refptr<RpcService>& service);
+
+ // Unregister an RpcService by name.
+ //
+ // Returns an error if no service with this name can be found.
+ Status UnregisterService(const std::string& service_name);
+
+ // Unregisters all RPC services.
+ void UnregisterAllServices();
+
+ // Queue a call for transmission. This will pick the appropriate reactor,
+ // and enqueue a task on that reactor to assign and send the call.
+ void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+ // Enqueue a call for processing on the server.
+ void QueueInboundCall(gscoped_ptr<InboundCall> call);
+
+ // Queue a cancellation for the given outbound call.
+ void QueueCancellation(const std::shared_ptr<OutboundCall> &call);
+
+ // Take ownership of the socket via Socket::Release
+ void RegisterInboundSocket(Socket *new_socket, const Sockaddr &remote);
+
+ // Dump the current RPCs into the given protobuf.
+ Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+ DumpRunningRpcsResponsePB* resp);
+
+ // Run 'func' on a reactor thread after 'when' time elapses.
+ //
+ // The status argument conveys whether 'func' was run correctly (i.e.
+ // after the elapsed time) or not.
+ void ScheduleOnReactor(const boost::function<void(const Status&)>& func,
+ MonoDelta when);
+
+ const security::TlsContext& tls_context() const { return *tls_context_; }
+ security::TlsContext* mutable_tls_context() { return tls_context_.get(); }
+
+ const security::TokenVerifier& token_verifier() const { return *token_verifier_; }
+ security::TokenVerifier* mutable_token_verifier() { return token_verifier_.get(); }
+ std::shared_ptr<security::TokenVerifier> shared_token_verifier() const {
+ return token_verifier_;
+ }
+
+ boost::optional<security::SignedTokenPB> authn_token() const {
+ std::lock_guard<simple_spinlock> l(authn_token_lock_);
+ return authn_token_;
+ }
+ void set_authn_token(const security::SignedTokenPB& token) {
+ std::lock_guard<simple_spinlock> l(authn_token_lock_);
+ authn_token_ = token;
+ }
+
+ RpcAuthentication authentication() const { return authentication_; }
+ RpcEncryption encryption() const { return encryption_; }
+
+ ThreadPool* negotiation_pool(Connection::Direction dir);
+
+ RpczStore* rpcz_store() { return rpcz_store_.get(); }
+
+ int num_reactors() const { return reactors_.size(); }
+
+ const std::string& name() const {
+ return name_;
+ }
+
+ bool closing() const {
+ shared_lock<rw_spinlock> l(lock_.get_lock());
+ return closing_;
+ }
+
+ scoped_refptr<MetricEntity> metric_entity() const { return metric_entity_; }
+
+ const int64_t rpc_negotiation_timeout_ms() const { return rpc_negotiation_timeout_ms_; }
+
+ const std::string& sasl_proto_name() const {
+ return sasl_proto_name_;
+ }
+
+ const std::string& keytab_file() const { return keytab_file_; }
+
+ const scoped_refptr<RpcService> rpc_service(const std::string& service_name) const;
+
+ private:
+ FRIEND_TEST(TestRpc, TestConnectionKeepalive);
+ FRIEND_TEST(TestRpc, TestConnectionAlwaysKeepalive);
+ FRIEND_TEST(TestRpc, TestClientConnectionsMetrics);
+ FRIEND_TEST(TestRpc, TestCredentialsPolicy);
+ FRIEND_TEST(TestRpc, TestReopenOutboundConnections);
+
+ explicit Messenger(const MessengerBuilder &bld);
+
+ Reactor* RemoteToReactor(const Sockaddr &remote);
+ Status Init();
+ void RunTimeoutThread();
+ void UpdateCurTime();
+
+ // Shuts down the messenger.
+ //
+ // Depending on 'mode', may or may not wait on any outstanding reactor tasks.
+ enum class ShutdownMode {
+ SYNC,
+ ASYNC,
+ };
+ void ShutdownInternal(ShutdownMode mode);
+
+ // Called by external-facing shared_ptr when the user no longer holds
+ // any references. See 'retain_self_' for more info.
+ void AllExternalReferencesDropped();
+
+ const std::string name_;
+
+ // Protects closing_, acceptor_pools_, rpc_services_.
+ mutable percpu_rwlock lock_;
+
+ bool closing_;
+
+ // Whether to require authentication and encryption on the connections managed
+ // by this messenger.
+ // TODO(KUDU-1928): scope these to individual proxies, so that messengers can be
+ // reused by different clients.
+ RpcAuthentication authentication_;
+ RpcEncryption encryption_;
+
+ // Pools which are listening on behalf of this messenger.
+ // Note that the user may have called Shutdown() on one of these
+ // pools, so even though we retain the reference, it may no longer
+ // be listening.
+ acceptor_vec_t acceptor_pools_;
+
+ // RPC services that handle inbound requests.
+ RpcServicesMap rpc_services_;
+
+ std::vector<Reactor*> reactors_;
+
+ // Separate client and server negotiation pools to avoid possibility of distributed
+ // deadlock. See KUDU-2041.
+ gscoped_ptr<ThreadPool> client_negotiation_pool_;
+ gscoped_ptr<ThreadPool> server_negotiation_pool_;
+
+ std::unique_ptr<security::TlsContext> tls_context_;
+
+ // A TokenVerifier, which can verify client provided authentication tokens.
+ std::shared_ptr<security::TokenVerifier> token_verifier_;
+
+ // An optional token, which can be used to authenticate to a server.
+ mutable simple_spinlock authn_token_lock_;
+ boost::optional<security::SignedTokenPB> authn_token_;
+
+ std::unique_ptr<RpczStore> rpcz_store_;
+
+ scoped_refptr<MetricEntity> metric_entity_;
+
+ // Timeout in milliseconds after which an incomplete connection negotiation will timeout.
+ const int64_t rpc_negotiation_timeout_ms_;
+
+ // The SASL protocol name that is used for the SASL negotiation.
+ const std::string sasl_proto_name_;
+
+ // Path to the Kerberos Keytab file for this server.
+ const std::string keytab_file_;
+
+ // Whether to set SO_REUSEPORT on the listening sockets.
+ bool reuseport_;
+
+ // The ownership of the Messenger object is somewhat subtle. The pointer graph
+ // looks like this:
+ //
+ // [User Code ] | [ Internal code ]
+ // |
+ // shared_ptr[1] |
+ // | |
+ // v
+ // Messenger <------------ shared_ptr[2] --- Reactor
+ // ^ | ------------- bare pointer --> Reactor
+ // \__/
+ // shared_ptr[2]
+ // (retain_self_)
+ //
+ // shared_ptr[1] instances use Messenger::AllExternalReferencesDropped()
+ // as a deleter.
+ // shared_ptr[2] are "traditional" shared_ptrs which call 'delete' on the
+ // object.
+ //
+ // The teardown sequence is as follows:
+ // Option 1): User calls "Shutdown()" explicitly:
+ // - Messenger::Shutdown tells Reactors to shut down.
+ // - When each reactor thread finishes, it drops its shared_ptr[2].
+ // - the Messenger::retain_self instance remains, keeping the Messenger
+ // alive.
+ // - Before returning, Messenger::Shutdown waits for Reactors to shut down.
+ // - The user eventually drops its shared_ptr[1], which calls
+ // Messenger::AllExternalReferencesDropped. This drops retain_self_
+ // and results in object destruction.
+ // Option 2): User drops all of its shared_ptr[1] references
+ // - Though the Reactors still reference the Messenger, AllExternalReferencesDropped
+ // will get called, which triggers Messenger::Shutdown.
+ // - AllExternalReferencesDropped drops retain_self_, so the only remaining
+ // references are from Reactor threads. But the reactor threads are shutting down.
+ // - When the last Reactor thread dies, there will be no more shared_ptr[1] references
+ // and the Messenger will be destroyed.
+ //
+ // The main goal of all of this confusion is that when using option 2, the
+ // reactor threads need to be able to shut down asynchronously, and we need
+ // to keep the Messenger alive until they do so. If normal shared_ptrs were
+ // handed out to users, the Messenger destructor may be forced to Join() the
+ // reactor threads, which deadlocks if the user destructs the Messenger from
+ // within a Reactor thread itself.
+ std::shared_ptr<Messenger> retain_self_;
+
+ DISALLOW_COPY_AND_ASSIGN(Messenger);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/mt-rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/mt-rpc-test.cc b/be/src/kudu/rpc/mt-rpc-test.cc
new file mode 100644
index 0000000..7427850
--- /dev/null
+++ b/be/src/kudu/rpc/mt-rpc-test.cc
@@ -0,0 +1,318 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc-test-base.h"
+
+#include <cstddef>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/acceptor_pool.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/proxy.h"
+#include "kudu/rpc/rpc_service.h"
+#include "kudu/rpc/service_if.h"
+#include "kudu/rpc/service_pool.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/thread.h"
+
+
+METRIC_DECLARE_counter(rpc_connections_accepted);
+METRIC_DECLARE_counter(rpcs_queue_overflow);
+
+using std::string;
+using std::shared_ptr;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+class MultiThreadedRpcTest : public RpcTestBase {
+ public:
+ // Make a single RPC call.
+ void SingleCall(Sockaddr server_addr, const char* method_name,
+ Status* result, CountDownLatch* latch) {
+ LOG(INFO) << "Connecting to " << server_addr.ToString();
+ shared_ptr<Messenger> client_messenger;
+ CHECK_OK(CreateMessenger("ClientSC", &client_messenger));
+ Proxy p(client_messenger, server_addr, server_addr.host(),
+ GenericCalculatorService::static_service_name());
+ *result = DoTestSyncCall(p, method_name);
+ latch->CountDown();
+ }
+
+ // Make RPC calls until we see a failure.
+ void HammerServer(Sockaddr server_addr, const char* method_name,
+ Status* last_result) {
+ shared_ptr<Messenger> client_messenger;
+ CHECK_OK(CreateMessenger("ClientHS", &client_messenger));
+ HammerServerWithMessenger(server_addr, method_name, last_result, client_messenger);
+ }
+
+ void HammerServerWithMessenger(
+ Sockaddr server_addr, const char* method_name, Status* last_result,
+ const shared_ptr<Messenger>& messenger) {
+ LOG(INFO) << "Connecting to " << server_addr.ToString();
+ Proxy p(messenger, server_addr, server_addr.host(),
+ GenericCalculatorService::static_service_name());
+
+ int i = 0;
+ while (true) {
+ i++;
+ Status s = DoTestSyncCall(p, method_name);
+ if (!s.ok()) {
+ // Return on first failure.
+ LOG(INFO) << "Call failed. Shutting down client thread. Ran " << i << " calls: "
+ << s.ToString();
+ *last_result = s;
+ return;
+ }
+ }
+ }
+};
+
+static void AssertShutdown(kudu::Thread* thread, const Status* status) {
+ ASSERT_OK(ThreadJoiner(thread).warn_every_ms(500).Join());
+ string msg = status->ToString();
+ ASSERT_TRUE(msg.find("Service unavailable") != string::npos ||
+ msg.find("Network error") != string::npos)
+ << "Status is actually: " << msg;
+}
+
+// Test making several concurrent RPC calls while shutting down.
+// Simply verify that we don't hit any CHECK errors.
+TEST_F(MultiThreadedRpcTest, TestShutdownDuringService) {
+ // Set up server.
+ Sockaddr server_addr;
+ ASSERT_OK(StartTestServer(&server_addr));
+
+ const int kNumThreads = 4;
+ scoped_refptr<kudu::Thread> threads[kNumThreads];
+ Status statuses[kNumThreads];
+ for (int i = 0; i < kNumThreads; i++) {
+ ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+ &MultiThreadedRpcTest::HammerServer, this, server_addr,
+ GenericCalculatorService::kAddMethodName, &statuses[i], &threads[i]));
+ }
+
+ SleepFor(MonoDelta::FromMilliseconds(50));
+
+ // Shut down server.
+ ASSERT_OK(server_messenger_->UnregisterService(service_name_));
+ service_pool_->Shutdown();
+ server_messenger_->Shutdown();
+
+ for (int i = 0; i < kNumThreads; i++) {
+ AssertShutdown(threads[i].get(), &statuses[i]);
+ }
+}
+
+// Test shutting down the client messenger exactly as a thread is about to start
+// a new connection. This is a regression test for KUDU-104.
+TEST_F(MultiThreadedRpcTest, TestShutdownClientWhileCallsPending) {
+ // Set up server.
+ Sockaddr server_addr;
+ ASSERT_OK(StartTestServer(&server_addr));
+
+ shared_ptr<Messenger> client_messenger;
+ ASSERT_OK(CreateMessenger("Client", &client_messenger));
+
+ scoped_refptr<kudu::Thread> thread;
+ Status status;
+ ASSERT_OK(kudu::Thread::Create("test", "test",
+ &MultiThreadedRpcTest::HammerServerWithMessenger, this, server_addr,
+ GenericCalculatorService::kAddMethodName, &status, client_messenger, &thread));
+
+ // Shut down the messenger after a very brief sleep. This often will race so that the
+ // call gets submitted to the messenger before shutdown, but the negotiation won't have
+ // started yet. In a debug build this fails about half the time without the bug fix.
+ // See KUDU-104.
+ SleepFor(MonoDelta::FromMicroseconds(10));
+ client_messenger->Shutdown();
+ client_messenger.reset();
+
+ ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join());
+ ASSERT_TRUE(status.IsAborted() ||
+ status.IsServiceUnavailable());
+ string msg = status.ToString();
+ SCOPED_TRACE(msg);
+ ASSERT_TRUE(msg.find("Client RPC Messenger shutting down") != string::npos ||
+ msg.find("reactor is shutting down") != string::npos ||
+ msg.find("Unable to start connection negotiation thread") != string::npos)
+ << "Status is actually: " << msg;
+}
+
+// This bogus service pool leaves the service queue full.
+class BogusServicePool : public ServicePool {
+ public:
+ BogusServicePool(gscoped_ptr<ServiceIf> service,
+ const scoped_refptr<MetricEntity>& metric_entity,
+ size_t service_queue_length)
+ : ServicePool(std::move(service), metric_entity, service_queue_length) {
+ }
+ virtual Status Init(int num_threads) OVERRIDE {
+ // Do nothing
+ return Status::OK();
+ }
+};
+
+void IncrementBackpressureOrShutdown(const Status* status, int* backpressure, int* shutdown) {
+ string msg = status->ToString();
+ if (msg.find("service queue is full") != string::npos) {
+ ++(*backpressure);
+ } else if (msg.find("shutting down") != string::npos) {
+ ++(*shutdown);
+ } else if (msg.find("got EOF from remote") != string::npos) {
+ ++(*shutdown);
+ } else {
+ FAIL() << "Unexpected status message: " << msg;
+ }
+}
+
+// Test that we get a Service Unavailable error when we max out the incoming RPC service queue.
+TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) {
+ const size_t kMaxConcurrency = 2;
+
+ MessengerBuilder bld("messenger1");
+ bld.set_num_reactors(kMaxConcurrency);
+ bld.set_metric_entity(metric_entity_);
+ CHECK_OK(bld.Build(&server_messenger_));
+
+ shared_ptr<AcceptorPool> pool;
+ ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr(), &pool));
+ ASSERT_OK(pool->Start(kMaxConcurrency));
+ Sockaddr server_addr = pool->bind_address();
+
+ gscoped_ptr<ServiceIf> service(new GenericCalculatorService());
+ service_name_ = service->service_name();
+ service_pool_ = new BogusServicePool(std::move(service),
+ server_messenger_->metric_entity(),
+ kMaxConcurrency);
+ ASSERT_OK(service_pool_->Init(n_worker_threads_));
+ server_messenger_->RegisterService(service_name_, service_pool_);
+
+ scoped_refptr<kudu::Thread> threads[3];
+ Status status[3];
+ CountDownLatch latch(1);
+ for (int i = 0; i < 3; i++) {
+ ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+ &MultiThreadedRpcTest::SingleCall, this, server_addr,
+ GenericCalculatorService::kAddMethodName, &status[i], &latch, &threads[i]));
+ }
+
+ // One should immediately fail due to backpressure. The latch is only initialized
+ // to wait for the first of three threads to finish.
+ latch.Wait();
+
+ // The rest would time out after 10 sec, but we help them along.
+ ASSERT_OK(server_messenger_->UnregisterService(service_name_));
+ service_pool_->Shutdown();
+ server_messenger_->Shutdown();
+
+ for (const auto& thread : threads) {
+ ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join());
+ }
+
+ // Verify that one error was due to backpressure.
+ int errors_backpressure = 0;
+ int errors_shutdown = 0;
+
+ for (const auto& s : status) {
+ IncrementBackpressureOrShutdown(&s, &errors_backpressure, &errors_shutdown);
+ }
+
+ ASSERT_EQ(1, errors_backpressure);
+ ASSERT_EQ(2, errors_shutdown);
+
+ // Check that RPC queue overflow metric is 1
+ Counter *rpcs_queue_overflow =
+ METRIC_rpcs_queue_overflow.Instantiate(server_messenger_->metric_entity()).get();
+ ASSERT_EQ(1, rpcs_queue_overflow->value());
+}
+
+static void HammerServerWithTCPConns(const Sockaddr& addr) {
+ while (true) {
+ Socket socket;
+ CHECK_OK(socket.Init(0));
+ Status s;
+ LOG_SLOW_EXECUTION(INFO, 100, "Connect took long") {
+ s = socket.Connect(addr);
+ }
+ if (!s.ok()) {
+ CHECK(s.IsNetworkError()) << "Unexpected error: " << s.ToString();
+ return;
+ }
+ CHECK_OK(socket.Close());
+ }
+}
+
+// Regression test for KUDU-128.
+// Test that shuts down the server while new TCP connections are incoming.
+TEST_F(MultiThreadedRpcTest, TestShutdownWithIncomingConnections) {
+ // Set up server.
+ Sockaddr server_addr;
+ ASSERT_OK(StartTestServer(&server_addr));
+
+ // Start a number of threads which just hammer the server with TCP connections.
+ vector<scoped_refptr<kudu::Thread> > threads;
+ for (int i = 0; i < 8; i++) {
+ scoped_refptr<kudu::Thread> new_thread;
+ CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
+ &HammerServerWithTCPConns, server_addr, &new_thread));
+ threads.push_back(new_thread);
+ }
+
+ // Sleep until the server has started to actually accept some connections from the
+ // test threads.
+ scoped_refptr<Counter> conns_accepted =
+ METRIC_rpc_connections_accepted.Instantiate(server_messenger_->metric_entity());
+ while (conns_accepted->value() == 0) {
+ SleepFor(MonoDelta::FromMicroseconds(100));
+ }
+
+ // Shutdown while there are still new connections appearing.
+ ASSERT_OK(server_messenger_->UnregisterService(service_name_));
+ service_pool_->Shutdown();
+ server_messenger_->Shutdown();
+
+ for (scoped_refptr<kudu::Thread>& t : threads) {
+ ASSERT_OK(ThreadJoiner(t.get()).warn_every_ms(500).Join());
+ }
+}
+
+} // namespace rpc
+} // namespace kudu
+