You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/17 03:14:27 UTC

[11/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC library from kudu@314c9d8

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/outbound_call.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/outbound_call.h b/be/src/kudu/rpc/outbound_call.h
new file mode 100644
index 0000000..ebed9b5
--- /dev/null
+++ b/be/src/kudu/rpc/outbound_call.h
@@ -0,0 +1,363 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_CLIENT_CALL_H
+#define KUDU_RPC_CLIENT_CALL_H
+
+#include <set>
+#include <string>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/rpc/user_credentials.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+namespace rpc {
+
+class CallResponse;
+class Connection;
+class DumpRunningRpcsRequestPB;
+class InboundTransfer;
+class RpcCallInProgressPB;
+class RpcController;
+class RpcSidecar;
+
+// Used to key on Connection information.
+// For use as a key in an unordered STL collection, use ConnectionIdHash and ConnectionIdEqual.
+// This class is copyable for STL compatibility, but not assignable (use CopyFrom() for that).
+class ConnectionId {
+ public:
+  ConnectionId();
+
+  // Copy constructor required for use with STL unordered_map.
+  ConnectionId(const ConnectionId& other);
+
+  // Convenience constructor.
+  ConnectionId(const Sockaddr& remote, UserCredentials user_credentials);
+
+  // The remote address.
+  void set_remote(const Sockaddr& remote);
+  const Sockaddr& remote() const { return remote_; }
+
+  // The credentials of the user associated with this connection, if any.
+  void set_user_credentials(UserCredentials user_credentials);
+  const UserCredentials& user_credentials() const { return user_credentials_; }
+  UserCredentials* mutable_user_credentials() { return &user_credentials_; }
+
+  // Copy state from another object to this one.
+  void CopyFrom(const ConnectionId& other);
+
+  // Returns a string representation of the object, not including the password field.
+  std::string ToString() const;
+
+  size_t HashCode() const;
+  bool Equals(const ConnectionId& other) const;
+
+ private:
+  // Remember to update HashCode() and Equals() when new fields are added.
+  Sockaddr remote_;
+  UserCredentials user_credentials_;
+
+  // Implementation of CopyFrom that can be shared with copy constructor.
+  void DoCopyFrom(const ConnectionId& other);
+
+  // Disable assignment operator.
+  void operator=(const ConnectionId&);
+};
+
+class ConnectionIdHash {
+ public:
+  std::size_t operator() (const ConnectionId& conn_id) const;
+};
+
+class ConnectionIdEqual {
+ public:
+  bool operator() (const ConnectionId& cid1, const ConnectionId& cid2) const;
+};
+
+// Tracks the status of a call on the client side.
+//
+// This is an internal-facing class -- clients interact with the
+// RpcController class.
+//
+// This is allocated by the Proxy when a call is first created,
+// then passed to the reactor thread to send on the wire. It's typically
+// kept using a shared_ptr because a call may terminate in any number
+// of different threads, making it tricky to enforce single ownership.
+class OutboundCall {
+ public:
+
+  // Phases of an outbound RPC. Making an outbound RPC might involve establishing
+  // a connection to the remote server first, and the actual call is made only
+  // once the connection to the server is established.
+  enum class Phase {
+    // The phase of connection negotiation between the caller and the callee.
+    CONNECTION_NEGOTIATION,
+
+    // The phase of sending a call over already established connection.
+    REMOTE_CALL,
+  };
+
+  OutboundCall(const ConnectionId& conn_id, const RemoteMethod& remote_method,
+               google::protobuf::Message* response_storage,
+               RpcController* controller, ResponseCallback callback);
+
+  ~OutboundCall();
+
+  // Serialize the given request PB into this call's internal storage, and assume
+  // ownership of any sidecars that should accompany this request.
+  //
+  // Because the request data is fully serialized by this call, 'req' may be subsequently
+  // mutated with no ill effects.
+  void SetRequestPayload(const google::protobuf::Message& req,
+      std::vector<std::unique_ptr<RpcSidecar>>&& sidecars);
+
+  // Assign the call ID for this call. This is called from the reactor
+  // thread once a connection has been assigned. Must only be called once.
+  void set_call_id(int32_t call_id) {
+    DCHECK_EQ(header_.call_id(), kInvalidCallId) << "Already has a call ID";
+    header_.set_call_id(call_id);
+  }
+
+  // Serialize the call for the wire. Requires that SetRequestPayload()
+  // is called first. This is called from the Reactor thread.
+  Status SerializeTo(std::vector<Slice>* slices);
+
+  // Callback after the call has been put on the outbound connection queue.
+  void SetQueued();
+
+  // Update the call state to show that the request has started being sent
+  // on the socket.
+  void SetSending();
+
+  // Update the call state to show that the request has been sent.
+  void SetSent();
+
+  // Mark the call as failed. This also triggers the callback to notify
+  // the caller. If the call failed due to a remote error, then err_pb
+  // should be set to the error returned by the remote server. Takes
+  // ownership of 'err_pb'.
+  void SetFailed(const Status& status,
+                 Phase phase = Phase::REMOTE_CALL,
+                 ErrorStatusPB* err_pb = nullptr);
+
+  // Mark the call as timed out. This also triggers the callback to notify
+  // the caller.
+  void SetTimedOut(Phase phase);
+  bool IsTimedOut() const;
+
+  bool IsNegotiationError() const;
+
+  // Is the call finished?
+  bool IsFinished() const;
+
+  // Fill in the call response.
+  void SetResponse(gscoped_ptr<CallResponse> resp);
+
+  const std::set<RpcFeatureFlag>& required_rpc_features() const {
+    return required_rpc_features_;
+  }
+
+  std::string ToString() const;
+
+  void DumpPB(const DumpRunningRpcsRequestPB& req, RpcCallInProgressPB* resp);
+
+  ////////////////////////////////////////////////////////////
+  // Getters
+  ////////////////////////////////////////////////////////////
+
+  const ConnectionId& conn_id() const { return conn_id_; }
+  const RemoteMethod& remote_method() const { return remote_method_; }
+  const ResponseCallback &callback() const { return callback_; }
+  RpcController* controller() { return controller_; }
+  const RpcController* controller() const { return controller_; }
+
+  // Return true if a call ID has been assigned to this call.
+  bool call_id_assigned() const {
+    return header_.call_id() != kInvalidCallId;
+  }
+
+  int32_t call_id() const {
+    DCHECK(call_id_assigned());
+    return header_.call_id();
+  }
+
+ private:
+  friend class RpcController;
+
+  // Various states the call propagates through.
+  // NB: if adding another state, be sure to update OutboundCall::IsFinished()
+  // and OutboundCall::StateName(State state) as well.
+  enum State {
+    READY = 0,
+    ON_OUTBOUND_QUEUE,
+    SENDING,
+    SENT,
+    NEGOTIATION_TIMED_OUT,
+    TIMED_OUT,
+    FINISHED_NEGOTIATION_ERROR,
+    FINISHED_ERROR,
+    FINISHED_SUCCESS
+  };
+
+  static std::string StateName(State state);
+
+  void set_state(State new_state);
+  State state() const;
+
+  // Same as set_state, but requires that the caller already holds
+  // lock_
+  void set_state_unlocked(State new_state);
+
+  // return current status
+  Status status() const;
+
+  // Time when the call was first initiatied.
+  MonoTime start_time_;
+
+  // Return the error protobuf, if a remote error occurred.
+  // This will only be non-NULL if status().IsRemoteError().
+  const ErrorStatusPB* error_pb() const;
+
+  // Lock for state_ status_, error_pb_ fields, since they
+  // may be mutated by the reactor thread while the client thread
+  // reads them.
+  mutable simple_spinlock lock_;
+  State state_;
+  Status status_;
+  gscoped_ptr<ErrorStatusPB> error_pb_;
+
+  // Call the user-provided callback.
+  void CallCallback();
+
+  // The RPC header.
+  // Parts of this (eg the call ID) are only assigned once this call has been
+  // passed to the reactor thread and assigned a connection.
+  RequestHeader header_;
+
+  // The remote method being called.
+  RemoteMethod remote_method_;
+
+  // RPC-system features required to send this call.
+  std::set<RpcFeatureFlag> required_rpc_features_;
+
+  const ConnectionId conn_id_;
+  ResponseCallback callback_;
+  RpcController* controller_;
+
+  // Pointer for the protobuf where the response should be written.
+  google::protobuf::Message* response_;
+
+  // Buffers for storing segments of the wire-format request.
+  faststring header_buf_;
+  faststring request_buf_;
+
+  // Once a response has been received for this call, contains that response.
+  // Otherwise NULL.
+  gscoped_ptr<CallResponse> call_response_;
+
+  // All sidecars to be sent with this call.
+  std::vector<std::unique_ptr<RpcSidecar>> sidecars_;
+
+  // Total size in bytes of all sidecars in 'sidecars_'. Set in SetRequestPayload().
+  int64_t sidecar_byte_size_ = -1;
+
+  DISALLOW_COPY_AND_ASSIGN(OutboundCall);
+};
+
+// A response to a call, on the client side.
+// Upon receiving a response, this is allocated in the reactor thread and filled
+// into the OutboundCall instance via OutboundCall::SetResponse.
+//
+// This may either be a success or error response.
+//
+// This class takes care of separating out the distinct payload slices sent
+// over.
+class CallResponse {
+ public:
+  CallResponse();
+
+  // Parse the response received from a call. This must be called before any
+  // other methods on this object.
+  Status ParseFrom(gscoped_ptr<InboundTransfer> transfer);
+
+  // Return true if the call succeeded.
+  bool is_success() const {
+    DCHECK(parsed_);
+    return !header_.is_error();
+  }
+
+  // Return the call ID that this response is related to.
+  int32_t call_id() const {
+    DCHECK(parsed_);
+    return header_.call_id();
+  }
+
+  // Return the serialized response data. This is just the response "body" --
+  // either a serialized ErrorStatusPB, or the serialized user response protobuf.
+  const Slice &serialized_response() const {
+    DCHECK(parsed_);
+    return serialized_response_;
+  }
+
+  // See RpcController::GetSidecar()
+  Status GetSidecar(int idx, Slice* sidecar) const;
+
+ private:
+  // True once ParseFrom() is called.
+  bool parsed_;
+
+  // The parsed header.
+  ResponseHeader header_;
+
+  // The slice of data for the encoded protobuf response.
+  // This slice refers to memory allocated by transfer_
+  Slice serialized_response_;
+
+  // Slices of data for rpc sidecars. They point into memory owned by transfer_.
+  Slice sidecar_slices_[TransferLimits::kMaxSidecars];
+
+  // The incoming transfer data - retained because serialized_response_
+  // and sidecar_slices_ refer into its data.
+  gscoped_ptr<InboundTransfer> transfer_;
+
+  DISALLOW_COPY_AND_ASSIGN(CallResponse);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/protoc-gen-krpc.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/protoc-gen-krpc.cc b/be/src/kudu/rpc/protoc-gen-krpc.cc
new file mode 100644
index 0000000..de41aa9
--- /dev/null
+++ b/be/src/kudu/rpc/protoc-gen-krpc.cc
@@ -0,0 +1,674 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+////////////////////////////////////////////////////////////////////////////////
+// Example usage:
+// protoc --plugin=protoc-gen-krpc --krpc_out . --proto_path . <file>.proto
+////////////////////////////////////////////////////////////////////////////////
+
+#include <ctype.h>
+
+#include <iostream>
+#include <map>
+#include <memory>
+#include <sstream>
+#include <string>
+
+#include <boost/optional.hpp>
+#include <glog/logging.h>
+#include <google/protobuf/compiler/code_generator.h>
+#include <google/protobuf/compiler/plugin.h>
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <google/protobuf/io/printer.h>
+#include <google/protobuf/io/zero_copy_stream.h>
+#include <google/protobuf/stubs/common.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/numbers.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/stringpiece.h"
+#include "kudu/gutil/strings/strip.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/status.h"
+#include "kudu/util/string_case.h"
+
+using boost::optional;
+using google::protobuf::FileDescriptor;
+using google::protobuf::io::Printer;
+using google::protobuf::MethodDescriptor;
+using google::protobuf::ServiceDescriptor;
+using std::map;
+using std::shared_ptr;
+using std::set;
+using std::string;
+using std::vector;
+
+namespace kudu {
+namespace rpc {
+
+namespace {
+
+// Return the name of the authorization method specified for this
+// RPC method, or boost::none if none is specified.
+//
+// This handles fallback to the service-wide default.
+optional<string> GetAuthzMethod(const MethodDescriptor& method) {
+  if (method.options().HasExtension(authz_method)) {
+    return method.options().GetExtension(authz_method);
+  }
+  if (method.service()->options().HasExtension(default_authz_method)) {
+    return method.service()->options().GetExtension(default_authz_method);
+  }
+  return boost::none;
+}
+
+} // anonymous namespace
+
+class Substituter {
+ public:
+  virtual ~Substituter() {}
+  virtual void InitSubstitutionMap(map<string, string> *map) const = 0;
+};
+
+// NameInfo contains information about the output names.
+class FileSubstitutions : public Substituter {
+ public:
+  static const std::string PROTO_EXTENSION;
+
+  Status Init(const FileDescriptor *file) {
+    string path = file->name();
+    map_["path"] = path;
+
+    // Initialize path_
+    // If path = /foo/bar/baz_stuff.proto, path_ = /foo/bar/baz_stuff
+    if (!TryStripSuffixString(path, PROTO_EXTENSION, &path_no_extension_)) {
+      return Status::InvalidArgument("file name " + path +
+                                     " did not end in " + PROTO_EXTENSION);
+    }
+    map_["path_no_extension"] = path_no_extension_;
+
+    // If path = /foo/bar/baz_stuff.proto, base_ = baz_stuff
+    string base;
+    GetBaseName(path_no_extension_, &base);
+    map_["base"] = base;
+
+    // If path = /foo/bar/baz_stuff.proto, camel_case_ = BazStuff
+    string camel_case;
+    SnakeToCamelCase(base, &camel_case);
+    map_["camel_case"] = camel_case;
+
+    // If path = /foo/bar/baz_stuff.proto, upper_case_ = BAZ_STUFF
+    string upper_case;
+    ToUpperCase(base, &upper_case);
+    map_["upper_case"] = upper_case;
+
+    map_["open_namespace"] = GenerateOpenNamespace(file->package());
+    map_["close_namespace"] = GenerateCloseNamespace(file->package());
+
+    return Status::OK();
+  }
+
+  virtual void InitSubstitutionMap(map<string, string> *map) const OVERRIDE {
+    typedef std::map<string, string>::value_type kv_pair;
+    for (const kv_pair &pair : map_) {
+      (*map)[pair.first] = pair.second;
+    }
+  }
+
+  std::string service_header() const {
+    return path_no_extension_ + ".service.h";
+  }
+
+  std::string service() const {
+    return path_no_extension_ + ".service.cc";
+  }
+
+  std::string proxy_header() const {
+    return path_no_extension_ + ".proxy.h";
+  }
+
+  std::string proxy() const {
+    return path_no_extension_ + ".proxy.cc";
+  }
+
+ private:
+  // Extract the last filename component.
+  static void GetBaseName(const string &path,
+                          string *base) {
+    size_t last_slash = path.find_last_of("/");
+    if (last_slash != string::npos) {
+      *base = path.substr(last_slash + 1);
+    } else {
+      *base = path;
+    }
+  }
+
+  static string GenerateOpenNamespace(const string &str) {
+    vector<string> components = strings::Split(str, ".");
+    string out;
+    for (const string &c : components) {
+      out.append("namespace ").append(c).append(" {\n");
+    }
+    return out;
+  }
+
+  static string GenerateCloseNamespace(const string &str) {
+    vector<string> components = strings::Split(str, ".");
+    string out;
+    for (auto c = components.crbegin(); c != components.crend(); c++) {
+      out.append("} // namespace ").append(*c).append("\n");
+    }
+    return out;
+  }
+
+  std::string path_no_extension_;
+  map<string, string> map_;
+};
+
+const std::string FileSubstitutions::PROTO_EXTENSION(".proto");
+
+class MethodSubstitutions : public Substituter {
+ public:
+  explicit MethodSubstitutions(const MethodDescriptor *method)
+    : method_(method) {
+  }
+
+  virtual void InitSubstitutionMap(map<string, string> *map) const OVERRIDE {
+
+    (*map)["rpc_name"] = method_->name();
+    (*map)["rpc_full_name"] = method_->full_name();
+    (*map)["rpc_full_name_plainchars"] =
+        StringReplace(method_->full_name(), ".", "_", true);
+    (*map)["request"] =
+        ReplaceNamespaceDelimiters(
+            StripNamespaceIfPossible(method_->service()->full_name(),
+                                     method_->input_type()->full_name()));
+    (*map)["response"] =
+        ReplaceNamespaceDelimiters(
+            StripNamespaceIfPossible(method_->service()->full_name(),
+                                     method_->output_type()->full_name()));
+    (*map)["metric_enum_key"] = strings::Substitute("kMetricIndex$0", method_->name());
+    bool track_result = static_cast<bool>(method_->options().GetExtension(track_rpc_result));
+    (*map)["track_result"] = track_result ? " true" : "false";
+    (*map)["authz_method"] = GetAuthzMethod(*method_).get_value_or("AuthorizeAllowAll");
+  }
+
+  // Strips the package from method arguments if they are in the same package as
+  // the service, otherwise leaves them so that we can have fully qualified
+  // namespaces for method arguments.
+  static std::string StripNamespaceIfPossible(const std::string& service_full_name,
+                                              const std::string& arg_full_name) {
+    StringPiece service_package(service_full_name);
+    if (!service_package.contains(".")) {
+      return arg_full_name;
+    }
+    // remove the service name so that we are left with only the package, including
+    // the last '.' so that we account for different packages with the same prefix.
+    service_package.remove_suffix(service_package.length() -
+                                  service_package.find_last_of(".") - 1);
+
+    StringPiece argfqn(arg_full_name);
+    if (argfqn.starts_with(service_package)) {
+      argfqn.remove_prefix(argfqn.find_last_of(".") + 1);
+    }
+    return argfqn.ToString();
+  }
+
+  static std::string ReplaceNamespaceDelimiters(const std::string& arg_full_name) {
+    return JoinStrings(strings::Split(arg_full_name, "."), "::");
+  }
+
+ private:
+  const MethodDescriptor *method_;
+};
+
+class ServiceSubstitutions : public Substituter {
+ public:
+  explicit ServiceSubstitutions(const ServiceDescriptor *service)
+    : service_(service)
+  {}
+
+  virtual void InitSubstitutionMap(map<string, string> *map) const OVERRIDE {
+    (*map)["service_name"] = service_->name();
+    (*map)["full_service_name"] = service_->full_name();
+    (*map)["service_method_count"] = SimpleItoa(service_->method_count());
+
+    // TODO: upgrade to protobuf 2.5.x and attach service comments
+    // to the generated service classes using the SourceLocation API.
+  }
+
+ private:
+  const ServiceDescriptor *service_;
+};
+
+
+class SubstitutionContext {
+ public:
+  // Takes ownership of the substituter
+  void Push(const Substituter *sub) {
+    subs_.push_back(shared_ptr<const Substituter>(sub));
+  }
+
+  void PushMethod(const MethodDescriptor *method) {
+    Push(new MethodSubstitutions(method));
+  }
+
+  void PushService(const ServiceDescriptor *service) {
+    Push(new ServiceSubstitutions(service));
+  }
+
+  void Pop() {
+    CHECK(!subs_.empty());
+    subs_.pop_back();
+  }
+
+  void InitSubstitutionMap(map<string, string> *subs) const {
+    for (const shared_ptr<const Substituter> &sub : subs_) {
+      sub->InitSubstitutionMap(subs);
+    }
+  }
+
+ private:
+  vector<shared_ptr<const Substituter> > subs_;
+};
+
+
+
+class CodeGenerator : public ::google::protobuf::compiler::CodeGenerator {
+ public:
+  CodeGenerator() { }
+
+  ~CodeGenerator() { }
+
+  bool Generate(const google::protobuf::FileDescriptor *file,
+        const std::string &/* parameter */,
+        google::protobuf::compiler::GeneratorContext *gen_context,
+        std::string *error) const OVERRIDE {
+    auto name_info = new FileSubstitutions();
+    Status ret = name_info->Init(file);
+    if (!ret.ok()) {
+      *error = "name_info.Init failed: " + ret.ToString();
+      return false;
+    }
+
+    SubstitutionContext subs;
+    subs.Push(name_info);
+
+    gscoped_ptr<google::protobuf::io::ZeroCopyOutputStream> ih_output(
+        gen_context->Open(name_info->service_header()));
+    Printer ih_printer(ih_output.get(), '$');
+    GenerateServiceIfHeader(&ih_printer, &subs, file);
+
+    gscoped_ptr<google::protobuf::io::ZeroCopyOutputStream> i_output(
+        gen_context->Open(name_info->service()));
+    Printer i_printer(i_output.get(), '$');
+    GenerateServiceIf(&i_printer, &subs, file);
+
+    gscoped_ptr<google::protobuf::io::ZeroCopyOutputStream> ph_output(
+        gen_context->Open(name_info->proxy_header()));
+    Printer ph_printer(ph_output.get(), '$');
+    GenerateProxyHeader(&ph_printer, &subs, file);
+
+    gscoped_ptr<google::protobuf::io::ZeroCopyOutputStream> p_output(
+        gen_context->Open(name_info->proxy()));
+    Printer p_printer(p_output.get(), '$');
+    GenerateProxy(&p_printer, &subs, file);
+
+    return true;
+  }
+
+ private:
+  void Print(Printer *printer,
+             const SubstitutionContext &sub,
+             const char *text) const {
+    map<string, string> subs;
+    sub.InitSubstitutionMap(&subs);
+    printer->Print(subs, text);
+  }
+
+  void GenerateServiceIfHeader(Printer *printer,
+                               SubstitutionContext *subs,
+                               const FileDescriptor *file) const {
+    Print(printer, *subs,
+      "// THIS FILE IS AUTOGENERATED FROM $path$\n"
+      "\n"
+      "#ifndef KUDU_RPC_$upper_case$_SERVICE_IF_DOT_H\n"
+      "#define KUDU_RPC_$upper_case$_SERVICE_IF_DOT_H\n"
+      "\n"
+      "#include \"$path_no_extension$.pb.h\"\n"
+      "\n"
+      "#include <functional>\n"
+      "#include <memory>\n"
+      "#include <string>\n"
+      "\n"
+      "#include \"kudu/rpc/rpc_header.pb.h\"\n"
+      "#include \"kudu/rpc/service_if.h\"\n"
+      "\n"
+      "namespace kudu {\n"
+      "class MetricEntity;\n"
+      "namespace rpc {\n"
+      "class Messenger;\n"
+      "class ResultTracker;\n"
+      "class RpcContext;\n"
+      "} // namespace rpc\n"
+      "} // namespace kudu\n"
+      "\n"
+      "$open_namespace$"
+      "\n"
+      );
+
+    for (int service_idx = 0; service_idx < file->service_count();
+         ++service_idx) {
+      const ServiceDescriptor *service = file->service(service_idx);
+      subs->PushService(service);
+
+      Print(printer, *subs,
+        "class $service_name$If : public ::kudu::rpc::GeneratedServiceIf {\n"
+        " public:\n"
+        "  explicit $service_name$If(const scoped_refptr<::kudu::MetricEntity>& entity,"
+            " const scoped_refptr<::kudu::rpc::ResultTracker>& result_tracker);\n"
+        "  virtual ~$service_name$If();\n"
+        "  std::string service_name() const override;\n"
+        "  static std::string static_service_name();\n"
+        "\n"
+        );
+
+      set<string> authz_methods;
+      for (int method_idx = 0; method_idx < service->method_count();
+           ++method_idx) {
+        const MethodDescriptor *method = service->method(method_idx);
+        subs->PushMethod(method);
+
+        Print(printer, *subs,
+        "  virtual void $rpc_name$(const $request$ *req,\n"
+        "     $response$ *resp, ::kudu::rpc::RpcContext *context) = 0;\n"
+        );
+        subs->Pop();
+        if (auto m = GetAuthzMethod(*method)) {
+          authz_methods.insert(m.get());
+        }
+      }
+
+      if (!authz_methods.empty()) {
+        printer->Print(
+        "\n\n"
+        "  // Authorization methods\n"
+        "  // ---------------------\n\n");
+      }
+      for (const string& m : authz_methods) {
+        printer->Print({ {"m", m} },
+        "  virtual bool $m$(const google::protobuf::Message* req,\n"
+        "     google::protobuf::Message* resp, ::kudu::rpc::RpcContext *context) = 0;\n");
+      }
+
+      Print(printer, *subs,
+        "\n"
+        "};\n"
+      );
+
+      subs->Pop(); // Service
+    }
+
+    Print(printer, *subs,
+      "\n"
+      "$close_namespace$\n"
+      "#endif\n");
+  }
+
+  void GenerateServiceIf(Printer *printer,
+                         SubstitutionContext *subs,
+                         const FileDescriptor *file) const {
+    Print(printer, *subs,
+      "// THIS FILE IS AUTOGENERATED FROM $path$\n"
+      "\n"
+      "#include \"$path_no_extension$.pb.h\"\n"
+      "#include \"$path_no_extension$.service.h\"\n"
+      "\n"
+      "#include <glog/logging.h>\n"
+      "\n"
+      "#include \"kudu/rpc/inbound_call.h\"\n"
+      "#include \"kudu/rpc/remote_method.h\"\n"
+      "#include \"kudu/rpc/rpc_context.h\"\n"
+      "#include \"kudu/rpc/service_if.h\"\n"
+      "#include \"kudu/util/metrics.h\"\n"
+      "\n");
+
+    // Define metric prototypes for each method in the service.
+    for (int service_idx = 0; service_idx < file->service_count();
+        ++service_idx) {
+      const ServiceDescriptor *service = file->service(service_idx);
+      subs->PushService(service);
+
+      for (int method_idx = 0; method_idx < service->method_count();
+          ++method_idx) {
+        const MethodDescriptor *method = service->method(method_idx);
+        subs->PushMethod(method);
+        Print(printer, *subs,
+          "METRIC_DEFINE_histogram(server, handler_latency_$rpc_full_name_plainchars$,\n"
+          "  \"$rpc_full_name$ RPC Time\",\n"
+          "  kudu::MetricUnit::kMicroseconds,\n"
+          "  \"Microseconds spent handling $rpc_full_name$() RPC requests\",\n"
+          "  60000000LU, 2);\n"
+          "\n");
+        subs->Pop();
+      }
+
+      subs->Pop();
+    }
+
+    Print(printer, *subs,
+      "using google::protobuf::Message;\n"
+      "using kudu::MetricEntity;\n"
+      "using kudu::rpc::ResultTracker;\n"
+      "using kudu::rpc::RpcContext;\n"
+      "using kudu::rpc::RpcMethodInfo;\n"
+      "using std::unique_ptr;\n"
+      "\n"
+      "$open_namespace$"
+      "\n");
+
+    for (int service_idx = 0; service_idx < file->service_count();
+         ++service_idx) {
+      const ServiceDescriptor *service = file->service(service_idx);
+      subs->PushService(service);
+
+      Print(printer, *subs,
+        "$service_name$If::$service_name$If(const scoped_refptr<MetricEntity>& entity,"
+            " const scoped_refptr<ResultTracker>& result_tracker) {\n"
+            "result_tracker_ = result_tracker;\n"
+      );
+      for (int method_idx = 0; method_idx < service->method_count();
+           ++method_idx) {
+        const MethodDescriptor *method = service->method(method_idx);
+        subs->PushMethod(method);
+
+        Print(printer, *subs,
+              "  {\n"
+              "    scoped_refptr<RpcMethodInfo> mi(new RpcMethodInfo());\n"
+              "    mi->req_prototype.reset(new $request$());\n"
+              "    mi->resp_prototype.reset(new $response$());\n"
+              "    mi->authz_method = [this](const Message* req, Message* resp,\n"
+              "                              RpcContext* ctx) {\n"
+              "      return this->$authz_method$(static_cast<const $request$*>(req),\n"
+              "                           static_cast<$response$*>(resp),\n"
+              "                           ctx);\n"
+              "    };\n"
+              "    mi->track_result = $track_result$;\n"
+              "    mi->handler_latency_histogram =\n"
+              "        METRIC_handler_latency_$rpc_full_name_plainchars$.Instantiate(entity);\n"
+              "    mi->func = [this](const Message* req, Message* resp, RpcContext* ctx) {\n"
+              "      this->$rpc_name$(static_cast<const $request$*>(req),\n"
+              "                       static_cast<$response$*>(resp),\n"
+              "                       ctx);\n"
+              "    };\n"
+              "    methods_by_name_[\"$rpc_name$\"] = std::move(mi);\n"
+              "  }\n");
+        subs->Pop();
+      }
+
+      Print(printer, *subs,
+        "}\n"
+        "\n"
+        "$service_name$If::~$service_name$If() {\n"
+        "}\n"
+        "\n"
+        "std::string $service_name$If::service_name() const {\n"
+        "  return \"$full_service_name$\";\n"
+        "}\n"
+        "std::string $service_name$If::static_service_name() {\n"
+        "  return \"$full_service_name$\";\n"
+        "}\n"
+        "\n"
+      );
+
+      subs->Pop();
+    }
+
+    Print(printer, *subs,
+      "$close_namespace$"
+      );
+  }
+
+  void GenerateProxyHeader(Printer *printer,
+                           SubstitutionContext *subs,
+                           const FileDescriptor *file) const {
+    Print(printer, *subs,
+      "// THIS FILE IS AUTOGENERATED FROM $path$\n"
+      "\n"
+      "#ifndef KUDU_RPC_$upper_case$_PROXY_DOT_H\n"
+      "#define KUDU_RPC_$upper_case$_PROXY_DOT_H\n"
+      "\n"
+      "#include \"$path_no_extension$.pb.h\"\n"
+      "\n"
+      "#include \"kudu/rpc/proxy.h\"\n"
+      "#include \"kudu/util/status.h\"\n"
+      "\n"
+      "namespace kudu { class Sockaddr; }\n"
+      "namespace kudu { namespace rpc { class UserCredentials; } }\n"
+      "$open_namespace$"
+      "\n"
+      "\n"
+    );
+
+    for (int service_idx = 0; service_idx < file->service_count();
+         ++service_idx) {
+      const ServiceDescriptor *service = file->service(service_idx);
+      subs->PushService(service);
+
+      Print(printer, *subs,
+        "class $service_name$Proxy : public ::kudu::rpc::Proxy {\n"
+        " public:\n"
+        "  $service_name$Proxy(const std::shared_ptr< ::kudu::rpc::Messenger>\n"
+        "                &messenger, const ::kudu::Sockaddr &sockaddr);\n"
+        "  ~$service_name$Proxy();\n"
+        "\n"
+        );
+
+      for (int method_idx = 0; method_idx < service->method_count();
+           ++method_idx) {
+        const MethodDescriptor *method = service->method(method_idx);
+        subs->PushMethod(method);
+
+        Print(printer, *subs,
+        "\n"
+        "  ::kudu::Status $rpc_name$(const $request$ &req, $response$ *resp,\n"
+        "                          ::kudu::rpc::RpcController *controller);\n"
+        "  void $rpc_name$Async(const $request$ &req,\n"
+        "                       $response$ *response,\n"
+        "                       ::kudu::rpc::RpcController *controller,\n"
+        "                       const ::kudu::rpc::ResponseCallback &callback);\n"
+        );
+        subs->Pop();
+      }
+      Print(printer, *subs,
+      "};\n");
+      subs->Pop();
+    }
+    Print(printer, *subs,
+      "\n"
+      "$close_namespace$"
+      "\n"
+      "#endif\n"
+      );
+  }
+
+  void GenerateProxy(Printer *printer,
+                     SubstitutionContext *subs,
+                     const FileDescriptor *file) const {
+    Print(printer, *subs,
+      "// THIS FILE IS AUTOGENERATED FROM $path$\n"
+      "\n"
+      "#include \"$path_no_extension$.proxy.h\"\n"
+      "\n"
+      "#include \"kudu/rpc/outbound_call.h\"\n"
+      "#include \"kudu/util/net/sockaddr.h\"\n"
+      "\n"
+      "$open_namespace$"
+      "\n"
+      );
+
+    for (int service_idx = 0; service_idx < file->service_count();
+         ++service_idx) {
+      const ServiceDescriptor *service = file->service(service_idx);
+      subs->PushService(service);
+      Print(printer, *subs,
+        "$service_name$Proxy::$service_name$Proxy(\n"
+        "   const std::shared_ptr< ::kudu::rpc::Messenger> &messenger,\n"
+        "   const ::kudu::Sockaddr &remote)\n"
+        "  : Proxy(messenger, remote, \"$full_service_name$\") {\n"
+        "}\n"
+        "\n"
+        "$service_name$Proxy::~$service_name$Proxy() {\n"
+        "}\n"
+        "\n"
+        "\n");
+      for (int method_idx = 0; method_idx < service->method_count();
+           ++method_idx) {
+        const MethodDescriptor *method = service->method(method_idx);
+        subs->PushMethod(method);
+        Print(printer, *subs,
+        "::kudu::Status $service_name$Proxy::$rpc_name$(const $request$ &req, $response$ *resp,\n"
+        "                                     ::kudu::rpc::RpcController *controller) {\n"
+        "  return SyncRequest(\"$rpc_name$\", req, resp, controller);\n"
+        "}\n"
+        "\n"
+        "void $service_name$Proxy::$rpc_name$Async(const $request$ &req,\n"
+        "                     $response$ *resp, ::kudu::rpc::RpcController *controller,\n"
+        "                     const ::kudu::rpc::ResponseCallback &callback) {\n"
+        "  AsyncRequest(\"$rpc_name$\", req, resp, controller, callback);\n"
+        "}\n"
+        "\n");
+        subs->Pop();
+      }
+
+      subs->Pop();
+    }
+    Print(printer, *subs,
+      "$close_namespace$");
+  }
+};
+} // namespace rpc
+} // namespace kudu
+
+int main(int argc, char *argv[]) {
+  kudu::rpc::CodeGenerator generator;
+  return google::protobuf::compiler::PluginMain(argc, argv, &generator);
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/proxy.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/proxy.cc b/be/src/kudu/rpc/proxy.cc
new file mode 100644
index 0000000..45ad5dd
--- /dev/null
+++ b/be/src/kudu/rpc/proxy.cc
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/proxy.h"
+
+#include <boost/bind.hpp>
+#include <glog/logging.h>
+#include <inttypes.h>
+#include <memory>
+#include <stdint.h>
+
+#include <iostream>
+#include <sstream>
+#include <vector>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/status.h"
+#include "kudu/util/user.h"
+
+using google::protobuf::Message;
+using std::string;
+using std::shared_ptr;
+
+namespace kudu {
+namespace rpc {
+
+Proxy::Proxy(std::shared_ptr<Messenger> messenger,
+             const Sockaddr& remote, string service_name)
+    : service_name_(std::move(service_name)),
+      messenger_(std::move(messenger)),
+      is_started_(false) {
+  CHECK(messenger_ != nullptr);
+  DCHECK(!service_name_.empty()) << "Proxy service name must not be blank";
+
+  // By default, we set the real user to the currently logged-in user.
+  // Effective user and password remain blank.
+  string real_user;
+  Status s = GetLoggedInUser(&real_user);
+  if (!s.ok()) {
+    LOG(WARNING) << "Proxy for " << service_name_ << ": Unable to get logged-in user name: "
+        << s.ToString() << " before connecting to remote: " << remote.ToString();
+  }
+
+  conn_id_.set_remote(remote);
+  conn_id_.mutable_user_credentials()->set_real_user(real_user);
+}
+
+Proxy::~Proxy() {
+}
+
+void Proxy::AsyncRequest(const string& method,
+                         const google::protobuf::Message& req,
+                         google::protobuf::Message* response,
+                         RpcController* controller,
+                         const ResponseCallback& callback) const {
+  CHECK(!controller->call_) << "Controller should be reset";
+  base::subtle::NoBarrier_Store(&is_started_, true);
+  RemoteMethod remote_method(service_name_, method);
+  controller->call_.reset(
+      new OutboundCall(conn_id_, remote_method, response, controller, callback));
+  controller->SetRequestParam(req);
+
+  // If this fails to queue, the callback will get called immediately
+  // and the controller will be in an ERROR state.
+  messenger_->QueueOutboundCall(controller->call_);
+}
+
+
+Status Proxy::SyncRequest(const string& method,
+                          const google::protobuf::Message& req,
+                          google::protobuf::Message* resp,
+                          RpcController* controller) const {
+  CountDownLatch latch(1);
+  AsyncRequest(method, req, DCHECK_NOTNULL(resp), controller,
+               boost::bind(&CountDownLatch::CountDown, boost::ref(latch)));
+
+  latch.Wait();
+  return controller->status();
+}
+
+void Proxy::set_user_credentials(const UserCredentials& user_credentials) {
+  CHECK(base::subtle::NoBarrier_Load(&is_started_) == false)
+    << "It is illegal to call set_user_credentials() after request processing has started";
+  conn_id_.set_user_credentials(user_credentials);
+}
+
+std::string Proxy::ToString() const {
+  return strings::Substitute("$0@$1", service_name_, conn_id_.ToString());
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/proxy.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/proxy.h b/be/src/kudu/rpc/proxy.h
new file mode 100644
index 0000000..ddbbe60
--- /dev/null
+++ b/be/src/kudu/rpc/proxy.h
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_PROXY_H
+#define KUDU_RPC_PROXY_H
+
+#include <memory>
+#include <string>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/status.h"
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+namespace rpc {
+
+class Messenger;
+
+// Interface to send calls to a remote service.
+//
+// Proxy objects do not map one-to-one with TCP connections.  The underlying TCP
+// connection is not established until the first call, and may be torn down and
+// re-established as necessary by the messenger. Additionally, the messenger is
+// likely to multiplex many Proxy objects on the same connection.
+//
+// Proxy objects are thread-safe after initialization only.
+// Setters on the Proxy are not thread-safe, and calling a setter after any RPC
+// request has started will cause a fatal error.
+//
+// After initialization, multiple threads may make calls using the same proxy object.
+class Proxy {
+ public:
+  Proxy(std::shared_ptr<Messenger> messenger, const Sockaddr& remote,
+        std::string service_name);
+  ~Proxy();
+
+  // Call a remote method asynchronously.
+  //
+  // Typically, users will not call this directly, but rather through
+  // a generated Proxy subclass.
+  //
+  // method: the method name to invoke on the remote server.
+  //
+  // req:  the request protobuf. This will be serialized immediately,
+  //       so the caller may free or otherwise mutate 'req' safely.
+  //
+  // resp: the response protobuf. This protobuf will be mutated upon
+  //       completion of the call. The RPC system does not take ownership
+  //       of this storage.
+  //
+  // NOTE: 'req' and 'resp' should be the appropriate protocol buffer implementation
+  // class corresponding to the parameter and result types of the service method
+  // defined in the service's '.proto' file.
+  //
+  // controller: the RpcController to associate with this call. Each call
+  //             must use a unique controller object. Does not take ownership.
+  //
+  // callback: the callback to invoke upon call completion. This callback may
+  //           be invoked before AsyncRequest() itself returns, or any time
+  //           thereafter. It may be invoked either on the caller's thread
+  //           or by an RPC IO thread, and thus should take care to not
+  //           block or perform any heavy CPU work.
+  void AsyncRequest(const std::string& method,
+                    const google::protobuf::Message& req,
+                    google::protobuf::Message* resp,
+                    RpcController* controller,
+                    const ResponseCallback& callback) const;
+
+  // The same as AsyncRequest(), except that the call blocks until the call
+  // finishes. If the call fails, returns a non-OK result.
+  Status SyncRequest(const std::string& method,
+                     const google::protobuf::Message& req,
+                     google::protobuf::Message* resp,
+                     RpcController* controller) const;
+
+  // Set the user credentials which should be used to log in.
+  void set_user_credentials(const UserCredentials& user_credentials);
+
+  // Get the user credentials which should be used to log in.
+  const UserCredentials& user_credentials() const { return conn_id_.user_credentials(); }
+
+  std::string ToString() const;
+
+ private:
+  const std::string service_name_;
+  std::shared_ptr<Messenger> messenger_;
+  ConnectionId conn_id_;
+  mutable Atomic32 is_started_;
+
+  DISALLOW_COPY_AND_ASSIGN(Proxy);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/reactor-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor-test.cc b/be/src/kudu/rpc/reactor-test.cc
new file mode 100644
index 0000000..2faac2a
--- /dev/null
+++ b/be/src/kudu/rpc/reactor-test.cc
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/reactor.h"
+
+#include "kudu/rpc/rpc-test-base.h"
+#include "kudu/util/countdown_latch.h"
+
+using std::shared_ptr;
+
+namespace kudu {
+namespace rpc {
+
+class ReactorTest : public RpcTestBase {
+ public:
+  ReactorTest()
+    : messenger_(CreateMessenger("my_messenger", 4)),
+      latch_(1) {
+  }
+
+  void ScheduledTask(const Status& status, const Status& expected_status) {
+    CHECK_EQ(expected_status.CodeAsString(), status.CodeAsString());
+    latch_.CountDown();
+  }
+
+  void ScheduledTaskCheckThread(const Status& status, const Thread* thread) {
+    CHECK_OK(status);
+    CHECK_EQ(thread, Thread::current_thread());
+    latch_.CountDown();
+  }
+
+  void ScheduledTaskScheduleAgain(const Status& status) {
+    messenger_->ScheduleOnReactor(
+        boost::bind(&ReactorTest::ScheduledTaskCheckThread, this, _1,
+                    Thread::current_thread()),
+        MonoDelta::FromMilliseconds(0));
+    latch_.CountDown();
+  }
+
+ protected:
+  const shared_ptr<Messenger> messenger_;
+  CountDownLatch latch_;
+};
+
+TEST_F(ReactorTest, TestFunctionIsCalled) {
+  messenger_->ScheduleOnReactor(
+      boost::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()),
+      MonoDelta::FromSeconds(0));
+  latch_.Wait();
+}
+
+TEST_F(ReactorTest, TestFunctionIsCalledAtTheRightTime) {
+  MonoTime before = MonoTime::Now();
+  messenger_->ScheduleOnReactor(
+      boost::bind(&ReactorTest::ScheduledTask, this, _1, Status::OK()),
+      MonoDelta::FromMilliseconds(100));
+  latch_.Wait();
+  MonoTime after = MonoTime::Now();
+  MonoDelta delta = after - before;
+  CHECK_GE(delta.ToMilliseconds(), 100);
+}
+
+TEST_F(ReactorTest, TestFunctionIsCalledIfReactorShutdown) {
+  messenger_->ScheduleOnReactor(
+      boost::bind(&ReactorTest::ScheduledTask, this, _1,
+                  Status::Aborted("doesn't matter")),
+      MonoDelta::FromSeconds(60));
+  messenger_->Shutdown();
+  latch_.Wait();
+}
+
+TEST_F(ReactorTest, TestReschedulesOnSameReactorThread) {
+  // Our scheduled task will schedule yet another task.
+  latch_.Reset(2);
+
+  messenger_->ScheduleOnReactor(
+      boost::bind(&ReactorTest::ScheduledTaskScheduleAgain, this, _1),
+      MonoDelta::FromSeconds(0));
+  latch_.Wait();
+  latch_.Wait();
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
new file mode 100644
index 0000000..e235dd4
--- /dev/null
+++ b/be/src/kudu/rpc/reactor.cc
@@ -0,0 +1,750 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/reactor.h"
+
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <stdlib.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <memory>
+#include <mutex>
+#include <string>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/optional.hpp>
+#include <ev++.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/client_negotiation.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/negotiation.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/debug/sanitizer_scopes.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/thread_restrictions.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+// When compiling on Mac OS X, use 'kqueue' instead of the default, 'select', for the event loop.
+// Otherwise we run into problems because 'select' can't handle connections when more than 1024
+// file descriptors are open by the process.
+#if defined(__APPLE__)
+static const int kDefaultLibEvFlags = ev::KQUEUE;
+#else
+static const int kDefaultLibEvFlags = ev::AUTO;
+#endif
+
+using std::string;
+using std::shared_ptr;
+using std::unique_ptr;
+using strings::Substitute;
+
+DEFINE_int64(rpc_negotiation_timeout_ms, 3000,
+             "Timeout for negotiating an RPC connection.");
+TAG_FLAG(rpc_negotiation_timeout_ms, advanced);
+TAG_FLAG(rpc_negotiation_timeout_ms, runtime);
+
+DEFINE_bool(rpc_reopen_outbound_connections, false,
+            "Open a new connection to the server for every RPC call. "
+            "If not enabled, an already existing connection to a "
+            "server is reused upon making another call to the same server. "
+            "When this flag is enabled, an already existing _idle_ connection "
+            "to the server is closed upon making another RPC call which would "
+            "reuse the connection otherwise. "
+            "Used by tests only.");
+TAG_FLAG(rpc_reopen_outbound_connections, unsafe);
+TAG_FLAG(rpc_reopen_outbound_connections, runtime);
+
+namespace kudu {
+namespace rpc {
+
+namespace {
+Status ShutdownError(bool aborted) {
+  const char* msg = "reactor is shutting down";
+  return aborted ?
+      Status::Aborted(msg, "", ESHUTDOWN) :
+      Status::ServiceUnavailable(msg, "", ESHUTDOWN);
+}
+} // anonymous namespace
+
+ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder& bld)
+  : loop_(kDefaultLibEvFlags),
+    cur_time_(MonoTime::Now()),
+    last_unused_tcp_scan_(cur_time_),
+    reactor_(reactor),
+    connection_keepalive_time_(bld.connection_keepalive_time_),
+    coarse_timer_granularity_(bld.coarse_timer_granularity_),
+    total_client_conns_cnt_(0),
+    total_server_conns_cnt_(0) {
+}
+
+Status ReactorThread::Init() {
+  DCHECK(thread_.get() == nullptr) << "Already started";
+  DVLOG(6) << "Called ReactorThread::Init()";
+  // Register to get async notifications in our epoll loop.
+  async_.set(loop_);
+  async_.set<ReactorThread, &ReactorThread::AsyncHandler>(this);
+  async_.start();
+
+  // Register the timer watcher.
+  // The timer is used for closing old TCP connections and applying
+  // backpressure.
+  timer_.set(loop_);
+  timer_.set<ReactorThread, &ReactorThread::TimerHandler>(this); // NOLINT(*)
+  timer_.start(coarse_timer_granularity_.ToSeconds(),
+               coarse_timer_granularity_.ToSeconds());
+
+  // Create Reactor thread.
+  return kudu::Thread::Create("reactor", "rpc reactor", &ReactorThread::RunThread, this, &thread_);
+}
+
+void ReactorThread::Shutdown() {
+  CHECK(reactor_->closing()) << "Should be called after setting closing_ flag";
+
+  VLOG(1) << name() << ": shutting down Reactor thread.";
+  WakeThread();
+}
+
+void ReactorThread::ShutdownInternal() {
+  DCHECK(IsCurrentThread());
+
+  // Tear down any outbound TCP connections.
+  Status service_unavailable = ShutdownError(false);
+  VLOG(1) << name() << ": tearing down outbound TCP connections...";
+  for (const auto& elem : client_conns_) {
+    const auto& conn = elem.second;
+    VLOG(1) << name() << ": shutting down " << conn->ToString();
+    conn->Shutdown(service_unavailable);
+  }
+  client_conns_.clear();
+
+  // Tear down any inbound TCP connections.
+  VLOG(1) << name() << ": tearing down inbound TCP connections...";
+  for (const auto& conn : server_conns_) {
+    VLOG(1) << name() << ": shutting down " << conn->ToString();
+    conn->Shutdown(service_unavailable);
+  }
+  server_conns_.clear();
+
+  // Abort any scheduled tasks.
+  //
+  // These won't be found in the ReactorThread's list of pending tasks
+  // because they've been "run" (that is, they've been scheduled).
+  Status aborted = ShutdownError(true); // aborted
+  for (DelayedTask* task : scheduled_tasks_) {
+    task->Abort(aborted); // should also free the task.
+  }
+  scheduled_tasks_.clear();
+
+  // Remove the OpenSSL thread state.
+  ERR_remove_thread_state(nullptr);
+}
+
+ReactorTask::ReactorTask() {
+}
+ReactorTask::~ReactorTask() {
+}
+
+Status ReactorThread::GetMetrics(ReactorMetrics* metrics) {
+  DCHECK(IsCurrentThread());
+  metrics->num_client_connections_ = client_conns_.size();
+  metrics->num_server_connections_ = server_conns_.size();
+  metrics->total_client_connections_ = total_client_conns_cnt_;
+  metrics->total_server_connections_ = total_server_conns_cnt_;
+  return Status::OK();
+}
+
+Status ReactorThread::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+                                      DumpRunningRpcsResponsePB* resp) {
+  DCHECK(IsCurrentThread());
+  for (const scoped_refptr<Connection>& conn : server_conns_) {
+    RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections()));
+  }
+  for (const conn_multimap_t::value_type& entry : client_conns_) {
+    Connection* conn = entry.second.get();
+    RETURN_NOT_OK(conn->DumpPB(req, resp->add_outbound_connections()));
+  }
+  return Status::OK();
+}
+
+void ReactorThread::WakeThread() {
+  // libev uses some lock-free synchronization, but doesn't have TSAN annotations.
+  // See http://lists.schmorp.de/pipermail/libev/2013q2/002178.html or KUDU-366
+  // for examples.
+  debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+  async_.send();
+}
+
+// Handle async events.  These events are sent to the reactor by other
+// threads that want to bring something to our attention, like the fact that
+// we're shutting down, or the fact that there is a new outbound Transfer
+// ready to send.
+void ReactorThread::AsyncHandler(ev::async& /*watcher*/, int /*revents*/) {
+  DCHECK(IsCurrentThread());
+
+  if (PREDICT_FALSE(reactor_->closing())) {
+    ShutdownInternal();
+    loop_.break_loop(); // break the epoll loop and terminate the thread
+    return;
+  }
+
+  boost::intrusive::list<ReactorTask> tasks;
+  reactor_->DrainTaskQueue(&tasks);
+
+  while (!tasks.empty()) {
+    ReactorTask& task = tasks.front();
+    tasks.pop_front();
+    task.Run(this);
+  }
+}
+
+void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) {
+  DCHECK(IsCurrentThread());
+
+  Status s = StartConnectionNegotiation(conn);
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG(ERROR) << "Server connection negotiation failed: " << s.ToString();
+    DestroyConnection(conn.get(), s);
+    return;
+  }
+  ++total_server_conns_cnt_;
+  server_conns_.emplace_back(std::move(conn));
+}
+
+void ReactorThread::AssignOutboundCall(const shared_ptr<OutboundCall>& call) {
+  DCHECK(IsCurrentThread());
+  scoped_refptr<Connection> conn;
+
+  Status s = FindOrStartConnection(call->conn_id(),
+                                   call->controller()->credentials_policy(),
+                                   &conn);
+  if (PREDICT_FALSE(!s.ok())) {
+    call->SetFailed(s, OutboundCall::Phase::CONNECTION_NEGOTIATION);
+    return;
+  }
+
+  conn->QueueOutboundCall(call);
+}
+
+//
+// Handles timer events.  The periodic timer:
+//
+// 1. updates Reactor::cur_time_
+// 2. every tcp_conn_timeo_ seconds, close down connections older than
+//    tcp_conn_timeo_ seconds.
+//
+void ReactorThread::TimerHandler(ev::timer& /*watcher*/, int revents) {
+  DCHECK(IsCurrentThread());
+  if (EV_ERROR & revents) {
+    LOG(WARNING) << "Reactor " << name() << " got an error in "
+      "the timer handler.";
+    return;
+  }
+  cur_time_ = MonoTime::Now();
+
+  ScanIdleConnections();
+}
+
+void ReactorThread::RegisterTimeout(ev::timer *watcher) {
+  watcher->set(loop_);
+}
+
+void ReactorThread::ScanIdleConnections() {
+  DCHECK(IsCurrentThread());
+  // Enforce TCP connection timeouts: server-side connections.
+  const auto server_conns_end = server_conns_.end();
+  uint64_t timed_out = 0;
+  for (auto it = server_conns_.begin(); it != server_conns_end; ) {
+    Connection* conn = it->get();
+    if (!conn->Idle()) {
+      VLOG(10) << "Connection " << conn->ToString() << " not idle";
+      ++it;
+      continue;
+    }
+
+    const MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
+    if (connection_delta <= connection_keepalive_time_) {
+      ++it;
+      continue;
+    }
+
+    conn->Shutdown(Status::NetworkError(
+        Substitute("connection timed out after $0", connection_keepalive_time_.ToString())));
+    VLOG(1) << "Timing out connection " << conn->ToString() << " - it has been idle for "
+            << connection_delta.ToString();
+    ++timed_out;
+    it = server_conns_.erase(it);
+  }
+
+  // Take care of idle client-side connections marked for shutdown.
+  uint64_t shutdown = 0;
+  for (auto it = client_conns_.begin(); it != client_conns_.end();) {
+    Connection* conn = it->second.get();
+    if (conn->scheduled_for_shutdown() && conn->Idle()) {
+      conn->Shutdown(Status::NetworkError(
+          "connection has been marked for shutdown"));
+      it = client_conns_.erase(it);
+      ++shutdown;
+    } else {
+      ++it;
+    }
+  }
+  // TODO(aserbin): clients may want to set their keepalive timeout for idle
+  //                but not scheduled for shutdown connections.
+
+  VLOG_IF(1, timed_out > 0) << name() << ": timed out " << timed_out << " TCP connections.";
+  VLOG_IF(1, shutdown > 0) << name() << ": shutdown " << shutdown << " TCP connections.";
+}
+
+const std::string& ReactorThread::name() const {
+  return reactor_->name();
+}
+
+MonoTime ReactorThread::cur_time() const {
+  return cur_time_;
+}
+
+Reactor *ReactorThread::reactor() {
+  return reactor_;
+}
+
+bool ReactorThread::IsCurrentThread() const {
+  return thread_.get() == kudu::Thread::current_thread();
+}
+
+void ReactorThread::RunThread() {
+  ThreadRestrictions::SetWaitAllowed(false);
+  ThreadRestrictions::SetIOAllowed(false);
+  DVLOG(6) << "Calling ReactorThread::RunThread()...";
+  loop_.run(0);
+  VLOG(1) << name() << " thread exiting.";
+
+  // No longer need the messenger. This causes the messenger to
+  // get deleted when all the reactors exit.
+  reactor_->messenger_.reset();
+}
+
+Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
+                                            CredentialsPolicy cred_policy,
+                                            scoped_refptr<Connection>* conn) {
+  DCHECK(IsCurrentThread());
+  const auto range = client_conns_.equal_range(conn_id);
+  scoped_refptr<Connection> found_conn;
+  for (auto it = range.first; it != range.second;) {
+    const auto& c = it->second.get();
+    // * Do not use connections scheduled for shutdown to place new calls.
+    //
+    // * Do not use a connection with a non-compliant credentials policy.
+    //   Instead, open a new one, while marking the former as scheduled for
+    //   shutdown. This process converges: any connection that satisfies the
+    //   PRIMARY_CREDENTIALS policy automatically satisfies the ANY_CREDENTIALS
+    //   policy as well. The idea is to keep only one usable connection
+    //   identified by the specified 'conn_id'.
+    //
+    // * If the test-only 'one-connection-per-RPC' mode is enabled, connections
+    //   are re-established at every RPC call.
+    if (c->scheduled_for_shutdown() ||
+        !c->SatisfiesCredentialsPolicy(cred_policy) ||
+        PREDICT_FALSE(FLAGS_rpc_reopen_outbound_connections)) {
+      if (c->Idle()) {
+        // Shutdown idle connections to the target destination. Non-idle ones
+        // will be taken care of later by the idle connection scanner.
+        DCHECK_EQ(Connection::CLIENT, c->direction());
+        c->Shutdown(Status::NetworkError("connection is closed due to non-reuse policy"));
+        it = client_conns_.erase(it);
+        continue;
+      }
+      c->set_scheduled_for_shutdown();
+    } else {
+      DCHECK(!found_conn);
+      found_conn = c;
+      // Appropriate connection is found; continue further to take care of the
+      // rest of connections to mark them for shutdown if they are not
+      // satisfying the policy.
+    }
+    ++it;
+  }
+  if (found_conn) {
+    // Found matching not-to-be-shutdown connection: return it as the result.
+    conn->swap(found_conn);
+    return Status::OK();
+  }
+
+  // No connection to this remote. Need to create one.
+  VLOG(2) << name() << " FindOrStartConnection: creating "
+          << "new connection for " << conn_id.remote().ToString();
+
+  // Create a new socket and start connecting to the remote.
+  Socket sock;
+  RETURN_NOT_OK(CreateClientSocket(&sock));
+  RETURN_NOT_OK(StartConnect(&sock, conn_id.remote()));
+
+  unique_ptr<Socket> new_socket(new Socket(sock.Release()));
+
+  // Register the new connection in our map.
+  *conn = new Connection(
+      this, conn_id.remote(), std::move(new_socket), Connection::CLIENT, cred_policy);
+  (*conn)->set_local_user_credentials(conn_id.user_credentials());
+
+  // Kick off blocking client connection negotiation.
+  Status s = StartConnectionNegotiation(*conn);
+  if (s.IsIllegalState()) {
+    // Return a nicer error message to the user indicating -- if we just
+    // forward the status we'd get something generic like "ThreadPool is closing".
+    return Status::ServiceUnavailable("Client RPC Messenger shutting down");
+  }
+  // Propagate any other errors as-is.
+  RETURN_NOT_OK_PREPEND(s, "Unable to start connection negotiation thread");
+
+  // Insert into the client connection map to avoid duplicate connection requests.
+  client_conns_.emplace(conn_id, *conn);
+  ++total_client_conns_cnt_;
+
+  return Status::OK();
+}
+
+Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>& conn) {
+  DCHECK(IsCurrentThread());
+
+  // Set a limit on how long the server will negotiate with a new client.
+  MonoTime deadline = MonoTime::Now() +
+      MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_timeout_ms);
+
+  scoped_refptr<Trace> trace(new Trace());
+  ADOPT_TRACE(trace.get());
+  TRACE("Submitting negotiation task for $0", conn->ToString());
+  auto authentication = reactor()->messenger()->authentication();
+  auto encryption = reactor()->messenger()->encryption();
+  RETURN_NOT_OK(reactor()->messenger()->negotiation_pool()->SubmitClosure(
+        Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline)));
+  return Status::OK();
+}
+
+void ReactorThread::CompleteConnectionNegotiation(
+    const scoped_refptr<Connection>& conn,
+    const Status& status,
+    unique_ptr<ErrorStatusPB> rpc_error) {
+  DCHECK(IsCurrentThread());
+  if (PREDICT_FALSE(!status.ok())) {
+    DestroyConnection(conn.get(), status, std::move(rpc_error));
+    return;
+  }
+
+  // Switch the socket back to non-blocking mode after negotiation.
+  Status s = conn->SetNonBlocking(true);
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG(DFATAL) << "Unable to set connection to non-blocking mode: " << s.ToString();
+    DestroyConnection(conn.get(), s, std::move(rpc_error));
+    return;
+  }
+
+  conn->MarkNegotiationComplete();
+  conn->EpollRegister(loop_);
+}
+
+Status ReactorThread::CreateClientSocket(Socket *sock) {
+  Status ret = sock->Init(Socket::FLAG_NONBLOCKING);
+  if (ret.ok()) {
+    ret = sock->SetNoDelay(true);
+  }
+  LOG_IF(WARNING, !ret.ok())
+      << "failed to create an outbound connection because a new socket could not be created: "
+      << ret.ToString();
+  return ret;
+}
+
+Status ReactorThread::StartConnect(Socket *sock, const Sockaddr& remote) {
+  const Status ret = sock->Connect(remote);
+  if (ret.ok()) {
+    VLOG(3) << "StartConnect: connect finished immediately for " << remote.ToString();
+    return Status::OK();
+  }
+
+  int posix_code = ret.posix_code();
+  if (Socket::IsTemporarySocketError(posix_code) || posix_code == EINPROGRESS) {
+    VLOG(3) << "StartConnect: connect in progress for " << remote.ToString();
+    return Status::OK();
+  }
+
+  LOG(WARNING) << "Failed to create an outbound connection to " << remote.ToString()
+               << " because connect() failed: " << ret.ToString();
+  return ret;
+}
+
+void ReactorThread::DestroyConnection(Connection *conn,
+                                      const Status& conn_status,
+                                      unique_ptr<ErrorStatusPB> rpc_error) {
+  DCHECK(IsCurrentThread());
+
+  conn->Shutdown(conn_status, std::move(rpc_error));
+
+  // Unlink connection from lists.
+  if (conn->direction() == Connection::CLIENT) {
+    ConnectionId conn_id(conn->remote(), conn->local_user_credentials());
+    const auto range = client_conns_.equal_range(conn_id);
+    CHECK(range.first != range.second) << "Couldn't find connection " << conn->ToString();
+    // The client_conns_ container is a multi-map.
+    for (auto it = range.first; it != range.second;) {
+      if (it->second.get() == conn) {
+        it = client_conns_.erase(it);
+        break;
+      }
+      ++it;
+    }
+  } else if (conn->direction() == Connection::SERVER) {
+    auto it = server_conns_.begin();
+    while (it != server_conns_.end()) {
+      if ((*it).get() == conn) {
+        server_conns_.erase(it);
+        break;
+      }
+      ++it;
+    }
+  }
+}
+
+DelayedTask::DelayedTask(boost::function<void(const Status&)> func,
+                         MonoDelta when)
+    : func_(std::move(func)),
+      when_(when),
+      thread_(nullptr) {
+}
+
+void DelayedTask::Run(ReactorThread* thread) {
+  DCHECK(thread_ == nullptr) << "Task has already been scheduled";
+  DCHECK(thread->IsCurrentThread());
+
+  // Schedule the task to run later.
+  thread_ = thread;
+  timer_.set(thread->loop_);
+  timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this);
+  timer_.start(when_.ToSeconds(), // after
+               0);                // repeat
+  thread_->scheduled_tasks_.insert(this);
+}
+
+void DelayedTask::Abort(const Status& abort_status) {
+  func_(abort_status);
+  delete this;
+}
+
+void DelayedTask::TimerHandler(ev::timer& watcher, int revents) {
+  // We will free this task's memory.
+  thread_->scheduled_tasks_.erase(this);
+
+  if (EV_ERROR & revents) {
+    string msg = "Delayed task got an error in its timer handler";
+    LOG(WARNING) << msg;
+    Abort(Status::Aborted(msg)); // Will delete 'this'.
+  } else {
+    func_(Status::OK());
+    delete this;
+  }
+}
+
+Reactor::Reactor(shared_ptr<Messenger> messenger,
+                 int index, const MessengerBuilder& bld)
+    : messenger_(std::move(messenger)),
+      name_(StringPrintf("%s_R%03d", messenger_->name().c_str(), index)),
+      closing_(false),
+      thread_(this, bld) {
+}
+
+Status Reactor::Init() {
+  DVLOG(6) << "Called Reactor::Init()";
+  return thread_.Init();
+}
+
+void Reactor::Shutdown() {
+  {
+    std::lock_guard<LockType> l(lock_);
+    if (closing_) {
+      return;
+    }
+    closing_ = true;
+  }
+
+  thread_.Shutdown();
+
+  // Abort all pending tasks. No new tasks can get scheduled after this
+  // because ScheduleReactorTask() tests the closing_ flag set above.
+  Status aborted = ShutdownError(true);
+  while (!pending_tasks_.empty()) {
+    ReactorTask& task = pending_tasks_.front();
+    pending_tasks_.pop_front();
+    task.Abort(aborted);
+  }
+}
+
+Reactor::~Reactor() {
+  Shutdown();
+}
+
+const std::string& Reactor::name() const {
+  return name_;
+}
+
+bool Reactor::closing() const {
+  std::lock_guard<LockType> l(lock_);
+  return closing_;
+}
+
+// Task to call an arbitrary function within the reactor thread.
+class RunFunctionTask : public ReactorTask {
+ public:
+  explicit RunFunctionTask(boost::function<Status()> f)
+      : function_(std::move(f)), latch_(1) {}
+
+  void Run(ReactorThread* /*reactor*/) override {
+    status_ = function_();
+    latch_.CountDown();
+  }
+  void Abort(const Status& status) override {
+    status_ = status;
+    latch_.CountDown();
+  }
+
+  // Wait until the function has completed, and return the Status
+  // returned by the function.
+  Status Wait() {
+    latch_.Wait();
+    return status_;
+  }
+
+ private:
+  boost::function<Status()> function_;
+  Status status_;
+  CountDownLatch latch_;
+};
+
+Status Reactor::GetMetrics(ReactorMetrics *metrics) {
+  return RunOnReactorThread(boost::bind(&ReactorThread::GetMetrics, &thread_, metrics));
+}
+
+Status Reactor::RunOnReactorThread(const boost::function<Status()>& f) {
+  RunFunctionTask task(f);
+  ScheduleReactorTask(&task);
+  return task.Wait();
+}
+
+Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+                                DumpRunningRpcsResponsePB* resp) {
+  return RunOnReactorThread(boost::bind(&ReactorThread::DumpRunningRpcs, &thread_,
+                                        boost::ref(req), resp));
+}
+
+class RegisterConnectionTask : public ReactorTask {
+ public:
+  explicit RegisterConnectionTask(scoped_refptr<Connection> conn)
+      : conn_(std::move(conn)) {
+  }
+
+  void Run(ReactorThread* reactor) override {
+    reactor->RegisterConnection(std::move(conn_));
+    delete this;
+  }
+
+  void Abort(const Status& /*status*/) override {
+    // We don't need to Shutdown the connection since it was never registered.
+    // This is only used for inbound connections, and inbound connections will
+    // never have any calls added to them until they've been registered.
+    delete this;
+  }
+
+ private:
+  scoped_refptr<Connection> conn_;
+};
+
+void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr& remote) {
+  VLOG(3) << name_ << ": new inbound connection to " << remote.ToString();
+  unique_ptr<Socket> new_socket(new Socket(socket->Release()));
+  auto task = new RegisterConnectionTask(
+      new Connection(&thread_, remote, std::move(new_socket), Connection::SERVER));
+  ScheduleReactorTask(task);
+}
+
+// Task which runs in the reactor thread to assign an outbound call
+// to a connection.
+class AssignOutboundCallTask : public ReactorTask {
+ public:
+  explicit AssignOutboundCallTask(shared_ptr<OutboundCall> call)
+      : call_(std::move(call)) {}
+
+  void Run(ReactorThread* reactor) override {
+    reactor->AssignOutboundCall(call_);
+    delete this;
+  }
+
+  void Abort(const Status& status) override {
+    // It doesn't matter what is the actual phase of the OutboundCall: just set
+    // it to Phase::REMOTE_CALL to finilize the state of the call.
+    call_->SetFailed(status, OutboundCall::Phase::REMOTE_CALL);
+    delete this;
+  }
+
+ private:
+  shared_ptr<OutboundCall> call_;
+};
+
+void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall>& call) {
+  DVLOG(3) << name_ << ": queueing outbound call "
+           << call->ToString() << " to remote " << call->conn_id().remote().ToString();
+  ScheduleReactorTask(new AssignOutboundCallTask(call));
+}
+
+void Reactor::ScheduleReactorTask(ReactorTask *task) {
+  {
+    std::unique_lock<LockType> l(lock_);
+    if (closing_) {
+      // We guarantee the reactor lock is not taken when calling Abort().
+      l.unlock();
+      task->Abort(ShutdownError(false));
+      return;
+    }
+    pending_tasks_.push_back(*task);
+  }
+  thread_.WakeThread();
+}
+
+bool Reactor::DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks) { // NOLINT(*)
+  std::lock_guard<LockType> l(lock_);
+  if (closing_) {
+    return false;
+  }
+  tasks->swap(pending_tasks_);
+  return true;
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.h b/be/src/kudu/rpc/reactor.h
new file mode 100644
index 0000000..37eedd4
--- /dev/null
+++ b/be/src/kudu/rpc/reactor.h
@@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_REACTOR_H
+#define KUDU_RPC_REACTOR_H
+
+#include <stdint.h>
+
+#include <list>
+#include <map>
+#include <memory>
+#include <set>
+#include <string>
+
+#include <boost/function.hpp>
+#include <boost/intrusive/list.hpp>
+#include <boost/optional.hpp>
+#include <ev++.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Socket;
+
+namespace rpc {
+
+typedef std::list<scoped_refptr<Connection>> conn_list_t;
+
+class DumpRunningRpcsRequestPB;
+class DumpRunningRpcsResponsePB;
+class Messenger;
+class MessengerBuilder;
+class Reactor;
+enum class CredentialsPolicy;
+
+// Simple metrics information from within a reactor.
+struct ReactorMetrics {
+  // Number of client RPC connections currently connected.
+  int32_t num_client_connections_;
+  // Number of server RPC connections currently connected.
+  int32_t num_server_connections_;
+
+  // Total number of client RPC connections opened during Reactor's lifetime.
+  uint64_t total_client_connections_;
+  // Total number of server RPC connections opened during Reactor's lifetime.
+  uint64_t total_server_connections_;
+};
+
+// A task which can be enqueued to run on the reactor thread.
+class ReactorTask : public boost::intrusive::list_base_hook<> {
+ public:
+  ReactorTask();
+
+  // Run the task. 'reactor' is guaranteed to be the current thread.
+  virtual void Run(ReactorThread *reactor) = 0;
+
+  // Abort the task, in the case that the reactor shut down before the
+  // task could be processed. This may or may not run on the reactor thread
+  // itself.
+  //
+  // The Reactor guarantees that the Reactor lock is free when this
+  // method is called.
+  virtual void Abort(const Status &abort_status) {}
+
+  virtual ~ReactorTask();
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(ReactorTask);
+};
+
+// A ReactorTask that is scheduled to run at some point in the future.
+//
+// Semantically it works like RunFunctionTask with a few key differences:
+// 1. The user function is called during Abort. Put another way, the
+//    user function is _always_ invoked, even during reactor shutdown.
+// 2. To differentiate between Abort and non-Abort, the user function
+//    receives a Status as its first argument.
+class DelayedTask : public ReactorTask {
+ public:
+  DelayedTask(boost::function<void(const Status &)> func, MonoDelta when);
+
+  // Schedules the task for running later but doesn't actually run it yet.
+  void Run(ReactorThread* thread) override;
+
+  // Behaves like ReactorTask::Abort.
+  void Abort(const Status& abort_status) override;
+
+ private:
+  // libev callback for when the registered timer fires.
+  void TimerHandler(ev::timer& watcher, int revents);
+
+  // User function to invoke when timer fires or when task is aborted.
+  const boost::function<void(const Status&)> func_;
+
+  // Delay to apply to this task.
+  const MonoDelta when_;
+
+  // Link back to registering reactor thread.
+  ReactorThread* thread_;
+
+  // libev timer. Set when Run() is invoked.
+  ev::timer timer_;
+};
+
+// A ReactorThread is a libev event handler thread which manages I/O
+// on a list of sockets.
+//
+// All methods in this class are _only_ called from the reactor thread itself
+// except where otherwise specified. New methods should DCHECK(IsCurrentThread())
+// to ensure this.
+class ReactorThread {
+ public:
+  friend class Connection;
+
+  // Client-side connection map. Multiple connections could be open to a remote
+  // server if multiple credential policies are used for individual RPCs.
+  typedef std::unordered_multimap<ConnectionId, scoped_refptr<Connection>,
+                                  ConnectionIdHash, ConnectionIdEqual>
+      conn_multimap_t;
+
+  ReactorThread(Reactor *reactor, const MessengerBuilder &bld);
+
+  // This may be called from another thread.
+  Status Init();
+
+  // Add any connections on this reactor thread into the given status dump.
+  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+                         DumpRunningRpcsResponsePB* resp);
+
+  // Block until the Reactor thread is shut down
+  //
+  // This must be called from another thread.
+  void Shutdown();
+
+  // This method is thread-safe.
+  void WakeThread();
+
+  // libev callback for handling async notifications in our epoll thread.
+  void AsyncHandler(ev::async &watcher, int revents);
+
+  // libev callback for handling timer events in our epoll thread.
+  void TimerHandler(ev::timer &watcher, int revents);
+
+  // Register an epoll timer watcher with our event loop.
+  // Does not set a timeout or start it.
+  void RegisterTimeout(ev::timer *watcher);
+
+  // This may be called from another thread.
+  const std::string &name() const;
+
+  MonoTime cur_time() const;
+
+  // This may be called from another thread.
+  Reactor *reactor();
+
+  // Return true if this reactor thread is the thread currently
+  // running. Should be used in DCHECK assertions.
+  bool IsCurrentThread() const;
+
+  // Begin the process of connection negotiation.
+  // Must be called from the reactor thread.
+  Status StartConnectionNegotiation(const scoped_refptr<Connection>& conn);
+
+  // Transition back from negotiating to processing requests.
+  // Must be called from the reactor thread.
+  void CompleteConnectionNegotiation(const scoped_refptr<Connection>& conn,
+                                     const Status& status,
+                                     std::unique_ptr<ErrorStatusPB> rpc_error);
+
+  // Collect metrics.
+  // Must be called from the reactor thread.
+  Status GetMetrics(ReactorMetrics *metrics);
+
+ private:
+  friend class AssignOutboundCallTask;
+  friend class RegisterConnectionTask;
+  friend class DelayedTask;
+
+  // Run the main event loop of the reactor.
+  void RunThread();
+
+  // Find or create a new connection to the given remote.
+  // If such a connection already exists, returns that, otherwise creates a new one.
+  // May return a bad Status if the connect() call fails.
+  // The resulting connection object is managed internally by the reactor thread.
+  Status FindOrStartConnection(const ConnectionId& conn_id,
+                               CredentialsPolicy cred_policy,
+                               scoped_refptr<Connection>* conn);
+
+  // Shut down the given connection, removing it from the connection tracking
+  // structures of this reactor.
+  //
+  // The connection is not explicitly deleted -- shared_ptr reference counting
+  // may hold on to the object after this, but callers should assume that it
+  // _may_ be deleted by this call.
+  void DestroyConnection(Connection *conn, const Status &conn_status,
+                         std::unique_ptr<ErrorStatusPB> rpc_error = {});
+
+  // Scan any open connections for idle ones that have been idle longer than
+  // connection_keepalive_time_
+  void ScanIdleConnections();
+
+  // Create a new client socket (non-blocking, NODELAY)
+  static Status CreateClientSocket(Socket *sock);
+
+  // Initiate a new connection on the given socket.
+  static Status StartConnect(Socket *sock, const Sockaddr &remote);
+
+  // Assign a new outbound call to the appropriate connection object.
+  // If this fails, the call is marked failed and completed.
+  void AssignOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+  // Register a new connection.
+  void RegisterConnection(scoped_refptr<Connection> conn);
+
+  // Actually perform shutdown of the thread, tearing down any connections,
+  // etc. This is called from within the thread.
+  void ShutdownInternal();
+
+  scoped_refptr<kudu::Thread> thread_;
+
+  // our epoll object (or kqueue, etc).
+  ev::dynamic_loop loop_;
+
+  // Used by other threads to notify the reactor thread
+  ev::async async_;
+
+  // Handles the periodic timer.
+  ev::timer timer_;
+
+  // Scheduled (but not yet run) delayed tasks.
+  //
+  // Each task owns its own memory and must be freed by its TaskRun and
+  // Abort members, provided it was allocated on the heap.
+  std::set<DelayedTask*> scheduled_tasks_;
+
+  // The current monotonic time.  Updated every coarse_timer_granularity_secs_.
+  MonoTime cur_time_;
+
+  // last time we did TCP timeouts.
+  MonoTime last_unused_tcp_scan_;
+
+  // Map of sockaddrs to Connection objects for outbound (client) connections.
+  conn_multimap_t client_conns_;
+
+  // List of current connections coming into the server.
+  conn_list_t server_conns_;
+
+  Reactor *reactor_;
+
+  // If a connection has been idle for this much time, it is torn down.
+  const MonoDelta connection_keepalive_time_;
+
+  // Scan for idle connections on this granularity.
+  const MonoDelta coarse_timer_granularity_;
+
+  // Total number of client connections opened during Reactor's lifetime.
+  uint64_t total_client_conns_cnt_;
+
+  // Total number of server connections opened during Reactor's lifetime.
+  uint64_t total_server_conns_cnt_;
+};
+
+// A Reactor manages a ReactorThread
+class Reactor {
+ public:
+  Reactor(std::shared_ptr<Messenger> messenger,
+          int index,
+          const MessengerBuilder &bld);
+  Status Init();
+
+  // Block until the Reactor is shut down
+  void Shutdown();
+
+  ~Reactor();
+
+  const std::string &name() const;
+
+  // Collect metrics about the reactor.
+  Status GetMetrics(ReactorMetrics *metrics);
+
+  // Add any connections on this reactor thread into the given status dump.
+  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+                         DumpRunningRpcsResponsePB* resp);
+
+  // Queue a new incoming connection. Takes ownership of the underlying fd from
+  // 'socket', but not the Socket object itself.
+  // If the reactor is already shut down, takes care of closing the socket.
+  void RegisterInboundSocket(Socket *socket, const Sockaddr &remote);
+
+  // Queue a new call to be sent. If the reactor is already shut down, marks
+  // the call as failed.
+  void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+  // Schedule the given task's Run() method to be called on the
+  // reactor thread.
+  // If the reactor shuts down before it is run, the Abort method will be
+  // called.
+  // Does _not_ take ownership of 'task' -- the task should take care of
+  // deleting itself after running if it is allocated on the heap.
+  void ScheduleReactorTask(ReactorTask *task);
+
+  Status RunOnReactorThread(const boost::function<Status()>& f);
+
+  // If the Reactor is closing, returns false.
+  // Otherwise, drains the pending_tasks_ queue into the provided list.
+  bool DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks);
+
+  Messenger *messenger() const {
+    return messenger_.get();
+  }
+
+  // Indicates whether the reactor is shutting down.
+  //
+  // This method is thread-safe.
+  bool closing() const;
+
+  // Is this reactor's thread the current thread?
+  bool IsCurrentThread() const {
+    return thread_.IsCurrentThread();
+  }
+
+ private:
+  friend class ReactorThread;
+  typedef simple_spinlock LockType;
+  mutable LockType lock_;
+
+  // parent messenger
+  std::shared_ptr<Messenger> messenger_;
+
+  const std::string name_;
+
+  // Whether the reactor is shutting down.
+  // Guarded by lock_.
+  bool closing_;
+
+  // Tasks to be run within the reactor thread.
+  // Guarded by lock_.
+  boost::intrusive::list<ReactorTask> pending_tasks_; // NOLINT(build/include_what_you_use)
+
+  ReactorThread thread_;
+
+  DISALLOW_COPY_AND_ASSIGN(Reactor);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/remote_method.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_method.cc b/be/src/kudu/rpc/remote_method.cc
new file mode 100644
index 0000000..32ec40d
--- /dev/null
+++ b/be/src/kudu/rpc/remote_method.cc
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/rpc_header.pb.h"
+
+namespace kudu {
+namespace rpc {
+
+using strings::Substitute;
+
+RemoteMethod::RemoteMethod(std::string service_name,
+                           const std::string method_name)
+    : service_name_(std::move(service_name)), method_name_(method_name) {}
+
+void RemoteMethod::FromPB(const RemoteMethodPB& pb) {
+  DCHECK(pb.IsInitialized()) << "PB is uninitialized: " << pb.InitializationErrorString();
+  service_name_ = pb.service_name();
+  method_name_ = pb.method_name();
+}
+
+void RemoteMethod::ToPB(RemoteMethodPB* pb) const {
+  pb->set_service_name(service_name_);
+  pb->set_method_name(method_name_);
+}
+
+string RemoteMethod::ToString() const {
+  return Substitute("$0.$1", service_name_, method_name_);
+}
+
+} // namespace rpc
+} // namespace kudu