You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/17 03:14:28 UTC

[12/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC library from kudu@314c9d8

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/negotiation-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/negotiation-test.cc b/be/src/kudu/rpc/negotiation-test.cc
new file mode 100644
index 0000000..68185bb
--- /dev/null
+++ b/be/src/kudu/rpc/negotiation-test.cc
@@ -0,0 +1,1331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc-test-base.h"
+
+#include <stdlib.h>
+#include <sys/stat.h>
+
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <thread>
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/rpc/client_negotiation.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/negotiation.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/security-test-util.h"
+#include "kudu/security/test/mini_kdc.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/security/tls_socket.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_signing_key.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/subprocess.h"
+#include "kudu/util/trace.h"
+#include "kudu/util/user.h"
+
+// HACK: MIT Kerberos doesn't have any way of determining its version number,
+// but the error messages in krb5-1.10 and earlier are broken due to
+// a bug: http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973
+//
+// Since we don't have any way to explicitly figure out the version, we just
+// look for this random macro which was added in 1.11 (the same version in which
+// the above bug was fixed).
+#ifndef KRB5_RESPONDER_QUESTION_PASSWORD
+#define KRB5_VERSION_LE_1_10
+#endif
+
+DEFINE_bool(is_test_child, false,
+            "Used by tests which require clean processes. "
+            "See TestDisableInit.");
+DECLARE_bool(rpc_encrypt_loopback_connections);
+DECLARE_bool(rpc_trace_negotiation);
+
+using std::string;
+using std::thread;
+using std::unique_ptr;
+
+using kudu::security::Cert;
+using kudu::security::PkiConfig;
+using kudu::security::PrivateKey;
+using kudu::security::SignedTokenPB;
+using kudu::security::TlsContext;
+using kudu::security::TokenSigner;
+using kudu::security::TokenSigningPrivateKey;
+using kudu::security::TokenVerifier;
+
+namespace kudu {
+namespace rpc {
+
+// The negotiation configuration for a client or server endpoint.
+struct EndpointConfig {
+  // The PKI configuration.
+  PkiConfig pki;
+  // The supported SASL mechanisms.
+  vector<SaslMechanism::Type> sasl_mechs;
+  // For the client, whether the client has the token.
+  // For the server, whether the server has the TSK.
+  bool token;
+  RpcEncryption encryption;
+};
+std::ostream& operator<<(std::ostream& o, EndpointConfig config) {
+  auto bool_string = [] (bool b) { return b ? "true" : "false"; };
+  o << "{pki: " << config.pki
+    << ", sasl-mechs: [" << JoinMapped(config.sasl_mechs, SaslMechanism::name_of, ", ")
+    << "], token: " << bool_string(config.token)
+    << ", encryption: ";
+
+  switch (config.encryption) {
+    case RpcEncryption::DISABLED: o << "DISABLED"; break;
+    case RpcEncryption::OPTIONAL: o << "OPTIONAL"; break;
+    case RpcEncryption::REQUIRED: o << "REQUIRED"; break;
+  }
+
+  o << "}";
+  return o;
+}
+
+// A description of a negotiation sequence, including client and server
+// configuration, as well as expected results.
+struct NegotiationDescriptor {
+  EndpointConfig client;
+  EndpointConfig server;
+
+  bool use_test_socket;
+
+  bool rpc_encrypt_loopback;
+
+  // The expected client status from negotiating.
+  Status client_status;
+  // The expected server status from negotiating.
+  Status server_status;
+
+  // The expected negotiated authentication type.
+  AuthenticationType negotiated_authn;
+
+  // The expected SASL mechanism, if SASL authentication is negotiated.
+  SaslMechanism::Type negotiated_mech;
+
+  // Whether the negotiation is expected to perform a TLS handshake.
+  bool tls_negotiated;
+};
+std::ostream& operator<<(std::ostream& o, NegotiationDescriptor c) {
+  auto bool_string = [] (bool b) { return b ? "true" : "false"; };
+  o << "{client: " << c.client
+    << ", server: " << c.server
+    << "}, rpc-encrypt-loopback: " << bool_string(c.rpc_encrypt_loopback);
+  return o;
+}
+
+class NegotiationTestSocket : public Socket {
+ public:
+  // Return an arbitrary public IP
+  Status GetPeerAddress(Sockaddr *cur_addr) const override {
+    return cur_addr->ParseString("8.8.8.8:12345", 0);
+  }
+};
+
+class TestNegotiation : public RpcTestBase,
+                        public ::testing::WithParamInterface<NegotiationDescriptor> {
+ public:
+  void SetUp() override {
+    RpcTestBase::SetUp();
+    ASSERT_OK(SaslInit());
+  }
+};
+
+TEST_P(TestNegotiation, TestNegotiation) {
+  NegotiationDescriptor desc = GetParam();
+
+  // Generate a trusted root certificate.
+  PrivateKey ca_key;
+  Cert ca_cert;
+  ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
+
+  // Create and configure a TLS context for each endpoint.
+  TlsContext client_tls_context;
+  TlsContext server_tls_context;
+  ASSERT_OK(client_tls_context.Init());
+  ASSERT_OK(server_tls_context.Init());
+  ASSERT_OK(ConfigureTlsContext(desc.client.pki, ca_cert, ca_key, &client_tls_context));
+  ASSERT_OK(ConfigureTlsContext(desc.server.pki, ca_cert, ca_key, &server_tls_context));
+
+  FLAGS_rpc_encrypt_loopback_connections = desc.rpc_encrypt_loopback;
+
+  // Generate an optional client token and server token verifier.
+  TokenSigner token_signer(60, 20, std::make_shared<TokenVerifier>());
+  {
+    unique_ptr<TokenSigningPrivateKey> key;
+    ASSERT_OK(token_signer.CheckNeedKey(&key));
+    // No keys are available yet, so should be able to add.
+    ASSERT_NE(nullptr, key.get());
+    ASSERT_OK(token_signer.AddKey(std::move(key)));
+  }
+  TokenVerifier token_verifier;
+  boost::optional<SignedTokenPB> authn_token;
+  if (desc.client.token) {
+    authn_token = SignedTokenPB();
+    security::TokenPB token;
+    token.set_expire_unix_epoch_seconds(WallTime_Now() + 60);
+    token.mutable_authn()->set_username("client-token");
+    ASSERT_TRUE(token.SerializeToString(authn_token->mutable_token_data()));
+    ASSERT_OK(token_signer.SignToken(&*authn_token));
+  }
+  if (desc.server.token) {
+    ASSERT_OK(token_verifier.ImportKeys(token_signer.verifier().ExportKeys()));
+  }
+
+  // Create the listening socket, client socket, and server socket.
+  Socket listening_socket;
+  ASSERT_OK(listening_socket.Init(0));
+  ASSERT_OK(listening_socket.BindAndListen(Sockaddr(), 1));
+  Sockaddr server_addr;
+  ASSERT_OK(listening_socket.GetSocketAddress(&server_addr));
+
+  unique_ptr<Socket> client_socket(new Socket());
+  ASSERT_OK(client_socket->Init(0));
+  client_socket->Connect(server_addr);
+
+  unique_ptr<Socket> server_socket(desc.use_test_socket ?
+                                   new NegotiationTestSocket() :
+                                   new Socket());
+
+  Sockaddr client_addr;
+  CHECK_OK(listening_socket.Accept(server_socket.get(), &client_addr, 0));
+
+  // Create and configure the client and server negotiation instances.
+  ClientNegotiation client_negotiation(std::move(client_socket),
+                                       &client_tls_context,
+                                       authn_token,
+                                       desc.client.encryption);
+  ServerNegotiation server_negotiation(std::move(server_socket),
+                                       &server_tls_context,
+                                       &token_verifier,
+                                       desc.server.encryption);
+
+  // Set client and server SASL mechanisms.
+  MiniKdc kdc;
+  bool kdc_started = false;
+  auto start_kdc_once = [&] () {
+    if (!kdc_started) {
+      kdc_started = true;
+      RETURN_NOT_OK(kdc.Start());
+    }
+    return Status::OK();
+  };
+  for (auto mech : desc.client.sasl_mechs) {
+    switch (mech) {
+      case SaslMechanism::INVALID: break;
+      case SaslMechanism::PLAIN:
+        ASSERT_OK(client_negotiation.EnablePlain("client-plain", "client-password"));
+        break;
+      case SaslMechanism::GSSAPI:
+        ASSERT_OK(start_kdc_once());
+        ASSERT_OK(kdc.CreateUserPrincipal("client-gssapi"));
+        ASSERT_OK(kdc.Kinit("client-gssapi"));
+        ASSERT_OK(kdc.SetKrb5Environment());
+        client_negotiation.set_server_fqdn("127.0.0.1");
+        ASSERT_OK(client_negotiation.EnableGSSAPI());
+        break;
+    }
+  }
+  for (auto mech : desc.server.sasl_mechs) {
+    switch (mech) {
+      case SaslMechanism::INVALID: break;
+      case SaslMechanism::PLAIN:
+        ASSERT_OK(server_negotiation.EnablePlain());
+        break;
+      case SaslMechanism::GSSAPI:
+        ASSERT_OK(start_kdc_once());
+        // Create the server principal and keytab.
+        string kt_path;
+        ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+        CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+        server_negotiation.set_server_fqdn("127.0.0.1");
+        ASSERT_OK(server_negotiation.EnableGSSAPI());
+        break;
+    }
+  }
+
+  // Run the client/server negotiation. Because negotiation is blocking, it
+  // has to be done on separate threads.
+  Status client_status;
+  Status server_status;
+  thread client_thread([&] () {
+      scoped_refptr<Trace> t(new Trace());
+      ADOPT_TRACE(t.get());
+      client_status = client_negotiation.Negotiate();
+      // Close the socket so that the server will not block forever on error.
+      client_negotiation.socket()->Close();
+
+      if (FLAGS_rpc_trace_negotiation || !client_status.ok()) {
+        string msg = Trace::CurrentTrace()->DumpToString();
+        if (!client_status.ok()) {
+          LOG(WARNING) << "Failed client RPC negotiation. Client trace:\n" << msg;
+        } else {
+          LOG(INFO) << "RPC negotiation tracing enabled. Client trace:\n" << msg;
+        }
+      }
+  });
+  thread server_thread([&] () {
+      scoped_refptr<Trace> t(new Trace());
+      ADOPT_TRACE(t.get());
+      server_status = server_negotiation.Negotiate();
+      // Close the socket so that the client will not block forever on error.
+      server_negotiation.socket()->Close();
+
+      if (FLAGS_rpc_trace_negotiation || !server_status.ok()) {
+        string msg = Trace::CurrentTrace()->DumpToString();
+        if (!server_status.ok()) {
+          LOG(WARNING) << "Failed server RPC negotiation. Server trace:\n" << msg;
+        } else {
+          LOG(INFO) << "RPC negotiation tracing enabled. Server trace:\n" << msg;
+        }
+      }
+  });
+  client_thread.join();
+  server_thread.join();
+
+  // Check the negotiation outcome against the expected outcome.
+  EXPECT_EQ(desc.client_status.CodeAsString(), client_status.CodeAsString());
+  EXPECT_EQ(desc.server_status.CodeAsString(), server_status.CodeAsString());
+  ASSERT_STR_MATCHES(client_status.ToString(), desc.client_status.ToString());
+  ASSERT_STR_MATCHES(server_status.ToString(), desc.server_status.ToString());
+
+  if (client_status.ok()) {
+    EXPECT_TRUE(server_status.ok());
+
+    // Make sure the negotiations agree with the expected values.
+    EXPECT_EQ(desc.negotiated_authn, client_negotiation.negotiated_authn());
+    EXPECT_EQ(desc.negotiated_mech, client_negotiation.negotiated_mechanism());
+    EXPECT_EQ(desc.negotiated_authn, server_negotiation.negotiated_authn());
+    EXPECT_EQ(desc.negotiated_mech, server_negotiation.negotiated_mechanism());
+    EXPECT_EQ(desc.tls_negotiated, server_negotiation.tls_negotiated());
+    EXPECT_EQ(desc.tls_negotiated, server_negotiation.tls_negotiated());
+
+    bool client_tls_socket = dynamic_cast<security::TlsSocket*>(client_negotiation.socket());
+    bool server_tls_socket = dynamic_cast<security::TlsSocket*>(server_negotiation.socket());
+    EXPECT_EQ(desc.rpc_encrypt_loopback, client_tls_socket);
+    EXPECT_EQ(desc.rpc_encrypt_loopback, server_tls_socket);
+
+    // Check that the expected user subject is authenticated.
+    RemoteUser remote_user = server_negotiation.take_authenticated_user();
+    switch (server_negotiation.negotiated_authn()) {
+      case AuthenticationType::SASL:
+        switch (server_negotiation.negotiated_mechanism()) {
+          case SaslMechanism::PLAIN:
+            EXPECT_EQ("client-plain", remote_user.username());
+            break;
+          case SaslMechanism::GSSAPI:
+            EXPECT_EQ("client-gssapi", remote_user.username());
+            EXPECT_EQ("client-gssapi@KRBTEST.COM", remote_user.principal().value_or(""));
+            break;
+          case SaslMechanism::INVALID: LOG(FATAL) << "invalid mechanism negotiated";
+        }
+        break;
+      case AuthenticationType::CERTIFICATE: {
+        // We expect the cert to be using the local username, because it hasn't
+        // logged in from any Keytab.
+        string expected;
+        CHECK_OK(GetLoggedInUser(&expected));
+        EXPECT_EQ(expected, remote_user.username());
+        EXPECT_FALSE(remote_user.principal());
+        break;
+      }
+      case AuthenticationType::TOKEN:
+        EXPECT_EQ("client-token", remote_user.username());
+        break;
+      case AuthenticationType::INVALID: LOG(FATAL) << "invalid authentication negotiated";
+    }
+  }
+}
+
+INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
+                        TestNegotiation,
+                        ::testing::Values(
+
+        // client: no authn/mechs
+        // server: no authn/mechs
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            {},
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            {},
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          false,
+          false,
+          Status::NotAuthorized(".*client is not configured with an authentication type"),
+          Status::NetworkError(""),
+          AuthenticationType::INVALID,
+          SaslMechanism::INVALID,
+          false,
+        },
+
+        // client: PLAIN
+        // server: no authn/mechs
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            {},
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          false,
+          false,
+          Status::NotAuthorized(".* server mechanism list is empty"),
+          Status::NotAuthorized(".* server mechanism list is empty"),
+          AuthenticationType::INVALID,
+          SaslMechanism::INVALID,
+          false,
+        },
+
+        // client: PLAIN
+        // server: PLAIN
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::OPTIONAL
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::DISABLED,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::PLAIN,
+          false,
+        },
+
+        // client: GSSAPI
+        // server: GSSAPI
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::DISABLED,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          false,
+        },
+
+        // client: GSSAPI, PLAIN
+        // server: GSSAPI, PLAIN
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::DISABLED,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          false,
+        },
+
+        // client: GSSAPI, PLAIN
+        // server: GSSAPI
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::DISABLED,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          false,
+        },
+
+        // client: PLAIN
+        // server: GSSAPI
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::DISABLED,
+          },
+          false,
+          false,
+          Status::NotAuthorized(".*client does not have Kerberos enabled"),
+          Status::NetworkError(""),
+          AuthenticationType::INVALID,
+          SaslMechanism::INVALID,
+          false,
+        },
+
+        // client: GSSAPI,
+        // server: GSSAPI, self-signed cert
+        // loopback encryption
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::SELF_SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          false,
+          true,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          true,
+        },
+
+        // client: GSSAPI, signed-cert
+        // server: GSSAPI, self-signed cert
+        // This tests that the server will not advertise CERTIFICATE authentication,
+        // since it doesn't have a trusted cert.
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::SELF_SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          true,
+        },
+
+        // client: PLAIN,
+        // server: PLAIN, self-signed cert
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::SELF_SIGNED,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::PLAIN,
+          true,
+        },
+
+        // client: signed-cert
+        // server: signed-cert
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::CERTIFICATE,
+          SaslMechanism::INVALID,
+          true,
+        },
+
+        // client: token, trusted cert
+        // server: token, signed-cert, GSSAPI
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::TRUSTED,
+            { },
+            true,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::PLAIN },
+            true,
+            RpcEncryption::OPTIONAL,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::TOKEN,
+          SaslMechanism::INVALID,
+          true,
+        },
+
+        // client: PLAIN, token
+        // server: PLAIN, token, signed cert
+        // Test that the client won't negotiate token authn if it doesn't have a
+        // trusted cert. We aren't expecting this to happen in practice (the
+        // token and trusted CA cert should come as a package).
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            true,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::PLAIN },
+            true,
+            RpcEncryption::OPTIONAL,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::PLAIN,
+          true,
+        },
+
+        // client: PLAIN, GSSAPI, signed-cert, token
+        // server: PLAIN, GSSAPI, signed-cert, token
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::PLAIN, SaslMechanism::GSSAPI },
+            true,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::PLAIN, SaslMechanism::GSSAPI },
+            true,
+            RpcEncryption::OPTIONAL,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::CERTIFICATE,
+          SaslMechanism::INVALID,
+          true,
+        },
+
+        // client: PLAIN, TLS disabled
+        // server: PLAIN, TLS required
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::DISABLED,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          false,
+          false,
+          Status::NotAuthorized(".*client does not support required TLS encryption"),
+          Status::NotAuthorized(".*client does not support required TLS encryption"),
+          AuthenticationType::SASL,
+          SaslMechanism::PLAIN,
+          true,
+        },
+
+        // client: PLAIN, TLS required
+        // server: PLAIN, TLS disabled
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::DISABLED,
+          },
+          false,
+          false,
+          Status::NotAuthorized(".*server does not support required TLS encryption"),
+          Status::NetworkError(""),
+          AuthenticationType::SASL,
+          SaslMechanism::PLAIN,
+          true,
+        },
+
+        // client: GSSAPI, TLS required, externally-signed cert
+        // server: GSSAPI, TLS required, externally-signed cert
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::EXTERNALLY_SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          EndpointConfig {
+            PkiConfig::EXTERNALLY_SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          true,
+        },
+
+        // client: GSSAPI, TLS optional, externally-signed cert
+        // server: GSSAPI, TLS required, signed cert
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::EXTERNALLY_SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::OPTIONAL,
+          },
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          true,
+        },
+
+        // client: GSSAPI, TLS required
+        // server: GSSAPI, TLS required, externally-signed cert
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::NONE,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          EndpointConfig {
+            PkiConfig::EXTERNALLY_SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          true,
+        },
+
+        // client: GSSAPI, PLAIN, TLS required, externally-signed cert
+        // server: PLAIN, TLS required, externally-signed cert
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::EXTERNALLY_SIGNED,
+            { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          EndpointConfig {
+            PkiConfig::EXTERNALLY_SIGNED,
+            { SaslMechanism::PLAIN },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::PLAIN,
+          true,
+        },
+
+        // client: GSSAPI, TLS disabled, signed cert
+        // server: GSSAPI, TLS required, externally-signed cert
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::DISABLED,
+          },
+          EndpointConfig {
+            PkiConfig::EXTERNALLY_SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          false,
+          false,
+          Status::NotAuthorized(".*client does not support required TLS encryption"),
+          Status::NotAuthorized(".*client does not support required TLS encryption"),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          true,
+        },
+
+        // client: GSSAPI, TLS required, signed cert
+        // server: GSSAPI, TLS required, externally-signed cert
+        NegotiationDescriptor {
+          EndpointConfig {
+            PkiConfig::SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          EndpointConfig {
+            PkiConfig::EXTERNALLY_SIGNED,
+            { SaslMechanism::GSSAPI },
+            false,
+            RpcEncryption::REQUIRED,
+          },
+          false,
+          false,
+          Status::OK(),
+          Status::OK(),
+          AuthenticationType::SASL,
+          SaslMechanism::GSSAPI,
+          true,
+        },
+
+        // client: PLAIN
+        // server: PLAIN
+        // connection from public routable IP
+        NegotiationDescriptor {
+            EndpointConfig {
+                PkiConfig::NONE,
+                { SaslMechanism::PLAIN },
+                false,
+                RpcEncryption::OPTIONAL
+            },
+            EndpointConfig {
+                PkiConfig::NONE,
+                { SaslMechanism::PLAIN },
+                false,
+                RpcEncryption::OPTIONAL
+            },
+            true,
+            false,
+            Status::NotAuthorized(".*unencrypted connections from publicly routable IPs"),
+            Status::NotAuthorized(".*unencrypted connections from publicly routable IPs"),
+            AuthenticationType::SASL,
+            SaslMechanism::PLAIN,
+            false,
+        },
+
+        // client: GSSAPI, TLS required, externally-signed cert
+        // server: GSSAPI, TLS required, externally-signed cert
+        // connection from public routable IP
+        NegotiationDescriptor {
+            EndpointConfig {
+                PkiConfig::EXTERNALLY_SIGNED,
+                { SaslMechanism::GSSAPI },
+                false,
+                RpcEncryption::REQUIRED,
+            },
+            EndpointConfig {
+                PkiConfig::EXTERNALLY_SIGNED,
+                { SaslMechanism::GSSAPI },
+                false,
+                RpcEncryption::REQUIRED,
+            },
+            true,
+            // true as no longer a loopback connection.
+            true,
+            Status::OK(),
+            Status::OK(),
+            AuthenticationType::SASL,
+            SaslMechanism::GSSAPI,
+            true,
+        }
+));
+
+// A "Callable" that takes a socket for use with starting a thread.
+// Can be used for ServerNegotiation or ClientNegotiation threads.
+typedef std::function<void(unique_ptr<Socket>)> SocketCallable;
+
+// Call Accept() on the socket, then pass the connection to the server runner
+static void RunAcceptingDelegator(Socket* acceptor,
+                                  const SocketCallable& server_runner) {
+  unique_ptr<Socket> conn(new Socket());
+  Sockaddr remote;
+  CHECK_OK(acceptor->Accept(conn.get(), &remote, 0));
+  server_runner(std::move(conn));
+}
+
+// Set up a socket and run a negotiation sequence.
+static void RunNegotiationTest(const SocketCallable& server_runner,
+                               const SocketCallable& client_runner) {
+  Socket server_sock;
+  CHECK_OK(server_sock.Init(0));
+  ASSERT_OK(server_sock.BindAndListen(Sockaddr(), 1));
+  Sockaddr server_bind_addr;
+  ASSERT_OK(server_sock.GetSocketAddress(&server_bind_addr));
+  thread server(RunAcceptingDelegator, &server_sock, server_runner);
+
+  unique_ptr<Socket> client_sock(new Socket());
+  CHECK_OK(client_sock->Init(0));
+  ASSERT_OK(client_sock->Connect(server_bind_addr));
+  thread client(client_runner, std::move(client_sock));
+
+  LOG(INFO) << "Waiting for test threads to terminate...";
+  client.join();
+  LOG(INFO) << "Client thread terminated.";
+
+  server.join();
+  LOG(INFO) << "Server thread terminated.";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+#ifndef __APPLE__
+template<class T>
+using CheckerFunction = std::function<void(const Status&, T&)>;
+
+// Run GSSAPI negotiation from the server side. Runs
+// 'post_check' after negotiation to verify the result.
+static void RunGSSAPINegotiationServer(unique_ptr<Socket> socket,
+                                       const CheckerFunction<ServerNegotiation>& post_check) {
+  TlsContext tls_context;
+  CHECK_OK(tls_context.Init());
+  TokenVerifier token_verifier;
+  ServerNegotiation server_negotiation(std::move(socket), &tls_context,
+                                       &token_verifier, RpcEncryption::OPTIONAL);
+  server_negotiation.set_server_fqdn("127.0.0.1");
+  CHECK_OK(server_negotiation.EnableGSSAPI());
+  post_check(server_negotiation.Negotiate(), server_negotiation);
+}
+
+// Run GSSAPI negotiation from the client side. Runs
+// 'post_check' after negotiation to verify the result.
+static void RunGSSAPINegotiationClient(unique_ptr<Socket> conn,
+                                       const CheckerFunction<ClientNegotiation>& post_check) {
+  TlsContext tls_context;
+  CHECK_OK(tls_context.Init());
+  ClientNegotiation client_negotiation(std::move(conn), &tls_context,
+                                       boost::none, RpcEncryption::OPTIONAL);
+  client_negotiation.set_server_fqdn("127.0.0.1");
+  CHECK_OK(client_negotiation.EnableGSSAPI());
+  post_check(client_negotiation.Negotiate(), client_negotiation);
+}
+
+// Test invalid SASL negotiations using the GSSAPI (kerberos) mechanism over a socket.
+// This test is ignored on macOS because the system Kerberos implementation
+// (Heimdal) caches the non-existence of client credentials, which causes futher
+// tests to fail.
+TEST_F(TestNegotiation, TestGSSAPIInvalidNegotiation) {
+  MiniKdc kdc;
+  ASSERT_OK(kdc.Start());
+
+  // Try to negotiate with no krb5 credentials on either side. It should fail on both
+  // sides.
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, ServerNegotiation& server) {
+                  // The client notices there are no credentials and
+                  // doesn't send any failure message to the server.
+                  // Instead, it just disconnects.
+                  //
+                  // TODO(todd): it might be preferable to have the server
+                  // fail to start if it has no valid keytab.
+                  CHECK(s.IsNetworkError());
+                }),
+      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+                [](const Status& s, ClientNegotiation& client) {
+                  CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+                  CHECK_GT(s.ToString().find("No Kerberos credentials available"), 0);
+#endif
+                }));
+
+
+  // Create the server principal and keytab.
+  string kt_path;
+  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  // Try to negotiate with no krb5 credentials on the client. It should fail on both
+  // sides.
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, ServerNegotiation& server) {
+                  // The client notices there are no credentials and
+                  // doesn't send any failure message to the server.
+                  // Instead, it just disconnects.
+                  CHECK(s.IsNetworkError());
+                }),
+      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+                [](const Status& s, ClientNegotiation& client) {
+                  CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+                  ASSERT_STR_MATCHES(s.ToString(),
+                                     "Not authorized: No Kerberos credentials available.*");
+#endif
+                }));
+
+  // Create and kinit as a client user.
+  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
+  ASSERT_OK(kdc.Kinit("testuser"));
+  ASSERT_OK(kdc.SetKrb5Environment());
+
+  // Change the server's keytab file so that it has inappropriate
+  // credentials.
+  // Authentication should now fail.
+  ASSERT_OK(kdc.CreateServiceKeytab("otherservice/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, ServerNegotiation& server) {
+                  CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+                  ASSERT_STR_CONTAINS(s.ToString(),
+                                      "No key table entry found matching kudu/127.0.0.1");
+#endif
+                }),
+      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+                [](const Status& s, ClientNegotiation& client) {
+                  CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+                  ASSERT_STR_CONTAINS(s.ToString(),
+                                      "No key table entry found matching kudu/127.0.0.1");
+#endif
+                }));
+}
+#endif
+
+#ifndef __APPLE__
+// Test that the pre-flight check for servers requiring Kerberos provides
+// nice error messages for missing or bad keytabs.
+//
+// This is ignored on macOS because the system Kerberos implementation does not
+// fail the preflight check when the keytab is inaccessible, probably because
+// the preflight check passes a 0-length token.
+TEST_F(TestNegotiation, TestPreflight) {
+  // Try pre-flight with no keytab.
+  Status s = ServerNegotiation::PreflightCheckGSSAPI();
+  ASSERT_FALSE(s.ok());
+#ifndef KRB5_VERSION_LE_1_10
+  ASSERT_STR_MATCHES(s.ToString(), "Key table file.*not found");
+#endif
+  // Try with a valid krb5 environment and keytab.
+  MiniKdc kdc;
+  ASSERT_OK(kdc.Start());
+  ASSERT_OK(kdc.SetKrb5Environment());
+  string kt_path;
+  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  ASSERT_OK(ServerNegotiation::PreflightCheckGSSAPI());
+
+  // Try with an inaccessible keytab.
+  CHECK_ERR(chmod(kt_path.c_str(), 0000));
+  s = ServerNegotiation::PreflightCheckGSSAPI();
+  ASSERT_FALSE(s.ok());
+#ifndef KRB5_VERSION_LE_1_10
+  ASSERT_STR_MATCHES(s.ToString(), "error accessing keytab: Permission denied");
+#endif
+  CHECK_ERR(unlink(kt_path.c_str()));
+
+  // Try with a keytab that has the wrong credentials.
+  ASSERT_OK(kdc.CreateServiceKeytab("wrong-service/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+  s = ServerNegotiation::PreflightCheckGSSAPI();
+  ASSERT_FALSE(s.ok());
+#ifndef KRB5_VERSION_LE_1_10
+  ASSERT_STR_MATCHES(s.ToString(), "No key table entry found matching kudu/.*");
+#endif
+}
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void RunTimeoutExpectingServer(unique_ptr<Socket> socket) {
+  TlsContext tls_context;
+  CHECK_OK(tls_context.Init());
+  TokenVerifier token_verifier;
+  ServerNegotiation server_negotiation(std::move(socket), &tls_context,
+                                       &token_verifier, RpcEncryption::OPTIONAL);
+  CHECK_OK(server_negotiation.EnablePlain());
+  Status s = server_negotiation.Negotiate();
+  ASSERT_TRUE(s.IsNetworkError()) << "Expected client to time out and close the connection. Got: "
+                                  << s.ToString();
+}
+
+static void RunTimeoutNegotiationClient(unique_ptr<Socket> sock) {
+  TlsContext tls_context;
+  CHECK_OK(tls_context.Init());
+  ClientNegotiation client_negotiation(std::move(sock), &tls_context,
+                                       boost::none, RpcEncryption::OPTIONAL);
+  CHECK_OK(client_negotiation.EnablePlain("test", "test"));
+  MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
+  client_negotiation.set_deadline(deadline);
+  Status s = client_negotiation.Negotiate();
+  ASSERT_TRUE(s.IsTimedOut()) << "Expected timeout! Got: " << s.ToString();
+  CHECK_OK(client_negotiation.socket()->Shutdown(true, true));
+}
+
+// Ensure that the client times out.
+TEST_F(TestNegotiation, TestClientTimeout) {
+  RunNegotiationTest(RunTimeoutExpectingServer, RunTimeoutNegotiationClient);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void RunTimeoutNegotiationServer(unique_ptr<Socket> socket) {
+  TlsContext tls_context;
+  CHECK_OK(tls_context.Init());
+  TokenVerifier token_verifier;
+  ServerNegotiation server_negotiation(std::move(socket), &tls_context,
+                                       &token_verifier, RpcEncryption::OPTIONAL);
+  CHECK_OK(server_negotiation.EnablePlain());
+  MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
+  server_negotiation.set_deadline(deadline);
+  Status s = server_negotiation.Negotiate();
+  ASSERT_TRUE(s.IsTimedOut()) << "Expected timeout! Got: " << s.ToString();
+  CHECK_OK(server_negotiation.socket()->Close());
+}
+
+static void RunTimeoutExpectingClient(unique_ptr<Socket> socket) {
+  TlsContext tls_context;
+  CHECK_OK(tls_context.Init());
+  ClientNegotiation client_negotiation(std::move(socket), &tls_context,
+                                       boost::none, RpcEncryption::OPTIONAL);
+  CHECK_OK(client_negotiation.EnablePlain("test", "test"));
+  Status s = client_negotiation.Negotiate();
+  ASSERT_TRUE(s.IsNetworkError()) << "Expected server to time out and close the connection. Got: "
+      << s.ToString();
+}
+
+// Ensure that the server times out.
+TEST_F(TestNegotiation, TestServerTimeout) {
+  RunNegotiationTest(RunTimeoutNegotiationServer, RunTimeoutExpectingClient);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// This suite of tests ensure that applications that embed the Kudu client are
+// able to externally handle the initialization of SASL. See KUDU-1749 and
+// IMPALA-4497 for context.
+//
+// The tests are a bit tricky because the initialization of SASL is static state
+// that we can't easily clear/reset between test cases. So, each test invokes
+// itself as a subprocess with the appropriate --gtest_filter line as well as a
+// special flag to indicate that it is the test child running.
+class TestDisableInit : public KuduTest {
+ protected:
+  // Run the lambda 'f' in a newly-started process, capturing its stderr
+  // into 'stderr'.
+  template<class TestFunc>
+  void DoTest(const TestFunc& f, string* stderr = nullptr) {
+    if (FLAGS_is_test_child) {
+      f();
+      return;
+    }
+
+    // Invoke the currently-running test case in a new subprocess.
+    string filter_flag = strings::Substitute("--gtest_filter=$0.$1",
+                                             CURRENT_TEST_CASE_NAME(), CURRENT_TEST_NAME());
+    string executable_path;
+    CHECK_OK(env_->GetExecutablePath(&executable_path));
+    string stdout;
+    Status s = Subprocess::Call({ executable_path, "test", filter_flag, "--is_test_child" },
+                                "" /* stdin */,
+                                &stdout,
+                                stderr);
+    ASSERT_TRUE(s.ok()) << "Test failed: " << stdout;
+  }
+};
+
+// Test disabling SASL but not actually properly initializing it before usage.
+TEST_F(TestDisableInit, TestDisableSasl_NotInitialized) {
+  DoTest([]() {
+      CHECK_OK(DisableSaslInitialization());
+      Status s = SaslInit();
+      ASSERT_STR_CONTAINS(s.ToString(), "was disabled, but SASL was not externally initialized");
+    });
+}
+
+// Test disabling SASL with proper initialization by some other app.
+TEST_F(TestDisableInit, TestDisableSasl_Good) {
+  DoTest([]() {
+      rpc::internal::SaslSetMutex();
+      sasl_client_init(NULL);
+      CHECK_OK(DisableSaslInitialization());
+      ASSERT_OK(SaslInit());
+    });
+}
+
+// Test a client which inits SASL itself but doesn't remember to disable Kudu's
+// SASL initialization.
+TEST_F(TestDisableInit, TestMultipleSaslInit) {
+  string stderr;
+  DoTest([]() {
+      rpc::internal::SaslSetMutex();
+      sasl_client_init(NULL);
+      ASSERT_OK(SaslInit());
+    }, &stderr);
+  // If we are the parent, we should see the warning from the child that it automatically
+  // skipped initialization because it detected that it was already initialized.
+  if (!FLAGS_is_test_child) {
+    ASSERT_STR_CONTAINS(stderr, "Skipping initialization");
+  }
+}
+
+// We are not able to detect mutexes not being set with the macOS version of libsasl.
+#ifndef __APPLE__
+// Test disabling SASL but not remembering to initialize the SASL mutex support. This
+// should succeed but generate a warning.
+TEST_F(TestDisableInit, TestDisableSasl_NoMutexImpl) {
+  string stderr;
+  DoTest([]() {
+      sasl_client_init(NULL);
+      CHECK_OK(DisableSaslInitialization());
+      ASSERT_OK(SaslInit());
+    }, &stderr);
+  // If we are the parent, we should see the warning from the child.
+  if (!FLAGS_is_test_child) {
+    ASSERT_STR_CONTAINS(stderr, "not provided with a mutex implementation");
+  }
+}
+
+// Test a client which inits SASL itself but doesn't remember to disable Kudu's
+// SASL initialization.
+TEST_F(TestDisableInit, TestMultipleSaslInit_NoMutexImpl) {
+  string stderr;
+  DoTest([]() {
+      sasl_client_init(NULL);
+      ASSERT_OK(SaslInit());
+    }, &stderr);
+  // If we are the parent, we should see the warning from the child that it automatically
+  // skipped initialization because it detected that it was already initialized.
+  if (!FLAGS_is_test_child) {
+    ASSERT_STR_CONTAINS(stderr, "Skipping initialization");
+    ASSERT_STR_CONTAINS(stderr, "not provided with a mutex implementation");
+  }
+}
+#endif
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/negotiation.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/negotiation.cc b/be/src/kudu/rpc/negotiation.cc
new file mode 100644
index 0000000..66a0112
--- /dev/null
+++ b/be/src/kudu/rpc/negotiation.cc
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/negotiation.h"
+
+#include <poll.h>
+#include <sys/time.h>
+
+#include <memory>
+#include <ostream>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/blocking_ops.h"
+#include "kudu/rpc/client_negotiation.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/reactor.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/status.h"
+#include "kudu/util/trace.h"
+
+DEFINE_bool(rpc_trace_negotiation, false,
+            "If enabled, dump traces of all RPC negotiations to the log");
+TAG_FLAG(rpc_trace_negotiation, runtime);
+TAG_FLAG(rpc_trace_negotiation, advanced);
+TAG_FLAG(rpc_trace_negotiation, experimental);
+
+DEFINE_int32(rpc_negotiation_inject_delay_ms, 0,
+             "If enabled, injects the given number of milliseconds delay into "
+             "the RPC negotiation process on the server side.");
+TAG_FLAG(rpc_negotiation_inject_delay_ms, unsafe);
+
+DECLARE_string(keytab_file);
+DECLARE_string(rpc_certificate_file);
+
+DEFINE_bool(rpc_encrypt_loopback_connections, false,
+            "Whether to encrypt data transfer on RPC connections that stay within "
+            "a single host. Encryption here is likely to offer no additional "
+            "security benefit since only a local 'root' user could intercept the "
+            "traffic, and wire encryption does not suitably protect against such "
+            "an attacker.");
+TAG_FLAG(rpc_encrypt_loopback_connections, advanced);
+
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+const char* AuthenticationTypeToString(AuthenticationType t) {
+  switch (t) {
+    case AuthenticationType::INVALID: return "INVALID"; break;
+    case AuthenticationType::SASL: return "SASL"; break;
+    case AuthenticationType::TOKEN: return "TOKEN"; break;
+    case AuthenticationType::CERTIFICATE: return "CERTIFICATE"; break;
+  }
+  return "<cannot reach here>";
+}
+
+std::ostream& operator<<(std::ostream& o, AuthenticationType authentication_type) {
+  return o << AuthenticationTypeToString(authentication_type);
+}
+
+// Wait for the client connection to be established and become ready for writing.
+static Status WaitForClientConnect(Socket* socket, const MonoTime& deadline) {
+  TRACE("Waiting for socket to connect");
+  int fd = socket->GetFd();
+  struct pollfd poll_fd;
+  poll_fd.fd = fd;
+  poll_fd.events = POLLOUT;
+  poll_fd.revents = 0;
+
+  MonoTime now;
+  MonoDelta remaining;
+  while (true) {
+    now = MonoTime::Now();
+    remaining = deadline - now;
+    DVLOG(4) << "Client waiting to connect for negotiation, time remaining until timeout deadline: "
+             << remaining.ToString();
+    if (PREDICT_FALSE(remaining.ToNanoseconds() <= 0)) {
+      return Status::TimedOut("Timeout exceeded waiting to connect");
+    }
+#if defined(__linux__)
+    struct timespec ts;
+    remaining.ToTimeSpec(&ts);
+    int ready = ppoll(&poll_fd, 1, &ts, NULL);
+#else
+    int ready = poll(&poll_fd, 1, remaining.ToMilliseconds());
+#endif
+    if (ready == -1) {
+      int err = errno;
+      if (err == EINTR) {
+        // We were interrupted by a signal, let's go again.
+        continue;
+      } else {
+        return Status::NetworkError("Error from ppoll() while waiting to connect",
+            ErrnoToString(err), err);
+      }
+    } else if (ready == 0) {
+      // Timeout exceeded. Loop back to the top to our impending doom.
+      continue;
+    } else {
+      // Success.
+      break;
+    }
+  }
+
+  // Connect finished, but this doesn't mean that we connected successfully.
+  // Check the socket for an error.
+  int so_error = 0;
+  socklen_t socklen = sizeof(so_error);
+  int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &socklen);
+  if (rc != 0) {
+    return Status::NetworkError("Unable to check connected socket for errors",
+                                ErrnoToString(errno),
+                                errno);
+  }
+  if (so_error != 0) {
+    return Status::NetworkError("connect", ErrnoToString(so_error), so_error);
+  }
+
+  return Status::OK();
+}
+
+// Disable / reset socket timeouts.
+static Status DisableSocketTimeouts(Socket* socket) {
+  RETURN_NOT_OK(socket->SetSendTimeout(MonoDelta::FromNanoseconds(0L)));
+  RETURN_NOT_OK(socket->SetRecvTimeout(MonoDelta::FromNanoseconds(0L)));
+  return Status::OK();
+}
+
+// Perform client negotiation. We don't LOG() anything, we leave that to our caller.
+static Status DoClientNegotiation(Connection* conn,
+                                  RpcAuthentication authentication,
+                                  RpcEncryption encryption,
+                                  MonoTime deadline,
+                                  unique_ptr<ErrorStatusPB>* rpc_error) {
+  const auto* messenger = conn->reactor_thread()->reactor()->messenger();
+  // Prefer secondary credentials (such as authn token) if permitted by policy.
+  const auto authn_token = (conn->credentials_policy() == CredentialsPolicy::PRIMARY_CREDENTIALS)
+      ? boost::none : messenger->authn_token();
+  ClientNegotiation client_negotiation(conn->release_socket(),
+                                       &messenger->tls_context(),
+                                       authn_token,
+                                       encryption);
+
+  // Note that the fqdn is an IP address here: we've already lost whatever DNS
+  // name the client was attempting to use. Unless krb5 is configured with 'rdns
+  // = false', it will automatically take care of reversing this address to its
+  // canonical hostname to determine the expected server principal.
+  client_negotiation.set_server_fqdn(conn->remote().host());
+
+  if (authentication != RpcAuthentication::DISABLED) {
+    Status s = client_negotiation.EnableGSSAPI();
+    if (!s.ok()) {
+      // If we can't enable GSSAPI, it's likely the client is just missing the
+      // appropriate SASL plugin. We don't want to require it to be installed
+      // if the user doesn't care about connecting to servers using Kerberos
+      // authentication. So, we'll just VLOG this here. If we try to connect
+      // to a server which requires Kerberos, we'll get a negotiation error
+      // at that point.
+      if (VLOG_IS_ON(1)) {
+        KLOG_FIRST_N(INFO, 1) << "Couldn't enable GSSAPI (Kerberos) SASL plugin: "
+                              << s.message().ToString()
+                              << ". This process will be unable to connect to "
+                              << "servers requiring Kerberos authentication.";
+      }
+
+      if (authentication == RpcAuthentication::REQUIRED &&
+          !authn_token &&
+          !messenger->tls_context().has_signed_cert()) {
+        return Status::InvalidArgument(
+            "Kerberos, token, or PKI certificate credentials must be provided in order to "
+            "require authentication for a client");
+      }
+    }
+  }
+
+  if (authentication != RpcAuthentication::REQUIRED) {
+    RETURN_NOT_OK(client_negotiation.EnablePlain(conn->local_user_credentials().real_user(), ""));
+  }
+
+  client_negotiation.set_deadline(deadline);
+
+  RETURN_NOT_OK(WaitForClientConnect(client_negotiation.socket(), deadline));
+  RETURN_NOT_OK(client_negotiation.socket()->SetNonBlocking(false));
+  RETURN_NOT_OK(client_negotiation.Negotiate(rpc_error));
+  RETURN_NOT_OK(DisableSocketTimeouts(client_negotiation.socket()));
+
+  // Transfer the negotiated socket and state back to the connection.
+  conn->adopt_socket(client_negotiation.release_socket());
+  conn->set_remote_features(client_negotiation.take_server_features());
+
+  // Sanity check: if no authn token was supplied as user credentials,
+  // the negotiated authentication type cannot be AuthenticationType::TOKEN.
+  DCHECK(!(authn_token == boost::none &&
+           client_negotiation.negotiated_authn() == AuthenticationType::TOKEN));
+
+  return Status::OK();
+}
+
+// Perform server negotiation. We don't LOG() anything, we leave that to our caller.
+static Status DoServerNegotiation(Connection* conn,
+                                  RpcAuthentication authentication,
+                                  RpcEncryption encryption,
+                                  const MonoTime& deadline) {
+  if (authentication == RpcAuthentication::REQUIRED &&
+      FLAGS_keytab_file.empty() &&
+      FLAGS_rpc_certificate_file.empty()) {
+    return Status::InvalidArgument("RPC authentication (--rpc_authentication) may not be "
+                                   "required unless Kerberos (--keytab_file) or external PKI "
+                                   "(--rpc_certificate_file et al) are configured");
+  }
+
+  if (FLAGS_rpc_negotiation_inject_delay_ms > 0) {
+    LOG(WARNING) << "Injecting " << FLAGS_rpc_negotiation_inject_delay_ms
+                 << "ms delay in negotiation";
+    SleepFor(MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_inject_delay_ms));
+  }
+
+  // Create a new ServerNegotiation to handle the synchronous negotiation.
+  const auto* messenger = conn->reactor_thread()->reactor()->messenger();
+  ServerNegotiation server_negotiation(conn->release_socket(),
+                                       &messenger->tls_context(),
+                                       &messenger->token_verifier(),
+                                       encryption);
+
+  if (authentication != RpcAuthentication::DISABLED && !FLAGS_keytab_file.empty()) {
+    RETURN_NOT_OK(server_negotiation.EnableGSSAPI());
+  }
+  if (authentication != RpcAuthentication::REQUIRED) {
+    RETURN_NOT_OK(server_negotiation.EnablePlain());
+  }
+
+  server_negotiation.set_deadline(deadline);
+
+  RETURN_NOT_OK(server_negotiation.socket()->SetNonBlocking(false));
+
+  RETURN_NOT_OK(server_negotiation.Negotiate());
+  RETURN_NOT_OK(DisableSocketTimeouts(server_negotiation.socket()));
+
+  // Transfer the negotiated socket and state back to the connection.
+  conn->adopt_socket(server_negotiation.release_socket());
+  conn->set_remote_features(server_negotiation.take_client_features());
+  conn->set_remote_user(server_negotiation.take_authenticated_user());
+
+  return Status::OK();
+}
+
+void Negotiation::RunNegotiation(const scoped_refptr<Connection>& conn,
+                                 RpcAuthentication authentication,
+                                 RpcEncryption encryption,
+                                 MonoTime deadline) {
+  Status s;
+  unique_ptr<ErrorStatusPB> rpc_error;
+  if (conn->direction() == Connection::SERVER) {
+    s = DoServerNegotiation(conn.get(), authentication, encryption, deadline);
+  } else {
+    s = DoClientNegotiation(conn.get(), authentication, encryption, deadline,
+                            &rpc_error);
+  }
+
+  if (PREDICT_FALSE(!s.ok())) {
+    string msg = Substitute("$0 connection negotiation failed: $1",
+                            conn->direction() == Connection::SERVER ? "Server" : "Client",
+                            conn->ToString());
+    s = s.CloneAndPrepend(msg);
+  }
+  TRACE("Negotiation complete: $0", s.ToString());
+
+  bool is_bad = !s.ok() && !(
+      (s.IsNetworkError() && s.posix_code() == ECONNREFUSED) ||
+      s.IsNotAuthorized());
+
+  if (is_bad || FLAGS_rpc_trace_negotiation) {
+    string msg = Trace::CurrentTrace()->DumpToString();
+    if (is_bad) {
+      LOG(WARNING) << "Failed RPC negotiation. Trace:\n" << msg;
+    } else {
+      LOG(INFO) << "RPC negotiation tracing enabled. Trace:\n" << msg;
+    }
+  }
+
+  if (conn->direction() == Connection::SERVER && s.IsNotAuthorized()) {
+    LOG(WARNING) << "Unauthorized connection attempt: " << s.message().ToString();
+  }
+  conn->CompleteNegotiation(std::move(s), std::move(rpc_error));
+}
+
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/negotiation.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/negotiation.h b/be/src/kudu/rpc/negotiation.h
new file mode 100644
index 0000000..2ca459b
--- /dev/null
+++ b/be/src/kudu/rpc/negotiation.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_NEGOTIATION_H
+#define KUDU_RPC_NEGOTIATION_H
+
+#include <iosfwd>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+namespace rpc {
+
+class Connection;
+enum class RpcAuthentication;
+enum class RpcEncryption;
+
+enum class AuthenticationType {
+  INVALID,
+  SASL,
+  TOKEN,
+  CERTIFICATE,
+};
+const char* AuthenticationTypeToString(AuthenticationType t);
+
+std::ostream& operator<<(std::ostream& o, AuthenticationType authentication_type);
+
+class Negotiation {
+ public:
+
+  // Perform negotiation for a connection (either server or client)
+  static void RunNegotiation(const scoped_refptr<Connection>& conn,
+                             RpcAuthentication authentication,
+                             RpcEncryption encryption,
+                             MonoTime deadline);
+ private:
+  DISALLOW_IMPLICIT_CONSTRUCTORS(Negotiation);
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif // KUDU_RPC_NEGOTIATION_H

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/outbound_call.cc b/be/src/kudu/rpc/outbound_call.cc
new file mode 100644
index 0000000..af03f1c
--- /dev/null
+++ b/be/src/kudu/rpc/outbound_call.cc
@@ -0,0 +1,509 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <boost/functional/hash.hpp>
+#include <gflags/gflags.h>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/kernel_stack_watchdog.h"
+
+// 100M cycles should be about 50ms on a 2Ghz box. This should be high
+// enough that involuntary context switches don't trigger it, but low enough
+// that any serious blocking behavior on the reactor would.
+DEFINE_int64(rpc_callback_max_cycles, 100 * 1000 * 1000,
+             "The maximum number of cycles for which an RPC callback "
+             "should be allowed to run without emitting a warning."
+             " (Advanced debugging option)");
+TAG_FLAG(rpc_callback_max_cycles, advanced);
+TAG_FLAG(rpc_callback_max_cycles, runtime);
+
+using std::unique_ptr;
+
+namespace kudu {
+namespace rpc {
+
+using google::protobuf::Message;
+using strings::Substitute;
+
+static const double kMicrosPerSecond = 1000000.0;
+
+///
+/// OutboundCall
+///
+
+OutboundCall::OutboundCall(const ConnectionId& conn_id,
+                           const RemoteMethod& remote_method,
+                           google::protobuf::Message* response_storage,
+                           RpcController* controller,
+                           ResponseCallback callback)
+    : state_(READY),
+      remote_method_(remote_method),
+      conn_id_(conn_id),
+      callback_(std::move(callback)),
+      controller_(DCHECK_NOTNULL(controller)),
+      response_(DCHECK_NOTNULL(response_storage)) {
+  DVLOG(4) << "OutboundCall " << this << " constructed with state_: " << StateName(state_)
+           << " and RPC timeout: "
+           << (controller->timeout().Initialized() ? controller->timeout().ToString() : "none");
+  header_.set_call_id(kInvalidCallId);
+  remote_method.ToPB(header_.mutable_remote_method());
+  start_time_ = MonoTime::Now();
+
+  if (!controller_->required_server_features().empty()) {
+    required_rpc_features_.insert(RpcFeatureFlag::APPLICATION_FEATURE_FLAGS);
+  }
+
+  if (controller_->request_id_) {
+    header_.set_allocated_request_id(controller_->request_id_.release());
+  }
+}
+
+OutboundCall::~OutboundCall() {
+  DCHECK(IsFinished());
+  DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " << StateName(state_);
+}
+
+Status OutboundCall::SerializeTo(vector<Slice>* slices) {
+  DCHECK_LT(0, request_buf_.size())
+      << "Must call SetRequestPayload() before SerializeTo()";
+
+  const MonoDelta &timeout = controller_->timeout();
+  if (timeout.Initialized()) {
+    header_.set_timeout_millis(timeout.ToMilliseconds());
+  }
+
+  for (uint32_t feature : controller_->required_server_features()) {
+    header_.add_required_feature_flags(feature);
+  }
+
+  DCHECK_LE(0, sidecar_byte_size_);
+  serialization::SerializeHeader(
+      header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_);
+
+  slices->push_back(Slice(header_buf_));
+  slices->push_back(Slice(request_buf_));
+  for (const unique_ptr<RpcSidecar>& car : sidecars_) slices->push_back(car->AsSlice());
+  return Status::OK();
+}
+
+void OutboundCall::SetRequestPayload(const Message& req,
+    vector<unique_ptr<RpcSidecar>>&& sidecars) {
+  DCHECK_EQ(-1, sidecar_byte_size_);
+
+  sidecars_ = move(sidecars);
+
+  // Compute total size of sidecar payload so that extra space can be reserved as part of
+  // the request body.
+  uint32_t message_size = req.ByteSize();
+  sidecar_byte_size_ = 0;
+  for (const unique_ptr<RpcSidecar>& car: sidecars_) {
+    header_.add_sidecar_offsets(sidecar_byte_size_ + message_size);
+    sidecar_byte_size_ += car->AsSlice().size();
+  }
+
+  serialization::SerializeMessage(req, &request_buf_, sidecar_byte_size_, true);
+}
+
+Status OutboundCall::status() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return status_;
+}
+
+const ErrorStatusPB* OutboundCall::error_pb() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return error_pb_.get();
+}
+
+string OutboundCall::StateName(State state) {
+  switch (state) {
+    case READY:
+      return "READY";
+    case ON_OUTBOUND_QUEUE:
+      return "ON_OUTBOUND_QUEUE";
+    case SENDING:
+      return "SENDING";
+    case SENT:
+      return "SENT";
+    case NEGOTIATION_TIMED_OUT:
+      return "NEGOTIATION_TIMED_OUT";
+    case TIMED_OUT:
+      return "TIMED_OUT";
+    case FINISHED_NEGOTIATION_ERROR:
+      return "FINISHED_NEGOTIATION_ERROR";
+    case FINISHED_ERROR:
+      return "FINISHED_ERROR";
+    case FINISHED_SUCCESS:
+      return "FINISHED_SUCCESS";
+    default:
+      LOG(DFATAL) << "Unknown state in OutboundCall: " << state;
+      return StringPrintf("UNKNOWN(%d)", state);
+  }
+}
+
+void OutboundCall::set_state(State new_state) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  set_state_unlocked(new_state);
+}
+
+OutboundCall::State OutboundCall::state() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  return state_;
+}
+
+void OutboundCall::set_state_unlocked(State new_state) {
+  // Sanity check state transitions.
+  DVLOG(3) << "OutboundCall " << this << " (" << ToString() << ") switching from " <<
+    StateName(state_) << " to " << StateName(new_state);
+  switch (new_state) {
+    case ON_OUTBOUND_QUEUE:
+      DCHECK_EQ(state_, READY);
+      break;
+    case SENDING:
+      // Allow SENDING to be set idempotently so we don't have to specifically check
+      // whether the state is transitioning in the RPC code.
+      DCHECK(state_ == ON_OUTBOUND_QUEUE || state_ == SENDING);
+      break;
+    case SENT:
+      DCHECK_EQ(state_, SENDING);
+      break;
+    case NEGOTIATION_TIMED_OUT:
+      DCHECK(state_ == ON_OUTBOUND_QUEUE);
+      break;
+    case TIMED_OUT:
+      DCHECK(state_ == SENT || state_ == ON_OUTBOUND_QUEUE || state_ == SENDING);
+      break;
+    case FINISHED_SUCCESS:
+      DCHECK_EQ(state_, SENT);
+      break;
+    default:
+      // No sanity checks for others.
+      break;
+  }
+
+  state_ = new_state;
+}
+
+void OutboundCall::CallCallback() {
+  int64_t start_cycles = CycleClock::Now();
+  {
+    SCOPED_WATCH_STACK(100);
+    callback_();
+    // Clear the callback, since it may be holding onto reference counts
+    // via bound parameters. We do this inside the timer because it's possible
+    // the user has naughty destructors that block, and we want to account for that
+    // time here if they happen to run on this thread.
+    callback_ = NULL;
+  }
+  int64_t end_cycles = CycleClock::Now();
+  int64_t wait_cycles = end_cycles - start_cycles;
+  if (PREDICT_FALSE(wait_cycles > FLAGS_rpc_callback_max_cycles)) {
+    double micros = static_cast<double>(wait_cycles) / base::CyclesPerSecond()
+      * kMicrosPerSecond;
+
+    LOG(WARNING) << "RPC callback for " << ToString() << " blocked reactor thread for "
+                 << micros << "us";
+  }
+}
+
+void OutboundCall::SetResponse(gscoped_ptr<CallResponse> resp) {
+  call_response_ = std::move(resp);
+  Slice r(call_response_->serialized_response());
+
+  if (call_response_->is_success()) {
+    // TODO: here we're deserializing the call response within the reactor thread,
+    // which isn't great, since it would block processing of other RPCs in parallel.
+    // Should look into a way to avoid this.
+    if (!response_->ParseFromArray(r.data(), r.size())) {
+      SetFailed(Status::IOError("invalid RPC response, missing fields",
+                                response_->InitializationErrorString()));
+      return;
+    }
+    set_state(FINISHED_SUCCESS);
+    CallCallback();
+  } else {
+    // Error
+    gscoped_ptr<ErrorStatusPB> err(new ErrorStatusPB());
+    if (!err->ParseFromArray(r.data(), r.size())) {
+      SetFailed(Status::IOError("Was an RPC error but could not parse error response",
+                                err->InitializationErrorString()));
+      return;
+    }
+    ErrorStatusPB* err_raw = err.release();
+    SetFailed(Status::RemoteError(err_raw->message()), Phase::REMOTE_CALL, err_raw);
+  }
+}
+
+void OutboundCall::SetQueued() {
+  set_state(ON_OUTBOUND_QUEUE);
+}
+
+void OutboundCall::SetSending() {
+  set_state(SENDING);
+}
+
+void OutboundCall::SetSent() {
+  set_state(SENT);
+
+  // This method is called in the reactor thread, so free the header buf,
+  // which was also allocated from this thread. tcmalloc's thread caching
+  // behavior is a lot more efficient if memory is freed from the same thread
+  // which allocated it -- this lets it keep to thread-local operations instead
+  // of taking a mutex to put memory back on the global freelist.
+  delete [] header_buf_.release();
+
+  // request_buf_ is also done being used here, but since it was allocated by
+  // the caller thread, we would rather let that thread free it whenever it
+  // deletes the RpcController.
+}
+
+void OutboundCall::SetFailed(const Status &status,
+                             Phase phase,
+                             ErrorStatusPB* err_pb) {
+  DCHECK(!status.ok());
+  DCHECK(phase == Phase::CONNECTION_NEGOTIATION || phase == Phase::REMOTE_CALL);
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    status_ = status;
+    if (err_pb) {
+      error_pb_.reset(err_pb);
+    }
+    set_state_unlocked(phase == Phase::CONNECTION_NEGOTIATION
+        ? FINISHED_NEGOTIATION_ERROR
+        : FINISHED_ERROR);
+  }
+  CallCallback();
+}
+
+void OutboundCall::SetTimedOut(Phase phase) {
+  static const char* kErrMsgNegotiation =
+      "connection negotiation to $1 for RPC $0 timed out after $2 ($3)";
+  static const char* kErrMsgCall = "$0 RPC to $1 timed out after $2 ($3)";
+  DCHECK(phase == Phase::CONNECTION_NEGOTIATION || phase == Phase::REMOTE_CALL);
+
+  // We have to fetch timeout outside the lock to avoid a lock
+  // order inversion between this class and RpcController.
+  const MonoDelta timeout = controller_->timeout();
+  {
+    std::lock_guard<simple_spinlock> l(lock_);
+    status_ = Status::TimedOut(
+        Substitute((phase == Phase::REMOTE_CALL) ? kErrMsgCall : kErrMsgNegotiation,
+                   remote_method_.method_name(),
+                   conn_id_.remote().ToString(),
+                   timeout.ToString(),
+                   StateName(state_)));
+    set_state_unlocked((phase == Phase::REMOTE_CALL) ? TIMED_OUT : NEGOTIATION_TIMED_OUT);
+  }
+  CallCallback();
+}
+
+bool OutboundCall::IsTimedOut() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  switch (state_) {
+    case NEGOTIATION_TIMED_OUT:       // fall-through
+    case TIMED_OUT:
+      return true;
+    default:
+      return false;
+  }
+}
+
+bool OutboundCall::IsNegotiationError() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  switch (state_) {
+    case FINISHED_NEGOTIATION_ERROR:  // fall-through
+    case NEGOTIATION_TIMED_OUT:
+      return true;
+    default:
+      return false;
+  }
+}
+
+bool OutboundCall::IsFinished() const {
+  std::lock_guard<simple_spinlock> l(lock_);
+  switch (state_) {
+    case READY:
+    case SENDING:
+    case ON_OUTBOUND_QUEUE:
+    case SENT:
+      return false;
+    case NEGOTIATION_TIMED_OUT:
+    case TIMED_OUT:
+    case FINISHED_NEGOTIATION_ERROR:
+    case FINISHED_ERROR:
+    case FINISHED_SUCCESS:
+      return true;
+    default:
+      LOG(FATAL) << "Unknown call state: " << state_;
+      return false;
+  }
+}
+
+string OutboundCall::ToString() const {
+  return Substitute("RPC call $0 -> $1", remote_method_.ToString(), conn_id_.ToString());
+}
+
+void OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
+                          RpcCallInProgressPB* resp) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  resp->mutable_header()->CopyFrom(header_);
+  resp->set_micros_elapsed((MonoTime::Now() - start_time_).ToMicroseconds());
+
+  switch (state_) {
+    case READY:
+      // Don't bother setting a state for "READY" since we don't expose a call
+      // until it's at least on the queue of a connection.
+      break;
+    case ON_OUTBOUND_QUEUE:
+      resp->set_state(RpcCallInProgressPB::ON_OUTBOUND_QUEUE);
+      break;
+    case SENDING:
+      resp->set_state(RpcCallInProgressPB::SENDING);
+      break;
+    case SENT:
+      resp->set_state(RpcCallInProgressPB::SENT);
+      break;
+    case NEGOTIATION_TIMED_OUT:
+      resp->set_state(RpcCallInProgressPB::NEGOTIATION_TIMED_OUT);
+      break;
+    case TIMED_OUT:
+      resp->set_state(RpcCallInProgressPB::TIMED_OUT);
+      break;
+    case FINISHED_NEGOTIATION_ERROR:
+      resp->set_state(RpcCallInProgressPB::FINISHED_NEGOTIATION_ERROR);
+      break;
+    case FINISHED_ERROR:
+      resp->set_state(RpcCallInProgressPB::FINISHED_ERROR);
+      break;
+    case FINISHED_SUCCESS:
+      resp->set_state(RpcCallInProgressPB::FINISHED_SUCCESS);
+      break;
+  }
+}
+
+///
+/// ConnectionId
+///
+
+ConnectionId::ConnectionId() {}
+
+ConnectionId::ConnectionId(const ConnectionId& other) {
+  DoCopyFrom(other);
+}
+
+ConnectionId::ConnectionId(const Sockaddr& remote, UserCredentials user_credentials) {
+  remote_ = remote;
+  user_credentials_ = std::move(user_credentials);
+}
+
+void ConnectionId::set_remote(const Sockaddr& remote) {
+  remote_ = remote;
+}
+
+void ConnectionId::set_user_credentials(UserCredentials user_credentials) {
+  user_credentials_ = std::move(user_credentials);
+}
+
+void ConnectionId::CopyFrom(const ConnectionId& other) {
+  DoCopyFrom(other);
+}
+
+string ConnectionId::ToString() const {
+  // Does not print the password.
+  return StringPrintf("{remote=%s, user_credentials=%s}",
+      remote_.ToString().c_str(),
+      user_credentials_.ToString().c_str());
+}
+
+void ConnectionId::DoCopyFrom(const ConnectionId& other) {
+  remote_ = other.remote_;
+  user_credentials_ = other.user_credentials_;
+}
+
+size_t ConnectionId::HashCode() const {
+  size_t seed = 0;
+  boost::hash_combine(seed, remote_.HashCode());
+  boost::hash_combine(seed, user_credentials_.HashCode());
+  return seed;
+}
+
+bool ConnectionId::Equals(const ConnectionId& other) const {
+  return (remote() == other.remote()
+       && user_credentials().Equals(other.user_credentials()));
+}
+
+size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const {
+  return conn_id.HashCode();
+}
+
+bool ConnectionIdEqual::operator() (const ConnectionId& cid1, const ConnectionId& cid2) const {
+  return cid1.Equals(cid2);
+}
+
+///
+/// CallResponse
+///
+
+CallResponse::CallResponse()
+ : parsed_(false) {
+}
+
+Status CallResponse::GetSidecar(int idx, Slice* sidecar) const {
+  DCHECK(parsed_);
+  if (idx < 0 || idx >= header_.sidecar_offsets_size()) {
+    return Status::InvalidArgument(strings::Substitute(
+        "Index $0 does not reference a valid sidecar", idx));
+  }
+  *sidecar = sidecar_slices_[idx];
+  return Status::OK();
+}
+
+Status CallResponse::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
+  CHECK(!parsed_);
+  RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_,
+                                            &serialized_response_));
+
+  // Use information from header to extract the payload slices.
+  RETURN_NOT_OK(RpcSidecar::ParseSidecars(header_.sidecar_offsets(),
+          serialized_response_, sidecar_slices_));
+
+  if (header_.sidecar_offsets_size() > 0) {
+    serialized_response_ =
+        Slice(serialized_response_.data(), header_.sidecar_offsets(0));
+  }
+
+  transfer_.swap(transfer);
+  parsed_ = true;
+  return Status::OK();
+}
+
+} // namespace rpc
+} // namespace kudu