You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:53 UTC
[41/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.cc b/be/src/kudu/rpc/reactor.cc
new file mode 100644
index 0000000..c1832ef
--- /dev/null
+++ b/be/src/kudu/rpc/reactor.cc
@@ -0,0 +1,918 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/reactor.h"
+
+#include <cerrno>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+
+#include <boost/bind.hpp> // IWYU pragma: keep
+#include <boost/intrusive/list.hpp>
+#include <boost/optional.hpp>
+#include <ev++.h>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/bind.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/client_negotiation.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/negotiation.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/debug/sanitizer_scopes.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/thread_restrictions.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+// When compiling on Mac OS X, use 'kqueue' instead of the default, 'select', for the event loop.
+// Otherwise we run into problems because 'select' can't handle connections when more than 1024
+// file descriptors are open by the process.
+#if defined(__APPLE__)
+static const int kDefaultLibEvFlags = ev::KQUEUE;
+#else
+static const int kDefaultLibEvFlags = ev::AUTO;
+#endif
+
+using std::string;
+using std::shared_ptr;
+using std::unique_ptr;
+using strings::Substitute;
+
+DEFINE_bool(rpc_reopen_outbound_connections, false,
+ "Open a new connection to the server for every RPC call. "
+ "If not enabled, an already existing connection to a "
+ "server is reused upon making another call to the same server. "
+ "When this flag is enabled, an already existing _idle_ connection "
+ "to the server is closed upon making another RPC call which would "
+ "reuse the connection otherwise. "
+ "Used by tests only.");
+TAG_FLAG(rpc_reopen_outbound_connections, unsafe);
+TAG_FLAG(rpc_reopen_outbound_connections, runtime);
+
+METRIC_DEFINE_histogram(server, reactor_load_percent,
+ "Reactor Thread Load Percentage",
+ kudu::MetricUnit::kUnits,
+ "The percentage of time that the reactor is busy "
+ "(not blocked awaiting network activity). If this metric "
+ "shows significant samples nears 100%, increasing the "
+ "number of reactors may be beneficial.", 100, 2);
+
+METRIC_DEFINE_histogram(server, reactor_active_latency_us,
+ "Reactor Thread Active Latency",
+ kudu::MetricUnit::kMicroseconds,
+ "Histogram of the wall clock time for reactor thread wake-ups. "
+ "The reactor thread is responsible for all network I/O and "
+ "therefore outliers in this latency histogram directly contribute "
+ "to the latency of both inbound and outbound RPCs.",
+ 1000000, 2);
+
+namespace kudu {
+namespace rpc {
+
+namespace {
+Status ShutdownError(bool aborted) {
+ const char* msg = "reactor is shutting down";
+ return aborted ?
+ Status::Aborted(msg, "", ESHUTDOWN) :
+ Status::ServiceUnavailable(msg, "", ESHUTDOWN);
+}
+
+// Callback for libev fatal errors (eg running out of file descriptors).
+// Unfortunately libev doesn't plumb these back through to the caller, but
+// instead just expects the callback to abort.
+//
+// This implementation is slightly preferable to the built-in one since
+// it uses a FATAL log message instead of printing to stderr, which might
+// not end up anywhere useful in a daemonized context.
+void LibevSysErr(const char* msg) throw() {
+ PLOG(FATAL) << "LibEV fatal error: " << msg;
+}
+
+void DoInitLibEv() {
+ ev::set_syserr_cb(LibevSysErr);
+}
+
+} // anonymous namespace
+
+ReactorThread::ReactorThread(Reactor *reactor, const MessengerBuilder& bld)
+ : loop_(kDefaultLibEvFlags),
+ cur_time_(MonoTime::Now()),
+ last_unused_tcp_scan_(cur_time_),
+ reactor_(reactor),
+ connection_keepalive_time_(bld.connection_keepalive_time_),
+ coarse_timer_granularity_(bld.coarse_timer_granularity_),
+ total_client_conns_cnt_(0),
+ total_server_conns_cnt_(0) {
+
+ if (bld.metric_entity_) {
+ invoke_us_histogram_ =
+ METRIC_reactor_active_latency_us.Instantiate(bld.metric_entity_);
+ load_percent_histogram_ =
+ METRIC_reactor_load_percent.Instantiate(bld.metric_entity_);
+ }
+}
+
+Status ReactorThread::Init() {
+ DCHECK(thread_.get() == nullptr) << "Already started";
+ DVLOG(6) << "Called ReactorThread::Init()";
+ // Register to get async notifications in our epoll loop.
+ async_.set(loop_);
+ async_.set<ReactorThread, &ReactorThread::AsyncHandler>(this); // NOLINT(*)
+ async_.start();
+
+ // Register the timer watcher.
+ // The timer is used for closing old TCP connections and applying
+ // backpressure.
+ timer_.set(loop_);
+ timer_.set<ReactorThread, &ReactorThread::TimerHandler>(this); // NOLINT(*)
+ timer_.start(coarse_timer_granularity_.ToSeconds(),
+ coarse_timer_granularity_.ToSeconds());
+
+ // Register our callbacks. ev++ doesn't provide handy wrappers for these.
+ ev_set_userdata(loop_, this);
+ ev_set_loop_release_cb(loop_, &ReactorThread::AboutToPollCb, &ReactorThread::PollCompleteCb);
+ ev_set_invoke_pending_cb(loop_, &ReactorThread::InvokePendingCb);
+
+ // Create Reactor thread.
+ return kudu::Thread::Create("reactor", "rpc reactor", &ReactorThread::RunThread, this, &thread_);
+}
+
+void ReactorThread::InvokePendingCb(struct ev_loop* loop) {
+ // Calculate the number of cycles spent calling our callbacks.
+ // This is called quite frequently so we use CycleClock rather than MonoTime
+ // since it's a bit faster.
+ int64_t start = CycleClock::Now();
+ ev_invoke_pending(loop);
+ int64_t dur_cycles = CycleClock::Now() - start;
+
+ // Contribute this to our histogram.
+ ReactorThread* thr = static_cast<ReactorThread*>(ev_userdata(loop));
+ if (thr->invoke_us_histogram_) {
+ thr->invoke_us_histogram_->Increment(dur_cycles * 1000000 / base::CyclesPerSecond());
+ }
+}
+
+void ReactorThread::AboutToPollCb(struct ev_loop* loop) noexcept {
+ // Store the current time in a member variable to be picked up below
+ // in PollCompleteCb.
+ ReactorThread* thr = static_cast<ReactorThread*>(ev_userdata(loop));
+ thr->cycle_clock_before_poll_ = CycleClock::Now();
+}
+
+void ReactorThread::PollCompleteCb(struct ev_loop* loop) noexcept {
+ // First things first, capture the time, so that this is as accurate as possible
+ int64_t cycle_clock_after_poll = CycleClock::Now();
+
+ // Record it in our accounting.
+ ReactorThread* thr = static_cast<ReactorThread*>(ev_userdata(loop));
+ DCHECK_NE(thr->cycle_clock_before_poll_, -1)
+ << "PollCompleteCb called without corresponding AboutToPollCb";
+
+ int64_t poll_cycles = cycle_clock_after_poll - thr->cycle_clock_before_poll_;
+ thr->cycle_clock_before_poll_ = -1;
+ thr->total_poll_cycles_ += poll_cycles;
+}
+
+void ReactorThread::Shutdown(Messenger::ShutdownMode mode) {
+ CHECK(reactor_->closing()) << "Should be called after setting closing_ flag";
+
+ VLOG(1) << name() << ": shutting down Reactor thread.";
+ WakeThread();
+
+ if (mode == Messenger::ShutdownMode::SYNC) {
+ // Join() will return a bad status if asked to join on the currently
+ // running thread.
+ CHECK_OK(ThreadJoiner(thread_.get()).Join());
+ }
+}
+
+void ReactorThread::ShutdownInternal() {
+ DCHECK(IsCurrentThread());
+
+ // Tear down any outbound TCP connections.
+ Status service_unavailable = ShutdownError(false);
+ VLOG(1) << name() << ": tearing down outbound TCP connections...";
+ for (const auto& elem : client_conns_) {
+ const auto& conn = elem.second;
+ VLOG(1) << name() << ": shutting down " << conn->ToString();
+ conn->Shutdown(service_unavailable);
+ }
+ client_conns_.clear();
+
+ // Tear down any inbound TCP connections.
+ VLOG(1) << name() << ": tearing down inbound TCP connections...";
+ for (const auto& conn : server_conns_) {
+ VLOG(1) << name() << ": shutting down " << conn->ToString();
+ conn->Shutdown(service_unavailable);
+ }
+ server_conns_.clear();
+
+ // Abort any scheduled tasks.
+ //
+ // These won't be found in the ReactorThread's list of pending tasks
+ // because they've been "run" (that is, they've been scheduled).
+ Status aborted = ShutdownError(true); // aborted
+ while (!scheduled_tasks_.empty()) {
+ DelayedTask* t = &scheduled_tasks_.front();
+ scheduled_tasks_.pop_front();
+ t->Abort(aborted); // should also free the task.
+ }
+
+ // Remove the OpenSSL thread state.
+ //
+ // As of OpenSSL 1.1, this [1] is a no-op and can be ignored.
+ //
+ // 1. https://www.openssl.org/docs/man1.1.0/crypto/ERR_remove_thread_state.html
+#if OPENSSL_VERSION_NUMBER < 0x10100000L
+ ERR_remove_thread_state(nullptr);
+#endif
+}
+
+ReactorTask::ReactorTask() {
+}
+ReactorTask::~ReactorTask() {
+}
+
+Status ReactorThread::GetMetrics(ReactorMetrics* metrics) {
+ DCHECK(IsCurrentThread());
+ metrics->num_client_connections_ = client_conns_.size();
+ metrics->num_server_connections_ = server_conns_.size();
+ metrics->total_client_connections_ = total_client_conns_cnt_;
+ metrics->total_server_connections_ = total_server_conns_cnt_;
+ return Status::OK();
+}
+
+Status ReactorThread::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+ DumpRunningRpcsResponsePB* resp) {
+ DCHECK(IsCurrentThread());
+ for (const scoped_refptr<Connection>& conn : server_conns_) {
+ RETURN_NOT_OK(conn->DumpPB(req, resp->add_inbound_connections()));
+ }
+ for (const conn_multimap_t::value_type& entry : client_conns_) {
+ Connection* conn = entry.second.get();
+ RETURN_NOT_OK(conn->DumpPB(req, resp->add_outbound_connections()));
+ }
+ return Status::OK();
+}
+
+void ReactorThread::WakeThread() {
+ // libev uses some lock-free synchronization, but doesn't have TSAN annotations.
+ // See http://lists.schmorp.de/pipermail/libev/2013q2/002178.html or KUDU-366
+ // for examples.
+ debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
+ async_.send();
+}
+
+// Handle async events. These events are sent to the reactor by other
+// threads that want to bring something to our attention, like the fact that
+// we're shutting down, or the fact that there is a new outbound Transfer
+// ready to send.
+void ReactorThread::AsyncHandler(ev::async& /*watcher*/, int /*revents*/) {
+ DCHECK(IsCurrentThread());
+
+ if (PREDICT_FALSE(reactor_->closing())) {
+ ShutdownInternal();
+ loop_.break_loop(); // break the epoll loop and terminate the thread
+ return;
+ }
+
+ boost::intrusive::list<ReactorTask> tasks;
+ reactor_->DrainTaskQueue(&tasks);
+
+ while (!tasks.empty()) {
+ ReactorTask& task = tasks.front();
+ tasks.pop_front();
+ task.Run(this);
+ }
+}
+
+void ReactorThread::RegisterConnection(scoped_refptr<Connection> conn) {
+ DCHECK(IsCurrentThread());
+
+ Status s = StartConnectionNegotiation(conn);
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG(ERROR) << "Server connection negotiation failed: " << s.ToString();
+ DestroyConnection(conn.get(), s);
+ return;
+ }
+ ++total_server_conns_cnt_;
+ server_conns_.emplace_back(std::move(conn));
+}
+
+void ReactorThread::AssignOutboundCall(shared_ptr<OutboundCall> call) {
+ DCHECK(IsCurrentThread());
+
+ // Skip if the outbound has been cancelled already.
+ if (PREDICT_FALSE(call->IsCancelled())) {
+ return;
+ }
+
+ scoped_refptr<Connection> conn;
+ Status s = FindOrStartConnection(call->conn_id(),
+ call->controller()->credentials_policy(),
+ &conn);
+ if (PREDICT_FALSE(!s.ok())) {
+ call->SetFailed(std::move(s), OutboundCall::Phase::CONNECTION_NEGOTIATION);
+ return;
+ }
+
+ conn->QueueOutboundCall(std::move(call));
+}
+
+void ReactorThread::CancelOutboundCall(const shared_ptr<OutboundCall>& call) {
+ DCHECK(IsCurrentThread());
+
+ // If the callback has been invoked already, the cancellation is a no-op.
+ // The controller may be gone already if the callback has been invoked.
+ if (call->IsFinished()) {
+ return;
+ }
+
+ scoped_refptr<Connection> conn;
+ if (FindConnection(call->conn_id(),
+ call->controller()->credentials_policy(),
+ &conn)) {
+ conn->CancelOutboundCall(call);
+ }
+ call->Cancel();
+}
+
+//
+// Handles timer events. The periodic timer:
+//
+// 1. updates Reactor::cur_time_
+// 2. every tcp_conn_timeo_ seconds, close down connections older than
+// tcp_conn_timeo_ seconds.
+//
+void ReactorThread::TimerHandler(ev::timer& /*watcher*/, int revents) {
+ DCHECK(IsCurrentThread());
+ if (EV_ERROR & revents) {
+ LOG(WARNING) << "Reactor " << name() << " got an error in "
+ "the timer handler.";
+ return;
+ }
+ cur_time_ = MonoTime::Now();
+
+ // Compute load percentage.
+ int64_t now_cycles = CycleClock::Now();
+ if (last_load_measurement_.time_cycles != -1) {
+ int64_t cycles_delta = (now_cycles - last_load_measurement_.time_cycles);
+ int64_t poll_cycles_delta = total_poll_cycles_ - last_load_measurement_.poll_cycles;
+ double poll_fraction = static_cast<double>(poll_cycles_delta) / cycles_delta;
+ double active_fraction = 1 - poll_fraction;
+ if (load_percent_histogram_) {
+ load_percent_histogram_->Increment(static_cast<int>(active_fraction * 100));
+ }
+ }
+ last_load_measurement_.time_cycles = now_cycles;
+ last_load_measurement_.poll_cycles = total_poll_cycles_;
+
+ ScanIdleConnections();
+}
+
+void ReactorThread::RegisterTimeout(ev::timer *watcher) {
+ watcher->set(loop_);
+}
+
+void ReactorThread::ScanIdleConnections() {
+ DCHECK(IsCurrentThread());
+ // Enforce TCP connection timeouts: server-side connections.
+ const auto server_conns_end = server_conns_.end();
+ uint64_t timed_out = 0;
+ // Scan for idle server connections if it's enabled.
+ if (connection_keepalive_time_ >= MonoDelta::FromMilliseconds(0)) {
+ for (auto it = server_conns_.begin(); it != server_conns_end; ) {
+ Connection* conn = it->get();
+ if (!conn->Idle()) {
+ VLOG(10) << "Connection " << conn->ToString() << " not idle";
+ ++it;
+ continue;
+ }
+
+ const MonoDelta connection_delta(cur_time_ - conn->last_activity_time());
+ if (connection_delta <= connection_keepalive_time_) {
+ ++it;
+ continue;
+ }
+
+ conn->Shutdown(Status::NetworkError(
+ Substitute("connection timed out after $0", connection_keepalive_time_.ToString())));
+ VLOG(1) << "Timing out connection " << conn->ToString() << " - it has been idle for "
+ << connection_delta.ToString();
+ ++timed_out;
+ it = server_conns_.erase(it);
+ }
+ }
+ // Take care of idle client-side connections marked for shutdown.
+ uint64_t shutdown = 0;
+ for (auto it = client_conns_.begin(); it != client_conns_.end();) {
+ Connection* conn = it->second.get();
+ if (conn->scheduled_for_shutdown() && conn->Idle()) {
+ conn->Shutdown(Status::NetworkError(
+ "connection has been marked for shutdown"));
+ it = client_conns_.erase(it);
+ ++shutdown;
+ } else {
+ ++it;
+ }
+ }
+ // TODO(aserbin): clients may want to set their keepalive timeout for idle
+ // but not scheduled for shutdown connections.
+
+ VLOG_IF(1, timed_out > 0) << name() << ": timed out " << timed_out << " TCP connections.";
+ VLOG_IF(1, shutdown > 0) << name() << ": shutdown " << shutdown << " TCP connections.";
+}
+
+const std::string& ReactorThread::name() const {
+ return reactor_->name();
+}
+
+MonoTime ReactorThread::cur_time() const {
+ return cur_time_;
+}
+
+Reactor *ReactorThread::reactor() {
+ return reactor_;
+}
+
+bool ReactorThread::IsCurrentThread() const {
+ return thread_.get() == kudu::Thread::current_thread();
+}
+
+void ReactorThread::RunThread() {
+ ThreadRestrictions::SetWaitAllowed(false);
+ ThreadRestrictions::SetIOAllowed(false);
+ DVLOG(6) << "Calling ReactorThread::RunThread()...";
+ loop_.run(0);
+ VLOG(1) << name() << " thread exiting.";
+
+ // No longer need the messenger. This causes the messenger to
+ // get deleted when all the reactors exit.
+ reactor_->messenger_.reset();
+}
+
+bool ReactorThread::FindConnection(const ConnectionId& conn_id,
+ CredentialsPolicy cred_policy,
+ scoped_refptr<Connection>* conn) {
+ DCHECK(IsCurrentThread());
+ const auto range = client_conns_.equal_range(conn_id);
+ scoped_refptr<Connection> found_conn;
+ for (auto it = range.first; it != range.second;) {
+ const auto& c = it->second.get();
+ // * Do not use connections scheduled for shutdown to place new calls.
+ //
+ // * Do not use a connection with a non-compliant credentials policy.
+ // Instead, open a new one, while marking the former as scheduled for
+ // shutdown. This process converges: any connection that satisfies the
+ // PRIMARY_CREDENTIALS policy automatically satisfies the ANY_CREDENTIALS
+ // policy as well. The idea is to keep only one usable connection
+ // identified by the specified 'conn_id'.
+ //
+ // * If the test-only 'one-connection-per-RPC' mode is enabled, connections
+ // are re-established at every RPC call.
+ if (c->scheduled_for_shutdown() ||
+ !c->SatisfiesCredentialsPolicy(cred_policy) ||
+ PREDICT_FALSE(FLAGS_rpc_reopen_outbound_connections)) {
+ if (c->Idle()) {
+ // Shutdown idle connections to the target destination. Non-idle ones
+ // will be taken care of later by the idle connection scanner.
+ DCHECK_EQ(Connection::CLIENT, c->direction());
+ c->Shutdown(Status::NetworkError("connection is closed due to non-reuse policy"));
+ it = client_conns_.erase(it);
+ continue;
+ }
+ c->set_scheduled_for_shutdown();
+ } else {
+ DCHECK(!found_conn);
+ found_conn = c;
+ // Appropriate connection is found; continue further to take care of the
+ // rest of connections to mark them for shutdown if they are not
+ // satisfying the policy.
+ }
+ ++it;
+ }
+ if (found_conn) {
+ // Found matching not-to-be-shutdown connection: return it as the result.
+ conn->swap(found_conn);
+ return true;
+ }
+ return false;
+}
+
+Status ReactorThread::FindOrStartConnection(const ConnectionId& conn_id,
+ CredentialsPolicy cred_policy,
+ scoped_refptr<Connection>* conn) {
+ DCHECK(IsCurrentThread());
+ if (FindConnection(conn_id, cred_policy, conn)) {
+ return Status::OK();
+ }
+
+ // No connection to this remote. Need to create one.
+ VLOG(2) << name() << " FindOrStartConnection: creating "
+ << "new connection for " << conn_id.remote().ToString();
+
+ // Create a new socket and start connecting to the remote.
+ Socket sock;
+ RETURN_NOT_OK(CreateClientSocket(&sock));
+ RETURN_NOT_OK(StartConnect(&sock, conn_id.remote()));
+
+ unique_ptr<Socket> new_socket(new Socket(sock.Release()));
+
+ // Register the new connection in our map.
+ *conn = new Connection(
+ this, conn_id.remote(), std::move(new_socket), Connection::CLIENT, cred_policy);
+ (*conn)->set_outbound_connection_id(conn_id);
+
+ // Kick off blocking client connection negotiation.
+ Status s = StartConnectionNegotiation(*conn);
+ if (s.IsIllegalState()) {
+ // Return a nicer error message to the user indicating -- if we just
+ // forward the status we'd get something generic like "ThreadPool is closing".
+ return Status::ServiceUnavailable("Client RPC Messenger shutting down");
+ }
+ // Propagate any other errors as-is.
+ RETURN_NOT_OK_PREPEND(s, "Unable to start connection negotiation thread");
+
+ // Insert into the client connection map to avoid duplicate connection requests.
+ client_conns_.emplace(conn_id, *conn);
+ ++total_client_conns_cnt_;
+
+ return Status::OK();
+}
+
+Status ReactorThread::StartConnectionNegotiation(const scoped_refptr<Connection>& conn) {
+ DCHECK(IsCurrentThread());
+
+ // Set a limit on how long the server will negotiate with a new client.
+ MonoTime deadline = MonoTime::Now() +
+ MonoDelta::FromMilliseconds(reactor()->messenger()->rpc_negotiation_timeout_ms());
+
+ scoped_refptr<Trace> trace(new Trace());
+ ADOPT_TRACE(trace.get());
+ TRACE("Submitting negotiation task for $0", conn->ToString());
+ auto authentication = reactor()->messenger()->authentication();
+ auto encryption = reactor()->messenger()->encryption();
+ ThreadPool* negotiation_pool =
+ reactor()->messenger()->negotiation_pool(conn->direction());
+ RETURN_NOT_OK(negotiation_pool->SubmitClosure(
+ Bind(&Negotiation::RunNegotiation, conn, authentication, encryption, deadline)));
+ return Status::OK();
+}
+
+void ReactorThread::CompleteConnectionNegotiation(
+ const scoped_refptr<Connection>& conn,
+ const Status& status,
+ unique_ptr<ErrorStatusPB> rpc_error) {
+ DCHECK(IsCurrentThread());
+ if (PREDICT_FALSE(!status.ok())) {
+ DestroyConnection(conn.get(), status, std::move(rpc_error));
+ return;
+ }
+
+ // Switch the socket back to non-blocking mode after negotiation.
+ Status s = conn->SetNonBlocking(true);
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG(DFATAL) << "Unable to set connection to non-blocking mode: " << s.ToString();
+ DestroyConnection(conn.get(), s, std::move(rpc_error));
+ return;
+ }
+
+ conn->MarkNegotiationComplete();
+ conn->EpollRegister(loop_);
+}
+
+Status ReactorThread::CreateClientSocket(Socket *sock) {
+ Status ret = sock->Init(Socket::FLAG_NONBLOCKING);
+ if (ret.ok()) {
+ ret = sock->SetNoDelay(true);
+ }
+ LOG_IF(WARNING, !ret.ok())
+ << "failed to create an outbound connection because a new socket could not be created: "
+ << ret.ToString();
+ return ret;
+}
+
+Status ReactorThread::StartConnect(Socket *sock, const Sockaddr& remote) {
+ const Status ret = sock->Connect(remote);
+ if (ret.ok()) {
+ VLOG(3) << "StartConnect: connect finished immediately for " << remote.ToString();
+ return Status::OK();
+ }
+
+ int posix_code = ret.posix_code();
+ if (Socket::IsTemporarySocketError(posix_code) || posix_code == EINPROGRESS) {
+ VLOG(3) << "StartConnect: connect in progress for " << remote.ToString();
+ return Status::OK();
+ }
+
+ LOG(WARNING) << "Failed to create an outbound connection to " << remote.ToString()
+ << " because connect() failed: " << ret.ToString();
+ return ret;
+}
+
+void ReactorThread::DestroyConnection(Connection *conn,
+ const Status& conn_status,
+ unique_ptr<ErrorStatusPB> rpc_error) {
+ DCHECK(IsCurrentThread());
+
+ conn->Shutdown(conn_status, std::move(rpc_error));
+
+ // Unlink connection from lists.
+ if (conn->direction() == Connection::CLIENT) {
+ const auto range = client_conns_.equal_range(conn->outbound_connection_id());
+ CHECK(range.first != range.second) << "Couldn't find connection " << conn->ToString();
+ // The client_conns_ container is a multi-map.
+ for (auto it = range.first; it != range.second;) {
+ if (it->second.get() == conn) {
+ it = client_conns_.erase(it);
+ break;
+ }
+ ++it;
+ }
+ } else if (conn->direction() == Connection::SERVER) {
+ auto it = server_conns_.begin();
+ while (it != server_conns_.end()) {
+ if ((*it).get() == conn) {
+ server_conns_.erase(it);
+ break;
+ }
+ ++it;
+ }
+ }
+}
+
+DelayedTask::DelayedTask(boost::function<void(const Status&)> func,
+ MonoDelta when)
+ : func_(std::move(func)),
+ when_(when),
+ thread_(nullptr) {
+}
+
+void DelayedTask::Run(ReactorThread* thread) {
+ DCHECK(thread_ == nullptr) << "Task has already been scheduled";
+ DCHECK(thread->IsCurrentThread());
+ DCHECK(!is_linked()) << "Should not be linked on pending_tasks_ anymore";
+
+ // Schedule the task to run later.
+ thread_ = thread;
+ timer_.set(thread->loop_);
+ timer_.set<DelayedTask, &DelayedTask::TimerHandler>(this); // NOLINT(*)
+ timer_.start(when_.ToSeconds(), // after
+ 0); // repeat
+ thread_->scheduled_tasks_.push_back(*this);
+}
+
+void DelayedTask::Abort(const Status& abort_status) {
+ func_(abort_status);
+ delete this;
+}
+
+void DelayedTask::TimerHandler(ev::timer& /*watcher*/, int revents) {
+ DCHECK(is_linked()) << "should be linked on scheduled_tasks_";
+ // We will free this task's memory.
+ thread_->scheduled_tasks_.erase(thread_->scheduled_tasks_.iterator_to(*this));
+
+ if (EV_ERROR & revents) {
+ string msg = "Delayed task got an error in its timer handler";
+ LOG(WARNING) << msg;
+ Abort(Status::Aborted(msg)); // Will delete 'this'.
+ } else {
+ func_(Status::OK());
+ delete this;
+ }
+}
+
+Reactor::Reactor(shared_ptr<Messenger> messenger,
+ int index, const MessengerBuilder& bld)
+ : messenger_(std::move(messenger)),
+ name_(StringPrintf("%s_R%03d", messenger_->name().c_str(), index)),
+ closing_(false),
+ thread_(this, bld) {
+ static std::once_flag libev_once;
+ std::call_once(libev_once, DoInitLibEv);
+}
+
+Status Reactor::Init() {
+ DVLOG(6) << "Called Reactor::Init()";
+ return thread_.Init();
+}
+
+void Reactor::Shutdown(Messenger::ShutdownMode mode) {
+ {
+ std::lock_guard<LockType> l(lock_);
+ if (closing_) {
+ return;
+ }
+ closing_ = true;
+ }
+
+ thread_.Shutdown(mode);
+
+ // Abort all pending tasks. No new tasks can get scheduled after this
+ // because ScheduleReactorTask() tests the closing_ flag set above.
+ Status aborted = ShutdownError(true);
+ while (!pending_tasks_.empty()) {
+ ReactorTask& task = pending_tasks_.front();
+ pending_tasks_.pop_front();
+ task.Abort(aborted);
+ }
+}
+
+Reactor::~Reactor() {
+ Shutdown(Messenger::ShutdownMode::ASYNC);
+}
+
+const std::string& Reactor::name() const {
+ return name_;
+}
+
+bool Reactor::closing() const {
+ std::lock_guard<LockType> l(lock_);
+ return closing_;
+}
+
+// Task to call an arbitrary function within the reactor thread.
+class RunFunctionTask : public ReactorTask {
+ public:
+ explicit RunFunctionTask(boost::function<Status()> f)
+ : function_(std::move(f)), latch_(1) {}
+
+ void Run(ReactorThread* /*reactor*/) override {
+ status_ = function_();
+ latch_.CountDown();
+ }
+ void Abort(const Status& status) override {
+ status_ = status;
+ latch_.CountDown();
+ }
+
+ // Wait until the function has completed, and return the Status
+ // returned by the function.
+ Status Wait() {
+ latch_.Wait();
+ return status_;
+ }
+
+ private:
+ boost::function<Status()> function_;
+ Status status_;
+ CountDownLatch latch_;
+};
+
+Status Reactor::GetMetrics(ReactorMetrics *metrics) {
+ return RunOnReactorThread(boost::bind(&ReactorThread::GetMetrics, &thread_, metrics));
+}
+
+Status Reactor::RunOnReactorThread(const boost::function<Status()>& f) {
+ RunFunctionTask task(f);
+ ScheduleReactorTask(&task);
+ return task.Wait();
+}
+
+Status Reactor::DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+ DumpRunningRpcsResponsePB* resp) {
+ return RunOnReactorThread(boost::bind(&ReactorThread::DumpRunningRpcs, &thread_,
+ boost::ref(req), resp));
+}
+
+class RegisterConnectionTask : public ReactorTask {
+ public:
+ explicit RegisterConnectionTask(scoped_refptr<Connection> conn)
+ : conn_(std::move(conn)) {
+ }
+
+ void Run(ReactorThread* reactor) override {
+ reactor->RegisterConnection(std::move(conn_));
+ delete this;
+ }
+
+ void Abort(const Status& /*status*/) override {
+ // We don't need to Shutdown the connection since it was never registered.
+ // This is only used for inbound connections, and inbound connections will
+ // never have any calls added to them until they've been registered.
+ delete this;
+ }
+
+ private:
+ scoped_refptr<Connection> conn_;
+};
+
+void Reactor::RegisterInboundSocket(Socket *socket, const Sockaddr& remote) {
+ VLOG(3) << name_ << ": new inbound connection to " << remote.ToString();
+ unique_ptr<Socket> new_socket(new Socket(socket->Release()));
+ auto task = new RegisterConnectionTask(
+ new Connection(&thread_, remote, std::move(new_socket), Connection::SERVER));
+ ScheduleReactorTask(task);
+}
+
+// Task which runs in the reactor thread to assign an outbound call
+// to a connection.
+class AssignOutboundCallTask : public ReactorTask {
+ public:
+ explicit AssignOutboundCallTask(shared_ptr<OutboundCall> call)
+ : call_(std::move(call)) {}
+
+ void Run(ReactorThread* reactor) override {
+ reactor->AssignOutboundCall(std::move(call_));
+ delete this;
+ }
+
+ void Abort(const Status& status) override {
+ // It doesn't matter what is the actual phase of the OutboundCall: just set
+ // it to Phase::REMOTE_CALL to finalize the state of the call.
+ call_->SetFailed(status, OutboundCall::Phase::REMOTE_CALL);
+ delete this;
+ }
+
+ private:
+ shared_ptr<OutboundCall> call_;
+};
+
+void Reactor::QueueOutboundCall(const shared_ptr<OutboundCall>& call) {
+ DVLOG(3) << name_ << ": queueing outbound call "
+ << call->ToString() << " to remote " << call->conn_id().remote().ToString();
+ // Test cancellation when 'call_' is in 'READY' state.
+ if (PREDICT_FALSE(call->ShouldInjectCancellation())) {
+ QueueCancellation(call);
+ }
+ ScheduleReactorTask(new AssignOutboundCallTask(call));
+}
+
+class CancellationTask : public ReactorTask {
+ public:
+ explicit CancellationTask(shared_ptr<OutboundCall> call)
+ : call_(std::move(call)) {}
+
+ void Run(ReactorThread* reactor) override {
+ reactor->CancelOutboundCall(call_);
+ delete this;
+ }
+
+ void Abort(const Status& /*status*/) override {
+ delete this;
+ }
+
+ private:
+ shared_ptr<OutboundCall> call_;
+};
+
+void Reactor::QueueCancellation(const shared_ptr<OutboundCall>& call) {
+ ScheduleReactorTask(new CancellationTask(call));
+}
+
+void Reactor::ScheduleReactorTask(ReactorTask *task) {
+ {
+ std::unique_lock<LockType> l(lock_);
+ if (closing_) {
+ // We guarantee the reactor lock is not taken when calling Abort().
+ l.unlock();
+ task->Abort(ShutdownError(false));
+ return;
+ }
+ pending_tasks_.push_back(*task);
+ }
+ thread_.WakeThread();
+}
+
+bool Reactor::DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks) { // NOLINT(*)
+ std::lock_guard<LockType> l(lock_);
+ if (closing_) {
+ return false;
+ }
+ tasks->swap(pending_tasks_);
+ return true;
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/reactor.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/reactor.h b/be/src/kudu/rpc/reactor.h
new file mode 100644
index 0000000..8884f54
--- /dev/null
+++ b/be/src/kudu/rpc/reactor.h
@@ -0,0 +1,427 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_REACTOR_H
+#define KUDU_RPC_REACTOR_H
+
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include <boost/function.hpp> // IWYU pragma: keep
+#include <boost/intrusive/list.hpp>
+#include <boost/intrusive/list_hook.hpp>
+#include <ev++.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/connection_id.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/status.h"
+#include "kudu/util/thread.h"
+
+namespace kudu {
+
+class Sockaddr;
+class Socket;
+
+namespace rpc {
+
+typedef std::list<scoped_refptr<Connection>> conn_list_t;
+
+class DumpRunningRpcsRequestPB;
+class DumpRunningRpcsResponsePB;
+class OutboundCall;
+class Reactor;
+class ReactorThread;
+enum class CredentialsPolicy;
+
+// Simple metrics information from within a reactor.
+// TODO(todd): switch these over to use util/metrics.h style metrics.
+struct ReactorMetrics {
+ // Number of client RPC connections currently connected.
+ int32_t num_client_connections_;
+ // Number of server RPC connections currently connected.
+ int32_t num_server_connections_;
+
+ // Total number of client RPC connections opened during Reactor's lifetime.
+ uint64_t total_client_connections_;
+ // Total number of server RPC connections opened during Reactor's lifetime.
+ uint64_t total_server_connections_;
+};
+
+// A task which can be enqueued to run on the reactor thread.
+class ReactorTask : public boost::intrusive::list_base_hook<> {
+ public:
+ ReactorTask();
+
+ // Run the task. 'reactor' is guaranteed to be the current thread.
+ virtual void Run(ReactorThread *reactor) = 0;
+
+ // Abort the task, in the case that the reactor shut down before the
+ // task could be processed. This may or may not run on the reactor thread
+ // itself.
+ //
+ // The Reactor guarantees that the Reactor lock is free when this
+ // method is called.
+ virtual void Abort(const Status &abort_status) {}
+
+ virtual ~ReactorTask();
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(ReactorTask);
+};
+
+// A ReactorTask that is scheduled to run at some point in the future.
+//
+// Semantically it works like RunFunctionTask with a few key differences:
+// 1. The user function is called during Abort. Put another way, the
+// user function is _always_ invoked, even during reactor shutdown.
+// 2. To differentiate between Abort and non-Abort, the user function
+// receives a Status as its first argument.
+class DelayedTask : public ReactorTask {
+ public:
+ DelayedTask(boost::function<void(const Status &)> func, MonoDelta when);
+
+ // Schedules the task for running later but doesn't actually run it yet.
+ void Run(ReactorThread* thread) override;
+
+ // Behaves like ReactorTask::Abort.
+ void Abort(const Status& abort_status) override;
+
+ private:
+ // libev callback for when the registered timer fires.
+ void TimerHandler(ev::timer& watcher, int revents);
+
+ // User function to invoke when timer fires or when task is aborted.
+ const boost::function<void(const Status&)> func_;
+
+ // Delay to apply to this task.
+ const MonoDelta when_;
+
+ // Link back to registering reactor thread.
+ ReactorThread* thread_;
+
+ // libev timer. Set when Run() is invoked.
+ ev::timer timer_;
+};
+
+// A ReactorThread is a libev event handler thread which manages I/O
+// on a list of sockets.
+//
+// All methods in this class are _only_ called from the reactor thread itself
+// except where otherwise specified. New methods should DCHECK(IsCurrentThread())
+// to ensure this.
+class ReactorThread {
+ public:
+ friend class Connection;
+
+ // Client-side connection map. Multiple connections could be open to a remote
+ // server if multiple credential policies are used for individual RPCs.
+ typedef std::unordered_multimap<ConnectionId, scoped_refptr<Connection>,
+ ConnectionIdHash, ConnectionIdEqual>
+ conn_multimap_t;
+
+ ReactorThread(Reactor *reactor, const MessengerBuilder &bld);
+
+ // This may be called from another thread.
+ Status Init();
+
+ // Add any connections on this reactor thread into the given status dump.
+ Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+ DumpRunningRpcsResponsePB* resp);
+
+ // Shuts down a reactor thread, optionally waiting for it to exit.
+ // Reactor::Shutdown() must have been called already.
+ //
+ // If mode == SYNC, may not be called from the reactor thread itself.
+ void Shutdown(Messenger::ShutdownMode mode);
+
+ // This method is thread-safe.
+ void WakeThread();
+
+ // libev callback for handling async notifications in our epoll thread.
+ void AsyncHandler(ev::async &watcher, int revents);
+
+ // libev callback for handling timer events in our epoll thread.
+ void TimerHandler(ev::timer &watcher, int revents);
+
+ // Register an epoll timer watcher with our event loop.
+ // Does not set a timeout or start it.
+ void RegisterTimeout(ev::timer *watcher);
+
+ // This may be called from another thread.
+ const std::string &name() const;
+
+ MonoTime cur_time() const;
+
+ // This may be called from another thread.
+ Reactor *reactor();
+
+ // Return true if this reactor thread is the thread currently
+ // running. Should be used in DCHECK assertions.
+ bool IsCurrentThread() const;
+
+ // Begin the process of connection negotiation.
+ // Must be called from the reactor thread.
+ Status StartConnectionNegotiation(const scoped_refptr<Connection>& conn);
+
+ // Transition back from negotiating to processing requests.
+ // Must be called from the reactor thread.
+ void CompleteConnectionNegotiation(const scoped_refptr<Connection>& conn,
+ const Status& status,
+ std::unique_ptr<ErrorStatusPB> rpc_error);
+
+ // Collect metrics.
+ // Must be called from the reactor thread.
+ Status GetMetrics(ReactorMetrics *metrics);
+
+ private:
+ friend class AssignOutboundCallTask;
+ friend class CancellationTask;
+ friend class RegisterConnectionTask;
+ friend class DelayedTask;
+
+ // Run the main event loop of the reactor.
+ void RunThread();
+
+ // When libev has noticed that it needs to wake up an application watcher,
+ // it calls this callback. The callback simply calls back into libev's
+ // ev_invoke_pending() to trigger all the watcher callbacks, but
+ // wraps it with latency measurements.
+ static void InvokePendingCb(struct ev_loop* loop);
+
+ // Similarly, libev calls these functions before/after invoking epoll_wait().
+ // We use these to measure the amount of time spent waiting.
+ //
+ // NOTE: 'noexcept' is required to avoid compilation errors due to libev's
+ // use of the same exception specification.
+ static void AboutToPollCb(struct ev_loop* loop) noexcept;
+ static void PollCompleteCb(struct ev_loop* loop) noexcept;
+
+ // Find a connection to the given remote and returns it in 'conn'.
+ // Returns true if a connection is found. Returns false otherwise.
+ bool FindConnection(const ConnectionId& conn_id,
+ CredentialsPolicy cred_policy,
+ scoped_refptr<Connection>* conn);
+
+ // Find or create a new connection to the given remote.
+ // If such a connection already exists, returns that, otherwise creates a new one.
+ // May return a bad Status if the connect() call fails.
+ // The resulting connection object is managed internally by the reactor thread.
+ Status FindOrStartConnection(const ConnectionId& conn_id,
+ CredentialsPolicy cred_policy,
+ scoped_refptr<Connection>* conn);
+
+ // Shut down the given connection, removing it from the connection tracking
+ // structures of this reactor.
+ //
+ // The connection is not explicitly deleted -- shared_ptr reference counting
+ // may hold on to the object after this, but callers should assume that it
+ // _may_ be deleted by this call.
+ void DestroyConnection(Connection *conn, const Status &conn_status,
+ std::unique_ptr<ErrorStatusPB> rpc_error = {});
+
+ // Scan any open connections for idle ones that have been idle longer than
+ // connection_keepalive_time_. If connection_keepalive_time_ < 0, the scan
+ // is skipped.
+ void ScanIdleConnections();
+
+ // Create a new client socket (non-blocking, NODELAY)
+ static Status CreateClientSocket(Socket *sock);
+
+ // Initiate a new connection on the given socket.
+ static Status StartConnect(Socket *sock, const Sockaddr &remote);
+
+ // Assign a new outbound call to the appropriate connection object.
+ // If this fails, the call is marked failed and completed.
+ void AssignOutboundCall(std::shared_ptr<OutboundCall> call);
+
+ // Cancel the outbound call. May update corresponding connection
+ // object to remove call from the CallAwaitingResponse object.
+ // Also mark the call as slated for cancellation so the callback
+ // may be invoked early if the RPC hasn't yet been sent or if it's
+ // waiting for a response from the remote.
+ void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+ // Register a new connection.
+ void RegisterConnection(scoped_refptr<Connection> conn);
+
+ // Actually perform shutdown of the thread, tearing down any connections,
+ // etc. This is called from within the thread.
+ void ShutdownInternal();
+
+ scoped_refptr<kudu::Thread> thread_;
+
+ // our epoll object (or kqueue, etc).
+ ev::dynamic_loop loop_;
+
+ // Used by other threads to notify the reactor thread
+ ev::async async_;
+
+ // Handles the periodic timer.
+ ev::timer timer_;
+
+ // Scheduled (but not yet run) delayed tasks.
+ //
+ // Each task owns its own memory and must be freed by its TaskRun and
+ // Abort members, provided it was allocated on the heap.
+ boost::intrusive::list<DelayedTask> scheduled_tasks_;
+
+ // The current monotonic time. Updated every coarse_timer_granularity_secs_.
+ MonoTime cur_time_;
+
+ // last time we did TCP timeouts.
+ MonoTime last_unused_tcp_scan_;
+
+ // Map of sockaddrs to Connection objects for outbound (client) connections.
+ conn_multimap_t client_conns_;
+
+ // List of current connections coming into the server.
+ conn_list_t server_conns_;
+
+ Reactor *reactor_;
+
+ // If a connection has been idle for this much time, it is torn down.
+ const MonoDelta connection_keepalive_time_;
+
+ // Scan for idle connections on this granularity.
+ const MonoDelta coarse_timer_granularity_;
+
+ // Metrics.
+ scoped_refptr<Histogram> invoke_us_histogram_;
+ scoped_refptr<Histogram> load_percent_histogram_;
+
+ // Total number of client connections opened during Reactor's lifetime.
+ uint64_t total_client_conns_cnt_;
+
+ // Total number of server connections opened during Reactor's lifetime.
+ uint64_t total_server_conns_cnt_;
+
+ // Set prior to calling epoll and then reset back to -1 after each invocation
+ // completes. Used for accounting total_poll_cycles_.
+ int64_t cycle_clock_before_poll_ = -1;
+
+ // The total number of cycles spent in epoll_wait() since this thread
+ // started.
+ int64_t total_poll_cycles_ = 0;
+
+ // Accounting for determining load average in each cycle of TimerHandler.
+ struct {
+ // The cycle-time at which the load average was last calculated.
+ int64_t time_cycles = -1;
+ // The value of total_poll_cycles_ at the last-recorded time.
+ int64_t poll_cycles = -1;
+ } last_load_measurement_;
+};
+
+// A Reactor manages a ReactorThread
+class Reactor {
+ public:
+ Reactor(std::shared_ptr<Messenger> messenger,
+ int index,
+ const MessengerBuilder &bld);
+ Status Init();
+
+ // Shuts down the reactor and its corresponding thread, optionally waiting
+ // until the thread has exited.
+ void Shutdown(Messenger::ShutdownMode mode);
+
+ ~Reactor();
+
+ const std::string &name() const;
+
+ // Collect metrics about the reactor.
+ Status GetMetrics(ReactorMetrics *metrics);
+
+ // Add any connections on this reactor thread into the given status dump.
+ Status DumpRunningRpcs(const DumpRunningRpcsRequestPB& req,
+ DumpRunningRpcsResponsePB* resp);
+
+ // Queue a new incoming connection. Takes ownership of the underlying fd from
+ // 'socket', but not the Socket object itself.
+ // If the reactor is already shut down, takes care of closing the socket.
+ void RegisterInboundSocket(Socket *socket, const Sockaddr &remote);
+
+ // Queue a new call to be sent. If the reactor is already shut down, marks
+ // the call as failed.
+ void QueueOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+ // Queue a new reactor task to cancel an outbound call.
+ void QueueCancellation(const std::shared_ptr<OutboundCall> &call);
+
+ // Schedule the given task's Run() method to be called on the
+ // reactor thread.
+ // If the reactor shuts down before it is run, the Abort method will be
+ // called.
+ // Does _not_ take ownership of 'task' -- the task should take care of
+ // deleting itself after running if it is allocated on the heap.
+ void ScheduleReactorTask(ReactorTask *task);
+
+ Status RunOnReactorThread(const boost::function<Status()>& f);
+
+ // If the Reactor is closing, returns false.
+ // Otherwise, drains the pending_tasks_ queue into the provided list.
+ bool DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks);
+
+ Messenger *messenger() const {
+ return messenger_.get();
+ }
+
+ // Indicates whether the reactor is shutting down.
+ //
+ // This method is thread-safe.
+ bool closing() const;
+
+ // Is this reactor's thread the current thread?
+ bool IsCurrentThread() const {
+ return thread_.IsCurrentThread();
+ }
+
+ private:
+ friend class ReactorThread;
+ typedef simple_spinlock LockType;
+ mutable LockType lock_;
+
+ // parent messenger
+ std::shared_ptr<Messenger> messenger_;
+
+ const std::string name_;
+
+ // Whether the reactor is shutting down.
+ // Guarded by lock_.
+ bool closing_;
+
+ // Tasks to be run within the reactor thread.
+ // Guarded by lock_.
+ boost::intrusive::list<ReactorTask> pending_tasks_; // NOLINT(build/include_what_you_use)
+
+ ReactorThread thread_;
+
+ DISALLOW_COPY_AND_ASSIGN(Reactor);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/remote_method.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_method.cc b/be/src/kudu/rpc/remote_method.cc
new file mode 100644
index 0000000..70b0d02
--- /dev/null
+++ b/be/src/kudu/rpc/remote_method.cc
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <ostream>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/remote_method.h"
+#include "kudu/rpc/rpc_header.pb.h"
+
+namespace kudu {
+namespace rpc {
+
+using strings::Substitute;
+
+RemoteMethod::RemoteMethod(std::string service_name,
+ std::string method_name)
+ : service_name_(std::move(service_name)),
+ method_name_(std::move(method_name)) {}
+
+void RemoteMethod::FromPB(const RemoteMethodPB& pb) {
+ DCHECK(pb.IsInitialized()) << "PB is uninitialized: " << pb.InitializationErrorString();
+ service_name_ = pb.service_name();
+ method_name_ = pb.method_name();
+}
+
+void RemoteMethod::ToPB(RemoteMethodPB* pb) const {
+ pb->set_service_name(service_name_);
+ pb->set_method_name(method_name_);
+}
+
+std::string RemoteMethod::ToString() const {
+ return Substitute("$0.$1", service_name_, method_name_);
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/remote_method.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_method.h b/be/src/kudu/rpc/remote_method.h
new file mode 100644
index 0000000..a0a35bb
--- /dev/null
+++ b/be/src/kudu/rpc/remote_method.h
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_REMOTE_METHOD_H_
+#define KUDU_RPC_REMOTE_METHOD_H_
+
+#include <string>
+
+namespace kudu {
+namespace rpc {
+
+class RemoteMethodPB;
+
+// Simple class that acts as a container for a fully qualified remote RPC name
+// and converts to/from RemoteMethodPB.
+// This class is also copyable and assignable for convenience reasons.
+class RemoteMethod {
+ public:
+ RemoteMethod() {}
+ RemoteMethod(std::string service_name, std::string method_name);
+ std::string service_name() const { return service_name_; }
+ std::string method_name() const { return method_name_; }
+
+ // Encode/decode to/from 'pb'.
+ void FromPB(const RemoteMethodPB& pb);
+ void ToPB(RemoteMethodPB* pb) const;
+
+ std::string ToString() const;
+
+ private:
+ std::string service_name_;
+ std::string method_name_;
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_REMOTE_METHOD_H_
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/remote_user.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_user.cc b/be/src/kudu/rpc/remote_user.cc
new file mode 100644
index 0000000..047f10a
--- /dev/null
+++ b/be/src/kudu/rpc/remote_user.cc
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/remote_user.h"
+
+#include <string>
+
+#include "kudu/gutil/strings/substitute.h"
+
+using std::string;
+
+namespace kudu {
+namespace rpc {
+
+string RemoteUser::ToString() const {
+ string ret;
+ strings::SubstituteAndAppend(&ret, "{username='$0'", username_);
+ if (principal_) {
+ strings::SubstituteAndAppend(&ret, ", principal='$0'", *principal_);
+ }
+ ret.append("}");
+ return ret;
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/remote_user.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/remote_user.h b/be/src/kudu/rpc/remote_user.h
new file mode 100644
index 0000000..77ef294
--- /dev/null
+++ b/be/src/kudu/rpc/remote_user.h
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <string>
+#include <utility>
+
+#include <boost/optional/optional.hpp>
+
+namespace kudu {
+namespace rpc {
+
+// Server-side view of the remote authenticated user.
+//
+// This class may be read by multiple threads concurrently after
+// its initialization during RPC negotiation.
+class RemoteUser {
+ public:
+ // The method by which the remote user authenticated.
+ enum Method {
+ // No authentication (authentication was not required by the server
+ // and the user provided a username but it was not validated in any way)
+ UNAUTHENTICATED,
+ // Kerberos-authenticated.
+ KERBEROS,
+ // Authenticated by a Kudu authentication token.
+ AUTHN_TOKEN,
+ // Authenticated by a client certificate.
+ CLIENT_CERT
+ };
+
+ Method authenticated_by() const {
+ return authenticated_by_;
+ }
+
+ const std::string& username() const { return username_; }
+
+ boost::optional<std::string> principal() const {
+ return principal_;
+ }
+
+ void SetAuthenticatedByKerberos(std::string username,
+ std::string principal) {
+ authenticated_by_ = KERBEROS;
+ username_ = std::move(username);
+ principal_ = std::move(principal);
+ }
+
+ void SetUnauthenticated(std::string username) {
+ authenticated_by_ = UNAUTHENTICATED;
+ username_ = std::move(username);
+ principal_ = boost::none;
+ }
+
+ void SetAuthenticatedByClientCert(std::string username,
+ boost::optional<std::string> principal) {
+ authenticated_by_ = CLIENT_CERT;
+ username_ = std::move(username);
+ principal_ = std::move(principal);
+ }
+
+ void SetAuthenticatedByToken(std::string username) {
+ authenticated_by_ = AUTHN_TOKEN;
+ username_ = std::move(username);
+ principal_ = boost::none;
+ }
+
+ // Returns a string representation of the object.
+ std::string ToString() const;
+
+ private:
+ // The real username of the remote user. In the case of a Kerberos
+ // principal, this has already been mapped to a local username.
+ // TODO(todd): actually do the above mapping.
+ std::string username_;
+
+ // The full principal of the remote user. This is only set in the
+ // case of a strong-authenticated user.
+ boost::optional<std::string> principal_;
+
+ Method authenticated_by_ = UNAUTHENTICATED;
+};
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/request_tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/request_tracker-test.cc b/be/src/kudu/rpc/request_tracker-test.cc
new file mode 100644
index 0000000..f1e4d55
--- /dev/null
+++ b/be/src/kudu/rpc/request_tracker-test.cc
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/util/test_macros.h"
+
+using std::vector;
+
+namespace kudu {
+namespace rpc {
+
+TEST(RequestTrackerTest, TestSequenceNumberGeneration) {
+ const int MAX = 10;
+
+ scoped_refptr<RequestTracker> tracker_(new RequestTracker("test_client"));
+
+ // A new tracker should have no incomplete RPCs
+ RequestTracker::SequenceNumber seq_no = tracker_->FirstIncomplete();
+ ASSERT_EQ(seq_no, RequestTracker::kNoSeqNo);
+
+ vector<RequestTracker::SequenceNumber> generated_seq_nos;
+
+ // Generate MAX in flight RPCs, making sure they are correctly returned.
+ for (int i = 0; i < MAX; i++) {
+ ASSERT_OK(tracker_->NewSeqNo(&seq_no));
+ generated_seq_nos.push_back(seq_no);
+ }
+
+ // Now we should get a first incomplete.
+ ASSERT_EQ(generated_seq_nos[0], tracker_->FirstIncomplete());
+
+ // Marking 'first_incomplete' as done, should advance the first incomplete.
+ tracker_->RpcCompleted(tracker_->FirstIncomplete());
+
+ ASSERT_EQ(generated_seq_nos[1], tracker_->FirstIncomplete());
+
+ // Marking a 'middle' rpc, should not advance 'first_incomplete'.
+ tracker_->RpcCompleted(generated_seq_nos[5]);
+ ASSERT_EQ(generated_seq_nos[1], tracker_->FirstIncomplete());
+
+ // Marking half the rpc as complete should advance FirstIncomplete.
+ // Note that this also tests that RequestTracker::RpcCompleted() is idempotent, i.e. that
+ // marking the same sequence number as complete twice is a no-op.
+ for (int i = 0; i < MAX / 2; i++) {
+ tracker_->RpcCompleted(generated_seq_nos[i]);
+ }
+
+ ASSERT_EQ(generated_seq_nos[6], tracker_->FirstIncomplete());
+
+ for (int i = MAX / 2; i <= MAX; i++) {
+ ASSERT_OK(tracker_->NewSeqNo(&seq_no));
+ generated_seq_nos.push_back(seq_no);
+ }
+
+ // Marking them all as completed should cause RequestTracker::FirstIncomplete() to return
+ // Status::NotFound() again.
+ for (auto seq_no : generated_seq_nos) {
+ tracker_->RpcCompleted(seq_no);
+ }
+
+ ASSERT_EQ(tracker_->FirstIncomplete(), RequestTracker::kNoSeqNo);
+}
+
+} // namespace rpc
+} // namespace kudu
+
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/request_tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/request_tracker.cc b/be/src/kudu/rpc/request_tracker.cc
new file mode 100644
index 0000000..07806f8
--- /dev/null
+++ b/be/src/kudu/rpc/request_tracker.cc
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/request_tracker.h"
+
+#include <mutex>
+#include <string>
+#include <utility>
+
+#include "kudu/gutil/map-util.h"
+
+namespace kudu {
+namespace rpc {
+
+const RequestTracker::SequenceNumber RequestTracker::kNoSeqNo = -1;
+
+RequestTracker::RequestTracker(std::string client_id)
+ : client_id_(std::move(client_id)),
+ next_(0) {}
+
+Status RequestTracker::NewSeqNo(SequenceNumber* seq_no) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ *seq_no = next_;
+ InsertOrDie(&incomplete_rpcs_, *seq_no);
+ next_++;
+ return Status::OK();
+}
+
+RequestTracker::SequenceNumber RequestTracker::FirstIncomplete() {
+ std::lock_guard<simple_spinlock> l(lock_);
+ if (incomplete_rpcs_.empty()) return kNoSeqNo;
+ return *incomplete_rpcs_.begin();
+}
+
+void RequestTracker::RpcCompleted(const SequenceNumber& seq_no) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ incomplete_rpcs_.erase(seq_no);
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/request_tracker.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/request_tracker.h b/be/src/kudu/rpc/request_tracker.h
new file mode 100644
index 0000000..5cc3995
--- /dev/null
+++ b/be/src/kudu/rpc/request_tracker.h
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstdint>
+#include <set>
+#include <string>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/locks.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+// RequestTracker implementation, inspired by:
+// "Implementing Linearizability at Large Scale and Low Latency" by Colin Lee et al.
+//
+// This generates sequence numbers for retriable RPCs and tracks the ongoing ones.
+// The main point of this is to enable exactly-once semantics, i.e. making sure that
+// an RPC is only executed once, by uniquely identifying each RPC that is sent to
+// the server.
+//
+// Note that the sequence numbers here are differet from RPC 'call ids'. A call id
+// uniquely identifies a call _to a server_. All calls have a call id that is
+// assigned incrementally. Sequence numbers, on the other hand, uniquely identify
+// the RPC operation itself. That is, if an RPC is retried on another server it will
+// have a different call id, but the same sequence number.
+//
+// By keeping track of the RPCs that are in-flight and which ones are completed
+// we can determine the first incomplete RPC. When this information is sent
+// to the server it can use it to garbage collect RPC results that it might be
+// saving for future retries, since it now knows there won't be any.
+//
+// This class is thread safe.
+class RequestTracker : public RefCountedThreadSafe<RequestTracker> {
+ public:
+ typedef int64_t SequenceNumber;
+ static const RequestTracker::SequenceNumber kNoSeqNo;
+ explicit RequestTracker(std::string client_id);
+
+ // Creates a new, unique, sequence number.
+ // Sequence numbers are assigned in increasing integer order.
+ // Returns Status::OK() and sets 'seq_no' if it was able to generate a sequence number
+ // or returns Status::ServiceUnavailable() if too many RPCs are in-flight, in which case
+ // the caller should try again later.
+ Status NewSeqNo(SequenceNumber* seq_no);
+
+ // Returns the sequence number of the first incomplete RPC.
+ // If there is no incomplete RPC returns kNoSeqNo.
+ SequenceNumber FirstIncomplete();
+
+ // Marks the rpc with 'seq_no' as completed.
+ void RpcCompleted(const SequenceNumber& seq_no);
+
+ // Returns the client id for this request tracker.
+ const std::string& client_id() { return client_id_; }
+ private:
+ // The client id for this request tracker.
+ const std::string client_id_;
+
+ // Lock that protects all non-const fields.
+ simple_spinlock lock_;
+
+ // The next sequence number.
+ SequenceNumber next_;
+
+ // The (ordered) set of incomplete RPCs.
+ std::set<SequenceNumber> incomplete_rpcs_;
+};
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/response_callback.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/response_callback.h b/be/src/kudu/rpc/response_callback.h
new file mode 100644
index 0000000..8c4fc03
--- /dev/null
+++ b/be/src/kudu/rpc/response_callback.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_RESPONSE_CALLBACK_H
+#define KUDU_RPC_RESPONSE_CALLBACK_H
+
+#include <boost/function.hpp>
+
+namespace kudu {
+namespace rpc {
+
+typedef boost::function<void()> ResponseCallback;
+
+}
+}
+
+#endif