You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:53 UTC

[41/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
new file mode 100644
index 0000000..c1832ef
--- /dev/null
+++ b/be/src/kudu/rpc/reactor.cc
@@ -0,0 +1,918 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/reactor.h"
+
+#include <cerrno>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+
+#include <boost/bind.hpp> // IWYU pragma: keep
+#include <boost/intrusive/list.hpp>
+#include <boost/optional.hpp>
+#include <ev++.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/client_negotiation.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/negotiation.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/debug/sanitizer_scopes.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/thread_restrictions.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+// When compiling on Mac OS X, use 'kqueue' instead of the default, 'select', for the event loop.
+// Otherwise we run into problems because 'select' can't handle connections when more than 1024
+// file descriptors are open by the process.
+#if defined(__APPLE__)
+static const int kDefaultLibEvFlags = ev::KQUEUE;
+#else
+static const int kDefaultLibEvFlags = ev::AUTO;
+#endif
+
+using std::string;
+using std::shared_ptr;
+using std::unique_ptr;
+using strings::Substitute;
+
+DEFINE_bool(rpc_reopen_outbound_connections, false,
+            "Open a new connection to the server for every RPC call. "
+            "If not enabled, an already existing connection to a "
+            "server is reused upon making another call to the same server. "
+            "When this flag is enabled, an already existing _idle_ connection "
+            "to the server is closed upon making another RPC call which would "
+            "reuse the connection otherwise. "
+            "Used by tests only.");
+TAG_FLAG(rpc_reopen_outbound_connections, unsafe);
+TAG_FLAG(rpc_reopen_outbound_connections, runtime);
+
+METRIC_DEFINE_histogram(server, reactor_load_percent,
+                        "Reactor Thread Load Percentage",
+                        kudu::MetricUnit::kUnits,
+                        "The percentage of time that the reactor is busy "
+                        "(not blocked awaiting network activity). If this metric "
+                        "shows significant samples nears 100%, increasing the "
+                        "number of reactors may be beneficial.", 100, 2);
+
+METRIC_DEFINE_histogram(server, reactor_active_latency_us,
+                        "Reactor Thread Active Latency",
+                        kudu::MetricUnit::kMicroseconds,
+                        "Histogram of the wall clock time for reactor thread wake-ups. "
+                        "The reactor thread is responsible for all network I/O and "
+                        "therefore outliers in this latency histogram directly contribute "
+                        "to the latency of both inbound and outbound RPCs.",
+                        1000000, 2);
+
+namespace kudu {
+namespace rpc {
+
+namespace {
+Status ShutdownError(bool aborted) {
+  const char* msg = "reactor is shutting down";
+  return aborted ?
+      Status::Aborted(msg, "", ESHUTDOWN) :
+      Status::ServiceUnavailable(msg, "", ESHUTDOWN);
+}
+
+// Callback for libev fatal errors (eg running out of file descriptors).
+// Unfortunately libev doesn't plumb these back through to the caller, but
+// instead just expects the callback to abort.
+//
+// This implementation is slightly preferable to the built-in one since
+// it uses a FATAL log message instead of printing to stderr, which might
+// not end up anywhere useful in a daemonized context.
+void LibevSysErr(const char* msg) throw() {
+  PLOG(FATAL) << "LibEV fatal error: " << msg;
+}
+
+void DoInitLibEv() {
+  ev::set_syserr_cb(LibevSysErr);
+}
+
+} // anonymous namespace
+
+ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder& bld)
+  : loop_(kDefaultLibEvFlags),
+    cur_time_(MonoTime::Now()),
+    last_unused_tcp_scan_(cur_time_),
+    reactor_(reactor),
+    connection_keepalive_time_(bld.connection_keepalive_time_),
+    coarse_timer_granularity_(bld.coarse_timer_granularity_),
+    total_client_conns_cnt_(0),
+    total_server_conns_cnt_(0) {
+
+  if (bld.metric_entity_) {
+    invoke_us_histogram_ =
+        METRIC_reactor_active_latency_us.Instantiate(bld.metric_entity_);
+    load_percent_histogram_ =
+        METRIC_reactor_load_percent.Instantiate(bld.metric_entity_);
+  }
+}
+
+Status ReactorThread::Init() {
+  DCHECK(thread_.get() == nullptr) << "Already started";
+  DVLOG(6) << "Called ReactorThread::Init()";
+  // Register to get async notifications in our epoll loop.
+  async_.set(loop_);
+  async_.set<ReactorThread, &ReactorThread::AsyncHandler>(this); // NOLINT(*)
+  async_.start();
+
+  // Register the timer watcher.
+  // The timer is used for closing old TCP connections and applying
+  // backpressure.
+  timer_.set(loop_);
+  timer_.set<ReactorThread, &ReactorThread::TimerHandler>(this); // NOLINT(*)
+  timer_.start(coarse_timer_granularity_.ToSeconds(),
+               coarse_timer_granularity_.ToSeconds());
+
+  // Register our callbacks. ev++ doesn't provide handy wrappers for these.
+  ev_set_userdata(loop_, this);
+  ev_set_loop_release_cb(loop_, &ReactorThread::AboutToPollCb, &ReactorThread::PollCompleteCb);
+  ev_set_invoke_pending_cb(loop_, &ReactorThread::InvokePendingCb);
+
+  // Create Reactor thread.
+  return kudu::Thread::Create("reactor", "rpc reactor", &ReactorThread::RunThread, this, &thread_);
+}
+
+void ReactorThread::InvokePendingCb(struct ev_loop* loop) {
+  // Calculate the number of cycles spent calling our callbacks.
+  // This is called quite frequently so we use CycleClock rather than MonoTime
+  // since it's a bit faster.
+  int64_t start = CycleClock::Now();
+  ev_invoke_pending(loop);
+  int64_t dur_cycles = CycleClock::Now() - start;
+
+  // Contribute this to our histogram.
+  ReactorThread* thr = static_cast<ReactorThread*>(ev_userdata(loop));
+  if (thr->invoke_us_histogram_) {
+    thr->invoke_us_histogram_->Increment(dur_cycles * 1000000 / base::CyclesPerSecond());
+  }
+}
+
+void ReactorThread::AboutToPollCb(struct ev_loop* loop) noexcept {
+  // Store the current time in a member variable to be picked up below
+  // in PollCompleteCb.
+  ReactorThread* thr = static_cast<ReactorThread*>(ev_userdata(loop));
+  thr->cycle_clock_before_poll_ = CycleClock::Now();
+}
+
+void ReactorThread::PollCompleteCb(struct ev_loop* loop) noexcept {
+  // First things first, capture the time, so that this is as accurate as possible
+  int64_t cycle_clock_after_poll = CycleClock::Now();
+
+  // Record it in our accounting.
+  ReactorThread* thr = static_cast<ReactorThread*>(ev_userdata(loop));
+  DCHECK_NE(thr->cycle_clock_before_poll_, -1)
+      << "PollCompleteCb called without corresponding AboutToPollCb";
+
+  int64_t poll_cycles = cycle_clock_after_poll - thr->cycle_clock_before_poll_;
+  thr->cycle_clock_before_poll_ = -1;
+  thr->total_poll_cycles_ += poll_cycles;
+}
+
+void ReactorThread::Shutdown(Messenger::ShutdownMode mode) {
+  CHECK(reactor_->closing()) << "Should be called after setting closing_ flag";
+
+  VLOG(1) << name() << ": shutting down Reactor thread.";
+  WakeThread();
+
+  if (mode == Messenger::ShutdownMode::SYNC) {
+    // Join() will return a bad status if asked to join on the currently
+    // running thread.
+    CHECK_OK(ThreadJoiner(thread_.get()).Join());
+  }
+}
+
+void ReactorThread::ShutdownInternal() {
+  DCHECK(IsCurrentThread());
+
+  // Tear down any outbound TCP connections.
+  Status service_unavailable = ShutdownError(false);
+  VLOG(1) << name() << ": tearing down outbound TCP connections...";
+  for (const auto& elem : client_conns_) {
+    const auto& conn = elem.second;
+    VLOG(1) << name() << ": shutting down " << conn->ToString();
+    conn->Shutdown(service_unavailable);
+  }
+  client_conns_.clear();
+
+  // Tear down any inbound TCP connections.
+  VLOG(1) << name() << ": tearing down inbound TCP connections...";
+  for (const auto& conn : server_conns_) {
+    VLOG(1) << name() << ": shutting down " << conn->ToString();
+    conn->Shutdown(service_unavailable);
+  }
+  server_conns_.clear();
+
+  // Abort any scheduled tasks.
+  //
+  // These won't be found in the ReactorThread's list of pending tasks
+  // because they've been "run" (that is, they've been scheduled).
+  Status aborted = ShutdownError(true); // aborted
+  while (!scheduled_tasks_.empty()) {
+    DelayedTask* t = &scheduled_tasks_.front();
+    scheduled_tasks_.pop_front();
+    t->Abort(aborted); // should also free the task.
+  }
+
+  // Remove the OpenSSL thread state.
+  //
+  // As of OpenSSL 1.1, this [1] is a no-op and can be ignored.
+  //
+  // 1. https://www.openssl.org/docs/man1.1.0/crypto/ERR_remove_thread_state.html
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+  ERR_remove_thread_state(nullptr);
+#endif
+}
+
+ReactorTask::ReactorTask() {
+}
+ReactorTask::~ReactorTask() {
+}
+
+Status ReactorThread::GetMetrics(ReactorMetrics* metrics) {
+  DCHECK(IsCurrentThread());
+  metrics->num_client_connections_ = client_conns_.size();
+  metrics->num_server_connections_ = server_conns_.size();
+  metrics->total_client_connections_ = total_client_conns_cnt_;
+  metrics->total_server_connections_ = total_server_conns_cnt_;
+  return Status::OK();
+}
+
+Status ReactorThread::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+                                      DumpRunningRpcsResponsePB* resp) {
+  DCHECK(IsCurrentThread());
+  for (const scoped_refptr<Connection>& conn : server_conns_) {
+    RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections()));
+  }
+  for (const conn_multimap_t::value_type& entry : client_conns_) {
+    Connection* conn = entry.second.get();
+    RETURN_NOT_OK(conn->DumpPB(req, resp->add_outbound_connections()));
+  }
+  return Status::OK();
+}
+
+void ReactorThread::WakeThread() {
+  // libev uses some lock-free synchronization, but doesn't have TSAN annotations.
+  // See http://lists.schmorp.de/pipermail/libev/2013q2/002178.html or KUDU-366
+  // for examples.
+  debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+  async_.send();
+}
+
+// Handle async events.  These events are sent to the reactor by other
+// threads that want to bring something to our attention, like the fact that
+// we're shutting down, or the fact that there is a new outbound Transfer
+// ready to send.
+void ReactorThread::AsyncHandler(ev::async& /*watcher*/, int /*revents*/) {
+  DCHECK(IsCurrentThread());
+
+  if (PREDICT_FALSE(reactor_->closing())) {
+    ShutdownInternal();
+    loop_.break_loop(); // break the epoll loop and terminate the thread
+    return;
+  }
+
+  boost::intrusive::list<ReactorTask> tasks;
+  reactor_->DrainTaskQueue(&tasks);
+
+  while (!tasks.empty()) {
+    ReactorTask& task = tasks.front();
+    tasks.pop_front();
+    task.Run(this);
+  }
+}
+
+void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) {
+  DCHECK(IsCurrentThread());
+
+  Status s = StartConnectionNegotiation(conn);
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG(ERROR) << "Server connection negotiation failed: " << s.ToString();
+    DestroyConnection(conn.get(), s);
+    return;
+  }
+  ++total_server_conns_cnt_;
+  server_conns_.emplace_back(std::move(conn));
+}
+
+void ReactorThread::AssignOutboundCall(shared_ptr<OutboundCall> call) {
+  DCHECK(IsCurrentThread());
+
+  // Skip if the outbound has been cancelled already.
+  if (PREDICT_FALSE(call->IsCancelled())) {
+    return;
+  }
+
+  scoped_refptr<Connection> conn;
+  Status s = FindOrStartConnection(call->conn_id(),
+                                   call->controller()->credentials_policy(),
+                                   &conn);
+  if (PREDICT_FALSE(!s.ok())) {
+    call->SetFailed(std::move(s), OutboundCall::Phase::CONNECTION_NEGOTIATION);
+    return;
+  }
+
+  conn->QueueOutboundCall(std::move(call));
+}
+
+void ReactorThread::CancelOutboundCall(const shared_ptr<OutboundCall>& call) {
+  DCHECK(IsCurrentThread());
+
+  // If the callback has been invoked already, the cancellation is a no-op.
+  // The controller may be gone already if the callback has been invoked.
+  if (call->IsFinished()) {
+    return;
+  }
+
+  scoped_refptr<Connection> conn;
+  if (FindConnection(call->conn_id(),
+                     call->controller()->credentials_policy(),
+                     &conn)) {
+    conn->CancelOutboundCall(call);
+  }
+  call->Cancel();
+}
+
+//
+// Handles timer events.  The periodic timer:
+//
+// 1. updates Reactor::cur_time_
+// 2. every tcp_conn_timeo_ seconds, close down connections older than
+//    tcp_conn_timeo_ seconds.
+//
+void ReactorThread::TimerHandler(ev::timer& /*watcher*/, int revents) {
+  DCHECK(IsCurrentThread());
+  if (EV_ERROR & revents) {
+    LOG(WARNING) << "Reactor " << name() << " got an error in "
+      "the timer handler.";
+    return;
+  }
+  cur_time_ = MonoTime::Now();
+
+  // Compute load percentage.
+  int64_t now_cycles = CycleClock::Now();
+  if (last_load_measurement_.time_cycles != -1) {
+    int64_t cycles_delta = (now_cycles - last_load_measurement_.time_cycles);
+    int64_t poll_cycles_delta = total_poll_cycles_ - last_load_measurement_.poll_cycles;
+    double poll_fraction = static_cast<double>(poll_cycles_delta) / cycles_delta;
+    double active_fraction = 1 - poll_fraction;
+    if (load_percent_histogram_) {
+      load_percent_histogram_->Increment(static_cast<int>(active_fraction * 100));
+    }
+  }
+  last_load_measurement_.time_cycles = now_cycles;
+  last_load_measurement_.poll_cycles = total_poll_cycles_;
+
+  ScanIdleConnections();
+}
+
+void ReactorThread::RegisterTimeout(ev::timer *watcher) {
+  watcher->set(loop_);
+}
+
+void ReactorThread::ScanIdleConnections() {
+  DCHECK(IsCurrentThread());
+  // Enforce TCP connection timeouts: server-side connections.
+  const auto server_conns_end = server_conns_.end();
+  uint64_t timed_out = 0;
+  // Scan for idle server connections if it's enabled.
+  if (connection_keepalive_time_ >= MonoDelta::FromMilliseconds(0)) {
+    for (auto it = server_conns_.begin(); it != server_conns_end; ) {
+      Connection* conn = it->get();
+      if (!conn->Idle()) {
+        VLOG(10) << "Connection " << conn->ToString() << " not idle";
+        ++it;
+        continue;
+      }
+
+      const MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
+      if (connection_delta <= connection_keepalive_time_) {
+        ++it;
+        continue;
+      }
+
+      conn->Shutdown(Status::NetworkError(
+          Substitute("connection timed out after $0", connection_keepalive_time_.ToString())));
+      VLOG(1) << "Timing out connection " << conn->ToString() << " - it has been idle for "
+              << connection_delta.ToString();
+      ++timed_out;
+      it = server_conns_.erase(it);
+    }
+  }
+  // Take care of idle client-side connections marked for shutdown.
+  uint64_t shutdown = 0;
+  for (auto it = client_conns_.begin(); it != client_conns_.end();) {
+    Connection* conn = it->second.get();
+    if (conn->scheduled_for_shutdown() && conn->Idle()) {
+      conn->Shutdown(Status::NetworkError(
+          "connection has been marked for shutdown"));
+      it = client_conns_.erase(it);
+      ++shutdown;
+    } else {
+      ++it;
+    }
+  }
+  // TODO(aserbin): clients may want to set their keepalive timeout for idle
+  //                but not scheduled for shutdown connections.
+
+  VLOG_IF(1, timed_out > 0) << name() << ": timed out " << timed_out << " TCP connections.";
+  VLOG_IF(1, shutdown > 0) << name() << ": shutdown " << shutdown << " TCP connections.";
+}
+
+const std::string& ReactorThread::name() const {
+  return reactor_->name();
+}
+
+MonoTime ReactorThread::cur_time() const {
+  return cur_time_;
+}
+
+Reactor *ReactorThread::reactor() {
+  return reactor_;
+}
+
+bool ReactorThread::IsCurrentThread() const {
+  return thread_.get() == kudu::Thread::current_thread();
+}
+
+void ReactorThread::RunThread() {
+  ThreadRestrictions::SetWaitAllowed(false);
+  ThreadRestrictions::SetIOAllowed(false);
+  DVLOG(6) << "Calling ReactorThread::RunThread()...";
+  loop_.run(0);
+  VLOG(1) << name() << " thread exiting.";
+
+  // No longer need the messenger. This causes the messenger to
+  // get deleted when all the reactors exit.
+  reactor_->messenger_.reset();
+}
+
+bool ReactorThread::FindConnection(const ConnectionId& conn_id,
+                                   CredentialsPolicy cred_policy,
+                                   scoped_refptr<Connection>* conn) {
+  DCHECK(IsCurrentThread());
+  const auto range = client_conns_.equal_range(conn_id);
+  scoped_refptr<Connection> found_conn;
+  for (auto it = range.first; it != range.second;) {
+    const auto& c = it->second.get();
+    // * Do not use connections scheduled for shutdown to place new calls.
+    //
+    // * Do not use a connection with a non-compliant credentials policy.
+    //   Instead, open a new one, while marking the former as scheduled for
+    //   shutdown. This process converges: any connection that satisfies the
+    //   PRIMARY_CREDENTIALS policy automatically satisfies the ANY_CREDENTIALS
+    //   policy as well. The idea is to keep only one usable connection
+    //   identified by the specified 'conn_id'.
+    //
+    // * If the test-only 'one-connection-per-RPC' mode is enabled, connections
+    //   are re-established at every RPC call.
+    if (c->scheduled_for_shutdown() ||
+        !c->SatisfiesCredentialsPolicy(cred_policy) ||
+        PREDICT_FALSE(FLAGS_rpc_reopen_outbound_connections)) {
+      if (c->Idle()) {
+        // Shutdown idle connections to the target destination. Non-idle ones
+        // will be taken care of later by the idle connection scanner.
+        DCHECK_EQ(Connection::CLIENT, c->direction());
+        c->Shutdown(Status::NetworkError("connection is closed due to non-reuse policy"));
+        it = client_conns_.erase(it);
+        continue;
+      }
+      c->set_scheduled_for_shutdown();
+    } else {
+      DCHECK(!found_conn);
+      found_conn = c;
+      // Appropriate connection is found; continue further to take care of the
+      // rest of connections to mark them for shutdown if they are not
+      // satisfying the policy.
+    }
+    ++it;
+  }
+  if (found_conn) {
+    // Found matching not-to-be-shutdown connection: return it as the result.
+    conn->swap(found_conn);
+    return true;
+  }
+  return false;
+}
+
+Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
+                                            CredentialsPolicy cred_policy,
+                                            scoped_refptr<Connection>* conn) {
+  DCHECK(IsCurrentThread());
+  if (FindConnection(conn_id, cred_policy, conn)) {
+    return Status::OK();
+  }
+
+  // No connection to this remote. Need to create one.
+  VLOG(2) << name() << " FindOrStartConnection: creating "
+          << "new connection for " << conn_id.remote().ToString();
+
+  // Create a new socket and start connecting to the remote.
+  Socket sock;
+  RETURN_NOT_OK(CreateClientSocket(&sock));
+  RETURN_NOT_OK(StartConnect(&sock, conn_id.remote()));
+
+  unique_ptr<Socket> new_socket(new Socket(sock.Release()));
+
+  // Register the new connection in our map.
+  *conn = new Connection(
+      this, conn_id.remote(), std::move(new_socket), Connection::CLIENT, cred_policy);
+  (*conn)->set_outbound_connection_id(conn_id);
+
+  // Kick off blocking client connection negotiation.
+  Status s = StartConnectionNegotiation(*conn);
+  if (s.IsIllegalState()) {
+    // Return a nicer error message to the user indicating -- if we just
+    // forward the status we'd get something generic like "ThreadPool is closing".
+    return Status::ServiceUnavailable("Client RPC Messenger shutting down");
+  }
+  // Propagate any other errors as-is.
+  RETURN_NOT_OK_PREPEND(s, "Unable to start connection negotiation thread");
+
+  // Insert into the client connection map to avoid duplicate connection requests.
+  client_conns_.emplace(conn_id, *conn);
+  ++total_client_conns_cnt_;
+
+  return Status::OK();
+}
+
+Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>& conn) {
+  DCHECK(IsCurrentThread());
+
+  // Set a limit on how long the server will negotiate with a new client.
+  MonoTime deadline = MonoTime::Now() +
+      MonoDelta::FromMilliseconds(reactor()->messenger()->rpc_negotiation_timeout_ms());
+
+  scoped_refptr<Trace> trace(new Trace());
+  ADOPT_TRACE(trace.get());
+  TRACE("Submitting negotiation task for $0", conn->ToString());
+  auto authentication = reactor()->messenger()->authentication();
+  auto encryption = reactor()->messenger()->encryption();
+  ThreadPool* negotiation_pool =
+      reactor()->messenger()->negotiation_pool(conn->direction());
+  RETURN_NOT_OK(negotiation_pool->SubmitClosure(
+        Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline)));
+  return Status::OK();
+}
+
+void ReactorThread::CompleteConnectionNegotiation(
+    const scoped_refptr<Connection>& conn,
+    const Status& status,
+    unique_ptr<ErrorStatusPB> rpc_error) {
+  DCHECK(IsCurrentThread());
+  if (PREDICT_FALSE(!status.ok())) {
+    DestroyConnection(conn.get(), status, std::move(rpc_error));
+    return;
+  }
+
+  // Switch the socket back to non-blocking mode after negotiation.
+  Status s = conn->SetNonBlocking(true);
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG(DFATAL) << "Unable to set connection to non-blocking mode: " << s.ToString();
+    DestroyConnection(conn.get(), s, std::move(rpc_error));
+    return;
+  }
+
+  conn->MarkNegotiationComplete();
+  conn->EpollRegister(loop_);
+}
+
+Status ReactorThread::CreateClientSocket(Socket *sock) {
+  Status ret = sock->Init(Socket::FLAG_NONBLOCKING);
+  if (ret.ok()) {
+    ret = sock->SetNoDelay(true);
+  }
+  LOG_IF(WARNING, !ret.ok())
+      << "failed to create an outbound connection because a new socket could not be created: "
+      << ret.ToString();
+  return ret;
+}
+
+Status ReactorThread::StartConnect(Socket *sock, const Sockaddr& remote) {
+  const Status ret = sock->Connect(remote);
+  if (ret.ok()) {
+    VLOG(3) << "StartConnect: connect finished immediately for " << remote.ToString();
+    return Status::OK();
+  }
+
+  int posix_code = ret.posix_code();
+  if (Socket::IsTemporarySocketError(posix_code) || posix_code == EINPROGRESS) {
+    VLOG(3) << "StartConnect: connect in progress for " << remote.ToString();
+    return Status::OK();
+  }
+
+  LOG(WARNING) << "Failed to create an outbound connection to " << remote.ToString()
+               << " because connect() failed: " << ret.ToString();
+  return ret;
+}
+
+void ReactorThread::DestroyConnection(Connection *conn,
+                                      const Status& conn_status,
+                                      unique_ptr<ErrorStatusPB> rpc_error) {
+  DCHECK(IsCurrentThread());
+
+  conn->Shutdown(conn_status, std::move(rpc_error));
+
+  // Unlink connection from lists.
+  if (conn->direction() == Connection::CLIENT) {
+    const auto range = client_conns_.equal_range(conn->outbound_connection_id());
+    CHECK(range.first != range.second) << "Couldn't find connection " << conn->ToString();
+    // The client_conns_ container is a multi-map.
+    for (auto it = range.first; it != range.second;) {
+      if (it->second.get() == conn) {
+        it = client_conns_.erase(it);
+        break;
+      }
+      ++it;
+    }
+  } else if (conn->direction() == Connection::SERVER) {
+    auto it = server_conns_.begin();
+    while (it != server_conns_.end()) {
+      if ((*it).get() == conn) {
+        server_conns_.erase(it);
+        break;
+      }
+      ++it;
+    }
+  }
+}
+
+DelayedTask::DelayedTask(boost::function<void(const Status&)> func,
+                         MonoDelta when)
+    : func_(std::move(func)),
+      when_(when),
+      thread_(nullptr) {
+}
+
+void DelayedTask::Run(ReactorThread* thread) {
+  DCHECK(thread_ == nullptr) << "Task has already been scheduled";
+  DCHECK(thread->IsCurrentThread());
+  DCHECK(!is_linked()) << "Should not be linked on pending_tasks_ anymore";
+
+  // Schedule the task to run later.
+  thread_ = thread;
+  timer_.set(thread->loop_);
+  timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this); // NOLINT(*)
+  timer_.start(when_.ToSeconds(), // after
+               0);                // repeat
+  thread_->scheduled_tasks_.push_back(*this);
+}
+
+void DelayedTask::Abort(const Status& abort_status) {
+  func_(abort_status);
+  delete this;
+}
+
+void DelayedTask::TimerHandler(ev::timer& /*watcher*/, int revents) {
+  DCHECK(is_linked()) << "should be linked on scheduled_tasks_";
+  // We will free this task's memory.
+  thread_->scheduled_tasks_.erase(thread_->scheduled_tasks_.iterator_to(*this));
+
+  if (EV_ERROR & revents) {
+    string msg = "Delayed task got an error in its timer handler";
+    LOG(WARNING) << msg;
+    Abort(Status::Aborted(msg)); // Will delete 'this'.
+  } else {
+    func_(Status::OK());
+    delete this;
+  }
+}
+
+Reactor::Reactor(shared_ptr<Messenger> messenger,
+                 int index, const MessengerBuilder& bld)
+    : messenger_(std::move(messenger)),
+      name_(StringPrintf("%s_R%03d", messenger_->name().c_str(), index)),
+      closing_(false),
+      thread_(this, bld) {
+  static std::once_flag libev_once;
+  std::call_once(libev_once, DoInitLibEv);
+}
+
+Status Reactor::Init() {
+  DVLOG(6) << "Called Reactor::Init()";
+  return thread_.Init();
+}
+
+void Reactor::Shutdown(Messenger::ShutdownMode mode) {
+  {
+    std::lock_guard<LockType> l(lock_);
+    if (closing_) {
+      return;
+    }
+    closing_ = true;
+  }
+
+  thread_.Shutdown(mode);
+
+  // Abort all pending tasks. No new tasks can get scheduled after this
+  // because ScheduleReactorTask() tests the closing_ flag set above.
+  Status aborted = ShutdownError(true);
+  while (!pending_tasks_.empty()) {
+    ReactorTask& task = pending_tasks_.front();
+    pending_tasks_.pop_front();
+    task.Abort(aborted);
+  }
+}
+
+Reactor::~Reactor() {
+  Shutdown(Messenger::ShutdownMode::ASYNC);
+}
+
+const std::string& Reactor::name() const {
+  return name_;
+}
+
+bool Reactor::closing() const {
+  std::lock_guard<LockType> l(lock_);
+  return closing_;
+}
+
+// Task to call an arbitrary function within the reactor thread.
+class RunFunctionTask : public ReactorTask {
+ public:
+  explicit RunFunctionTask(boost::function<Status()> f)
+      : function_(std::move(f)), latch_(1) {}
+
+  void Run(ReactorThread* /*reactor*/) override {
+    status_ = function_();
+    latch_.CountDown();
+  }
+  void Abort(const Status& status) override {
+    status_ = status;
+    latch_.CountDown();
+  }
+
+  // Wait until the function has completed, and return the Status
+  // returned by the function.
+  Status Wait() {
+    latch_.Wait();
+    return status_;
+  }
+
+ private:
+  boost::function<Status()> function_;
+  Status status_;
+  CountDownLatch latch_;
+};
+
+Status Reactor::GetMetrics(ReactorMetrics *metrics) {
+  return RunOnReactorThread(boost::bind(&ReactorThread::GetMetrics, &thread_, metrics));
+}
+
+Status Reactor::RunOnReactorThread(const boost::function<Status()>& f) {
+  RunFunctionTask task(f);
+  ScheduleReactorTask(&task);
+  return task.Wait();
+}
+
+Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+                                DumpRunningRpcsResponsePB* resp) {
+  return RunOnReactorThread(boost::bind(&ReactorThread::DumpRunningRpcs, &thread_,
+                                        boost::ref(req), resp));
+}
+
+class RegisterConnectionTask : public ReactorTask {
+ public:
+  explicit RegisterConnectionTask(scoped_refptr<Connection> conn)
+      : conn_(std::move(conn)) {
+  }
+
+  void Run(ReactorThread* reactor) override {
+    reactor->RegisterConnection(std::move(conn_));
+    delete this;
+  }
+
+  void Abort(const Status& /*status*/) override {
+    // We don't need to Shutdown the connection since it was never registered.
+    // This is only used for inbound connections, and inbound connections will
+    // never have any calls added to them until they've been registered.
+    delete this;
+  }
+
+ private:
+  scoped_refptr<Connection> conn_;
+};
+
+void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr& remote) {
+  VLOG(3) << name_ << ": new inbound connection to " << remote.ToString();
+  unique_ptr<Socket> new_socket(new Socket(socket->Release()));
+  auto task = new RegisterConnectionTask(
+      new Connection(&thread_, remote, std::move(new_socket), Connection::SERVER));
+  ScheduleReactorTask(task);
+}
+
+// Task which runs in the reactor thread to assign an outbound call
+// to a connection.
+class AssignOutboundCallTask : public ReactorTask {
+ public:
+  explicit AssignOutboundCallTask(shared_ptr<OutboundCall> call)
+      : call_(std::move(call)) {}
+
+  void Run(ReactorThread* reactor) override {
+    reactor->AssignOutboundCall(std::move(call_));
+    delete this;
+  }
+
+  void Abort(const Status& status) override {
+    // It doesn't matter what is the actual phase of the OutboundCall: just set
+    // it to Phase::REMOTE_CALL to finalize the state of the call.
+    call_->SetFailed(status, OutboundCall::Phase::REMOTE_CALL);
+    delete this;
+  }
+
+ private:
+  shared_ptr<OutboundCall> call_;
+};
+
+void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall>& call) {
+  DVLOG(3) << name_ << ": queueing outbound call "
+           << call->ToString() << " to remote " << call->conn_id().remote().ToString();
+  // Test cancellation when 'call_' is in 'READY' state.
+  if (PREDICT_FALSE(call->ShouldInjectCancellation())) {
+    QueueCancellation(call);
+  }
+  ScheduleReactorTask(new AssignOutboundCallTask(call));
+}
+
+class CancellationTask : public ReactorTask {
+ public:
+  explicit CancellationTask(shared_ptr<OutboundCall> call)
+      : call_(std::move(call)) {}
+
+  void Run(ReactorThread* reactor) override {
+    reactor->CancelOutboundCall(call_);
+    delete this;
+  }
+
+  void Abort(const Status& /*status*/) override {
+    delete this;
+  }
+
+ private:
+  shared_ptr<OutboundCall> call_;
+};
+
+void Reactor::QueueCancellation(const shared_ptr<OutboundCall>& call) {
+  ScheduleReactorTask(new CancellationTask(call));
+}
+
+void Reactor::ScheduleReactorTask(ReactorTask *task) {
+  {
+    std::unique_lock<LockType> l(lock_);
+    if (closing_) {
+      // We guarantee the reactor lock is not taken when calling Abort().
+      l.unlock();
+      task->Abort(ShutdownError(false));
+      return;
+    }
+    pending_tasks_.push_back(*task);
+  }
+  thread_.WakeThread();
+}
+
+bool Reactor::DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks) { // NOLINT(*)
+  std::lock_guard<LockType> l(lock_);
+  if (closing_) {
+    return false;
+  }
+  tasks->swap(pending_tasks_);
+  return true;
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.h b/be/src/kudu/rpc/reactor.h
new file mode 100644
index 0000000..8884f54
--- /dev/null
+++ b/be/src/kudu/rpc/reactor.h
@@ -0,0 +1,427 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_REACTOR_H
+#define KUDU_RPC_REACTOR_H
+
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include <boost/function.hpp> // IWYU pragma: keep
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/list_hook.hpp>
+#include <ev++.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/connection_id.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+namespace kudu {
+
+class Sockaddr;
+class Socket;
+
+namespace rpc {
+
+typedef std::list<scoped_refptr<Connection>> conn_list_t;
+
+class DumpRunningRpcsRequestPB;
+class DumpRunningRpcsResponsePB;
+class OutboundCall;
+class Reactor;
+class ReactorThread;
+enum class CredentialsPolicy;
+
+// Simple metrics information from within a reactor.
+// TODO(todd): switch these over to use util/metrics.h style metrics.
+struct ReactorMetrics {
+  // Number of client RPC connections currently connected.
+  int32_t num_client_connections_;
+  // Number of server RPC connections currently connected.
+  int32_t num_server_connections_;
+
+  // Total number of client RPC connections opened during Reactor's lifetime.
+  uint64_t total_client_connections_;
+  // Total number of server RPC connections opened during Reactor's lifetime.
+  uint64_t total_server_connections_;
+};
+
+// A task which can be enqueued to run on the reactor thread.
+class ReactorTask : public boost::intrusive::list_base_hook<> {
+ public:
+  ReactorTask();
+
+  // Run the task. 'reactor' is guaranteed to be the current thread.
+  virtual void Run(ReactorThread *reactor) = 0;
+
+  // Abort the task, in the case that the reactor shut down before the
+  // task could be processed. This may or may not run on the reactor thread
+  // itself.
+  //
+  // The Reactor guarantees that the Reactor lock is free when this
+  // method is called.
+  virtual void Abort(const Status &abort_status) {}
+
+  virtual ~ReactorTask();
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(ReactorTask);
+};
+
+// A ReactorTask that is scheduled to run at some point in the future.
+//
+// Semantically it works like RunFunctionTask with a few key differences:
+// 1. The user function is called during Abort. Put another way, the
+//    user function is _always_ invoked, even during reactor shutdown.
+// 2. To differentiate between Abort and non-Abort, the user function
+//    receives a Status as its first argument.
+class DelayedTask : public ReactorTask {
+ public:
+  DelayedTask(boost::function<void(const Status &)> func, MonoDelta when);
+
+  // Schedules the task for running later but doesn't actually run it yet.
+  void Run(ReactorThread* thread) override;
+
+  // Behaves like ReactorTask::Abort.
+  void Abort(const Status& abort_status) override;
+
+ private:
+  // libev callback for when the registered timer fires.
+  void TimerHandler(ev::timer& watcher, int revents);
+
+  // User function to invoke when timer fires or when task is aborted.
+  const boost::function<void(const Status&)> func_;
+
+  // Delay to apply to this task.
+  const MonoDelta when_;
+
+  // Link back to registering reactor thread.
+  ReactorThread* thread_;
+
+  // libev timer. Set when Run() is invoked.
+  ev::timer timer_;
+};
+
+// A ReactorThread is a libev event handler thread which manages I/O
+// on a list of sockets.
+//
+// All methods in this class are _only_ called from the reactor thread itself
+// except where otherwise specified. New methods should DCHECK(IsCurrentThread())
+// to ensure this.
+class ReactorThread {
+ public:
+  friend class Connection;
+
+  // Client-side connection map. Multiple connections could be open to a remote
+  // server if multiple credential policies are used for individual RPCs.
+  typedef std::unordered_multimap<ConnectionId, scoped_refptr<Connection>,
+                                  ConnectionIdHash, ConnectionIdEqual>
+      conn_multimap_t;
+
+  ReactorThread(Reactor *reactor, const MessengerBuilder &bld);
+
+  // This may be called from another thread.
+  Status Init();
+
+  // Add any connections on this reactor thread into the given status dump.
+  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+                         DumpRunningRpcsResponsePB* resp);
+
+  // Shuts down a reactor thread, optionally waiting for it to exit.
+  // Reactor::Shutdown() must have been called already.
+  //
+  // If mode == SYNC, may not be called from the reactor thread itself.
+  void Shutdown(Messenger::ShutdownMode mode);
+
+  // This method is thread-safe.
+  void WakeThread();
+
+  // libev callback for handling async notifications in our epoll thread.
+  void AsyncHandler(ev::async &watcher, int revents);
+
+  // libev callback for handling timer events in our epoll thread.
+  void TimerHandler(ev::timer &watcher, int revents);
+
+  // Register an epoll timer watcher with our event loop.
+  // Does not set a timeout or start it.
+  void RegisterTimeout(ev::timer *watcher);
+
+  // This may be called from another thread.
+  const std::string &name() const;
+
+  MonoTime cur_time() const;
+
+  // This may be called from another thread.
+  Reactor *reactor();
+
+  // Return true if this reactor thread is the thread currently
+  // running. Should be used in DCHECK assertions.
+  bool IsCurrentThread() const;
+
+  // Begin the process of connection negotiation.
+  // Must be called from the reactor thread.
+  Status StartConnectionNegotiation(const scoped_refptr<Connection>& conn);
+
+  // Transition back from negotiating to processing requests.
+  // Must be called from the reactor thread.
+  void CompleteConnectionNegotiation(const scoped_refptr<Connection>& conn,
+                                     const Status& status,
+                                     std::unique_ptr<ErrorStatusPB> rpc_error);
+
+  // Collect metrics.
+  // Must be called from the reactor thread.
+  Status GetMetrics(ReactorMetrics *metrics);
+
+ private:
+  friend class AssignOutboundCallTask;
+  friend class CancellationTask;
+  friend class RegisterConnectionTask;
+  friend class DelayedTask;
+
+  // Run the main event loop of the reactor.
+  void RunThread();
+
+  // When libev has noticed that it needs to wake up an application watcher,
+  // it calls this callback. The callback simply calls back into libev's
+  // ev_invoke_pending() to trigger all the watcher callbacks, but
+  // wraps it with latency measurements.
+  static void InvokePendingCb(struct ev_loop* loop);
+
+  // Similarly, libev calls these functions before/after invoking epoll_wait().
+  // We use these to measure the amount of time spent waiting.
+  //
+  // NOTE: 'noexcept' is required to avoid compilation errors due to libev's
+  // use of the same exception specification.
+  static void AboutToPollCb(struct ev_loop* loop) noexcept;
+  static void PollCompleteCb(struct ev_loop* loop) noexcept;
+
+  // Find a connection to the given remote and returns it in 'conn'.
+  // Returns true if a connection is found. Returns false otherwise.
+  bool FindConnection(const ConnectionId& conn_id,
+                      CredentialsPolicy cred_policy,
+                      scoped_refptr<Connection>* conn);
+
+  // Find or create a new connection to the given remote.
+  // If such a connection already exists, returns that, otherwise creates a new one.
+  // May return a bad Status if the connect() call fails.
+  // The resulting connection object is managed internally by the reactor thread.
+  Status FindOrStartConnection(const ConnectionId& conn_id,
+                               CredentialsPolicy cred_policy,
+                               scoped_refptr<Connection>* conn);
+
+  // Shut down the given connection, removing it from the connection tracking
+  // structures of this reactor.
+  //
+  // The connection is not explicitly deleted -- shared_ptr reference counting
+  // may hold on to the object after this, but callers should assume that it
+  // _may_ be deleted by this call.
+  void DestroyConnection(Connection *conn, const Status &conn_status,
+                         std::unique_ptr<ErrorStatusPB> rpc_error = {});
+
+  // Scan any open connections for idle ones that have been idle longer than
+  // connection_keepalive_time_. If connection_keepalive_time_ < 0, the scan
+  // is skipped.
+  void ScanIdleConnections();
+
+  // Create a new client socket (non-blocking, NODELAY)
+  static Status CreateClientSocket(Socket *sock);
+
+  // Initiate a new connection on the given socket.
+  static Status StartConnect(Socket *sock, const Sockaddr &remote);
+
+  // Assign a new outbound call to the appropriate connection object.
+  // If this fails, the call is marked failed and completed.
+  void AssignOutboundCall(std::shared_ptr<OutboundCall> call);
+
+  // Cancel the outbound call. May update corresponding connection
+  // object to remove call from the CallAwaitingResponse object.
+  // Also mark the call as slated for cancellation so the callback
+  // may be invoked early if the RPC hasn't yet been sent or if it's
+  // waiting for a response from the remote.
+  void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+  // Register a new connection.
+  void RegisterConnection(scoped_refptr<Connection> conn);
+
+  // Actually perform shutdown of the thread, tearing down any connections,
+  // etc. This is called from within the thread.
+  void ShutdownInternal();
+
+  scoped_refptr<kudu::Thread> thread_;
+
+  // our epoll object (or kqueue, etc).
+  ev::dynamic_loop loop_;
+
+  // Used by other threads to notify the reactor thread
+  ev::async async_;
+
+  // Handles the periodic timer.
+  ev::timer timer_;
+
+  // Scheduled (but not yet run) delayed tasks.
+  //
+  // Each task owns its own memory and must be freed by its TaskRun and
+  // Abort members, provided it was allocated on the heap.
+  boost::intrusive::list<DelayedTask> scheduled_tasks_;
+
+  // The current monotonic time.  Updated every coarse_timer_granularity_secs_.
+  MonoTime cur_time_;
+
+  // last time we did TCP timeouts.
+  MonoTime last_unused_tcp_scan_;
+
+  // Map of sockaddrs to Connection objects for outbound (client) connections.
+  conn_multimap_t client_conns_;
+
+  // List of current connections coming into the server.
+  conn_list_t server_conns_;
+
+  Reactor *reactor_;
+
+  // If a connection has been idle for this much time, it is torn down.
+  const MonoDelta connection_keepalive_time_;
+
+  // Scan for idle connections on this granularity.
+  const MonoDelta coarse_timer_granularity_;
+
+  // Metrics.
+  scoped_refptr<Histogram> invoke_us_histogram_;
+  scoped_refptr<Histogram> load_percent_histogram_;
+
+  // Total number of client connections opened during Reactor's lifetime.
+  uint64_t total_client_conns_cnt_;
+
+  // Total number of server connections opened during Reactor's lifetime.
+  uint64_t total_server_conns_cnt_;
+
+  // Set prior to calling epoll and then reset back to -1 after each invocation
+  // completes. Used for accounting total_poll_cycles_.
+  int64_t cycle_clock_before_poll_ = -1;
+
+  // The total number of cycles spent in epoll_wait() since this thread
+  // started.
+  int64_t total_poll_cycles_ = 0;
+
+  // Accounting for determining load average in each cycle of TimerHandler.
+  struct {
+    // The cycle-time at which the load average was last calculated.
+    int64_t time_cycles = -1;
+    // The value of total_poll_cycles_ at the last-recorded time.
+    int64_t poll_cycles = -1;
+  } last_load_measurement_;
+};
+
+// A Reactor manages a ReactorThread
+class Reactor {
+ public:
+  Reactor(std::shared_ptr<Messenger> messenger,
+          int index,
+          const MessengerBuilder &bld);
+  Status Init();
+
+  // Shuts down the reactor and its corresponding thread, optionally waiting
+  // until the thread has exited.
+  void Shutdown(Messenger::ShutdownMode mode);
+
+  ~Reactor();
+
+  const std::string &name() const;
+
+  // Collect metrics about the reactor.
+  Status GetMetrics(ReactorMetrics *metrics);
+
+  // Add any connections on this reactor thread into the given status dump.
+  Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+                         DumpRunningRpcsResponsePB* resp);
+
+  // Queue a new incoming connection. Takes ownership of the underlying fd from
+  // 'socket', but not the Socket object itself.
+  // If the reactor is already shut down, takes care of closing the socket.
+  void RegisterInboundSocket(Socket *socket, const Sockaddr &remote);
+
+  // Queue a new call to be sent. If the reactor is already shut down, marks
+  // the call as failed.
+  void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+  // Queue a new reactor task to cancel an outbound call.
+  void QueueCancellation(const std::shared_ptr<OutboundCall> &call);
+
+  // Schedule the given task's Run() method to be called on the
+  // reactor thread.
+  // If the reactor shuts down before it is run, the Abort method will be
+  // called.
+  // Does _not_ take ownership of 'task' -- the task should take care of
+  // deleting itself after running if it is allocated on the heap.
+  void ScheduleReactorTask(ReactorTask *task);
+
+  Status RunOnReactorThread(const boost::function<Status()>& f);
+
+  // If the Reactor is closing, returns false.
+  // Otherwise, drains the pending_tasks_ queue into the provided list.
+  bool DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks);
+
+  Messenger *messenger() const {
+    return messenger_.get();
+  }
+
+  // Indicates whether the reactor is shutting down.
+  //
+  // This method is thread-safe.
+  bool closing() const;
+
+  // Is this reactor's thread the current thread?
+  bool IsCurrentThread() const {
+    return thread_.IsCurrentThread();
+  }
+
+ private:
+  friend class ReactorThread;
+  typedef simple_spinlock LockType;
+  mutable LockType lock_;
+
+  // parent messenger
+  std::shared_ptr<Messenger> messenger_;
+
+  const std::string name_;
+
+  // Whether the reactor is shutting down.
+  // Guarded by lock_.
+  bool closing_;
+
+  // Tasks to be run within the reactor thread.
+  // Guarded by lock_.
+  boost::intrusive::list<ReactorTask> pending_tasks_; // NOLINT(build/include_what_you_use)
+
+  ReactorThread thread_;
+
+  DISALLOW_COPY_AND_ASSIGN(Reactor);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/remote_method.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_method.cc b/be/src/kudu/rpc/remote_method.cc
new file mode 100644
index 0000000..70b0d02
--- /dev/null
+++ b/be/src/kudu/rpc/remote_method.cc
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <ostream>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/rpc_header.pb.h"
+
+namespace kudu {
+namespace rpc {
+
+using strings::Substitute;
+
+RemoteMethod::RemoteMethod(std::string service_name,
+                           std::string method_name)
+    : service_name_(std::move(service_name)),
+      method_name_(std::move(method_name)) {}
+
+void RemoteMethod::FromPB(const RemoteMethodPB& pb) {
+  DCHECK(pb.IsInitialized()) << "PB is uninitialized: " << pb.InitializationErrorString();
+  service_name_ = pb.service_name();
+  method_name_ = pb.method_name();
+}
+
+void RemoteMethod::ToPB(RemoteMethodPB* pb) const {
+  pb->set_service_name(service_name_);
+  pb->set_method_name(method_name_);
+}
+
+std::string RemoteMethod::ToString() const {
+  return Substitute("$0.$1", service_name_, method_name_);
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/remote_method.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_method.h b/be/src/kudu/rpc/remote_method.h
new file mode 100644
index 0000000..a0a35bb
--- /dev/null
+++ b/be/src/kudu/rpc/remote_method.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_REMOTE_METHOD_H_
+#define KUDU_RPC_REMOTE_METHOD_H_
+
+#include <string>
+
+namespace kudu {
+namespace rpc {
+
+class RemoteMethodPB;
+
+// Simple class that acts as a container for a fully qualified remote RPC name
+// and converts to/from RemoteMethodPB.
+// This class is also copyable and assignable for convenience reasons.
+class RemoteMethod {
+ public:
+  RemoteMethod() {}
+  RemoteMethod(std::string service_name, std::string method_name);
+  std::string service_name() const { return service_name_; }
+  std::string method_name() const { return method_name_; }
+
+  // Encode/decode to/from 'pb'.
+  void FromPB(const RemoteMethodPB& pb);
+  void ToPB(RemoteMethodPB* pb) const;
+
+  std::string ToString() const;
+
+ private:
+  std::string service_name_;
+  std::string method_name_;
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_REMOTE_METHOD_H_

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/remote_user.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_user.cc b/be/src/kudu/rpc/remote_user.cc
new file mode 100644
index 0000000..047f10a
--- /dev/null
+++ b/be/src/kudu/rpc/remote_user.cc
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/remote_user.h"
+
+#include <string>
+
+#include "kudu/gutil/strings/substitute.h"
+
+using std::string;
+
+namespace kudu {
+namespace rpc {
+
+string RemoteUser::ToString() const {
+  string ret;
+  strings::SubstituteAndAppend(&ret, "{username='$0'", username_);
+  if (principal_) {
+    strings::SubstituteAndAppend(&ret, ", principal='$0'", *principal_);
+  }
+  ret.append("}");
+  return ret;
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/remote_user.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_user.h b/be/src/kudu/rpc/remote_user.h
new file mode 100644
index 0000000..77ef294
--- /dev/null
+++ b/be/src/kudu/rpc/remote_user.h
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+#include <utility>
+
+#include <boost/optional/optional.hpp>
+
+namespace kudu {
+namespace rpc {
+
+// Server-side view of the remote authenticated user.
+//
+// This class may be read by multiple threads concurrently after
+// its initialization during RPC negotiation.
+class RemoteUser {
+ public:
+  // The method by which the remote user authenticated.
+  enum Method {
+    // No authentication (authentication was not required by the server
+    // and the user provided a username but it was not validated in any way)
+    UNAUTHENTICATED,
+    // Kerberos-authenticated.
+    KERBEROS,
+    // Authenticated by a Kudu authentication token.
+    AUTHN_TOKEN,
+    // Authenticated by a client certificate.
+    CLIENT_CERT
+  };
+
+  Method authenticated_by() const {
+    return authenticated_by_;
+  }
+
+  const std::string& username() const { return username_; }
+
+  boost::optional<std::string> principal() const {
+    return principal_;
+  }
+
+  void SetAuthenticatedByKerberos(std::string username,
+                                  std::string principal) {
+    authenticated_by_ = KERBEROS;
+    username_ = std::move(username);
+    principal_ = std::move(principal);
+  }
+
+  void SetUnauthenticated(std::string username) {
+    authenticated_by_ = UNAUTHENTICATED;
+    username_ = std::move(username);
+    principal_ = boost::none;
+  }
+
+  void SetAuthenticatedByClientCert(std::string username,
+                                    boost::optional<std::string> principal) {
+    authenticated_by_ = CLIENT_CERT;
+    username_ = std::move(username);
+    principal_ = std::move(principal);
+  }
+
+  void SetAuthenticatedByToken(std::string username) {
+    authenticated_by_ = AUTHN_TOKEN;
+    username_ = std::move(username);
+    principal_ = boost::none;
+  }
+
+  // Returns a string representation of the object.
+  std::string ToString() const;
+
+ private:
+  // The real username of the remote user. In the case of a Kerberos
+  // principal, this has already been mapped to a local username.
+  // TODO(todd): actually do the above mapping.
+  std::string username_;
+
+  // The full principal of the remote user. This is only set in the
+  // case of a strong-authenticated user.
+  boost::optional<std::string> principal_;
+
+  Method authenticated_by_ = UNAUTHENTICATED;
+};
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/request_tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/request_tracker-test.cc b/be/src/kudu/rpc/request_tracker-test.cc
new file mode 100644
index 0000000..f1e4d55
--- /dev/null
+++ b/be/src/kudu/rpc/request_tracker-test.cc
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/util/test_macros.h"
+
+using std::vector;
+
+namespace kudu {
+namespace rpc {
+
+TEST(RequestTrackerTest, TestSequenceNumberGeneration) {
+  const int MAX = 10;
+
+  scoped_refptr<RequestTracker> tracker_(new RequestTracker("test_client"));
+
+  // A new tracker should have no incomplete RPCs
+  RequestTracker::SequenceNumber seq_no = tracker_->FirstIncomplete();
+  ASSERT_EQ(seq_no, RequestTracker::kNoSeqNo);
+
+  vector<RequestTracker::SequenceNumber> generated_seq_nos;
+
+  // Generate MAX in flight RPCs, making sure they are correctly returned.
+  for (int i = 0; i < MAX; i++) {
+    ASSERT_OK(tracker_->NewSeqNo(&seq_no));
+    generated_seq_nos.push_back(seq_no);
+  }
+
+  // Now we should get a first incomplete.
+  ASSERT_EQ(generated_seq_nos[0], tracker_->FirstIncomplete());
+
+  // Marking 'first_incomplete' as done, should advance the first incomplete.
+  tracker_->RpcCompleted(tracker_->FirstIncomplete());
+
+  ASSERT_EQ(generated_seq_nos[1], tracker_->FirstIncomplete());
+
+  // Marking a 'middle' rpc, should not advance 'first_incomplete'.
+  tracker_->RpcCompleted(generated_seq_nos[5]);
+  ASSERT_EQ(generated_seq_nos[1], tracker_->FirstIncomplete());
+
+  // Marking half the rpc as complete should advance FirstIncomplete.
+  // Note that this also tests that RequestTracker::RpcCompleted() is idempotent, i.e. that
+  // marking the same sequence number as complete twice is a no-op.
+  for (int i = 0; i < MAX / 2; i++) {
+    tracker_->RpcCompleted(generated_seq_nos[i]);
+  }
+
+  ASSERT_EQ(generated_seq_nos[6], tracker_->FirstIncomplete());
+
+  for (int i = MAX / 2; i <= MAX; i++) {
+    ASSERT_OK(tracker_->NewSeqNo(&seq_no));
+    generated_seq_nos.push_back(seq_no);
+  }
+
+  // Marking them all as completed should cause RequestTracker::FirstIncomplete() to return
+  // Status::NotFound() again.
+  for (auto seq_no : generated_seq_nos) {
+    tracker_->RpcCompleted(seq_no);
+  }
+
+  ASSERT_EQ(tracker_->FirstIncomplete(), RequestTracker::kNoSeqNo);
+}
+
+} // namespace rpc
+} // namespace kudu
+

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/request_tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/request_tracker.cc b/be/src/kudu/rpc/request_tracker.cc
new file mode 100644
index 0000000..07806f8
--- /dev/null
+++ b/be/src/kudu/rpc/request_tracker.cc
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/request_tracker.h"
+
+#include <mutex>
+#include <string>
+#include <utility>
+
+#include "kudu/gutil/map-util.h"
+
+namespace kudu {
+namespace rpc {
+
+const RequestTracker::SequenceNumber RequestTracker::kNoSeqNo = -1;
+
+RequestTracker::RequestTracker(std::string client_id)
+    : client_id_(std::move(client_id)),
+      next_(0) {}
+
+Status RequestTracker::NewSeqNo(SequenceNumber* seq_no) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  *seq_no = next_;
+  InsertOrDie(&incomplete_rpcs_, *seq_no);
+  next_++;
+  return Status::OK();
+}
+
+RequestTracker::SequenceNumber RequestTracker::FirstIncomplete() {
+  std::lock_guard<simple_spinlock> l(lock_);
+  if (incomplete_rpcs_.empty()) return kNoSeqNo;
+  return *incomplete_rpcs_.begin();
+}
+
+void RequestTracker::RpcCompleted(const SequenceNumber& seq_no) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  incomplete_rpcs_.erase(seq_no);
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/request_tracker.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/request_tracker.h b/be/src/kudu/rpc/request_tracker.h
new file mode 100644
index 0000000..5cc3995
--- /dev/null
+++ b/be/src/kudu/rpc/request_tracker.h
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <set>
+#include <string>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+// RequestTracker implementation, inspired by:
+// "Implementing Linearizability at Large Scale and Low Latency" by Colin Lee et al.
+//
+// This generates sequence numbers for retriable RPCs and tracks the ongoing ones.
+// The main point of this is to enable exactly-once semantics, i.e. making sure that
+// an RPC is only executed once, by uniquely identifying each RPC that is sent to
+// the server.
+//
+// Note that the sequence numbers here are differet from RPC 'call ids'. A call id
+// uniquely identifies a call _to a server_. All calls have a call id that is
+// assigned incrementally. Sequence numbers, on the other hand, uniquely identify
+// the RPC operation itself. That is, if an RPC is retried on another server it will
+// have a different call id, but the same sequence number.
+//
+// By keeping track of the RPCs that are in-flight and which ones are completed
+// we can determine the first incomplete RPC. When this information is sent
+// to the server it can use it to garbage collect RPC results that it might be
+// saving for future retries, since it now knows there won't be any.
+//
+// This class is thread safe.
+class RequestTracker : public RefCountedThreadSafe<RequestTracker> {
+ public:
+  typedef int64_t SequenceNumber;
+  static const RequestTracker::SequenceNumber kNoSeqNo;
+  explicit RequestTracker(std::string client_id);
+
+  // Creates a new, unique, sequence number.
+  // Sequence numbers are assigned in increasing integer order.
+  // Returns Status::OK() and sets 'seq_no' if it was able to generate a sequence number
+  // or returns Status::ServiceUnavailable() if too many RPCs are in-flight, in which case
+  // the caller should try again later.
+  Status NewSeqNo(SequenceNumber* seq_no);
+
+  // Returns the sequence number of the first incomplete RPC.
+  // If there is no incomplete RPC returns kNoSeqNo.
+  SequenceNumber FirstIncomplete();
+
+  // Marks the rpc with 'seq_no' as completed.
+  void RpcCompleted(const SequenceNumber& seq_no);
+
+  // Returns the client id for this request tracker.
+  const std::string& client_id() { return client_id_; }
+ private:
+  // The client id for this request tracker.
+  const std::string client_id_;
+
+  // Lock that protects all non-const fields.
+  simple_spinlock lock_;
+
+  // The next sequence number.
+  SequenceNumber next_;
+
+  // The (ordered) set of incomplete RPCs.
+  std::set<SequenceNumber> incomplete_rpcs_;
+};
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/response_callback.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/response_callback.h b/be/src/kudu/rpc/response_callback.h
new file mode 100644
index 0000000..8c4fc03
--- /dev/null
+++ b/be/src/kudu/rpc/response_callback.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_RESPONSE_CALLBACK_H
+#define KUDU_RPC_RESPONSE_CALLBACK_H
+
+#include <boost/function.hpp>
+
+namespace kudu {
+namespace rpc {
+
+typedef boost::function<void()> ResponseCallback;
+
+}
+}
+
+#endif