You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/17 03:14:27 UTC
[11/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC
library from kudu@314c9d8
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/outbound_call.h b/be/src/kudu/rpc/outbound_call.h
new file mode 100644
index 0000000..ebed9b5
--- /dev/null
+++ b/be/src/kudu/rpc/outbound_call.h
@@ -0,0 +1,363 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_CLIENT_CALL_H
+#define KUDU_RPC_CLIENT_CALL_H
+
+#include <set>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/rpc/user_credentials.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+namespace rpc {
+
+class CallResponse;
+class Connection;
+class DumpRunningRpcsRequestPB;
+class InboundTransfer;
+class RpcCallInProgressPB;
+class RpcController;
+class RpcSidecar;
+
+// Used to key on Connection information.
+// For use as a key in an unordered STL collection, use ConnectionIdHash and ConnectionIdEqual.
+// This class is copyable for STL compatibility, but not assignable (use CopyFrom() for that).
+class ConnectionId {
+ public:
+ ConnectionId();
+
+ // Copy constructor required for use with STL unordered_map.
+ ConnectionId(const ConnectionId& other);
+
+ // Convenience constructor.
+ ConnectionId(const Sockaddr& remote, UserCredentials user_credentials);
+
+ // The remote address.
+ void set_remote(const Sockaddr& remote);
+ const Sockaddr& remote() const { return remote_; }
+
+ // The credentials of the user associated with this connection, if any.
+ void set_user_credentials(UserCredentials user_credentials);
+ const UserCredentials& user_credentials() const { return user_credentials_; }
+ UserCredentials* mutable_user_credentials() { return &user_credentials_; }
+
+ // Copy state from another object to this one.
+ void CopyFrom(const ConnectionId& other);
+
+ // Returns a string representation of the object, not including the password field.
+ std::string ToString() const;
+
+ size_t HashCode() const;
+ bool Equals(const ConnectionId& other) const;
+
+ private:
+ // Remember to update HashCode() and Equals() when new fields are added.
+ Sockaddr remote_;
+ UserCredentials user_credentials_;
+
+ // Implementation of CopyFrom that can be shared with copy constructor.
+ void DoCopyFrom(const ConnectionId& other);
+
+ // Disable assignment operator.
+ void operator=(const ConnectionId&);
+};
+
+class ConnectionIdHash {
+ public:
+ std::size_t operator() (const ConnectionId& conn_id) const;
+};
+
+class ConnectionIdEqual {
+ public:
+ bool operator() (const ConnectionId& cid1, const ConnectionId& cid2) const;
+};
+
+// Tracks the status of a call on the client side.
+//
+// This is an internal-facing class -- clients interact with the
+// RpcController class.
+//
+// This is allocated by the Proxy when a call is first created,
+// then passed to the reactor thread to send on the wire. It's typically
+// kept using a shared_ptr because a call may terminate in any number
+// of different threads, making it tricky to enforce single ownership.
+class OutboundCall {
+ public:
+
+ // Phases of an outbound RPC. Making an outbound RPC might involve establishing
+ // a connection to the remote server first, and the actual call is made only
+ // once the connection to the server is established.
+ enum class Phase {
+ // The phase of connection negotiation between the caller and the callee.
+ CONNECTION_NEGOTIATION,
+
+ // The phase of sending a call over already established connection.
+ REMOTE_CALL,
+ };
+
+ OutboundCall(const ConnectionId& conn_id, const RemoteMethod& remote_method,
+ google::protobuf::Message* response_storage,
+ RpcController* controller, ResponseCallback callback);
+
+ ~OutboundCall();
+
+ // Serialize the given request PB into this call's internal storage, and assume
+ // ownership of any sidecars that should accompany this request.
+ //
+ // Because the request data is fully serialized by this call, 'req' may be subsequently
+ // mutated with no ill effects.
+ void SetRequestPayload(const google::protobuf::Message& req,
+ std::vector<std::unique_ptr<RpcSidecar>>&& sidecars);
+
+ // Assign the call ID for this call. This is called from the reactor
+ // thread once a connection has been assigned. Must only be called once.
+ void set_call_id(int32_t call_id) {
+ DCHECK_EQ(header_.call_id(), kInvalidCallId) << "Already has a call ID";
+ header_.set_call_id(call_id);
+ }
+
+ // Serialize the call for the wire. Requires that SetRequestPayload()
+ // is called first. This is called from the Reactor thread.
+ Status SerializeTo(std::vector<Slice>* slices);
+
+ // Callback after the call has been put on the outbound connection queue.
+ void SetQueued();
+
+ // Update the call state to show that the request has started being sent
+ // on the socket.
+ void SetSending();
+
+ // Update the call state to show that the request has been sent.
+ void SetSent();
+
+ // Mark the call as failed. This also triggers the callback to notify
+ // the caller. If the call failed due to a remote error, then err_pb
+ // should be set to the error returned by the remote server. Takes
+ // ownership of 'err_pb'.
+ void SetFailed(const Status& status,
+ Phase phase = Phase::REMOTE_CALL,
+ ErrorStatusPB* err_pb = nullptr);
+
+ // Mark the call as timed out. This also triggers the callback to notify
+ // the caller.
+ void SetTimedOut(Phase phase);
+ bool IsTimedOut() const;
+
+ bool IsNegotiationError() const;
+
+ // Is the call finished?
+ bool IsFinished() const;
+
+ // Fill in the call response.
+ void SetResponse(gscoped_ptr<CallResponse> resp);
+
+ const std::set<RpcFeatureFlag>& required_rpc_features() const {
+ return required_rpc_features_;
+ }
+
+ std::string ToString() const;
+
+ void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp);
+
+ ////////////////////////////////////////////////////////////
+ // Getters
+ ////////////////////////////////////////////////////////////
+
+ const ConnectionId& conn_id() const { return conn_id_; }
+ const RemoteMethod& remote_method() const { return remote_method_; }
+ const ResponseCallback &callback() const { return callback_; }
+ RpcController* controller() { return controller_; }
+ const RpcController* controller() const { return controller_; }
+
+ // Return true if a call ID has been assigned to this call.
+ bool call_id_assigned() const {
+ return header_.call_id() != kInvalidCallId;
+ }
+
+ int32_t call_id() const {
+ DCHECK(call_id_assigned());
+ return header_.call_id();
+ }
+
+ private:
+ friend class RpcController;
+
+ // Various states the call propagates through.
+ // NB: if adding another state, be sure to update OutboundCall::IsFinished()
+ // and OutboundCall::StateName(State state) as well.
+ enum State {
+ READY = 0,
+ ON_OUTBOUND_QUEUE,
+ SENDING,
+ SENT,
+ NEGOTIATION_TIMED_OUT,
+ TIMED_OUT,
+ FINISHED_NEGOTIATION_ERROR,
+ FINISHED_ERROR,
+ FINISHED_SUCCESS
+ };
+
+ static std::string StateName(State state);
+
+ void set_state(State new_state);
+ State state() const;
+
+ // Same as set_state, but requires that the caller already holds
+ // lock_
+ void set_state_unlocked(State new_state);
+
+ // return current status
+ Status status() const;
+
+ // Time when the call was first initiatied.
+ MonoTime start_time_;
+
+ // Return the error protobuf, if a remote error occurred.
+ // This will only be non-NULL if status().IsRemoteError().
+ const ErrorStatusPB* error_pb() const;
+
+ // Lock for state_ status_, error_pb_ fields, since they
+ // may be mutated by the reactor thread while the client thread
+ // reads them.
+ mutable simple_spinlock lock_;
+ State state_;
+ Status status_;
+ gscoped_ptr<ErrorStatusPB> error_pb_;
+
+ // Call the user-provided callback.
+ void CallCallback();
+
+ // The RPC header.
+ // Parts of this (eg the call ID) are only assigned once this call has been
+ // passed to the reactor thread and assigned a connection.
+ RequestHeader header_;
+
+ // The remote method being called.
+ RemoteMethod remote_method_;
+
+ // RPC-system features required to send this call.
+ std::set<RpcFeatureFlag> required_rpc_features_;
+
+ const ConnectionId conn_id_;
+ ResponseCallback callback_;
+ RpcController* controller_;
+
+ // Pointer for the protobuf where the response should be written.
+ google::protobuf::Message* response_;
+
+ // Buffers for storing segments of the wire-format request.
+ faststring header_buf_;
+ faststring request_buf_;
+
+ // Once a response has been received for this call, contains that response.
+ // Otherwise NULL.
+ gscoped_ptr<CallResponse> call_response_;
+
+ // All sidecars to be sent with this call.
+ std::vector<std::unique_ptr<RpcSidecar>> sidecars_;
+
+ // Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload().
+ int64_t sidecar_byte_size_ = -1;
+
+ DISALLOW_COPY_AND_ASSIGN(OutboundCall);
+};
+
+// A response to a call, on the client side.
+// Upon receiving a response, this is allocated in the reactor thread and filled
+// into the OutboundCall instance via OutboundCall::SetResponse.
+//
+// This may either be a success or error response.
+//
+// This class takes care of separating out the distinct payload slices sent
+// over.
+class CallResponse {
+ public:
+ CallResponse();
+
+ // Parse the response received from a call. This must be called before any
+ // other methods on this object.
+ Status ParseFrom(gscoped_ptr<InboundTransfer> transfer);
+
+ // Return true if the call succeeded.
+ bool is_success() const {
+ DCHECK(parsed_);
+ return !header_.is_error();
+ }
+
+ // Return the call ID that this response is related to.
+ int32_t call_id() const {
+ DCHECK(parsed_);
+ return header_.call_id();
+ }
+
+ // Return the serialized response data. This is just the response "body" --
+ // either a serialized ErrorStatusPB, or the serialized user response protobuf.
+ const Slice &serialized_response() const {
+ DCHECK(parsed_);
+ return serialized_response_;
+ }
+
+ // See RpcController::GetSidecar()
+ Status GetSidecar(int idx, Slice* sidecar) const;
+
+ private:
+ // True once ParseFrom() is called.
+ bool parsed_;
+
+ // The parsed header.
+ ResponseHeader header_;
+
+ // The slice of data for the encoded protobuf response.
+ // This slice refers to memory allocated by transfer_
+ Slice serialized_response_;
+
+ // Slices of data for rpc sidecars. They point into memory owned by transfer_.
+ Slice sidecar_slices_[TransferLimits::kMaxSidecars];
+
+ // The incoming transfer data - retained because serialized_response_
+ // and sidecar_slices_ refer into its data.
+ gscoped_ptr<InboundTransfer> transfer_;
+
+ DISALLOW_COPY_AND_ASSIGN(CallResponse);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/protoc-gen-krpc.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/protoc-gen-krpc.cc b/be/src/kudu/rpc/protoc-gen-krpc.cc
new file mode 100644
index 0000000..de41aa9
--- /dev/null
+++ b/be/src/kudu/rpc/protoc-gen-krpc.cc
@@ -0,0 +1,674 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+////////////////////////////////////////////////////////////////////////////////
+// Example usage:
+// protoc --plugin=protoc-gen-krpc --krpc_out . --proto_path . <file>.proto
+////////////////////////////////////////////////////////////////////////////////
+
+#include <ctype.h>
+
+#include <iostream>
+#include <map>
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include <boost/optional.hpp>
+#include <glog/logging.h>
+#include <google/protobuf/compiler/code_generator.h>
+#include <google/protobuf/compiler/plugin.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <google/protobuf/io/printer.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#include <google/protobuf/stubs/common.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/status.h"
+#include "kudu/util/string_case.h"
+
+using boost::optional;
+using google::protobuf::FileDescriptor;
+using google::protobuf::io::Printer;
+using google::protobuf::MethodDescriptor;
+using google::protobuf::ServiceDescriptor;
+using std::map;
+using std::shared_ptr;
+using std::set;
+using std::string;
+using std::vector;
+
+namespace kudu {
+namespace rpc {
+
+namespace {
+
+// Return the name of the authorization method specified for this
+// RPC method, or boost::none if none is specified.
+//
+// This handles fallback to the service-wide default.
+optional<string> GetAuthzMethod(const MethodDescriptor& method) {
+ if (method.options().HasExtension(authz_method)) {
+ return method.options().GetExtension(authz_method);
+ }
+ if (method.service()->options().HasExtension(default_authz_method)) {
+ return method.service()->options().GetExtension(default_authz_method);
+ }
+ return boost::none;
+}
+
+} // anonymous namespace
+
+class Substituter {
+ public:
+ virtual ~Substituter() {}
+ virtual void InitSubstitutionMap(map<string, string> *map) const = 0;
+};
+
+// NameInfo contains information about the output names.
+class FileSubstitutions : public Substituter {
+ public:
+ static const std::string PROTO_EXTENSION;
+
+ Status Init(const FileDescriptor *file) {
+ string path = file->name();
+ map_["path"] = path;
+
+ // Initialize path_
+ // If path = /foo/bar/baz_stuff.proto, path_ = /foo/bar/baz_stuff
+ if (!TryStripSuffixString(path, PROTO_EXTENSION, &path_no_extension_)) {
+ return Status::InvalidArgument("file name " + path +
+ " did not end in " + PROTO_EXTENSION);
+ }
+ map_["path_no_extension"] = path_no_extension_;
+
+ // If path = /foo/bar/baz_stuff.proto, base_ = baz_stuff
+ string base;
+ GetBaseName(path_no_extension_, &base);
+ map_["base"] = base;
+
+ // If path = /foo/bar/baz_stuff.proto, camel_case_ = BazStuff
+ string camel_case;
+ SnakeToCamelCase(base, &camel_case);
+ map_["camel_case"] = camel_case;
+
+ // If path = /foo/bar/baz_stuff.proto, upper_case_ = BAZ_STUFF
+ string upper_case;
+ ToUpperCase(base, &upper_case);
+ map_["upper_case"] = upper_case;
+
+ map_["open_namespace"] = GenerateOpenNamespace(file->package());
+ map_["close_namespace"] = GenerateCloseNamespace(file->package());
+
+ return Status::OK();
+ }
+
+ virtual void InitSubstitutionMap(map<string, string> *map) const OVERRIDE {
+ typedef std::map<string, string>::value_type kv_pair;
+ for (const kv_pair &pair : map_) {
+ (*map)[pair.first] = pair.second;
+ }
+ }
+
+ std::string service_header() const {
+ return path_no_extension_ + ".service.h";
+ }
+
+ std::string service() const {
+ return path_no_extension_ + ".service.cc";
+ }
+
+ std::string proxy_header() const {
+ return path_no_extension_ + ".proxy.h";
+ }
+
+ std::string proxy() const {
+ return path_no_extension_ + ".proxy.cc";
+ }
+
+ private:
+ // Extract the last filename component.
+ static void GetBaseName(const string &path,
+ string *base) {
+ size_t last_slash = path.find_last_of("/");
+ if (last_slash != string::npos) {
+ *base = path.substr(last_slash + 1);
+ } else {
+ *base = path;
+ }
+ }
+
+ static string GenerateOpenNamespace(const string &str) {
+ vector<string> components = strings::Split(str, ".");
+ string out;
+ for (const string &c : components) {
+ out.append("namespace ").append(c).append(" {\n");
+ }
+ return out;
+ }
+
+ static string GenerateCloseNamespace(const string &str) {
+ vector<string> components = strings::Split(str, ".");
+ string out;
+ for (auto c = components.crbegin(); c != components.crend(); c++) {
+ out.append("} // namespace ").append(*c).append("\n");
+ }
+ return out;
+ }
+
+ std::string path_no_extension_;
+ map<string, string> map_;
+};
+
+const std::string FileSubstitutions::PROTO_EXTENSION(".proto");
+
+class MethodSubstitutions : public Substituter {
+ public:
+ explicit MethodSubstitutions(const MethodDescriptor *method)
+ : method_(method) {
+ }
+
+ virtual void InitSubstitutionMap(map<string, string> *map) const OVERRIDE {
+
+ (*map)["rpc_name"] = method_->name();
+ (*map)["rpc_full_name"] = method_->full_name();
+ (*map)["rpc_full_name_plainchars"] =
+ StringReplace(method_->full_name(), ".", "_", true);
+ (*map)["request"] =
+ ReplaceNamespaceDelimiters(
+ StripNamespaceIfPossible(method_->service()->full_name(),
+ method_->input_type()->full_name()));
+ (*map)["response"] =
+ ReplaceNamespaceDelimiters(
+ StripNamespaceIfPossible(method_->service()->full_name(),
+ method_->output_type()->full_name()));
+ (*map)["metric_enum_key"] = strings::Substitute("kMetricIndex$0", method_->name());
+ bool track_result = static_cast<bool>(method_->options().GetExtension(track_rpc_result));
+ (*map)["track_result"] = track_result ? " true" : "false";
+ (*map)["authz_method"] = GetAuthzMethod(*method_).get_value_or("AuthorizeAllowAll");
+ }
+
+ // Strips the package from method arguments if they are in the same package as
+ // the service, otherwise leaves them so that we can have fully qualified
+ // namespaces for method arguments.
+ static std::string StripNamespaceIfPossible(const std::string& service_full_name,
+ const std::string& arg_full_name) {
+ StringPiece service_package(service_full_name);
+ if (!service_package.contains(".")) {
+ return arg_full_name;
+ }
+ // remove the service name so that we are left with only the package, including
+ // the last '.' so that we account for different packages with the same prefix.
+ service_package.remove_suffix(service_package.length() -
+ service_package.find_last_of(".") - 1);
+
+ StringPiece argfqn(arg_full_name);
+ if (argfqn.starts_with(service_package)) {
+ argfqn.remove_prefix(argfqn.find_last_of(".") + 1);
+ }
+ return argfqn.ToString();
+ }
+
+ static std::string ReplaceNamespaceDelimiters(const std::string& arg_full_name) {
+ return JoinStrings(strings::Split(arg_full_name, "."), "::");
+ }
+
+ private:
+ const MethodDescriptor *method_;
+};
+
+class ServiceSubstitutions : public Substituter {
+ public:
+ explicit ServiceSubstitutions(const ServiceDescriptor *service)
+ : service_(service)
+ {}
+
+ virtual void InitSubstitutionMap(map<string, string> *map) const OVERRIDE {
+ (*map)["service_name"] = service_->name();
+ (*map)["full_service_name"] = service_->full_name();
+ (*map)["service_method_count"] = SimpleItoa(service_->method_count());
+
+ // TODO: upgrade to protobuf 2.5.x and attach service comments
+ // to the generated service classes using the SourceLocation API.
+ }
+
+ private:
+ const ServiceDescriptor *service_;
+};
+
+
+class SubstitutionContext {
+ public:
+ // Takes ownership of the substituter
+ void Push(const Substituter *sub) {
+ subs_.push_back(shared_ptr<const Substituter>(sub));
+ }
+
+ void PushMethod(const MethodDescriptor *method) {
+ Push(new MethodSubstitutions(method));
+ }
+
+ void PushService(const ServiceDescriptor *service) {
+ Push(new ServiceSubstitutions(service));
+ }
+
+ void Pop() {
+ CHECK(!subs_.empty());
+ subs_.pop_back();
+ }
+
+ void InitSubstitutionMap(map<string, string> *subs) const {
+ for (const shared_ptr<const Substituter> &sub : subs_) {
+ sub->InitSubstitutionMap(subs);
+ }
+ }
+
+ private:
+ vector<shared_ptr<const Substituter> > subs_;
+};
+
+
+
+class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator {
+ public:
+ CodeGenerator() { }
+
+ ~CodeGenerator() { }
+
+ bool Generate(const google::protobuf::FileDescriptor *file,
+ const std::string &/* parameter */,
+ google::protobuf::compiler::GeneratorContext *gen_context,
+ std::string *error) const OVERRIDE {
+ auto name_info = new FileSubstitutions();
+ Status ret = name_info->Init(file);
+ if (!ret.ok()) {
+ *error = "name_info.Init failed: " + ret.ToString();
+ return false;
+ }
+
+ SubstitutionContext subs;
+ subs.Push(name_info);
+
+ gscoped_ptr<google::protobuf::io::ZeroCopyOutputStream> ih_output(
+ gen_context->Open(name_info->service_header()));
+ Printer ih_printer(ih_output.get(), '$');
+ GenerateServiceIfHeader(&ih_printer, &subs, file);
+
+ gscoped_ptr<google::protobuf::io::ZeroCopyOutputStream> i_output(
+ gen_context->Open(name_info->service()));
+ Printer i_printer(i_output.get(), '$');
+ GenerateServiceIf(&i_printer, &subs, file);
+
+ gscoped_ptr<google::protobuf::io::ZeroCopyOutputStream> ph_output(
+ gen_context->Open(name_info->proxy_header()));
+ Printer ph_printer(ph_output.get(), '$');
+ GenerateProxyHeader(&ph_printer, &subs, file);
+
+ gscoped_ptr<google::protobuf::io::ZeroCopyOutputStream> p_output(
+ gen_context->Open(name_info->proxy()));
+ Printer p_printer(p_output.get(), '$');
+ GenerateProxy(&p_printer, &subs, file);
+
+ return true;
+ }
+
+ private:
+ void Print(Printer *printer,
+ const SubstitutionContext &sub,
+ const char *text) const {
+ map<string, string> subs;
+ sub.InitSubstitutionMap(&subs);
+ printer->Print(subs, text);
+ }
+
+ void GenerateServiceIfHeader(Printer *printer,
+ SubstitutionContext *subs,
+ const FileDescriptor *file) const {
+ Print(printer, *subs,
+ "// THIS FILE IS AUTOGENERATED FROM $path$\n"
+ "\n"
+ "#ifndef KUDU_RPC_$upper_case$_SERVICE_IF_DOT_H\n"
+ "#define KUDU_RPC_$upper_case$_SERVICE_IF_DOT_H\n"
+ "\n"
+ "#include \"$path_no_extension$.pb.h\"\n"
+ "\n"
+ "#include <functional>\n"
+ "#include <memory>\n"
+ "#include <string>\n"
+ "\n"
+ "#include \"kudu/rpc/rpc_header.pb.h\"\n"
+ "#include \"kudu/rpc/service_if.h\"\n"
+ "\n"
+ "namespace kudu {\n"
+ "class MetricEntity;\n"
+ "namespace rpc {\n"
+ "class Messenger;\n"
+ "class ResultTracker;\n"
+ "class RpcContext;\n"
+ "} // namespace rpc\n"
+ "} // namespace kudu\n"
+ "\n"
+ "$open_namespace$"
+ "\n"
+ );
+
+ for (int service_idx = 0; service_idx < file->service_count();
+ ++service_idx) {
+ const ServiceDescriptor *service = file->service(service_idx);
+ subs->PushService(service);
+
+ Print(printer, *subs,
+ "class $service_name$If : public ::kudu::rpc::GeneratedServiceIf {\n"
+ " public:\n"
+ " explicit $service_name$If(const scoped_refptr<::kudu::MetricEntity>& entity,"
+ " const scoped_refptr<::kudu::rpc::ResultTracker>& result_tracker);\n"
+ " virtual ~$service_name$If();\n"
+ " std::string service_name() const override;\n"
+ " static std::string static_service_name();\n"
+ "\n"
+ );
+
+ set<string> authz_methods;
+ for (int method_idx = 0; method_idx < service->method_count();
+ ++method_idx) {
+ const MethodDescriptor *method = service->method(method_idx);
+ subs->PushMethod(method);
+
+ Print(printer, *subs,
+ " virtual void $rpc_name$(const $request$ *req,\n"
+ " $response$ *resp, ::kudu::rpc::RpcContext *context) = 0;\n"
+ );
+ subs->Pop();
+ if (auto m = GetAuthzMethod(*method)) {
+ authz_methods.insert(m.get());
+ }
+ }
+
+ if (!authz_methods.empty()) {
+ printer->Print(
+ "\n\n"
+ " // Authorization methods\n"
+ " // ---------------------\n\n");
+ }
+ for (const string& m : authz_methods) {
+ printer->Print({ {"m", m} },
+ " virtual bool $m$(const google::protobuf::Message* req,\n"
+ " google::protobuf::Message* resp, ::kudu::rpc::RpcContext *context) = 0;\n");
+ }
+
+ Print(printer, *subs,
+ "\n"
+ "};\n"
+ );
+
+ subs->Pop(); // Service
+ }
+
+ Print(printer, *subs,
+ "\n"
+ "$close_namespace$\n"
+ "#endif\n");
+ }
+
+ void GenerateServiceIf(Printer *printer,
+ SubstitutionContext *subs,
+ const FileDescriptor *file) const {
+ Print(printer, *subs,
+ "// THIS FILE IS AUTOGENERATED FROM $path$\n"
+ "\n"
+ "#include \"$path_no_extension$.pb.h\"\n"
+ "#include \"$path_no_extension$.service.h\"\n"
+ "\n"
+ "#include <glog/logging.h>\n"
+ "\n"
+ "#include \"kudu/rpc/inbound_call.h\"\n"
+ "#include \"kudu/rpc/remote_method.h\"\n"
+ "#include \"kudu/rpc/rpc_context.h\"\n"
+ "#include \"kudu/rpc/service_if.h\"\n"
+ "#include \"kudu/util/metrics.h\"\n"
+ "\n");
+
+ // Define metric prototypes for each method in the service.
+ for (int service_idx = 0; service_idx < file->service_count();
+ ++service_idx) {
+ const ServiceDescriptor *service = file->service(service_idx);
+ subs->PushService(service);
+
+ for (int method_idx = 0; method_idx < service->method_count();
+ ++method_idx) {
+ const MethodDescriptor *method = service->method(method_idx);
+ subs->PushMethod(method);
+ Print(printer, *subs,
+ "METRIC_DEFINE_histogram(server, handler_latency_$rpc_full_name_plainchars$,\n"
+ " \"$rpc_full_name$ RPC Time\",\n"
+ " kudu::MetricUnit::kMicroseconds,\n"
+ " \"Microseconds spent handling $rpc_full_name$() RPC requests\",\n"
+ " 60000000LU, 2);\n"
+ "\n");
+ subs->Pop();
+ }
+
+ subs->Pop();
+ }
+
+ Print(printer, *subs,
+ "using google::protobuf::Message;\n"
+ "using kudu::MetricEntity;\n"
+ "using kudu::rpc::ResultTracker;\n"
+ "using kudu::rpc::RpcContext;\n"
+ "using kudu::rpc::RpcMethodInfo;\n"
+ "using std::unique_ptr;\n"
+ "\n"
+ "$open_namespace$"
+ "\n");
+
+ for (int service_idx = 0; service_idx < file->service_count();
+ ++service_idx) {
+ const ServiceDescriptor *service = file->service(service_idx);
+ subs->PushService(service);
+
+ Print(printer, *subs,
+ "$service_name$If::$service_name$If(const scoped_refptr<MetricEntity>& entity,"
+ " const scoped_refptr<ResultTracker>& result_tracker) {\n"
+ "result_tracker_ = result_tracker;\n"
+ );
+ for (int method_idx = 0; method_idx < service->method_count();
+ ++method_idx) {
+ const MethodDescriptor *method = service->method(method_idx);
+ subs->PushMethod(method);
+
+ Print(printer, *subs,
+ " {\n"
+ " scoped_refptr<RpcMethodInfo> mi(new RpcMethodInfo());\n"
+ " mi->req_prototype.reset(new $request$());\n"
+ " mi->resp_prototype.reset(new $response$());\n"
+ " mi->authz_method = [this](const Message* req, Message* resp,\n"
+ " RpcContext* ctx) {\n"
+ " return this->$authz_method$(static_cast<const $request$*>(req),\n"
+ " static_cast<$response$*>(resp),\n"
+ " ctx);\n"
+ " };\n"
+ " mi->track_result = $track_result$;\n"
+ " mi->handler_latency_histogram =\n"
+ " METRIC_handler_latency_$rpc_full_name_plainchars$.Instantiate(entity);\n"
+ " mi->func = [this](const Message* req, Message* resp, RpcContext* ctx) {\n"
+ " this->$rpc_name$(static_cast<const $request$*>(req),\n"
+ " static_cast<$response$*>(resp),\n"
+ " ctx);\n"
+ " };\n"
+ " methods_by_name_[\"$rpc_name$\"] = std::move(mi);\n"
+ " }\n");
+ subs->Pop();
+ }
+
+ Print(printer, *subs,
+ "}\n"
+ "\n"
+ "$service_name$If::~$service_name$If() {\n"
+ "}\n"
+ "\n"
+ "std::string $service_name$If::service_name() const {\n"
+ " return \"$full_service_name$\";\n"
+ "}\n"
+ "std::string $service_name$If::static_service_name() {\n"
+ " return \"$full_service_name$\";\n"
+ "}\n"
+ "\n"
+ );
+
+ subs->Pop();
+ }
+
+ Print(printer, *subs,
+ "$close_namespace$"
+ );
+ }
+
+ void GenerateProxyHeader(Printer *printer,
+ SubstitutionContext *subs,
+ const FileDescriptor *file) const {
+ Print(printer, *subs,
+ "// THIS FILE IS AUTOGENERATED FROM $path$\n"
+ "\n"
+ "#ifndef KUDU_RPC_$upper_case$_PROXY_DOT_H\n"
+ "#define KUDU_RPC_$upper_case$_PROXY_DOT_H\n"
+ "\n"
+ "#include \"$path_no_extension$.pb.h\"\n"
+ "\n"
+ "#include \"kudu/rpc/proxy.h\"\n"
+ "#include \"kudu/util/status.h\"\n"
+ "\n"
+ "namespace kudu { class Sockaddr; }\n"
+ "namespace kudu { namespace rpc { class UserCredentials; } }\n"
+ "$open_namespace$"
+ "\n"
+ "\n"
+ );
+
+ for (int service_idx = 0; service_idx < file->service_count();
+ ++service_idx) {
+ const ServiceDescriptor *service = file->service(service_idx);
+ subs->PushService(service);
+
+ Print(printer, *subs,
+ "class $service_name$Proxy : public ::kudu::rpc::Proxy {\n"
+ " public:\n"
+ " $service_name$Proxy(const std::shared_ptr< ::kudu::rpc::Messenger>\n"
+ " &messenger, const ::kudu::Sockaddr &sockaddr);\n"
+ " ~$service_name$Proxy();\n"
+ "\n"
+ );
+
+ for (int method_idx = 0; method_idx < service->method_count();
+ ++method_idx) {
+ const MethodDescriptor *method = service->method(method_idx);
+ subs->PushMethod(method);
+
+ Print(printer, *subs,
+ "\n"
+ " ::kudu::Status $rpc_name$(const $request$ &req, $response$ *resp,\n"
+ " ::kudu::rpc::RpcController *controller);\n"
+ " void $rpc_name$Async(const $request$ &req,\n"
+ " $response$ *response,\n"
+ " ::kudu::rpc::RpcController *controller,\n"
+ " const ::kudu::rpc::ResponseCallback &callback);\n"
+ );
+ subs->Pop();
+ }
+ Print(printer, *subs,
+ "};\n");
+ subs->Pop();
+ }
+ Print(printer, *subs,
+ "\n"
+ "$close_namespace$"
+ "\n"
+ "#endif\n"
+ );
+ }
+
+ void GenerateProxy(Printer *printer,
+ SubstitutionContext *subs,
+ const FileDescriptor *file) const {
+ Print(printer, *subs,
+ "// THIS FILE IS AUTOGENERATED FROM $path$\n"
+ "\n"
+ "#include \"$path_no_extension$.proxy.h\"\n"
+ "\n"
+ "#include \"kudu/rpc/outbound_call.h\"\n"
+ "#include \"kudu/util/net/sockaddr.h\"\n"
+ "\n"
+ "$open_namespace$"
+ "\n"
+ );
+
+ for (int service_idx = 0; service_idx < file->service_count();
+ ++service_idx) {
+ const ServiceDescriptor *service = file->service(service_idx);
+ subs->PushService(service);
+ Print(printer, *subs,
+ "$service_name$Proxy::$service_name$Proxy(\n"
+ " const std::shared_ptr< ::kudu::rpc::Messenger> &messenger,\n"
+ " const ::kudu::Sockaddr &remote)\n"
+ " : Proxy(messenger, remote, \"$full_service_name$\") {\n"
+ "}\n"
+ "\n"
+ "$service_name$Proxy::~$service_name$Proxy() {\n"
+ "}\n"
+ "\n"
+ "\n");
+ for (int method_idx = 0; method_idx < service->method_count();
+ ++method_idx) {
+ const MethodDescriptor *method = service->method(method_idx);
+ subs->PushMethod(method);
+ Print(printer, *subs,
+ "::kudu::Status $service_name$Proxy::$rpc_name$(const $request$ &req, $response$ *resp,\n"
+ " ::kudu::rpc::RpcController *controller) {\n"
+ " return SyncRequest(\"$rpc_name$\", req, resp, controller);\n"
+ "}\n"
+ "\n"
+ "void $service_name$Proxy::$rpc_name$Async(const $request$ &req,\n"
+ " $response$ *resp, ::kudu::rpc::RpcController *controller,\n"
+ " const ::kudu::rpc::ResponseCallback &callback) {\n"
+ " AsyncRequest(\"$rpc_name$\", req, resp, controller, callback);\n"
+ "}\n"
+ "\n");
+ subs->Pop();
+ }
+
+ subs->Pop();
+ }
+ Print(printer, *subs,
+ "$close_namespace$");
+ }
+};
+} // namespace rpc
+} // namespace kudu
+
+int main(int argc, char *argv[]) {
+ kudu::rpc::CodeGenerator generator;
+ return google::protobuf::compiler::PluginMain(argc, argv, &generator);
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/proxy.cc b/be/src/kudu/rpc/proxy.cc
new file mode 100644
index 0000000..45ad5dd
--- /dev/null
+++ b/be/src/kudu/rpc/proxy.cc
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/proxy.h"
+
+#include <boost/bind.hpp>
+#include <glog/logging.h>
+#include <inttypes.h>
+#include <memory>
+#include <stdint.h>
+
+#include <iostream>
+#include <sstream>
+#include <vector>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
+#include "kudu/util/user.h"
+
+using google::protobuf::Message;
+using std::string;
+using std::shared_ptr;
+
+namespace kudu {
+namespace rpc {
+
+Proxy::Proxy(std::shared_ptr<Messenger> messenger,
+ const Sockaddr& remote, string service_name)
+ : service_name_(std::move(service_name)),
+ messenger_(std::move(messenger)),
+ is_started_(false) {
+ CHECK(messenger_ != nullptr);
+ DCHECK(!service_name_.empty()) << "Proxy service name must not be blank";
+
+ // By default, we set the real user to the currently logged-in user.
+ // Effective user and password remain blank.
+ string real_user;
+ Status s = GetLoggedInUser(&real_user);
+ if (!s.ok()) {
+ LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get logged-in user name: "
+ << s.ToString() << " before connecting to remote: " << remote.ToString();
+ }
+
+ conn_id_.set_remote(remote);
+ conn_id_.mutable_user_credentials()->set_real_user(real_user);
+}
+
+Proxy::~Proxy() {
+}
+
+void Proxy::AsyncRequest(const string& method,
+ const google::protobuf::Message& req,
+ google::protobuf::Message* response,
+ RpcController* controller,
+ const ResponseCallback& callback) const {
+ CHECK(!controller->call_) << "Controller should be reset";
+ base::subtle::NoBarrier_Store(&is_started_, true);
+ RemoteMethod remote_method(service_name_, method);
+ controller->call_.reset(
+ new OutboundCall(conn_id_, remote_method, response, controller, callback));
+ controller->SetRequestParam(req);
+
+ // If this fails to queue, the callback will get called immediately
+ // and the controller will be in an ERROR state.
+ messenger_->QueueOutboundCall(controller->call_);
+}
+
+
+Status Proxy::SyncRequest(const string& method,
+ const google::protobuf::Message& req,
+ google::protobuf::Message* resp,
+ RpcController* controller) const {
+ CountDownLatch latch(1);
+ AsyncRequest(method, req, DCHECK_NOTNULL(resp), controller,
+ boost::bind(&CountDownLatch::CountDown, boost::ref(latch)));
+
+ latch.Wait();
+ return controller->status();
+}
+
+void Proxy::set_user_credentials(const UserCredentials& user_credentials) {
+ CHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
+ << "It is illegal to call set_user_credentials() after request processing has started";
+ conn_id_.set_user_credentials(user_credentials);
+}
+
+std::string Proxy::ToString() const {
+ return strings::Substitute("$0@$1", service_name_, conn_id_.ToString());
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/proxy.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/proxy.h b/be/src/kudu/rpc/proxy.h
new file mode 100644
index 0000000..ddbbe60
--- /dev/null
+++ b/be/src/kudu/rpc/proxy.h
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_PROXY_H
+#define KUDU_RPC_PROXY_H
+
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+namespace rpc {
+
+class Messenger;
+
+// Interface to send calls to a remote service.
+//
+// Proxy objects do not map one-to-one with TCP connections. The underlying TCP
+// connection is not established until the first call, and may be torn down and
+// re-established as necessary by the messenger. Additionally, the messenger is
+// likely to multiplex many Proxy objects on the same connection.
+//
+// Proxy objects are thread-safe after initialization only.
+// Setters on the Proxy are not thread-safe, and calling a setter after any RPC
+// request has started will cause a fatal error.
+//
+// After initialization, multiple threads may make calls using the same proxy object.
+class Proxy {
+ public:
+ Proxy(std::shared_ptr<Messenger> messenger, const Sockaddr& remote,
+ std::string service_name);
+ ~Proxy();
+
+ // Call a remote method asynchronously.
+ //
+ // Typically, users will not call this directly, but rather through
+ // a generated Proxy subclass.
+ //
+ // method: the method name to invoke on the remote server.
+ //
+ // req: the request protobuf. This will be serialized immediately,
+ // so the caller may free or otherwise mutate 'req' safely.
+ //
+ // resp: the response protobuf. This protobuf will be mutated upon
+ // completion of the call. The RPC system does not take ownership
+ // of this storage.
+ //
+ // NOTE: 'req' and 'resp' should be the appropriate protocol buffer implementation
+ // class corresponding to the parameter and result types of the service method
+ // defined in the service's '.proto' file.
+ //
+ // controller: the RpcController to associate with this call. Each call
+ // must use a unique controller object. Does not take ownership.
+ //
+ // callback: the callback to invoke upon call completion. This callback may
+ // be invoked before AsyncRequest() itself returns, or any time
+ // thereafter. It may be invoked either on the caller's thread
+ // or by an RPC IO thread, and thus should take care to not
+ // block or perform any heavy CPU work.
+ void AsyncRequest(const std::string& method,
+ const google::protobuf::Message& req,
+ google::protobuf::Message* resp,
+ RpcController* controller,
+ const ResponseCallback& callback) const;
+
+ // The same as AsyncRequest(), except that the call blocks until the call
+ // finishes. If the call fails, returns a non-OK result.
+ Status SyncRequest(const std::string& method,
+ const google::protobuf::Message& req,
+ google::protobuf::Message* resp,
+ RpcController* controller) const;
+
+ // Set the user credentials which should be used to log in.
+ void set_user_credentials(const UserCredentials& user_credentials);
+
+ // Get the user credentials which should be used to log in.
+ const UserCredentials& user_credentials() const { return conn_id_.user_credentials(); }
+
+ std::string ToString() const;
+
+ private:
+ const std::string service_name_;
+ std::shared_ptr<Messenger> messenger_;
+ ConnectionId conn_id_;
+ mutable Atomic32 is_started_;
+
+ DISALLOW_COPY_AND_ASSIGN(Proxy);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/reactor-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor-test.cc b/be/src/kudu/rpc/reactor-test.cc
new file mode 100644
index 0000000..2faac2a
--- /dev/null
+++ b/be/src/kudu/rpc/reactor-test.cc
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/reactor.h"
+
+#include "kudu/rpc/rpc-test-base.h"
+#include "kudu/util/countdown_latch.h"
+
+using std::shared_ptr;
+
+namespace kudu {
+namespace rpc {
+
+class ReactorTest : public RpcTestBase {
+ public:
+ ReactorTest()
+ : messenger_(CreateMessenger("my_messenger", 4)),
+ latch_(1) {
+ }
+
+ void ScheduledTask(const Status& status, const Status& expected_status) {
+ CHECK_EQ(expected_status.CodeAsString(), status.CodeAsString());
+ latch_.CountDown();
+ }
+
+ void ScheduledTaskCheckThread(const Status& status, const Thread* thread) {
+ CHECK_OK(status);
+ CHECK_EQ(thread, Thread::current_thread());
+ latch_.CountDown();
+ }
+
+ void ScheduledTaskScheduleAgain(const Status& status) {
+ messenger_->ScheduleOnReactor(
+ boost::bind(&ReactorTest::ScheduledTaskCheckThread, this, _1,
+ Thread::current_thread()),
+ MonoDelta::FromMilliseconds(0));
+ latch_.CountDown();
+ }
+
+ protected:
+ const shared_ptr<Messenger> messenger_;
+ CountDownLatch latch_;
+};
+
+TEST_F(ReactorTest, TestFunctionIsCalled) {
+ messenger_->ScheduleOnReactor(
+ boost::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()),
+ MonoDelta::FromSeconds(0));
+ latch_.Wait();
+}
+
+TEST_F(ReactorTest, TestFunctionIsCalledAtTheRightTime) {
+ MonoTime before = MonoTime::Now();
+ messenger_->ScheduleOnReactor(
+ boost::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()),
+ MonoDelta::FromMilliseconds(100));
+ latch_.Wait();
+ MonoTime after = MonoTime::Now();
+ MonoDelta delta = after - before;
+ CHECK_GE(delta.ToMilliseconds(), 100);
+}
+
+TEST_F(ReactorTest, TestFunctionIsCalledIfReactorShutdown) {
+ messenger_->ScheduleOnReactor(
+ boost::bind(&ReactorTest::ScheduledTask, this, _1,
+ Status::Aborted("doesn't matter")),
+ MonoDelta::FromSeconds(60));
+ messenger_->Shutdown();
+ latch_.Wait();
+}
+
+TEST_F(ReactorTest, TestReschedulesOnSameReactorThread) {
+ // Our scheduled task will schedule yet another task.
+ latch_.Reset(2);
+
+ messenger_->ScheduleOnReactor(
+ boost::bind(&ReactorTest::ScheduledTaskScheduleAgain, this, _1),
+ MonoDelta::FromSeconds(0));
+ latch_.Wait();
+ latch_.Wait();
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
new file mode 100644
index 0000000..e235dd4
--- /dev/null
+++ b/be/src/kudu/rpc/reactor.cc
@@ -0,0 +1,750 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/reactor.h"
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/optional.hpp>
+#include <ev++.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/client_negotiation.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/negotiation.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/debug/sanitizer_scopes.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/thread_restrictions.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+// When compiling on Mac OS X, use 'kqueue' instead of the default, 'select', for the event loop.
+// Otherwise we run into problems because 'select' can't handle connections when more than 1024
+// file descriptors are open by the process.
+#if defined(__APPLE__)
+static const int kDefaultLibEvFlags = ev::KQUEUE;
+#else
+static const int kDefaultLibEvFlags = ev::AUTO;
+#endif
+
+using std::string;
+using std::shared_ptr;
+using std::unique_ptr;
+using strings::Substitute;
+
+DEFINE_int64(rpc_negotiation_timeout_ms, 3000,
+ "Timeout for negotiating an RPC connection.");
+TAG_FLAG(rpc_negotiation_timeout_ms, advanced);
+TAG_FLAG(rpc_negotiation_timeout_ms, runtime);
+
+DEFINE_bool(rpc_reopen_outbound_connections, false,
+ "Open a new connection to the server for every RPC call. "
+ "If not enabled, an already existing connection to a "
+ "server is reused upon making another call to the same server. "
+ "When this flag is enabled, an already existing _idle_ connection "
+ "to the server is closed upon making another RPC call which would "
+ "reuse the connection otherwise. "
+ "Used by tests only.");
+TAG_FLAG(rpc_reopen_outbound_connections, unsafe);
+TAG_FLAG(rpc_reopen_outbound_connections, runtime);
+
+namespace kudu {
+namespace rpc {
+
+namespace {
+Status ShutdownError(bool aborted) {
+ const char* msg = "reactor is shutting down";
+ return aborted ?
+ Status::Aborted(msg, "", ESHUTDOWN) :
+ Status::ServiceUnavailable(msg, "", ESHUTDOWN);
+}
+} // anonymous namespace
+
+ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder& bld)
+ : loop_(kDefaultLibEvFlags),
+ cur_time_(MonoTime::Now()),
+ last_unused_tcp_scan_(cur_time_),
+ reactor_(reactor),
+ connection_keepalive_time_(bld.connection_keepalive_time_),
+ coarse_timer_granularity_(bld.coarse_timer_granularity_),
+ total_client_conns_cnt_(0),
+ total_server_conns_cnt_(0) {
+}
+
+Status ReactorThread::Init() {
+ DCHECK(thread_.get() == nullptr) << "Already started";
+ DVLOG(6) << "Called ReactorThread::Init()";
+ // Register to get async notifications in our epoll loop.
+ async_.set(loop_);
+ async_.set<ReactorThread, &ReactorThread::AsyncHandler>(this);
+ async_.start();
+
+ // Register the timer watcher.
+ // The timer is used for closing old TCP connections and applying
+ // backpressure.
+ timer_.set(loop_);
+ timer_.set<ReactorThread, &ReactorThread::TimerHandler>(this); // NOLINT(*)
+ timer_.start(coarse_timer_granularity_.ToSeconds(),
+ coarse_timer_granularity_.ToSeconds());
+
+ // Create Reactor thread.
+ return kudu::Thread::Create("reactor", "rpc reactor", &ReactorThread::RunThread, this, &thread_);
+}
+
+void ReactorThread::Shutdown() {
+ CHECK(reactor_->closing()) << "Should be called after setting closing_ flag";
+
+ VLOG(1) << name() << ": shutting down Reactor thread.";
+ WakeThread();
+}
+
+void ReactorThread::ShutdownInternal() {
+ DCHECK(IsCurrentThread());
+
+ // Tear down any outbound TCP connections.
+ Status service_unavailable = ShutdownError(false);
+ VLOG(1) << name() << ": tearing down outbound TCP connections...";
+ for (const auto& elem : client_conns_) {
+ const auto& conn = elem.second;
+ VLOG(1) << name() << ": shutting down " << conn->ToString();
+ conn->Shutdown(service_unavailable);
+ }
+ client_conns_.clear();
+
+ // Tear down any inbound TCP connections.
+ VLOG(1) << name() << ": tearing down inbound TCP connections...";
+ for (const auto& conn : server_conns_) {
+ VLOG(1) << name() << ": shutting down " << conn->ToString();
+ conn->Shutdown(service_unavailable);
+ }
+ server_conns_.clear();
+
+ // Abort any scheduled tasks.
+ //
+ // These won't be found in the ReactorThread's list of pending tasks
+ // because they've been "run" (that is, they've been scheduled).
+ Status aborted = ShutdownError(true); // aborted
+ for (DelayedTask* task : scheduled_tasks_) {
+ task->Abort(aborted); // should also free the task.
+ }
+ scheduled_tasks_.clear();
+
+ // Remove the OpenSSL thread state.
+ ERR_remove_thread_state(nullptr);
+}
+
+ReactorTask::ReactorTask() {
+}
+ReactorTask::~ReactorTask() {
+}
+
+Status ReactorThread::GetMetrics(ReactorMetrics* metrics) {
+ DCHECK(IsCurrentThread());
+ metrics->num_client_connections_ = client_conns_.size();
+ metrics->num_server_connections_ = server_conns_.size();
+ metrics->total_client_connections_ = total_client_conns_cnt_;
+ metrics->total_server_connections_ = total_server_conns_cnt_;
+ return Status::OK();
+}
+
+Status ReactorThread::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+ DumpRunningRpcsResponsePB* resp) {
+ DCHECK(IsCurrentThread());
+ for (const scoped_refptr<Connection>& conn : server_conns_) {
+ RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections()));
+ }
+ for (const conn_multimap_t::value_type& entry : client_conns_) {
+ Connection* conn = entry.second.get();
+ RETURN_NOT_OK(conn->DumpPB(req, resp->add_outbound_connections()));
+ }
+ return Status::OK();
+}
+
+void ReactorThread::WakeThread() {
+ // libev uses some lock-free synchronization, but doesn't have TSAN annotations.
+ // See http://lists.schmorp.de/pipermail/libev/2013q2/002178.html or KUDU-366
+ // for examples.
+ debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+ async_.send();
+}
+
+// Handle async events. These events are sent to the reactor by other
+// threads that want to bring something to our attention, like the fact that
+// we're shutting down, or the fact that there is a new outbound Transfer
+// ready to send.
+void ReactorThread::AsyncHandler(ev::async& /*watcher*/, int /*revents*/) {
+ DCHECK(IsCurrentThread());
+
+ if (PREDICT_FALSE(reactor_->closing())) {
+ ShutdownInternal();
+ loop_.break_loop(); // break the epoll loop and terminate the thread
+ return;
+ }
+
+ boost::intrusive::list<ReactorTask> tasks;
+ reactor_->DrainTaskQueue(&tasks);
+
+ while (!tasks.empty()) {
+ ReactorTask& task = tasks.front();
+ tasks.pop_front();
+ task.Run(this);
+ }
+}
+
+void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) {
+ DCHECK(IsCurrentThread());
+
+ Status s = StartConnectionNegotiation(conn);
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG(ERROR) << "Server connection negotiation failed: " << s.ToString();
+ DestroyConnection(conn.get(), s);
+ return;
+ }
+ ++total_server_conns_cnt_;
+ server_conns_.emplace_back(std::move(conn));
+}
+
+void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall>& call) {
+ DCHECK(IsCurrentThread());
+ scoped_refptr<Connection> conn;
+
+ Status s = FindOrStartConnection(call->conn_id(),
+ call->controller()->credentials_policy(),
+ &conn);
+ if (PREDICT_FALSE(!s.ok())) {
+ call->SetFailed(s, OutboundCall::Phase::CONNECTION_NEGOTIATION);
+ return;
+ }
+
+ conn->QueueOutboundCall(call);
+}
+
+//
+// Handles timer events. The periodic timer:
+//
+// 1. updates Reactor::cur_time_
+// 2. every tcp_conn_timeo_ seconds, close down connections older than
+// tcp_conn_timeo_ seconds.
+//
+void ReactorThread::TimerHandler(ev::timer& /*watcher*/, int revents) {
+ DCHECK(IsCurrentThread());
+ if (EV_ERROR & revents) {
+ LOG(WARNING) << "Reactor " << name() << " got an error in "
+ "the timer handler.";
+ return;
+ }
+ cur_time_ = MonoTime::Now();
+
+ ScanIdleConnections();
+}
+
+void ReactorThread::RegisterTimeout(ev::timer *watcher) {
+ watcher->set(loop_);
+}
+
+void ReactorThread::ScanIdleConnections() {
+ DCHECK(IsCurrentThread());
+ // Enforce TCP connection timeouts: server-side connections.
+ const auto server_conns_end = server_conns_.end();
+ uint64_t timed_out = 0;
+ for (auto it = server_conns_.begin(); it != server_conns_end; ) {
+ Connection* conn = it->get();
+ if (!conn->Idle()) {
+ VLOG(10) << "Connection " << conn->ToString() << " not idle";
+ ++it;
+ continue;
+ }
+
+ const MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
+ if (connection_delta <= connection_keepalive_time_) {
+ ++it;
+ continue;
+ }
+
+ conn->Shutdown(Status::NetworkError(
+ Substitute("connection timed out after $0", connection_keepalive_time_.ToString())));
+ VLOG(1) << "Timing out connection " << conn->ToString() << " - it has been idle for "
+ << connection_delta.ToString();
+ ++timed_out;
+ it = server_conns_.erase(it);
+ }
+
+ // Take care of idle client-side connections marked for shutdown.
+ uint64_t shutdown = 0;
+ for (auto it = client_conns_.begin(); it != client_conns_.end();) {
+ Connection* conn = it->second.get();
+ if (conn->scheduled_for_shutdown() && conn->Idle()) {
+ conn->Shutdown(Status::NetworkError(
+ "connection has been marked for shutdown"));
+ it = client_conns_.erase(it);
+ ++shutdown;
+ } else {
+ ++it;
+ }
+ }
+ // TODO(aserbin): clients may want to set their keepalive timeout for idle
+ // but not scheduled for shutdown connections.
+
+ VLOG_IF(1, timed_out > 0) << name() << ": timed out " << timed_out << " TCP connections.";
+ VLOG_IF(1, shutdown > 0) << name() << ": shutdown " << shutdown << " TCP connections.";
+}
+
+const std::string& ReactorThread::name() const {
+ return reactor_->name();
+}
+
+MonoTime ReactorThread::cur_time() const {
+ return cur_time_;
+}
+
+Reactor *ReactorThread::reactor() {
+ return reactor_;
+}
+
+bool ReactorThread::IsCurrentThread() const {
+ return thread_.get() == kudu::Thread::current_thread();
+}
+
+void ReactorThread::RunThread() {
+ ThreadRestrictions::SetWaitAllowed(false);
+ ThreadRestrictions::SetIOAllowed(false);
+ DVLOG(6) << "Calling ReactorThread::RunThread()...";
+ loop_.run(0);
+ VLOG(1) << name() << " thread exiting.";
+
+ // No longer need the messenger. This causes the messenger to
+ // get deleted when all the reactors exit.
+ reactor_->messenger_.reset();
+}
+
+Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
+ CredentialsPolicy cred_policy,
+ scoped_refptr<Connection>* conn) {
+ DCHECK(IsCurrentThread());
+ const auto range = client_conns_.equal_range(conn_id);
+ scoped_refptr<Connection> found_conn;
+ for (auto it = range.first; it != range.second;) {
+ const auto& c = it->second.get();
+ // * Do not use connections scheduled for shutdown to place new calls.
+ //
+ // * Do not use a connection with a non-compliant credentials policy.
+ // Instead, open a new one, while marking the former as scheduled for
+ // shutdown. This process converges: any connection that satisfies the
+ // PRIMARY_CREDENTIALS policy automatically satisfies the ANY_CREDENTIALS
+ // policy as well. The idea is to keep only one usable connection
+ // identified by the specified 'conn_id'.
+ //
+ // * If the test-only 'one-connection-per-RPC' mode is enabled, connections
+ // are re-established at every RPC call.
+ if (c->scheduled_for_shutdown() ||
+ !c->SatisfiesCredentialsPolicy(cred_policy) ||
+ PREDICT_FALSE(FLAGS_rpc_reopen_outbound_connections)) {
+ if (c->Idle()) {
+ // Shutdown idle connections to the target destination. Non-idle ones
+ // will be taken care of later by the idle connection scanner.
+ DCHECK_EQ(Connection::CLIENT, c->direction());
+ c->Shutdown(Status::NetworkError("connection is closed due to non-reuse policy"));
+ it = client_conns_.erase(it);
+ continue;
+ }
+ c->set_scheduled_for_shutdown();
+ } else {
+ DCHECK(!found_conn);
+ found_conn = c;
+ // Appropriate connection is found; continue further to take care of the
+ // rest of connections to mark them for shutdown if they are not
+ // satisfying the policy.
+ }
+ ++it;
+ }
+ if (found_conn) {
+ // Found matching not-to-be-shutdown connection: return it as the result.
+ conn->swap(found_conn);
+ return Status::OK();
+ }
+
+ // No connection to this remote. Need to create one.
+ VLOG(2) << name() << " FindOrStartConnection: creating "
+ << "new connection for " << conn_id.remote().ToString();
+
+ // Create a new socket and start connecting to the remote.
+ Socket sock;
+ RETURN_NOT_OK(CreateClientSocket(&sock));
+ RETURN_NOT_OK(StartConnect(&sock, conn_id.remote()));
+
+ unique_ptr<Socket> new_socket(new Socket(sock.Release()));
+
+ // Register the new connection in our map.
+ *conn = new Connection(
+ this, conn_id.remote(), std::move(new_socket), Connection::CLIENT, cred_policy);
+ (*conn)->set_local_user_credentials(conn_id.user_credentials());
+
+ // Kick off blocking client connection negotiation.
+ Status s = StartConnectionNegotiation(*conn);
+ if (s.IsIllegalState()) {
+ // Return a nicer error message to the user indicating -- if we just
+ // forward the status we'd get something generic like "ThreadPool is closing".
+ return Status::ServiceUnavailable("Client RPC Messenger shutting down");
+ }
+ // Propagate any other errors as-is.
+ RETURN_NOT_OK_PREPEND(s, "Unable to start connection negotiation thread");
+
+ // Insert into the client connection map to avoid duplicate connection requests.
+ client_conns_.emplace(conn_id, *conn);
+ ++total_client_conns_cnt_;
+
+ return Status::OK();
+}
+
+Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>& conn) {
+ DCHECK(IsCurrentThread());
+
+ // Set a limit on how long the server will negotiate with a new client.
+ MonoTime deadline = MonoTime::Now() +
+ MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_timeout_ms);
+
+ scoped_refptr<Trace> trace(new Trace());
+ ADOPT_TRACE(trace.get());
+ TRACE("Submitting negotiation task for $0", conn->ToString());
+ auto authentication = reactor()->messenger()->authentication();
+ auto encryption = reactor()->messenger()->encryption();
+ RETURN_NOT_OK(reactor()->messenger()->negotiation_pool()->SubmitClosure(
+ Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline)));
+ return Status::OK();
+}
+
+void ReactorThread::CompleteConnectionNegotiation(
+ const scoped_refptr<Connection>& conn,
+ const Status& status,
+ unique_ptr<ErrorStatusPB> rpc_error) {
+ DCHECK(IsCurrentThread());
+ if (PREDICT_FALSE(!status.ok())) {
+ DestroyConnection(conn.get(), status, std::move(rpc_error));
+ return;
+ }
+
+ // Switch the socket back to non-blocking mode after negotiation.
+ Status s = conn->SetNonBlocking(true);
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG(DFATAL) << "Unable to set connection to non-blocking mode: " << s.ToString();
+ DestroyConnection(conn.get(), s, std::move(rpc_error));
+ return;
+ }
+
+ conn->MarkNegotiationComplete();
+ conn->EpollRegister(loop_);
+}
+
+Status ReactorThread::CreateClientSocket(Socket *sock) {
+ Status ret = sock->Init(Socket::FLAG_NONBLOCKING);
+ if (ret.ok()) {
+ ret = sock->SetNoDelay(true);
+ }
+ LOG_IF(WARNING, !ret.ok())
+ << "failed to create an outbound connection because a new socket could not be created: "
+ << ret.ToString();
+ return ret;
+}
+
+Status ReactorThread::StartConnect(Socket *sock, const Sockaddr& remote) {
+ const Status ret = sock->Connect(remote);
+ if (ret.ok()) {
+ VLOG(3) << "StartConnect: connect finished immediately for " << remote.ToString();
+ return Status::OK();
+ }
+
+ int posix_code = ret.posix_code();
+ if (Socket::IsTemporarySocketError(posix_code) || posix_code == EINPROGRESS) {
+ VLOG(3) << "StartConnect: connect in progress for " << remote.ToString();
+ return Status::OK();
+ }
+
+ LOG(WARNING) << "Failed to create an outbound connection to " << remote.ToString()
+ << " because connect() failed: " << ret.ToString();
+ return ret;
+}
+
+void ReactorThread::DestroyConnection(Connection *conn,
+ const Status& conn_status,
+ unique_ptr<ErrorStatusPB> rpc_error) {
+ DCHECK(IsCurrentThread());
+
+ conn->Shutdown(conn_status, std::move(rpc_error));
+
+ // Unlink connection from lists.
+ if (conn->direction() == Connection::CLIENT) {
+ ConnectionId conn_id(conn->remote(), conn->local_user_credentials());
+ const auto range = client_conns_.equal_range(conn_id);
+ CHECK(range.first != range.second) << "Couldn't find connection " << conn->ToString();
+ // The client_conns_ container is a multi-map.
+ for (auto it = range.first; it != range.second;) {
+ if (it->second.get() == conn) {
+ it = client_conns_.erase(it);
+ break;
+ }
+ ++it;
+ }
+ } else if (conn->direction() == Connection::SERVER) {
+ auto it = server_conns_.begin();
+ while (it != server_conns_.end()) {
+ if ((*it).get() == conn) {
+ server_conns_.erase(it);
+ break;
+ }
+ ++it;
+ }
+ }
+}
+
+DelayedTask::DelayedTask(boost::function<void(const Status&)> func,
+ MonoDelta when)
+ : func_(std::move(func)),
+ when_(when),
+ thread_(nullptr) {
+}
+
+void DelayedTask::Run(ReactorThread* thread) {
+ DCHECK(thread_ == nullptr) << "Task has already been scheduled";
+ DCHECK(thread->IsCurrentThread());
+
+ // Schedule the task to run later.
+ thread_ = thread;
+ timer_.set(thread->loop_);
+ timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this);
+ timer_.start(when_.ToSeconds(), // after
+ 0); // repeat
+ thread_->scheduled_tasks_.insert(this);
+}
+
+void DelayedTask::Abort(const Status& abort_status) {
+ func_(abort_status);
+ delete this;
+}
+
+void DelayedTask::TimerHandler(ev::timer& watcher, int revents) {
+ // We will free this task's memory.
+ thread_->scheduled_tasks_.erase(this);
+
+ if (EV_ERROR & revents) {
+ string msg = "Delayed task got an error in its timer handler";
+ LOG(WARNING) << msg;
+ Abort(Status::Aborted(msg)); // Will delete 'this'.
+ } else {
+ func_(Status::OK());
+ delete this;
+ }
+}
+
+Reactor::Reactor(shared_ptr<Messenger> messenger,
+ int index, const MessengerBuilder& bld)
+ : messenger_(std::move(messenger)),
+ name_(StringPrintf("%s_R%03d", messenger_->name().c_str(), index)),
+ closing_(false),
+ thread_(this, bld) {
+}
+
+Status Reactor::Init() {
+ DVLOG(6) << "Called Reactor::Init()";
+ return thread_.Init();
+}
+
+void Reactor::Shutdown() {
+ {
+ std::lock_guard<LockType> l(lock_);
+ if (closing_) {
+ return;
+ }
+ closing_ = true;
+ }
+
+ thread_.Shutdown();
+
+ // Abort all pending tasks. No new tasks can get scheduled after this
+ // because ScheduleReactorTask() tests the closing_ flag set above.
+ Status aborted = ShutdownError(true);
+ while (!pending_tasks_.empty()) {
+ ReactorTask& task = pending_tasks_.front();
+ pending_tasks_.pop_front();
+ task.Abort(aborted);
+ }
+}
+
+Reactor::~Reactor() {
+ Shutdown();
+}
+
+const std::string& Reactor::name() const {
+ return name_;
+}
+
+bool Reactor::closing() const {
+ std::lock_guard<LockType> l(lock_);
+ return closing_;
+}
+
+// Task to call an arbitrary function within the reactor thread.
+class RunFunctionTask : public ReactorTask {
+ public:
+ explicit RunFunctionTask(boost::function<Status()> f)
+ : function_(std::move(f)), latch_(1) {}
+
+ void Run(ReactorThread* /*reactor*/) override {
+ status_ = function_();
+ latch_.CountDown();
+ }
+ void Abort(const Status& status) override {
+ status_ = status;
+ latch_.CountDown();
+ }
+
+ // Wait until the function has completed, and return the Status
+ // returned by the function.
+ Status Wait() {
+ latch_.Wait();
+ return status_;
+ }
+
+ private:
+ boost::function<Status()> function_;
+ Status status_;
+ CountDownLatch latch_;
+};
+
+Status Reactor::GetMetrics(ReactorMetrics *metrics) {
+ return RunOnReactorThread(boost::bind(&ReactorThread::GetMetrics, &thread_, metrics));
+}
+
+Status Reactor::RunOnReactorThread(const boost::function<Status()>& f) {
+ RunFunctionTask task(f);
+ ScheduleReactorTask(&task);
+ return task.Wait();
+}
+
+Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+ DumpRunningRpcsResponsePB* resp) {
+ return RunOnReactorThread(boost::bind(&ReactorThread::DumpRunningRpcs, &thread_,
+ boost::ref(req), resp));
+}
+
+class RegisterConnectionTask : public ReactorTask {
+ public:
+ explicit RegisterConnectionTask(scoped_refptr<Connection> conn)
+ : conn_(std::move(conn)) {
+ }
+
+ void Run(ReactorThread* reactor) override {
+ reactor->RegisterConnection(std::move(conn_));
+ delete this;
+ }
+
+ void Abort(const Status& /*status*/) override {
+ // We don't need to Shutdown the connection since it was never registered.
+ // This is only used for inbound connections, and inbound connections will
+ // never have any calls added to them until they've been registered.
+ delete this;
+ }
+
+ private:
+ scoped_refptr<Connection> conn_;
+};
+
+void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr& remote) {
+ VLOG(3) << name_ << ": new inbound connection to " << remote.ToString();
+ unique_ptr<Socket> new_socket(new Socket(socket->Release()));
+ auto task = new RegisterConnectionTask(
+ new Connection(&thread_, remote, std::move(new_socket), Connection::SERVER));
+ ScheduleReactorTask(task);
+}
+
+// Task which runs in the reactor thread to assign an outbound call
+// to a connection.
+class AssignOutboundCallTask : public ReactorTask {
+ public:
+ explicit AssignOutboundCallTask(shared_ptr<OutboundCall> call)
+ : call_(std::move(call)) {}
+
+ void Run(ReactorThread* reactor) override {
+ reactor->AssignOutboundCall(call_);
+ delete this;
+ }
+
+ void Abort(const Status& status) override {
+ // It doesn't matter what is the actual phase of the OutboundCall: just set
+ // it to Phase::REMOTE_CALL to finilize the state of the call.
+ call_->SetFailed(status, OutboundCall::Phase::REMOTE_CALL);
+ delete this;
+ }
+
+ private:
+ shared_ptr<OutboundCall> call_;
+};
+
+void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall>& call) {
+ DVLOG(3) << name_ << ": queueing outbound call "
+ << call->ToString() << " to remote " << call->conn_id().remote().ToString();
+ ScheduleReactorTask(new AssignOutboundCallTask(call));
+}
+
+void Reactor::ScheduleReactorTask(ReactorTask *task) {
+ {
+ std::unique_lock<LockType> l(lock_);
+ if (closing_) {
+ // We guarantee the reactor lock is not taken when calling Abort().
+ l.unlock();
+ task->Abort(ShutdownError(false));
+ return;
+ }
+ pending_tasks_.push_back(*task);
+ }
+ thread_.WakeThread();
+}
+
+bool Reactor::DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks) { // NOLINT(*)
+ std::lock_guard<LockType> l(lock_);
+ if (closing_) {
+ return false;
+ }
+ tasks->swap(pending_tasks_);
+ return true;
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.h b/be/src/kudu/rpc/reactor.h
new file mode 100644
index 0000000..37eedd4
--- /dev/null
+++ b/be/src/kudu/rpc/reactor.h
@@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_REACTOR_H
+#define KUDU_RPC_REACTOR_H
+
+#include <stdint.h>
+
+#include <list>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+
+#include <boost/function.hpp>
+#include <boost/intrusive/list.hpp>
+#include <boost/optional.hpp>
+#include <ev++.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Socket;
+
+namespace rpc {
+
+typedef std::list<scoped_refptr<Connection>> conn_list_t;
+
+class DumpRunningRpcsRequestPB;
+class DumpRunningRpcsResponsePB;
+class Messenger;
+class MessengerBuilder;
+class Reactor;
+enum class CredentialsPolicy;
+
+// Simple metrics information from within a reactor.
+struct ReactorMetrics {
+ // Number of client RPC connections currently connected.
+ int32_t num_client_connections_;
+ // Number of server RPC connections currently connected.
+ int32_t num_server_connections_;
+
+ // Total number of client RPC connections opened during Reactor's lifetime.
+ uint64_t total_client_connections_;
+ // Total number of server RPC connections opened during Reactor's lifetime.
+ uint64_t total_server_connections_;
+};
+
+// A task which can be enqueued to run on the reactor thread.
+class ReactorTask : public boost::intrusive::list_base_hook<> {
+ public:
+ ReactorTask();
+
+ // Run the task. 'reactor' is guaranteed to be the current thread.
+ virtual void Run(ReactorThread *reactor) = 0;
+
+ // Abort the task, in the case that the reactor shut down before the
+ // task could be processed. This may or may not run on the reactor thread
+ // itself.
+ //
+ // The Reactor guarantees that the Reactor lock is free when this
+ // method is called.
+ virtual void Abort(const Status &abort_status) {}
+
+ virtual ~ReactorTask();
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(ReactorTask);
+};
+
+// A ReactorTask that is scheduled to run at some point in the future.
+//
+// Semantically it works like RunFunctionTask with a few key differences:
+// 1. The user function is called during Abort. Put another way, the
+// user function is _always_ invoked, even during reactor shutdown.
+// 2. To differentiate between Abort and non-Abort, the user function
+// receives a Status as its first argument.
+class DelayedTask : public ReactorTask {
+ public:
+ DelayedTask(boost::function<void(const Status &)> func, MonoDelta when);
+
+ // Schedules the task for running later but doesn't actually run it yet.
+ void Run(ReactorThread* thread) override;
+
+ // Behaves like ReactorTask::Abort.
+ void Abort(const Status& abort_status) override;
+
+ private:
+ // libev callback for when the registered timer fires.
+ void TimerHandler(ev::timer& watcher, int revents);
+
+ // User function to invoke when timer fires or when task is aborted.
+ const boost::function<void(const Status&)> func_;
+
+ // Delay to apply to this task.
+ const MonoDelta when_;
+
+ // Link back to registering reactor thread.
+ ReactorThread* thread_;
+
+ // libev timer. Set when Run() is invoked.
+ ev::timer timer_;
+};
+
+// A ReactorThread is a libev event handler thread which manages I/O
+// on a list of sockets.
+//
+// All methods in this class are _only_ called from the reactor thread itself
+// except where otherwise specified. New methods should DCHECK(IsCurrentThread())
+// to ensure this.
+class ReactorThread {
+ public:
+ friend class Connection;
+
+ // Client-side connection map. Multiple connections could be open to a remote
+ // server if multiple credential policies are used for individual RPCs.
+ typedef std::unordered_multimap<ConnectionId, scoped_refptr<Connection>,
+ ConnectionIdHash, ConnectionIdEqual>
+ conn_multimap_t;
+
+ ReactorThread(Reactor *reactor, const MessengerBuilder &bld);
+
+ // This may be called from another thread.
+ Status Init();
+
+ // Add any connections on this reactor thread into the given status dump.
+ Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+ DumpRunningRpcsResponsePB* resp);
+
+ // Block until the Reactor thread is shut down
+ //
+ // This must be called from another thread.
+ void Shutdown();
+
+ // This method is thread-safe.
+ void WakeThread();
+
+ // libev callback for handling async notifications in our epoll thread.
+ void AsyncHandler(ev::async &watcher, int revents);
+
+ // libev callback for handling timer events in our epoll thread.
+ void TimerHandler(ev::timer &watcher, int revents);
+
+ // Register an epoll timer watcher with our event loop.
+ // Does not set a timeout or start it.
+ void RegisterTimeout(ev::timer *watcher);
+
+ // This may be called from another thread.
+ const std::string &name() const;
+
+ MonoTime cur_time() const;
+
+ // This may be called from another thread.
+ Reactor *reactor();
+
+ // Return true if this reactor thread is the thread currently
+ // running. Should be used in DCHECK assertions.
+ bool IsCurrentThread() const;
+
+ // Begin the process of connection negotiation.
+ // Must be called from the reactor thread.
+ Status StartConnectionNegotiation(const scoped_refptr<Connection>& conn);
+
+ // Transition back from negotiating to processing requests.
+ // Must be called from the reactor thread.
+ void CompleteConnectionNegotiation(const scoped_refptr<Connection>& conn,
+ const Status& status,
+ std::unique_ptr<ErrorStatusPB> rpc_error);
+
+ // Collect metrics.
+ // Must be called from the reactor thread.
+ Status GetMetrics(ReactorMetrics *metrics);
+
+ private:
+ friend class AssignOutboundCallTask;
+ friend class RegisterConnectionTask;
+ friend class DelayedTask;
+
+ // Run the main event loop of the reactor.
+ void RunThread();
+
+ // Find or create a new connection to the given remote.
+ // If such a connection already exists, returns that, otherwise creates a new one.
+ // May return a bad Status if the connect() call fails.
+ // The resulting connection object is managed internally by the reactor thread.
+ Status FindOrStartConnection(const ConnectionId& conn_id,
+ CredentialsPolicy cred_policy,
+ scoped_refptr<Connection>* conn);
+
+ // Shut down the given connection, removing it from the connection tracking
+ // structures of this reactor.
+ //
+ // The connection is not explicitly deleted -- shared_ptr reference counting
+ // may hold on to the object after this, but callers should assume that it
+ // _may_ be deleted by this call.
+ void DestroyConnection(Connection *conn, const Status &conn_status,
+ std::unique_ptr<ErrorStatusPB> rpc_error = {});
+
+ // Scan any open connections for idle ones that have been idle longer than
+ // connection_keepalive_time_
+ void ScanIdleConnections();
+
+ // Create a new client socket (non-blocking, NODELAY)
+ static Status CreateClientSocket(Socket *sock);
+
+ // Initiate a new connection on the given socket.
+ static Status StartConnect(Socket *sock, const Sockaddr &remote);
+
+ // Assign a new outbound call to the appropriate connection object.
+ // If this fails, the call is marked failed and completed.
+ void AssignOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+ // Register a new connection.
+ void RegisterConnection(scoped_refptr<Connection> conn);
+
+ // Actually perform shutdown of the thread, tearing down any connections,
+ // etc. This is called from within the thread.
+ void ShutdownInternal();
+
+ scoped_refptr<kudu::Thread> thread_;
+
+ // our epoll object (or kqueue, etc).
+ ev::dynamic_loop loop_;
+
+ // Used by other threads to notify the reactor thread
+ ev::async async_;
+
+ // Handles the periodic timer.
+ ev::timer timer_;
+
+ // Scheduled (but not yet run) delayed tasks.
+ //
+ // Each task owns its own memory and must be freed by its TaskRun and
+ // Abort members, provided it was allocated on the heap.
+ std::set<DelayedTask*> scheduled_tasks_;
+
+ // The current monotonic time. Updated every coarse_timer_granularity_secs_.
+ MonoTime cur_time_;
+
+ // last time we did TCP timeouts.
+ MonoTime last_unused_tcp_scan_;
+
+ // Map of sockaddrs to Connection objects for outbound (client) connections.
+ conn_multimap_t client_conns_;
+
+ // List of current connections coming into the server.
+ conn_list_t server_conns_;
+
+ Reactor *reactor_;
+
+ // If a connection has been idle for this much time, it is torn down.
+ const MonoDelta connection_keepalive_time_;
+
+ // Scan for idle connections on this granularity.
+ const MonoDelta coarse_timer_granularity_;
+
+ // Total number of client connections opened during Reactor's lifetime.
+ uint64_t total_client_conns_cnt_;
+
+ // Total number of server connections opened during Reactor's lifetime.
+ uint64_t total_server_conns_cnt_;
+};
+
+// A Reactor manages a ReactorThread
+class Reactor {
+ public:
+ Reactor(std::shared_ptr<Messenger> messenger,
+ int index,
+ const MessengerBuilder &bld);
+ Status Init();
+
+ // Block until the Reactor is shut down
+ void Shutdown();
+
+ ~Reactor();
+
+ const std::string &name() const;
+
+ // Collect metrics about the reactor.
+ Status GetMetrics(ReactorMetrics *metrics);
+
+ // Add any connections on this reactor thread into the given status dump.
+ Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+ DumpRunningRpcsResponsePB* resp);
+
+ // Queue a new incoming connection. Takes ownership of the underlying fd from
+ // 'socket', but not the Socket object itself.
+ // If the reactor is already shut down, takes care of closing the socket.
+ void RegisterInboundSocket(Socket *socket, const Sockaddr &remote);
+
+ // Queue a new call to be sent. If the reactor is already shut down, marks
+ // the call as failed.
+ void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+ // Schedule the given task's Run() method to be called on the
+ // reactor thread.
+ // If the reactor shuts down before it is run, the Abort method will be
+ // called.
+ // Does _not_ take ownership of 'task' -- the task should take care of
+ // deleting itself after running if it is allocated on the heap.
+ void ScheduleReactorTask(ReactorTask *task);
+
+ Status RunOnReactorThread(const boost::function<Status()>& f);
+
+ // If the Reactor is closing, returns false.
+ // Otherwise, drains the pending_tasks_ queue into the provided list.
+ bool DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks);
+
+ Messenger *messenger() const {
+ return messenger_.get();
+ }
+
+ // Indicates whether the reactor is shutting down.
+ //
+ // This method is thread-safe.
+ bool closing() const;
+
+ // Is this reactor's thread the current thread?
+ bool IsCurrentThread() const {
+ return thread_.IsCurrentThread();
+ }
+
+ private:
+ friend class ReactorThread;
+ typedef simple_spinlock LockType;
+ mutable LockType lock_;
+
+ // parent messenger
+ std::shared_ptr<Messenger> messenger_;
+
+ const std::string name_;
+
+ // Whether the reactor is shutting down.
+ // Guarded by lock_.
+ bool closing_;
+
+ // Tasks to be run within the reactor thread.
+ // Guarded by lock_.
+ boost::intrusive::list<ReactorTask> pending_tasks_; // NOLINT(build/include_what_you_use)
+
+ ReactorThread thread_;
+
+ DISALLOW_COPY_AND_ASSIGN(Reactor);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/remote_method.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_method.cc b/be/src/kudu/rpc/remote_method.cc
new file mode 100644
index 0000000..32ec40d
--- /dev/null
+++ b/be/src/kudu/rpc/remote_method.cc
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/rpc_header.pb.h"
+
+namespace kudu {
+namespace rpc {
+
+using strings::Substitute;
+
+RemoteMethod::RemoteMethod(std::string service_name,
+ const std::string method_name)
+ : service_name_(std::move(service_name)), method_name_(method_name) {}
+
+void RemoteMethod::FromPB(const RemoteMethodPB& pb) {
+ DCHECK(pb.IsInitialized()) << "PB is uninitialized: " << pb.InitializationErrorString();
+ service_name_ = pb.service_name();
+ method_name_ = pb.method_name();
+}
+
+void RemoteMethod::ToPB(RemoteMethodPB* pb) const {
+ pb->set_service_name(service_name_);
+ pb->set_method_name(method_name_);
+}
+
+string RemoteMethod::ToString() const {
+ return Substitute("$0.$1", service_name_, method_name_);
+}
+
+} // namespace rpc
+} // namespace kudu