You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:57 UTC

[45/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/client_negotiation.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/client_negotiation.h b/be/src/kudu/rpc/client_negotiation.h
new file mode 100644
index 0000000..06fb2b8
--- /dev/null
+++ b/be/src/kudu/rpc/client_negotiation.h
@@ -0,0 +1,263 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdlib>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <boost/optional/optional.hpp>
+#include <glog/logging.h>
+#include <sasl/sasl.h>
+
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/negotiation.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/security/security_flags.h"
+#include "kudu/security/tls_handshake.h"
+#include "kudu/security/token.pb.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Slice;
+class faststring;
+
+namespace security {
+class TlsContext;
+}
+
+namespace rpc {
+
+// Class for doing KRPC negotiation with a remote server over a bidirectional socket.
+// Operations on this class are NOT thread-safe.
+class ClientNegotiation {
+ public:
+  // Creates a new client negotiation instance, taking ownership of the
+  // provided socket. After completing the negotiation process by setting the
+  // desired options and calling Negotiate(), the socket can be retrieved with
+  // 'release_socket'.
+  //
+  // The provided TlsContext must outlive this negotiation instance.
+  ClientNegotiation(std::unique_ptr<Socket> socket,
+                    const security::TlsContext* tls_context,
+                    boost::optional<security::SignedTokenPB> authn_token,
+                    RpcEncryption encryption,
+                    std::string sasl_proto_name);
+
+  // Enable PLAIN authentication.
+  // Must be called before Negotiate().
+  Status EnablePlain(const std::string& user,
+                     const std::string& pass);
+
+  // Enable GSSAPI authentication.
+  // Must be called before Negotiate().
+  Status EnableGSSAPI();
+
+  // Returns mechanism negotiated by this connection.
+  // Must be called after Negotiate().
+  SaslMechanism::Type negotiated_mechanism() const;
+
+  // Returns the negotiated authentication type for the connection.
+  // Must be called after Negotiate().
+  AuthenticationType negotiated_authn() const {
+    DCHECK_NE(negotiated_authn_, AuthenticationType::INVALID);
+    return negotiated_authn_;
+  }
+
+  // Returns true if TLS was negotiated.
+  // Must be called after Negotiate().
+  bool tls_negotiated() const {
+    return tls_negotiated_;
+  }
+
+  // Returns the set of RPC system features supported by the remote server.
+  // Must be called before Negotiate().
+  std::set<RpcFeatureFlag> server_features() const {
+    return server_features_;
+  }
+
+  // Returns the set of RPC system features supported by the remote server.
+  // Must be called after Negotiate().
+  // Subsequent calls to this method or server_features() will return an empty set.
+  std::set<RpcFeatureFlag> take_server_features() {
+    return std::move(server_features_);
+  }
+
+  // Specify the fully-qualified domain name of the remote server.
+  // Must be called before Negotiate(). Required for some mechanisms.
+  void set_server_fqdn(const std::string& domain_name);
+
+  // Set deadline for connection negotiation.
+  void set_deadline(const MonoTime& deadline);
+
+  Socket* socket() { return socket_.get(); }
+
+  // Takes and returns the socket owned by this client negotiation. The caller
+  // will own the socket after this call, and the negotiation instance should no
+  // longer be used. Must be called after Negotiate(). Subsequent calls to this
+  // method or socket() will return a null pointer.
+  std::unique_ptr<Socket> release_socket() { return std::move(socket_); }
+
+  // Negotiate with the remote server. Should only be called once per
+  // ClientNegotiation and socket instance, after all options have been set.
+  //
+  // Returns OK on success, otherwise may return NotAuthorized, NotSupported, or
+  // another non-OK status.
+  Status Negotiate(std::unique_ptr<ErrorStatusPB>* rpc_error = nullptr);
+
+  // SASL callback for plugin options, supported mechanisms, etc.
+  // Returns SASL_FAIL if the option is not handled, which does not fail the handshake.
+  int GetOptionCb(const char* plugin_name, const char* option,
+                  const char** result, unsigned* len);
+
+  // SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
+  int SimpleCb(int id, const char** result, unsigned* len);
+
+  // SASL callback for SASL_CB_PASS
+  int SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret);
+
+  // Check that GSSAPI/Kerberos credentials are available.
+  static Status CheckGSSAPI() WARN_UNUSED_RESULT;
+
+ private:
+
+  // Encode and send the specified negotiate request message to the server.
+  Status SendNegotiatePB(const NegotiatePB& msg) WARN_UNUSED_RESULT;
+
+  // Receive a negotiate response message from the server, deserializing it into 'msg'.
+  // Validates that the response is not an error.
+  Status RecvNegotiatePB(NegotiatePB* msg,
+                         faststring* buffer,
+                         std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
+  // Parse error status message from raw bytes of an ErrorStatusPB.
+  Status ParseError(const Slice& err_data,
+                    std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
+  Status SendConnectionHeader() WARN_UNUSED_RESULT;
+
+  // Initialize the SASL client negotiation instance.
+  Status InitSaslClient() WARN_UNUSED_RESULT;
+
+  // Send a NEGOTIATE step message to the server.
+  Status SendNegotiate() WARN_UNUSED_RESULT;
+
+  // Handle NEGOTIATE step response from the server.
+  Status HandleNegotiate(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+  // Send a TLS_HANDSHAKE request message to the server with the provided token.
+  Status SendTlsHandshake(std::string tls_token) WARN_UNUSED_RESULT;
+
+  // Handle a TLS_HANDSHAKE response message from the server.
+  Status HandleTlsHandshake(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+  // Authenticate to the server using SASL.
+  // 'recv_buf' allows a receive buffer to be reused.
+  Status AuthenticateBySasl(faststring* recv_buf,
+                            std::unique_ptr<ErrorStatusPB>* rpc_error) WARN_UNUSED_RESULT;
+
+  // Authenticate to the server using a token.
+  // 'recv_buf' allows a receive buffer to be reused.
+  Status AuthenticateByToken(faststring* recv_buf,
+                             std::unique_ptr<ErrorStatusPB> *rpc_error) WARN_UNUSED_RESULT;
+
+  // Send an SASL_INITIATE message to the server.
+  // Returns:
+  //  Status::OK if the SASL_SUCCESS message is expected next.
+  //  Status::Incomplete if the SASL_CHALLENGE message is expected next.
+  //  Any other status indicates an error.
+  Status SendSaslInitiate() WARN_UNUSED_RESULT;
+
+  // Send a SASL_RESPONSE message to the server.
+  Status SendSaslResponse(const char* resp_msg, unsigned resp_msg_len) WARN_UNUSED_RESULT;
+
+  // Handle case when server sends SASL_CHALLENGE response.
+  // Returns:
+  //  Status::OK if a SASL_SUCCESS message is expected next.
+  //  Status::Incomplete if another SASL_CHALLENGE message is expected.
+  //  Any other status indicates an error.
+  Status HandleSaslChallenge(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+  // Handle case when server sends SASL_SUCCESS response.
+  Status HandleSaslSuccess(const NegotiatePB& response) WARN_UNUSED_RESULT;
+
+  // Perform a client-side step of the SASL negotiation.
+  // Input is what came from the server. Output is what we will send back to the server.
+  // Returns:
+  //   Status::OK if sasl_client_step returns SASL_OK.
+  //   Status::Incomplete if sasl_client_step returns SASL_CONTINUE
+  // otherwise returns an appropriate error status.
+  Status DoSaslStep(const std::string& in, const char** out, unsigned* out_len) WARN_UNUSED_RESULT;
+
+  Status SendConnectionContext() WARN_UNUSED_RESULT;
+
+  // The socket to the remote server.
+  std::unique_ptr<Socket> socket_;
+
+  // SASL state.
+  std::vector<sasl_callback_t> callbacks_;
+  std::unique_ptr<sasl_conn_t, SaslDeleter> sasl_conn_;
+  SaslHelper helper_;
+  boost::optional<std::string> nonce_;
+
+  // TLS state.
+  const security::TlsContext* tls_context_;
+  security::TlsHandshake tls_handshake_;
+  const RpcEncryption encryption_;
+  bool tls_negotiated_;
+
+  // TSK state.
+  boost::optional<security::SignedTokenPB> authn_token_;
+
+  // Authentication state.
+  std::string plain_auth_user_;
+  std::string plain_pass_;
+  std::unique_ptr<sasl_secret_t, decltype(std::free)*> psecret_;
+
+  // The set of features advertised by the client. Filled in when we send
+  // the first message. This is not necessarily constant since some features
+  // may be dynamically enabled.
+  std::set<RpcFeatureFlag> client_features_;
+
+  // The set of features supported by the server. Filled in during negotiation.
+  std::set<RpcFeatureFlag> server_features_;
+
+  // The authentication type. Filled in during negotiation.
+  AuthenticationType negotiated_authn_;
+
+  // The SASL mechanism used by the connection. Filled in during negotiation.
+  SaslMechanism::Type negotiated_mech_;
+
+  // The SASL protocol name that is used for the SASL negotiation.
+  const std::string sasl_proto_name_;
+
+  // Negotiation timeout deadline.
+  MonoTime deadline_;
+};
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection.cc b/be/src/kudu/rpc/connection.cc
new file mode 100644
index 0000000..1632dd3
--- /dev/null
+++ b/be/src/kudu/rpc/connection.cc
@@ -0,0 +1,767 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/connection.h"
+
+#include <algorithm>
+#include <cerrno>
+#include <iostream>
+#include <memory>
+#include <set>
+#include <string>
+#include <type_traits>
+
+#include <boost/intrusive/detail/list_iterator.hpp>
+#include <boost/intrusive/list.hpp>
+#include <ev.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/map-util.h"
+#include "kudu/util/slice.h"
+#include "kudu/gutil/strings/human_readable.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/reactor.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+using std::includes;
+using std::set;
+using std::shared_ptr;
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+typedef OutboundCall::Phase Phase;
+
+///
+/// Connection
+///
+Connection::Connection(ReactorThread *reactor_thread,
+                       Sockaddr remote,
+                       unique_ptr<Socket> socket,
+                       Direction direction,
+                       CredentialsPolicy policy)
+    : reactor_thread_(reactor_thread),
+      remote_(remote),
+      socket_(std::move(socket)),
+      direction_(direction),
+      last_activity_time_(MonoTime::Now()),
+      is_epoll_registered_(false),
+      next_call_id_(1),
+      credentials_policy_(policy),
+      negotiation_complete_(false),
+      is_confidential_(false),
+      scheduled_for_shutdown_(false) {
+}
+
+Status Connection::SetNonBlocking(bool enabled) {
+  return socket_->SetNonBlocking(enabled);
+}
+
+void Connection::EpollRegister(ev::loop_ref& loop) {
+  DCHECK(reactor_thread_->IsCurrentThread());
+  DVLOG(4) << "Registering connection for epoll: " << ToString();
+  write_io_.set(loop);
+  write_io_.set(socket_->GetFd(), ev::WRITE);
+  write_io_.set<Connection, &Connection::WriteHandler>(this);
+  if (direction_ == CLIENT && negotiation_complete_) {
+    write_io_.start();
+  }
+  read_io_.set(loop);
+  read_io_.set(socket_->GetFd(), ev::READ);
+  read_io_.set<Connection, &Connection::ReadHandler>(this);
+  read_io_.start();
+  is_epoll_registered_ = true;
+}
+
+Connection::~Connection() {
+  // Must clear the outbound_transfers_ list before deleting.
+  CHECK(outbound_transfers_.begin() == outbound_transfers_.end());
+
+  // It's crucial that the connection is Shutdown first -- otherwise
+  // our destructor will end up calling read_io_.stop() and write_io_.stop()
+  // from a possibly non-reactor thread context. This can then make all
+  // hell break loose with libev.
+  CHECK(!is_epoll_registered_);
+}
+
+bool Connection::Idle() const {
+  DCHECK(reactor_thread_->IsCurrentThread());
+  // check if we're in the middle of receiving something
+  InboundTransfer *transfer = inbound_.get();
+  if (transfer && (transfer->TransferStarted())) {
+    return false;
+  }
+  // check if we still need to send something
+  if (!outbound_transfers_.empty()) {
+    return false;
+  }
+  // can't kill a connection if calls are waiting response
+  if (!awaiting_response_.empty()) {
+    return false;
+  }
+
+  if (!calls_being_handled_.empty()) {
+    return false;
+  }
+
+  // We are not idle if we are in the middle of connection negotiation.
+  if (!negotiation_complete_) {
+    return false;
+  }
+
+  return true;
+}
+
+void Connection::Shutdown(const Status &status,
+                          unique_ptr<ErrorStatusPB> rpc_error) {
+  DCHECK(reactor_thread_->IsCurrentThread());
+  shutdown_status_ = status.CloneAndPrepend("RPC connection failed");
+
+  if (inbound_ && inbound_->TransferStarted()) {
+    double secs_since_active =
+        (reactor_thread_->cur_time() - last_activity_time_).ToSeconds();
+    LOG(WARNING) << "Shutting down " << ToString()
+                 << " with pending inbound data ("
+                 << inbound_->StatusAsString() << ", last active "
+                 << HumanReadableElapsedTime::ToShortString(secs_since_active)
+                 << " ago, status=" << status.ToString() << ")";
+  }
+
+  // Clear any calls which have been sent and were awaiting a response.
+  for (const car_map_t::value_type &v : awaiting_response_) {
+    CallAwaitingResponse *c = v.second;
+    if (c->call) {
+      // Make sure every awaiting call receives the error info, if any.
+      unique_ptr<ErrorStatusPB> error;
+      if (rpc_error) {
+        error.reset(new ErrorStatusPB(*rpc_error));
+      }
+      c->call->SetFailed(status,
+                         negotiation_complete_ ? Phase::REMOTE_CALL
+                                               : Phase::CONNECTION_NEGOTIATION,
+                         std::move(error));
+    }
+    // And we must return the CallAwaitingResponse to the pool
+    car_pool_.Destroy(c);
+  }
+  awaiting_response_.clear();
+
+  // Clear any outbound transfers.
+  while (!outbound_transfers_.empty()) {
+    OutboundTransfer *t = &outbound_transfers_.front();
+    outbound_transfers_.pop_front();
+    delete t;
+  }
+
+  read_io_.stop();
+  write_io_.stop();
+  is_epoll_registered_ = false;
+  if (socket_) {
+    WARN_NOT_OK(socket_->Close(), "Error closing socket");
+  }
+}
+
+void Connection::QueueOutbound(gscoped_ptr<OutboundTransfer> transfer) {
+  DCHECK(reactor_thread_->IsCurrentThread());
+
+  if (!shutdown_status_.ok()) {
+    // If we've already shut down, then we just need to abort the
+    // transfer rather than bothering to queue it.
+    transfer->Abort(shutdown_status_);
+    return;
+  }
+
+  DVLOG(3) << "Queueing transfer: " << transfer->HexDump();
+
+  outbound_transfers_.push_back(*transfer.release());
+
+  if (negotiation_complete_ && !write_io_.is_active()) {
+    // If we weren't currently in the middle of sending anything,
+    // then our write_io_ interest is stopped. Need to re-start it.
+    // Only do this after connection negotiation is done doing its work.
+    write_io_.start();
+  }
+}
+
+Connection::CallAwaitingResponse::~CallAwaitingResponse() {
+  DCHECK(conn->reactor_thread_->IsCurrentThread());
+}
+
+void Connection::CallAwaitingResponse::HandleTimeout(ev::timer &watcher, int revents) {
+  if (remaining_timeout > 0) {
+    if (watcher.remaining() < -1.0) {
+      LOG(WARNING) << "RPC call timeout handler was delayed by "
+                   << -watcher.remaining() << "s! This may be due to a process-wide "
+                   << "pause such as swapping, logging-related delays, or allocator lock "
+                   << "contention. Will allow an additional "
+                   << remaining_timeout << "s for a response.";
+    }
+
+    watcher.set(remaining_timeout, 0);
+    watcher.start();
+    remaining_timeout = 0;
+    return;
+  }
+
+  conn->HandleOutboundCallTimeout(this);
+}
+
+void Connection::HandleOutboundCallTimeout(CallAwaitingResponse *car) {
+  DCHECK(reactor_thread_->IsCurrentThread());
+  DCHECK(car->call);
+  // The timeout timer is stopped by the car destructor exiting Connection::HandleCallResponse()
+  DCHECK(!car->call->IsFinished());
+
+  // Mark the call object as failed.
+  car->call->SetTimedOut(negotiation_complete_ ? Phase::REMOTE_CALL
+                                               : Phase::CONNECTION_NEGOTIATION);
+
+  // Test cancellation when 'car->call' is in 'TIMED_OUT' state
+  MaybeInjectCancellation(car->call);
+
+  // Drop the reference to the call. If the original caller has moved on after
+  // seeing the timeout, we no longer need to hold onto the allocated memory
+  // from the request.
+  car->call.reset();
+
+  // We still leave the CallAwaitingResponse in the map -- this is because we may still
+  // receive a response from the server, and we don't want a spurious log message
+  // when we do finally receive the response. The fact that CallAwaitingResponse::call
+  // is a NULL pointer indicates to the response processing code that the call
+  // already timed out.
+}
+
+void Connection::CancelOutboundCall(const shared_ptr<OutboundCall> &call) {
+  CallAwaitingResponse* car = FindPtrOrNull(awaiting_response_, call->call_id());
+  if (car != nullptr) {
+    // car->call may be NULL if the call has timed out already.
+    DCHECK(!car->call || car->call.get() == call.get());
+    car->call.reset();
+  }
+}
+
+// Inject a cancellation when 'call' is in state 'FLAGS_rpc_inject_cancellation_state'.
+void inline Connection::MaybeInjectCancellation(const shared_ptr<OutboundCall> &call) {
+  if (PREDICT_FALSE(call->ShouldInjectCancellation())) {
+    reactor_thread_->reactor()->messenger()->QueueCancellation(call);
+  }
+}
+
+// Callbacks after sending a call on the wire.
+// This notifies the OutboundCall object to change its state to SENT once it
+// has been fully transmitted.
+struct CallTransferCallbacks : public TransferCallbacks {
+ public:
+  explicit CallTransferCallbacks(shared_ptr<OutboundCall> call,
+                                 Connection *conn)
+      : call_(std::move(call)), conn_(conn) {}
+
+  virtual void NotifyTransferFinished() OVERRIDE {
+    // TODO: would be better to cancel the transfer while it is still on the queue if we
+    // timed out before the transfer started, but there is still a race in the case of
+    // a partial send that we have to handle here
+    if (call_->IsFinished()) {
+      DCHECK(call_->IsTimedOut() || call_->IsCancelled());
+    } else {
+      call_->SetSent();
+      // Test cancellation when 'call_' is in 'SENT' state.
+      conn_->MaybeInjectCancellation(call_);
+    }
+    delete this;
+  }
+
+  virtual void NotifyTransferAborted(const Status &status) OVERRIDE {
+    VLOG(1) << "Transfer of RPC call " << call_->ToString() << " aborted: "
+            << status.ToString();
+    delete this;
+  }
+
+ private:
+  shared_ptr<OutboundCall> call_;
+  Connection* conn_;
+};
+
+void Connection::QueueOutboundCall(shared_ptr<OutboundCall> call) {
+  DCHECK(call);
+  DCHECK_EQ(direction_, CLIENT);
+  DCHECK(reactor_thread_->IsCurrentThread());
+
+  if (PREDICT_FALSE(!shutdown_status_.ok())) {
+    // Already shutdown
+    call->SetFailed(shutdown_status_,
+                    negotiation_complete_ ? Phase::REMOTE_CALL
+                                          : Phase::CONNECTION_NEGOTIATION);
+    return;
+  }
+
+  // At this point the call has a serialized request, but no call header, since we haven't
+  // yet assigned a call ID.
+  DCHECK(!call->call_id_assigned());
+
+  // We shouldn't reach this point if 'call' was requested to be cancelled.
+  DCHECK(!call->cancellation_requested());
+
+  // Assign the call ID.
+  int32_t call_id = GetNextCallId();
+  call->set_call_id(call_id);
+
+  // Serialize the actual bytes to be put on the wire.
+  TransferPayload tmp_slices;
+  size_t n_slices = call->SerializeTo(&tmp_slices);
+
+  call->SetQueued();
+
+  // Test cancellation when 'call_' is in 'ON_OUTBOUND_QUEUE' state.
+  MaybeInjectCancellation(call);
+
+  scoped_car car(car_pool_.make_scoped_ptr(car_pool_.Construct()));
+  car->conn = this;
+  car->call = call;
+
+  // Set up the timeout timer.
+  const MonoDelta &timeout = call->controller()->timeout();
+  if (timeout.Initialized()) {
+    reactor_thread_->RegisterTimeout(&car->timeout_timer);
+    car->timeout_timer.set<CallAwaitingResponse, // NOLINT(*)
+                           &CallAwaitingResponse::HandleTimeout>(car.get());
+
+    // For calls with a timeout of at least 500ms, we actually run the timeout
+    // handler in two stages. The first timeout fires with a timeout 10% less
+    // than the user-specified one. It then schedules a second timeout for the
+    // remaining amount of time.
+    //
+    // The purpose of this two-stage timeout is to be more robust when the client
+    // has some process-wide pause, such as lock contention in tcmalloc, or a
+    // reactor callback that blocks in glog. Consider the following case:
+    //
+    // T = 0s        user issues an RPC with 5 second timeout
+    // T = 0.5s - 6s   process is blocked
+    // T = 6s        process unblocks, and the timeout fires (1s late)
+    //
+    // Without the two-stage timeout, we would determine that the call had timed out,
+    // even though it's likely that the response is waiting on our TCP socket.
+    // With the two-stage timeout, we'll end up with:
+    //
+    // T = 0s           user issues an RPC with 5 second timeout
+    // T = 0.5s - 6s    process is blocked
+    // T = 6s           process unblocks, and the first-stage timeout fires (1.5s late)
+    // T = 6s - 6.200s  time for the client to read the response which is waiting
+    // T = 6.200s       if the response was not actually available, we'll time out here
+    //
+    // We don't bother with this logic for calls with very short timeouts - assumedly
+    // a user setting such a short RPC timeout is well equipped to handle one.
+    double time = timeout.ToSeconds();
+    if (time >= 0.5) {
+      car->remaining_timeout = time * 0.1;
+      time -= car->remaining_timeout;
+    } else {
+      car->remaining_timeout = 0;
+    }
+
+    car->timeout_timer.set(time, 0);
+    car->timeout_timer.start();
+  }
+
+  TransferCallbacks *cb = new CallTransferCallbacks(std::move(call), this);
+  awaiting_response_[call_id] = car.release();
+  QueueOutbound(gscoped_ptr<OutboundTransfer>(
+      OutboundTransfer::CreateForCallRequest(call_id, tmp_slices, n_slices, cb)));
+}
+
+// Callbacks for sending an RPC call response from the server.
+// This takes ownership of the InboundCall object so that, once it has
+// been responded to, we can free up all of the associated memory.
+struct ResponseTransferCallbacks : public TransferCallbacks {
+ public:
+  ResponseTransferCallbacks(gscoped_ptr<InboundCall> call,
+                            Connection *conn) :
+    call_(std::move(call)),
+    conn_(conn)
+  {}
+
+  ~ResponseTransferCallbacks() {
+    // Remove the call from the map.
+    InboundCall *call_from_map = EraseKeyReturnValuePtr(
+      &conn_->calls_being_handled_, call_->call_id());
+    DCHECK_EQ(call_from_map, call_.get());
+  }
+
+  virtual void NotifyTransferFinished() OVERRIDE {
+    delete this;
+  }
+
+  virtual void NotifyTransferAborted(const Status &status) OVERRIDE {
+    LOG(WARNING) << "Connection torn down before " <<
+      call_->ToString() << " could send its response";
+    delete this;
+  }
+
+ private:
+  gscoped_ptr<InboundCall> call_;
+  Connection *conn_;
+};
+
+// Reactor task which puts a transfer on the outbound transfer queue.
+class QueueTransferTask : public ReactorTask {
+ public:
+  QueueTransferTask(gscoped_ptr<OutboundTransfer> transfer,
+                    Connection *conn)
+    : transfer_(std::move(transfer)),
+      conn_(conn)
+  {}
+
+  virtual void Run(ReactorThread *thr) OVERRIDE {
+    conn_->QueueOutbound(std::move(transfer_));
+    delete this;
+  }
+
+  virtual void Abort(const Status &status) OVERRIDE {
+    transfer_->Abort(status);
+    delete this;
+  }
+
+ private:
+  gscoped_ptr<OutboundTransfer> transfer_;
+  Connection *conn_;
+};
+
+void Connection::QueueResponseForCall(gscoped_ptr<InboundCall> call) {
+  // This is usually called by the IPC worker thread when the response
+  // is set, but in some circumstances may also be called by the
+  // reactor thread (e.g. if the service has shut down)
+
+  DCHECK_EQ(direction_, SERVER);
+
+  // If the connection is torn down, then the QueueOutbound() call that
+  // eventually runs in the reactor thread will take care of calling
+  // ResponseTransferCallbacks::NotifyTransferAborted.
+
+  TransferPayload tmp_slices;
+  size_t n_slices = call->SerializeResponseTo(&tmp_slices);
+
+  TransferCallbacks *cb = new ResponseTransferCallbacks(std::move(call), this);
+  // After the response is sent, can delete the InboundCall object.
+  // We set a dummy call ID and required feature set, since these are not needed
+  // when sending responses.
+  gscoped_ptr<OutboundTransfer> t(
+      OutboundTransfer::CreateForCallResponse(tmp_slices, n_slices, cb));
+
+  QueueTransferTask *task = new QueueTransferTask(std::move(t), this);
+  reactor_thread_->reactor()->ScheduleReactorTask(task);
+}
+
+void Connection::set_confidential(bool is_confidential) {
+  is_confidential_ = is_confidential;
+}
+
+bool Connection::SatisfiesCredentialsPolicy(CredentialsPolicy policy) const {
+  DCHECK_EQ(direction_, CLIENT);
+  return (policy == CredentialsPolicy::ANY_CREDENTIALS) ||
+      (policy == credentials_policy_);
+}
+
+RpczStore* Connection::rpcz_store() {
+  return reactor_thread_->reactor()->messenger()->rpcz_store();
+}
+
+void Connection::ReadHandler(ev::io &watcher, int revents) {
+  DCHECK(reactor_thread_->IsCurrentThread());
+
+  DVLOG(3) << ToString() << " ReadHandler(revents=" << revents << ")";
+  if (revents & EV_ERROR) {
+    reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() +
+                                     ": ReadHandler encountered an error"));
+    return;
+  }
+  last_activity_time_ = reactor_thread_->cur_time();
+
+  while (true) {
+    if (!inbound_) {
+      inbound_.reset(new InboundTransfer());
+    }
+    Status status = inbound_->ReceiveBuffer(*socket_);
+    if (PREDICT_FALSE(!status.ok())) {
+      if (status.posix_code() == ESHUTDOWN) {
+        VLOG(1) << ToString() << " shut down by remote end.";
+      } else {
+        LOG(WARNING) << ToString() << " recv error: " << status.ToString();
+      }
+      reactor_thread_->DestroyConnection(this, status);
+      return;
+    }
+    if (!inbound_->TransferFinished()) {
+      DVLOG(3) << ToString() << ": read is not yet finished yet.";
+      return;
+    }
+    DVLOG(3) << ToString() << ": finished reading " << inbound_->data().size() << " bytes";
+
+    if (direction_ == CLIENT) {
+      HandleCallResponse(std::move(inbound_));
+    } else if (direction_ == SERVER) {
+      HandleIncomingCall(std::move(inbound_));
+    } else {
+      LOG(FATAL) << "Invalid direction: " << direction_;
+    }
+
+    // TODO: it would seem that it would be good to loop around and see if
+    // there is more data on the socket by trying another recv(), but it turns
+    // out that it really hurts throughput to do so. A better approach
+    // might be for each InboundTransfer to actually try to read an extra byte,
+    // and if it succeeds, then we'd copy that byte into a new InboundTransfer
+    // and loop around, since it's likely the next call also arrived at the
+    // same time.
+    break;
+  }
+}
+
+void Connection::HandleIncomingCall(gscoped_ptr<InboundTransfer> transfer) {
+  DCHECK(reactor_thread_->IsCurrentThread());
+
+  gscoped_ptr<InboundCall> call(new InboundCall(this));
+  Status s = call->ParseFrom(std::move(transfer));
+  if (!s.ok()) {
+    LOG(WARNING) << ToString() << ": received bad data: " << s.ToString();
+    // TODO: shutdown? probably, since any future stuff on this socket will be
+    // "unsynchronized"
+    return;
+  }
+
+  if (!InsertIfNotPresent(&calls_being_handled_, call->call_id(), call.get())) {
+    LOG(WARNING) << ToString() << ": received call ID " << call->call_id() <<
+      " but was already processing this ID! Ignoring";
+    reactor_thread_->DestroyConnection(
+      this, Status::RuntimeError("Received duplicate call id",
+                                 Substitute("$0", call->call_id())));
+    return;
+  }
+
+  reactor_thread_->reactor()->messenger()->QueueInboundCall(std::move(call));
+}
+
+void Connection::HandleCallResponse(gscoped_ptr<InboundTransfer> transfer) {
+  DCHECK(reactor_thread_->IsCurrentThread());
+  gscoped_ptr<CallResponse> resp(new CallResponse);
+  CHECK_OK(resp->ParseFrom(std::move(transfer)));
+
+  CallAwaitingResponse *car_ptr =
+    EraseKeyReturnValuePtr(&awaiting_response_, resp->call_id());
+  if (PREDICT_FALSE(car_ptr == nullptr)) {
+    LOG(WARNING) << ToString() << ": Got a response for call id " << resp->call_id() << " which "
+                 << "was not pending! Ignoring.";
+    return;
+  }
+
+  // The car->timeout_timer ev::timer will be stopped automatically by its destructor.
+  scoped_car car(car_pool_.make_scoped_ptr(car_ptr));
+
+  if (PREDICT_FALSE(!car->call)) {
+    // The call already failed due to a timeout.
+    VLOG(1) << "Got response to call id " << resp->call_id() << " after client "
+            << "already timed out or cancelled";
+    return;
+  }
+
+  car->call->SetResponse(std::move(resp));
+
+  // Test cancellation when 'car->call' is in 'FINISHED_SUCCESS' or 'FINISHED_ERROR' state.
+  MaybeInjectCancellation(car->call);
+}
+
+void Connection::WriteHandler(ev::io &watcher, int revents) {
+  DCHECK(reactor_thread_->IsCurrentThread());
+
+  if (revents & EV_ERROR) {
+    reactor_thread_->DestroyConnection(this, Status::NetworkError(ToString() +
+          ": writeHandler encountered an error"));
+    return;
+  }
+  DVLOG(3) << ToString() << ": writeHandler: revents = " << revents;
+
+  OutboundTransfer *transfer;
+  if (outbound_transfers_.empty()) {
+    LOG(WARNING) << ToString() << " got a ready-to-write callback, but there is "
+      "nothing to write.";
+    write_io_.stop();
+    return;
+  }
+
+  while (!outbound_transfers_.empty()) {
+    transfer = &(outbound_transfers_.front());
+
+    if (!transfer->TransferStarted()) {
+
+      if (transfer->is_for_outbound_call()) {
+        CallAwaitingResponse* car = FindOrDie(awaiting_response_, transfer->call_id());
+        if (!car->call) {
+          // If the call has already timed out or has already been cancelled, the 'call'
+          // field would be set to NULL. In that case, don't bother sending it.
+          outbound_transfers_.pop_front();
+          transfer->Abort(Status::Aborted("already timed out or cancelled"));
+          delete transfer;
+          continue;
+        }
+
+        // If this is the start of the transfer, then check if the server has the
+        // required RPC flags. We have to wait until just before the transfer in
+        // order to ensure that the negotiation has taken place, so that the flags
+        // are available.
+        const set<RpcFeatureFlag>& required_features = car->call->required_rpc_features();
+        if (!includes(remote_features_.begin(), remote_features_.end(),
+                      required_features.begin(), required_features.end())) {
+          outbound_transfers_.pop_front();
+          Status s = Status::NotSupported("server does not support the required RPC features");
+          transfer->Abort(s);
+          Phase phase = negotiation_complete_ ? Phase::REMOTE_CALL : Phase::CONNECTION_NEGOTIATION;
+          car->call->SetFailed(std::move(s), phase);
+          // Test cancellation when 'call_' is in 'FINISHED_ERROR' state.
+          MaybeInjectCancellation(car->call);
+          car->call.reset();
+          delete transfer;
+          continue;
+        }
+
+        car->call->SetSending();
+
+        // Test cancellation when 'call_' is in 'SENDING' state.
+        MaybeInjectCancellation(car->call);
+      }
+    }
+
+    last_activity_time_ = reactor_thread_->cur_time();
+    Status status = transfer->SendBuffer(*socket_);
+    if (PREDICT_FALSE(!status.ok())) {
+      LOG(WARNING) << ToString() << " send error: " << status.ToString();
+      reactor_thread_->DestroyConnection(this, status);
+      return;
+    }
+
+    if (!transfer->TransferFinished()) {
+      DVLOG(3) << ToString() << ": writeHandler: xfer not finished.";
+      return;
+    }
+
+    outbound_transfers_.pop_front();
+    delete transfer;
+  }
+
+  // If we were able to write all of our outbound transfers,
+  // we don't have any more to write.
+  write_io_.stop();
+}
+
+std::string Connection::ToString() const {
+  // This may be called from other threads, so we cannot
+  // include anything in the output about the current state,
+  // which might concurrently change from another thread.
+  return strings::Substitute(
+    "$0 $1",
+    direction_ == SERVER ? "server connection from" : "client connection to",
+    remote_.ToString());
+}
+
+// Reactor task that transitions this Connection from connection negotiation to
+// regular RPC handling. Destroys Connection on negotiation error.
+class NegotiationCompletedTask : public ReactorTask {
+ public:
+  NegotiationCompletedTask(Connection* conn,
+                           Status negotiation_status,
+                           std::unique_ptr<ErrorStatusPB> rpc_error)
+    : conn_(conn),
+      negotiation_status_(std::move(negotiation_status)),
+      rpc_error_(std::move(rpc_error)) {
+  }
+
+  virtual void Run(ReactorThread *rthread) OVERRIDE {
+    rthread->CompleteConnectionNegotiation(conn_,
+                                           negotiation_status_,
+                                           std::move(rpc_error_));
+    delete this;
+  }
+
+  virtual void Abort(const Status &status) OVERRIDE {
+    DCHECK(conn_->reactor_thread()->reactor()->closing());
+    VLOG(1) << "Failed connection negotiation due to shut down reactor thread: "
+            << status.ToString();
+    delete this;
+  }
+
+ private:
+  scoped_refptr<Connection> conn_;
+  const Status negotiation_status_;
+  std::unique_ptr<ErrorStatusPB> rpc_error_;
+};
+
+void Connection::CompleteNegotiation(Status negotiation_status,
+                                     unique_ptr<ErrorStatusPB> rpc_error) {
+  auto task = new NegotiationCompletedTask(
+      this, std::move(negotiation_status), std::move(rpc_error));
+  reactor_thread_->reactor()->ScheduleReactorTask(task);
+}
+
+void Connection::MarkNegotiationComplete() {
+  DCHECK(reactor_thread_->IsCurrentThread());
+  negotiation_complete_ = true;
+}
+
+Status Connection::DumpPB(const DumpRunningRpcsRequestPB& req,
+                          RpcConnectionPB* resp) {
+  DCHECK(reactor_thread_->IsCurrentThread());
+  resp->set_remote_ip(remote_.ToString());
+  if (negotiation_complete_) {
+    resp->set_state(RpcConnectionPB::OPEN);
+  } else {
+    resp->set_state(RpcConnectionPB::NEGOTIATING);
+  }
+
+  if (direction_ == CLIENT) {
+    for (const car_map_t::value_type& entry : awaiting_response_) {
+      CallAwaitingResponse *c = entry.second;
+      if (c->call) {
+        c->call->DumpPB(req, resp->add_calls_in_flight());
+      }
+    }
+
+    resp->set_outbound_queue_size(num_queued_outbound_transfers());
+  } else if (direction_ == SERVER) {
+    if (negotiation_complete_) {
+      // It's racy to dump credentials while negotiating, since the Connection
+      // object is owned by the negotiation thread at that point.
+      resp->set_remote_user_credentials(remote_user_.ToString());
+    }
+    for (const inbound_call_map_t::value_type& entry : calls_being_handled_) {
+      InboundCall* c = entry.second;
+      c->DumpPB(req, resp->add_calls_in_flight());
+    }
+  } else {
+    LOG(FATAL);
+  }
+  return Status::OK();
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection.h b/be/src/kudu/rpc/connection.h
new file mode 100644
index 0000000..362a35b
--- /dev/null
+++ b/be/src/kudu/rpc/connection.h
@@ -0,0 +1,391 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_CONNECTION_H
+#define KUDU_RPC_CONNECTION_H
+
+#include <cstddef>
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <set>
+#include <string>
+#include <unordered_map>
+#include <utility>
+
+#include <boost/intrusive/list.hpp>
+#include <boost/optional/optional.hpp>
+#include <ev++.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/connection_id.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/remote_user.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/object_pool.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+namespace rpc {
+
+class DumpRunningRpcsRequestPB;
+class InboundCall;
+class OutboundCall;
+class RpcConnectionPB;
+class ReactorThread;
+class RpczStore;
+enum class CredentialsPolicy;
+
+//
+// A connection between an endpoint and us.
+//
+// Inbound connections are created by AcceptorPools, which eventually schedule
+// RegisterConnection() to be called from the reactor thread.
+//
+// Outbound connections are created by the Reactor thread in order to service
+// outbound calls.
+//
+// Once a Connection is created, it can be used both for sending messages and
+// receiving them, but any given connection is explicitly a client or server.
+// If a pair of servers are making bidirectional RPCs, they will use two separate
+// TCP connections (and Connection objects).
+//
+// This class is not fully thread-safe.  It is accessed only from the context of a
+// single ReactorThread except where otherwise specified.
+//
+class Connection : public RefCountedThreadSafe<Connection> {
+ public:
+  enum Direction {
+    // This host is sending calls via this connection.
+    CLIENT,
+    // This host is receiving calls via this connection.
+    SERVER
+  };
+
+  // Create a new Connection.
+  // reactor_thread: the reactor that owns us.
+  // remote: the address of the remote end
+  // socket: the socket to take ownership of.
+  // direction: whether we are the client or server side
+  Connection(ReactorThread *reactor_thread,
+             Sockaddr remote,
+             std::unique_ptr<Socket> socket,
+             Direction direction,
+             CredentialsPolicy policy = CredentialsPolicy::ANY_CREDENTIALS);
+
+  // Set underlying socket to non-blocking (or blocking) mode.
+  Status SetNonBlocking(bool enabled);
+
+  // Register our socket with an epoll loop.  We will only ever be registered in
+  // one epoll loop at a time.
+  void EpollRegister(ev::loop_ref& loop);
+
+  ~Connection();
+
+  MonoTime last_activity_time() const {
+    return last_activity_time_;
+  }
+
+  // Returns true if we are not in the process of receiving or sending a
+  // message, and we have no outstanding calls.
+  bool Idle() const;
+
+  // Fail any calls which are currently queued or awaiting response.
+  // Prohibits any future calls (they will be failed immediately with this
+  // same Status).
+  void Shutdown(const Status& status,
+                std::unique_ptr<ErrorStatusPB> rpc_error = {});
+
+  // Queue a new call to be made. If the queueing fails, the call will be
+  // marked failed. The caller is expected to check if 'call' has been cancelled
+  // before making the call.
+  // Takes ownership of the 'call' object regardless of whether it succeeds or fails.
+  void QueueOutboundCall(std::shared_ptr<OutboundCall> call);
+
+  // Queue a call response back to the client on the server side.
+  //
+  // This may be called from a non-reactor thread.
+  void QueueResponseForCall(gscoped_ptr<InboundCall> call);
+
+  // Cancel an outbound call by removing any reference to it by CallAwaitingResponse
+  // in 'awaiting_responses_'.
+  void CancelOutboundCall(const std::shared_ptr<OutboundCall> &call);
+
+  // The address of the remote end of the connection.
+  const Sockaddr &remote() const { return remote_; }
+
+  // Set the user credentials for an outbound connection.
+  void set_outbound_connection_id(ConnectionId conn_id) {
+    DCHECK_EQ(direction_, CLIENT);
+    DCHECK(!outbound_connection_id_);
+    outbound_connection_id_ = std::move(conn_id);
+  }
+
+  // Get the user credentials which will be used to log in.
+  const ConnectionId& outbound_connection_id() const {
+    DCHECK_EQ(direction_, CLIENT);
+    DCHECK(outbound_connection_id_);
+    return *outbound_connection_id_;
+  }
+
+  bool is_confidential() const {
+    return is_confidential_;
+  }
+
+  // Set/unset the 'confidentiality' property for this connection.
+  void set_confidential(bool is_confidential);
+
+  // Credentials policy to start connection negotiation.
+  CredentialsPolicy credentials_policy() const { return credentials_policy_; }
+
+  // Whether the connection satisfies the specified credentials policy.
+  //
+  // NOTE: The policy is set prior to connection negotiation, and the actual
+  //       authentication credentials used for connection negotiation might
+  //       effectively make the connection to satisfy a stronger policy.
+  //       An example: the credentials policy for the connection was set to
+  //       ANY_CREDENTIALS, but since the authn token was not available
+  //       at the time of negotiation, the primary credentials were used, making
+  //       the connection de facto satisfying the PRIMARY_CREDENTIALS policy.
+  bool SatisfiesCredentialsPolicy(CredentialsPolicy policy) const;
+
+  RpczStore* rpcz_store();
+
+  // libev callback when data is available to read.
+  void ReadHandler(ev::io &watcher, int revents);
+
+  // libev callback when we may write to the socket.
+  void WriteHandler(ev::io &watcher, int revents);
+
+  // Safe to be called from other threads.
+  std::string ToString() const;
+
+  Direction direction() const { return direction_; }
+
+  Socket* socket() { return socket_.get(); }
+
+  // Go through the process of transferring control of the underlying socket back to the Reactor.
+  void CompleteNegotiation(Status negotiation_status,
+                           std::unique_ptr<ErrorStatusPB> rpc_error);
+
+  // Indicate that negotiation is complete and that the Reactor is now in control of the socket.
+  void MarkNegotiationComplete();
+
+  Status DumpPB(const DumpRunningRpcsRequestPB& req,
+                RpcConnectionPB* resp);
+
+  ReactorThread* reactor_thread() const { return reactor_thread_; }
+
+  std::unique_ptr<Socket> release_socket() {
+    return std::move(socket_);
+  }
+
+  void adopt_socket(std::unique_ptr<Socket> socket) {
+    socket_ = std::move(socket);
+  }
+
+  void set_remote_features(std::set<RpcFeatureFlag> remote_features) {
+    remote_features_ = std::move(remote_features);
+  }
+
+  void set_remote_user(RemoteUser user) {
+    DCHECK_EQ(direction_, SERVER);
+    remote_user_ = std::move(user);
+  }
+
+  const RemoteUser& remote_user() const {
+    DCHECK_EQ(direction_, SERVER);
+    return remote_user_;
+  }
+
+  // Whether the connection is scheduled for shutdown.
+  bool scheduled_for_shutdown() const {
+    DCHECK_EQ(direction_, CLIENT);
+    return scheduled_for_shutdown_;
+  }
+
+  // Mark the connection as scheduled to be shut down. Reactor does not dispatch
+  // new calls on such a connection.
+  void set_scheduled_for_shutdown() {
+    DCHECK_EQ(direction_, CLIENT);
+    scheduled_for_shutdown_ = true;
+  }
+
+  size_t num_queued_outbound_transfers() const {
+    return outbound_transfers_.size();
+  }
+
+ private:
+  friend struct CallAwaitingResponse;
+  friend class QueueTransferTask;
+  friend struct CallTransferCallbacks;
+  friend struct ResponseTransferCallbacks;
+
+  // A call which has been fully sent to the server, which we're waiting for
+  // the server to process. This is used on the client side only.
+  struct CallAwaitingResponse {
+    ~CallAwaitingResponse();
+
+    // Notification from libev that the call has timed out.
+    void HandleTimeout(ev::timer &watcher, int revents);
+
+    Connection *conn;
+    std::shared_ptr<OutboundCall> call;
+    ev::timer timeout_timer;
+
+    // We time out RPC calls in two stages. This is set to the amount of timeout
+    // remaining after the next timeout fires. See Connection::QueueOutboundCall().
+    double remaining_timeout;
+  };
+
+  typedef std::unordered_map<uint64_t, CallAwaitingResponse*> car_map_t;
+  typedef std::unordered_map<uint64_t, InboundCall*> inbound_call_map_t;
+
+  // Returns the next valid (positive) sequential call ID by incrementing a counter
+  // and ensuring we roll over from INT32_MAX to 0.
+  // Negative numbers are reserved for special purposes.
+  int32_t GetNextCallId() {
+    int32_t call_id = next_call_id_;
+    if (PREDICT_FALSE(next_call_id_ == std::numeric_limits<int32_t>::max())) {
+      next_call_id_ = 0;
+    } else {
+      next_call_id_++;
+    }
+    return call_id;
+  }
+
+  // An incoming packet has completed transferring on the server side.
+  // This parses the call and delivers it into the call queue.
+  void HandleIncomingCall(gscoped_ptr<InboundTransfer> transfer);
+
+  // An incoming packet has completed on the client side. This parses the
+  // call response, looks up the CallAwaitingResponse, and calls the
+  // client callback.
+  void HandleCallResponse(gscoped_ptr<InboundTransfer> transfer);
+
+  // The given CallAwaitingResponse has elapsed its user-defined timeout.
+  // Set it to Failed.
+  void HandleOutboundCallTimeout(CallAwaitingResponse *car);
+
+  // Queue a transfer for sending on this connection.
+  // We will take ownership of the transfer.
+  // This must be called from the reactor thread.
+  void QueueOutbound(gscoped_ptr<OutboundTransfer> transfer);
+
+  // Internal test function for injecting cancellation request when 'call'
+  // reaches state specified in 'FLAGS_rpc_inject_cancellation_state'.
+  void MaybeInjectCancellation(const std::shared_ptr<OutboundCall> &call);
+
+  // The reactor thread that created this connection.
+  ReactorThread* const reactor_thread_;
+
+  // The remote address we're talking to.
+  const Sockaddr remote_;
+
+  // The socket we're communicating on.
+  std::unique_ptr<Socket> socket_;
+
+  // The ConnectionId that serves as a key into the client connection map
+  // within this reactor. Only set in the case of outbound connections.
+  boost::optional<ConnectionId> outbound_connection_id_;
+
+  // The authenticated remote user (if this is an inbound connection on the server).
+  RemoteUser remote_user_;
+
+  // whether we are client or server
+  Direction direction_;
+
+  // The last time we read or wrote from the socket.
+  MonoTime last_activity_time_;
+
+  // the inbound transfer, if any
+  gscoped_ptr<InboundTransfer> inbound_;
+
+  // notifies us when our socket is writable.
+  ev::io write_io_;
+
+  // notifies us when our socket is readable.
+  ev::io read_io_;
+
+  // Set to true when the connection is registered on a loop.
+  // This is used for a sanity check in the destructor that we are properly
+  // un-registered before shutting down.
+  bool is_epoll_registered_;
+
+  // waiting to be sent
+  boost::intrusive::list<OutboundTransfer> outbound_transfers_; // NOLINT(*)
+
+  // Calls which have been sent and are now waiting for a response.
+  car_map_t awaiting_response_;
+
+  // Calls which have been received on the server and are currently
+  // being handled.
+  inbound_call_map_t calls_being_handled_;
+
+  // the next call ID to use
+  int32_t next_call_id_;
+
+  // Starts as Status::OK, gets set to a shutdown status upon Shutdown().
+  Status shutdown_status_;
+
+  // RPC features supported by the remote end of the connection.
+  std::set<RpcFeatureFlag> remote_features_;
+
+  // Pool from which CallAwaitingResponse objects are allocated.
+  // Also a funny name.
+  ObjectPool<CallAwaitingResponse> car_pool_;
+  typedef ObjectPool<CallAwaitingResponse>::scoped_ptr scoped_car;
+
+  // The credentials policy to use for connection negotiation. It defines which
+  // type of user credentials used to negotiate a connection. The actual type of
+  // credentials used for authentication during the negotiation process depends
+  // on the credentials availability, but the result credentials guaranteed to
+  // always satisfy the specified credentials policy. In other words, the actual
+  // type of credentials used for connection negotiation might effectively make
+  // the connection to satisfy a stronger/narrower policy.
+  //
+  // An example:
+  //   The credentials policy for the connection was set to ANY_CREDENTIALS,
+  //   but since no secondary credentials (such authn token) were available
+  //   at the time of negotiation, the primary credentials were used,making the
+  //   connection satisfying the PRIMARY_CREDENTIALS policy de facto.
+  const CredentialsPolicy credentials_policy_;
+
+  // Whether we completed connection negotiation.
+  bool negotiation_complete_;
+
+  // Whether it's OK to pass confidential information over the connection.
+  // For example, an encrypted (but not necessarily authenticated) connection
+  // is considered confidential.
+  bool is_confidential_;
+
+  // Whether the connection is scheduled for shutdown.
+  bool scheduled_for_shutdown_;
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/connection_id.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection_id.cc b/be/src/kudu/rpc/connection_id.cc
new file mode 100644
index 0000000..6720807
--- /dev/null
+++ b/be/src/kudu/rpc/connection_id.cc
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/connection_id.h"
+
+#include <cstddef>
+#include <utility>
+
+#include <boost/functional/hash/hash.hpp>
+#include <glog/logging.h>
+
+#include "kudu/gutil/strings/substitute.h"
+
+using std::string;
+
+namespace kudu {
+namespace rpc {
+
+ConnectionId::ConnectionId() {}
+
+ConnectionId::ConnectionId(const Sockaddr& remote,
+                           std::string hostname,
+                           UserCredentials user_credentials)
+    : remote_(remote),
+      hostname_(std::move(hostname)),
+      user_credentials_(std::move(user_credentials)) {
+  CHECK(!hostname_.empty());
+}
+
+void ConnectionId::set_user_credentials(UserCredentials user_credentials) {
+  DCHECK(user_credentials.has_real_user());
+  user_credentials_ = std::move(user_credentials);
+}
+
+string ConnectionId::ToString() const {
+  string remote;
+  if (hostname_ != remote_.host()) {
+    remote = strings::Substitute("$0 ($1)", remote_.ToString(), hostname_);
+  } else {
+    remote = remote_.ToString();
+  }
+
+  return strings::Substitute("{remote=$0, user_credentials=$1}",
+                             remote,
+                             user_credentials_.ToString());
+}
+
+size_t ConnectionId::HashCode() const {
+  size_t seed = 0;
+  boost::hash_combine(seed, remote_.HashCode());
+  boost::hash_combine(seed, hostname_);
+  boost::hash_combine(seed, user_credentials_.HashCode());
+  return seed;
+}
+
+bool ConnectionId::Equals(const ConnectionId& other) const {
+  return remote() == other.remote() &&
+      hostname_ == other.hostname_ &&
+      user_credentials().Equals(other.user_credentials());
+}
+
+size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const {
+  return conn_id.HashCode();
+}
+
+bool ConnectionIdEqual::operator() (const ConnectionId& cid1, const ConnectionId& cid2) const {
+  return cid1.Equals(cid2);
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/connection_id.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/connection_id.h b/be/src/kudu/rpc/connection_id.h
new file mode 100644
index 0000000..67a4786
--- /dev/null
+++ b/be/src/kudu/rpc/connection_id.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <cstddef>
+#include <string>
+
+#include "kudu/rpc/user_credentials.h"
+#include "kudu/util/net/sockaddr.h"
+
+namespace kudu {
+namespace rpc {
+
+// Used to key on Connection information.
+// For use as a key in an unordered STL collection, use ConnectionIdHash and ConnectionIdEqual.
+// This class is copyable for STL compatibility, but not assignable (use CopyFrom() for that).
+class ConnectionId {
+ public:
+  ConnectionId();
+
+  // Copy constructor required for use with STL unordered_map.
+  ConnectionId(const ConnectionId& other) = default;
+
+  // Convenience constructor.
+  ConnectionId(const Sockaddr& remote,
+               std::string hostname,
+               UserCredentials user_credentials);
+
+  // The remote address.
+  const Sockaddr& remote() const { return remote_; }
+
+  const std::string& hostname() const { return hostname_; }
+
+  // The credentials of the user associated with this connection, if any.
+  void set_user_credentials(UserCredentials user_credentials);
+
+  const UserCredentials& user_credentials() const { return user_credentials_; }
+
+  // Copy state from another object to this one.
+  void CopyFrom(const ConnectionId& other);
+
+  // Returns a string representation of the object, not including the password field.
+  std::string ToString() const;
+
+  size_t HashCode() const;
+  bool Equals(const ConnectionId& other) const;
+
+ private:
+  // Remember to update HashCode() and Equals() when new fields are added.
+  Sockaddr remote_;
+
+  // The original host name before it was resolved to 'remote_'.
+  // This must be retained since it is used to compute Kerberos Service Principal Names (SPNs).
+  std::string hostname_;
+
+  UserCredentials user_credentials_;
+};
+
+class ConnectionIdHash {
+ public:
+  std::size_t operator() (const ConnectionId& conn_id) const;
+};
+
+class ConnectionIdEqual {
+ public:
+  bool operator() (const ConnectionId& cid1, const ConnectionId& cid2) const;
+};
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/constants.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/constants.cc b/be/src/kudu/rpc/constants.cc
new file mode 100644
index 0000000..a4e024c
--- /dev/null
+++ b/be/src/kudu/rpc/constants.cc
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/constants.h"
+
+using std::set;
+
+namespace kudu {
+namespace rpc {
+
+const char* const kMagicNumber = "hrpc";
+const char* const kSaslAppName = "kudu";
+
+// NOTE: the TLS flag is dynamically added based on the local encryption
+// configuration.
+//
+// NOTE: the TLS_AUTHENTICATION_ONLY flag is dynamically added on both
+// sides based on the remote peer's address.
+set<RpcFeatureFlag> kSupportedServerRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS };
+set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags = { APPLICATION_FEATURE_FLAGS };
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/constants.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/constants.h b/be/src/kudu/rpc/constants.h
new file mode 100644
index 0000000..a3c7c67
--- /dev/null
+++ b/be/src/kudu/rpc/constants.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_RPC_CONSTANTS_H
+#define KUDU_RPC_RPC_CONSTANTS_H
+
+#include <cstdint>
+#include <set>
+
+#include "kudu/rpc/rpc_header.pb.h"
+
+namespace kudu {
+namespace rpc {
+
+// Magic number bytes sent at connection setup time.
+extern const char* const kMagicNumber;
+
+// App name for SASL library init
+extern const char* const kSaslAppName;
+
+// Current version of the RPC protocol.
+static const uint32_t kCurrentRpcVersion = 9;
+
+// From Hadoop.
+static const int32_t kInvalidCallId = -2;
+static const int32_t kConnectionContextCallId = -3;
+static const int32_t kNegotiateCallId = -33;
+
+static const uint8_t kMagicNumberLength = 4;
+static const uint8_t kHeaderFlagsLength = 3;
+
+// There is a 4-byte length prefix before any packet.
+static const uint8_t kMsgLengthPrefixLength = 4;
+
+// The set of RPC features that this server build supports.
+// Non-const for testing.
+extern std::set<RpcFeatureFlag> kSupportedServerRpcFeatureFlags;
+
+// The set of RPC features that this client build supports.
+// Non-const for testing.
+extern std::set<RpcFeatureFlag> kSupportedClientRpcFeatureFlags;
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_RPC_CONSTANTS_H

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/rpc/exactly_once_rpc-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/exactly_once_rpc-test.cc b/be/src/kudu/rpc/exactly_once_rpc-test.cc
new file mode 100644
index 0000000..c94e89c
--- /dev/null
+++ b/be/src/kudu/rpc/exactly_once_rpc-test.cc
@@ -0,0 +1,629 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <unistd.h>
+
+#include <atomic>
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/callback.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/rpc/request_tracker.h"
+#include "kudu/rpc/response_callback.h"
+#include "kudu/rpc/result_tracker.h"
+#include "kudu/rpc/retriable_rpc.h"
+#include "kudu/rpc/rpc-test-base.h"
+#include "kudu/rpc/rpc.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/rtest.pb.h"
+#include "kudu/rpc/rtest.proxy.h"
+#include "kudu/util/countdown_latch.h"
+#include "kudu/util/mem_tracker.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/pb_util.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+DECLARE_int64(remember_clients_ttl_ms);
+DECLARE_int64(remember_responses_ttl_ms);
+DECLARE_int64(result_tracker_gc_interval_ms);
+
+using kudu::pb_util::SecureDebugString;
+using kudu::pb_util::SecureShortDebugString;
+using std::atomic_int;
+using std::shared_ptr;
+using std::unique_ptr;
+using std::vector;
+
+namespace kudu {
+namespace rpc {
+
+class Messenger;
+
+namespace {
+
+const char* kClientId = "test-client";
+
+void AddRequestId(RpcController* controller,
+                  const std::string& client_id,
+                  ResultTracker::SequenceNumber sequence_number,
+                  int64_t attempt_no) {
+  unique_ptr<RequestIdPB> request_id(new RequestIdPB());
+  request_id->set_client_id(client_id);
+  request_id->set_seq_no(sequence_number);
+  request_id->set_attempt_no(attempt_no);
+  request_id->set_first_incomplete_seq_no(sequence_number);
+  controller->SetRequestIdPB(std::move(request_id));
+}
+
+class TestServerPicker : public ServerPicker<CalculatorServiceProxy> {
+ public:
+  explicit TestServerPicker(CalculatorServiceProxy* proxy) : proxy_(proxy) {}
+
+  void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) override {
+    callback.Run(Status::OK(), proxy_);
+  }
+
+  void MarkServerFailed(CalculatorServiceProxy*, const Status&) override {}
+  void MarkReplicaNotLeader(CalculatorServiceProxy*) override {}
+  void MarkResourceNotFound(CalculatorServiceProxy*) override {}
+
+ private:
+  CalculatorServiceProxy* proxy_;
+};
+
+} // anonymous namespace
+
+class CalculatorServiceRpc : public RetriableRpc<CalculatorServiceProxy,
+                                                 ExactlyOnceRequestPB,
+                                                 ExactlyOnceResponsePB> {
+ public:
+  CalculatorServiceRpc(const scoped_refptr<TestServerPicker>& server_picker,
+                       const scoped_refptr<RequestTracker>& request_tracker,
+                       const MonoTime& deadline,
+                       shared_ptr<Messenger> messenger,
+                       int value,
+                       CountDownLatch* latch,
+                       int server_sleep = 0)
+      : RetriableRpc(server_picker, request_tracker, deadline, std::move(messenger)),
+        latch_(latch) {
+    req_.set_value_to_add(value);
+    req_.set_randomly_fail(true);
+    req_.set_sleep_for_ms(server_sleep);
+  }
+
+  void Try(CalculatorServiceProxy* server, const ResponseCallback& callback) override {
+    server->AddExactlyOnceAsync(req_,
+                                &resp_,
+                                mutable_retrier()->mutable_controller(),
+                                callback);
+  }
+
+  RetriableRpcStatus AnalyzeResponse(const Status& rpc_cb_status) override {
+    // We shouldn't get errors from the server/rpc system since we set a high timeout.
+    CHECK_OK(rpc_cb_status);
+
+    if (!mutable_retrier()->controller().status().ok()) {
+      CHECK(mutable_retrier()->controller().status().IsRemoteError());
+      if (mutable_retrier()->controller().error_response()->code()
+          == ErrorStatusPB::ERROR_REQUEST_STALE) {
+        return { RetriableRpcStatus::NON_RETRIABLE_ERROR,
+              mutable_retrier()->controller().status() };
+      }
+      return { RetriableRpcStatus::SERVICE_UNAVAILABLE,
+               mutable_retrier()->controller().status() };
+    }
+
+    // If the controller is not finished we're in the ReplicaFoundCb() callback.
+    // Return ok to proceed with the call to the server.
+    if (!mutable_retrier()->mutable_controller()->finished()) {
+      return { RetriableRpcStatus::OK, Status::OK() };
+    }
+
+    // If we've received a response in the past, all following responses must
+    // match.
+    if (!successful_response_.IsInitialized()) {
+      successful_response_.CopyFrom(resp_);
+    } else {
+      CHECK_EQ(SecureDebugString(successful_response_),
+               SecureDebugString(resp_));
+    }
+
+    if (sometimes_retry_successful_) {
+      // Still report errors, with some probability. This will cause requests to
+      // be retried. Since the requests were originally successful we should get
+      // the same reply back.
+      int random = rand() % 4;
+      switch (random) {
+        case 0:
+          return { RetriableRpcStatus::SERVICE_UNAVAILABLE,
+                   Status::RemoteError("") };
+        case 1:
+          return { RetriableRpcStatus::RESOURCE_NOT_FOUND,
+                   Status::RemoteError("") };
+        case 2:
+          return { RetriableRpcStatus::SERVER_NOT_ACCESSIBLE,
+                   Status::RemoteError("") };
+        case 3:
+          return { RetriableRpcStatus::OK, Status::OK() };
+        default: LOG(FATAL) << "Unexpected value";
+      }
+    }
+    return { RetriableRpcStatus::OK, Status::OK() };
+  }
+
+  void Finish(const Status& status) override {
+    CHECK_OK(status);
+    latch_->CountDown();
+    delete this;
+  }
+
+  std::string ToString() const override { return "test-rpc"; }
+  CountDownLatch* latch_;
+  ExactlyOnceResponsePB successful_response_;
+  bool sometimes_retry_successful_ = true;
+};
+
+class ExactlyOnceRpcTest : public RpcTestBase {
+ public:
+  void SetUp() override {
+    RpcTestBase::SetUp();
+    SeedRandom();
+  }
+
+  Status StartServer() {
+    // Set up server.
+    RETURN_NOT_OK(StartTestServerWithGeneratedCode(&server_addr_));
+    RETURN_NOT_OK(CreateMessenger("Client", &client_messenger_));
+    proxy_.reset(new CalculatorServiceProxy(
+        client_messenger_, server_addr_, server_addr_.host()));
+    test_picker_.reset(new TestServerPicker(proxy_.get()));
+    request_tracker_.reset(new RequestTracker(kClientId));
+    attempt_nos_ = 0;
+
+    return Status::OK();
+  }
+
+  // An exactly once adder that uses RetriableRpc to perform the requests.
+  struct RetriableRpcExactlyOnceAdder {
+    RetriableRpcExactlyOnceAdder(const scoped_refptr<TestServerPicker>& server_picker,
+                     const scoped_refptr<RequestTracker>& request_tracker,
+                     shared_ptr<Messenger> messenger,
+                     int value,
+                     int server_sleep = 0) : latch_(1) {
+      MonoTime now = MonoTime::Now();
+      now.AddDelta(MonoDelta::FromMilliseconds(10000));
+      rpc_ = new CalculatorServiceRpc(server_picker,
+                                      request_tracker,
+                                      now,
+                                      std::move(messenger),
+                                      value,
+                                      &latch_,
+                                      server_sleep);
+    }
+
+    void Start() {
+      CHECK_OK(kudu::Thread::Create(
+                   "test",
+                   "test",
+                   &RetriableRpcExactlyOnceAdder::SleepAndSend, this, &thread));
+    }
+
+    void SleepAndSend() {
+      rpc_->SendRpc();
+      latch_.Wait();
+    }
+
+    CountDownLatch latch_;
+    scoped_refptr<kudu::Thread> thread;
+    CalculatorServiceRpc* rpc_;
+  };
+
+  // An exactly once adder that sends multiple, simultaneous calls, to the server
+  // and makes sure that only one of the calls was successful.
+  struct SimultaneousExactlyOnceAdder {
+    SimultaneousExactlyOnceAdder(CalculatorServiceProxy* p,
+                     ResultTracker::SequenceNumber sequence_number,
+                     int value,
+                     uint64_t client_sleep,
+                     uint64_t server_sleep,
+                     int64_t attempt_no)
+     : proxy(p),
+       client_sleep_for_ms(client_sleep) {
+      req.set_value_to_add(value);
+      req.set_sleep_for_ms(server_sleep);
+      AddRequestId(&controller, kClientId, sequence_number, attempt_no);
+    }
+
+    void Start() {
+      CHECK_OK(kudu::Thread::Create(
+          "test",
+          "test",
+          &SimultaneousExactlyOnceAdder::SleepAndSend, this, &thread));
+    }
+
+    // Sleeps the preset number of msecs before sending the call.
+    void SleepAndSend() {
+      usleep(client_sleep_for_ms * 1000);
+      controller.set_timeout(MonoDelta::FromSeconds(20));
+      CHECK_OK(proxy->AddExactlyOnce(req, &resp, &controller));
+    }
+
+    CalculatorServiceProxy* const proxy;
+    const uint64_t client_sleep_for_ms;
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    ExactlyOnceResponsePB resp;
+    scoped_refptr<kudu::Thread> thread;
+  };
+
+
+  void CheckValueMatches(int expected_value) {
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    req.set_value_to_add(0);
+    ExactlyOnceResponsePB resp;
+    RequestTracker::SequenceNumber seq_no;
+    CHECK_OK(request_tracker_->NewSeqNo(&seq_no));
+    AddRequestId(&controller, kClientId, seq_no, 0);
+    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
+    ASSERT_EQ(resp.current_val(), expected_value);
+    request_tracker_->RpcCompleted(seq_no);
+  }
+
+
+  // This continuously issues calls to the server, that often last longer than
+  // 'remember_responses_ttl_ms', making sure that we don't get errors back.
+  void DoLongWritesThread(MonoDelta run_for) {
+    MonoTime run_until = MonoTime::Now();
+    run_until.AddDelta(run_for);
+    int counter = 0;
+    while (MonoTime::Now() < run_until) {
+      unique_ptr<RetriableRpcExactlyOnceAdder> adder(new RetriableRpcExactlyOnceAdder(
+          test_picker_, request_tracker_, client_messenger_, 1,
+          rand() % (2 * FLAGS_remember_responses_ttl_ms)));
+
+      // This thread is used in the stress test where we're constantly running GC.
+      // So, once we get a "success" response, it's likely that the result will be
+      // GCed on the server side, and thus it's not safe to spuriously retry.
+      adder->rpc_->sometimes_retry_successful_ = false;
+      adder->SleepAndSend();
+      SleepFor(MonoDelta::FromMilliseconds(rand() % 10));
+      counter++;
+    }
+    ExactlyOnceResponsePB response;
+    ResultTracker::SequenceNumber sequence_number;
+    CHECK_OK(request_tracker_->NewSeqNo(&sequence_number));
+    CHECK_OK(MakeAddCall(sequence_number, 0, &response));
+    CHECK_EQ(response.current_val(), counter);
+    request_tracker_->RpcCompleted(sequence_number);
+  }
+
+  // Stubbornly sends the same request to the server, this should observe three states.
+  // The request should be successful at first, then its result should be GCed and the
+  // client should be GCed.
+  void StubbornlyWriteTheSameRequestThread(ResultTracker::SequenceNumber sequence_number,
+                                           MonoDelta run_for) {
+    MonoTime run_until = MonoTime::Now();
+    run_until.AddDelta(run_for);
+    // Make an initial request, so that we get a response to compare to.
+    ExactlyOnceResponsePB original_response;
+    CHECK_OK(MakeAddCall(sequence_number, 0, &original_response));
+
+    // Now repeat the same request. At first we should get the same response, then the result
+    // should be GCed and we should get STALE back. Finally the request should succeed again
+    // but we should get a new response.
+    bool result_gced = false;
+    bool client_gced = false;
+    while (MonoTime::Now() < run_until) {
+      ExactlyOnceResponsePB response;
+      Status s = MakeAddCall(sequence_number, 0, &response);
+      if (s.ok()) {
+        if (!result_gced) {
+          CHECK_EQ(SecureDebugString(response), SecureDebugString(original_response));
+        } else {
+          client_gced = true;
+          CHECK_NE(SecureDebugString(response), SecureDebugString(original_response));
+        }
+        SleepFor(MonoDelta::FromMilliseconds(rand() % 10));
+      } else if (s.IsRemoteError()) {
+        result_gced = true;
+        SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_clients_ttl_ms * 2));
+      }
+    }
+    CHECK(result_gced);
+    CHECK(client_gced);
+  }
+
+  Status MakeAddCall(ResultTracker::SequenceNumber sequence_number,
+                     int value_to_add,
+                     ExactlyOnceResponsePB* response,
+                     int attempt_no = -1) {
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    req.set_value_to_add(value_to_add);
+    if (attempt_no == -1) attempt_no = attempt_nos_.fetch_add(1);
+    AddRequestId(&controller, kClientId, sequence_number, attempt_no);
+    Status s = proxy_->AddExactlyOnce(req, response, &controller);
+    return s;
+  }
+
+ protected:
+  Sockaddr server_addr_;
+  atomic_int attempt_nos_;
+  shared_ptr<Messenger> client_messenger_;
+  std::unique_ptr<CalculatorServiceProxy> proxy_;
+  scoped_refptr<TestServerPicker> test_picker_;
+  scoped_refptr<RequestTracker> request_tracker_;
+};
+
+// Tests that we get exactly once semantics on RPCs when we send a bunch of requests with the
+// same sequence number as previous requests.
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsAfterRpcCompleted) {
+  ASSERT_OK(StartServer());
+  ExactlyOnceResponsePB original_resp;
+  int mem_consumption = mem_tracker_->consumption();
+  {
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    req.set_value_to_add(1);
+
+    // Assign id 0.
+    AddRequestId(&controller, kClientId, 0, 0);
+
+    // Send the request the first time.
+    ASSERT_OK(proxy_->AddExactlyOnce(req, &original_resp, &controller));
+
+    // The incremental usage of a new client is the size of the response itself
+    // plus some fixed overhead for the client-tracking structure.
+    int expected_incremental_usage = original_resp.SpaceUsed() + 200;
+
+    int mem_consumption_after = mem_tracker_->consumption();
+    ASSERT_GT(mem_consumption_after - mem_consumption, expected_incremental_usage);
+    mem_consumption = mem_consumption_after;
+  }
+
+  // Now repeat the rpc 10 times, using the same sequence number, none of these should be executed
+  // and they should get the same response back.
+  for (int i = 0; i < 10; i++) {
+    RpcController controller;
+    controller.set_timeout(MonoDelta::FromSeconds(20));
+    ExactlyOnceRequestPB req;
+    req.set_value_to_add(1);
+    ExactlyOnceResponsePB resp;
+    AddRequestId(&controller, kClientId, 0, i + 1);
+    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
+    ASSERT_EQ(resp.current_val(), 1);
+    ASSERT_EQ(resp.current_time_micros(), original_resp.current_time_micros());
+    // Sleep to give the MemTracker time to update -- we don't expect any update,
+    // but if we had a bug here, we'd only see it with this sleep.
+    SleepFor(MonoDelta::FromMilliseconds(100));
+    // We shouldn't have consumed any more memory since the responses were cached.
+    ASSERT_EQ(mem_consumption, mem_tracker_->consumption());
+  }
+
+  // Making a new request, from a new client, should double the memory consumption.
+  {
+    RpcController controller;
+    ExactlyOnceRequestPB req;
+    ExactlyOnceResponsePB resp;
+    req.set_value_to_add(1);
+
+    // Assign id 0.
+    AddRequestId(&controller, "test-client2", 0, 0);
+
+    // Send the first request for this new client.
+    ASSERT_OK(proxy_->AddExactlyOnce(req, &resp, &controller));
+    ASSERT_EQ(mem_tracker_->consumption(), mem_consumption * 2);
+  }
+}
+
+// Performs a series of requests in which each single request is attempted multiple times, as
+// the server side is instructed to spuriously fail attempts.
+// In CalculatorServiceRpc we sure that the same response is returned by all retries and,
+// after all the rpcs are done, we make sure that final result is the expected one.
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithReplicatedRpc) {
+  ASSERT_OK(StartServer());
+  int kNumIterations = 10;
+  int kNumRpcs = 10;
+
+  if (AllowSlowTests()) {
+    kNumIterations = 100;
+    kNumRpcs = 100;
+  }
+
+  int count = 0;
+  for (int i = 0; i < kNumIterations; i ++) {
+    vector<unique_ptr<RetriableRpcExactlyOnceAdder>> adders;
+    for (int j = 0; j < kNumRpcs; j++) {
+      unique_ptr<RetriableRpcExactlyOnceAdder> adder(
+          new RetriableRpcExactlyOnceAdder(test_picker_, request_tracker_, client_messenger_, j));
+      adders.push_back(std::move(adder));
+      adders[j]->Start();
+      count += j;
+    }
+    for (int j = 0; j < kNumRpcs; j++) {
+      CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join());
+    }
+    CheckValueMatches(count);
+  }
+}
+
+// Performs a series of requests in which each single request is attempted by multiple threads.
+// On each iteration, after all the threads complete, we expect that the add operation was
+// executed exactly once.
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsWithConcurrentUpdaters) {
+  ASSERT_OK(StartServer());
+  int kNumIterations = 10;
+  int kNumThreads = 10;
+
+  if (AllowSlowTests()) {
+    kNumIterations = 100;
+    kNumThreads = 100;
+  }
+
+  ResultTracker::SequenceNumber sequence_number = 0;
+  int memory_consumption_initial = mem_tracker_->consumption();
+  int single_response_size = 0;
+
+  // Measure memory consumption for a single response from the same client.
+  ExactlyOnceResponsePB resp;
+  ASSERT_OK(MakeAddCall(sequence_number, 1, &resp));
+
+  for (int i = 1; i <= kNumIterations; i ++) {
+    vector<unique_ptr<SimultaneousExactlyOnceAdder>> adders;
+    for (int j = 0; j < kNumThreads; j++) {
+      unique_ptr<SimultaneousExactlyOnceAdder> adder(
+          new SimultaneousExactlyOnceAdder(proxy_.get(),
+                                           i, // sequence number
+                                           1, // value
+                                           rand() % 20, // client_sleep
+                                           rand() % 10, // server_sleep
+                                           attempt_nos_.fetch_add(1))); // attempt number
+      adders.push_back(std::move(adder));
+      adders[j]->Start();
+    }
+    uint64_t time_micros = 0;
+    for (int j = 0; j < kNumThreads; j++) {
+      CHECK_OK(ThreadJoiner(adders[j]->thread.get()).Join());
+      ASSERT_EQ(adders[j]->resp.current_val(), i + 1);
+      if (time_micros == 0) {
+        time_micros = adders[j]->resp.current_time_micros();
+      } else {
+        ASSERT_EQ(adders[j]->resp.current_time_micros(), time_micros);
+      }
+    }
+
+    // After all adders finished we should at least the size of one more response.
+    // The actual size depends of multiple factors, for instance, how many calls were "attached"
+    // (which is timing dependent) so we can't be more precise than this.
+    ASSERT_GT(mem_tracker_->consumption(),
+              memory_consumption_initial + single_response_size * i);
+  }
+}
+
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollection) {
+  FLAGS_remember_clients_ttl_ms = 500;
+  FLAGS_remember_responses_ttl_ms = 100;
+
+  ASSERT_OK(StartServer());
+
+  // Make a request.
+  ExactlyOnceResponsePB original;
+  ResultTracker::SequenceNumber sequence_number = 0;
+  ASSERT_OK(MakeAddCall(sequence_number, 1, &original));
+
+  // Making the same request again, should return the same response.
+  ExactlyOnceResponsePB resp;
+  ASSERT_OK(MakeAddCall(sequence_number, 1, &resp));
+  ASSERT_EQ(SecureShortDebugString(original), SecureShortDebugString(resp));
+
+  // Now sleep for 'remember_responses_ttl_ms' and run GC, we should then
+  // get a STALE back.
+  SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_responses_ttl_ms));
+  int64_t memory_consumption = mem_tracker_->consumption();
+  result_tracker_->GCResults();
+  ASSERT_LT(mem_tracker_->consumption(), memory_consumption);
+
+  resp.Clear();
+  Status s = MakeAddCall(sequence_number, 1, &resp);
+  ASSERT_TRUE(s.IsRemoteError());
+  ASSERT_STR_CONTAINS(s.ToString(), "is stale");
+
+  // Sleep again, this time for 'remember_clients_ttl_ms' and run GC again.
+  // The request should be successful, but its response should be a new one.
+  SleepFor(MonoDelta::FromMilliseconds(FLAGS_remember_clients_ttl_ms));
+  memory_consumption = mem_tracker_->consumption();
+  result_tracker_->GCResults();
+  ASSERT_LT(mem_tracker_->consumption(), memory_consumption);
+
+  resp.Clear();
+  ASSERT_OK(MakeAddCall(sequence_number, 1, &resp));
+  ASSERT_NE(SecureShortDebugString(resp), SecureShortDebugString(original));
+}
+
+// This test creates a thread continuously making requests to the server, some lasting longer
+// than the GC period, at the same time it runs GC, making sure that the corresponding
+// CompletionRecords/ClientStates are not deleted from underneath the ongoing requests.
+// This also creates a thread that runs GC very frequently and another thread that sends the
+// same request over and over and observes the possible states: request is ok, request is stale
+// request is ok again (because the client was forgotten).
+TEST_F(ExactlyOnceRpcTest, TestExactlyOnceSemanticsGarbageCollectionStressTest) {
+  FLAGS_remember_clients_ttl_ms = 100;
+  FLAGS_remember_responses_ttl_ms = 10;
+  FLAGS_result_tracker_gc_interval_ms = 10;
+
+  ASSERT_OK(StartServer());
+
+  // The write thread runs for a shorter period to make sure client GC has a
+  // chance to run.
+  MonoDelta writes_run_for = MonoDelta::FromSeconds(2);
+  MonoDelta stubborn_run_for = MonoDelta::FromSeconds(3);
+  if (AllowSlowTests()) {
+    writes_run_for = MonoDelta::FromSeconds(10);
+    stubborn_run_for = MonoDelta::FromSeconds(11);
+  }
+
+  result_tracker_->StartGCThread();
+
+  // Assign the first sequence number (0) to the 'stubborn writes' thread.
+  // This thread will keep making RPCs with this sequence number while
+  // the 'write_thread' will make normal requests with increasing sequence
+  // numbers.
+  ResultTracker::SequenceNumber stubborn_req_seq_num;
+  CHECK_OK(request_tracker_->NewSeqNo(&stubborn_req_seq_num));
+  ASSERT_EQ(stubborn_req_seq_num, 0);
+
+  scoped_refptr<kudu::Thread> stubborn_thread;
+  CHECK_OK(kudu::Thread::Create(
+      "stubborn", "stubborn", &ExactlyOnceRpcTest::StubbornlyWriteTheSameRequestThread,
+      this, stubborn_req_seq_num, stubborn_run_for, &stubborn_thread));
+
+  scoped_refptr<kudu::Thread> write_thread;
+  CHECK_OK(kudu::Thread::Create(
+      "write", "write", &ExactlyOnceRpcTest::DoLongWritesThread,
+      this, writes_run_for, &write_thread));
+
+  write_thread->Join();
+  stubborn_thread->Join();
+
+  // Within a few seconds, the consumption should be back to zero.
+  // Really, this should be within 100ms, but we'll give it a bit of
+  // time to avoid test flakiness.
+  AssertEventually([&]() {
+      ASSERT_EQ(0, mem_tracker_->consumption());
+    }, MonoDelta::FromSeconds(5));
+  NO_PENDING_FATALS();
+}
+
+
+} // namespace rpc
+} // namespace kudu