You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/17 03:14:28 UTC
[12/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC
library from kudu@314c9d8
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/negotiation-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/negotiation-test.cc b/be/src/kudu/rpc/negotiation-test.cc
new file mode 100644
index 0000000..68185bb
--- /dev/null
+++ b/be/src/kudu/rpc/negotiation-test.cc
@@ -0,0 +1,1331 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc-test-base.h"
+
+#include <stdlib.h>
+#include <sys/stat.h>
+
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <thread>
+
+#include <gflags/gflags.h>
+#include <gtest/gtest.h>
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/rpc/client_negotiation.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/negotiation.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/security-test-util.h"
+#include "kudu/security/test/mini_kdc.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/security/tls_socket.h"
+#include "kudu/security/token_signer.h"
+#include "kudu/security/token_signing_key.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/subprocess.h"
+#include "kudu/util/trace.h"
+#include "kudu/util/user.h"
+
+// HACK: MIT Kerberos doesn't have any way of determining its version number,
+// but the error messages in krb5-1.10 and earlier are broken due to
+// a bug: http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973
+//
+// Since we don't have any way to explicitly figure out the version, we just
+// look for this random macro which was added in 1.11 (the same version in which
+// the above bug was fixed).
+#ifndef KRB5_RESPONDER_QUESTION_PASSWORD
+#define KRB5_VERSION_LE_1_10
+#endif
+
+DEFINE_bool(is_test_child, false,
+ "Used by tests which require clean processes. "
+ "See TestDisableInit.");
+DECLARE_bool(rpc_encrypt_loopback_connections);
+DECLARE_bool(rpc_trace_negotiation);
+
+using std::string;
+using std::thread;
+using std::unique_ptr;
+
+using kudu::security::Cert;
+using kudu::security::PkiConfig;
+using kudu::security::PrivateKey;
+using kudu::security::SignedTokenPB;
+using kudu::security::TlsContext;
+using kudu::security::TokenSigner;
+using kudu::security::TokenSigningPrivateKey;
+using kudu::security::TokenVerifier;
+
+namespace kudu {
+namespace rpc {
+
+// The negotiation configuration for a client or server endpoint.
+struct EndpointConfig {
+ // The PKI configuration.
+ PkiConfig pki;
+ // The supported SASL mechanisms.
+ vector<SaslMechanism::Type> sasl_mechs;
+ // For the client, whether the client has the token.
+ // For the server, whether the server has the TSK.
+ bool token;
+ RpcEncryption encryption;
+};
+std::ostream& operator<<(std::ostream& o, EndpointConfig config) {
+ auto bool_string = [] (bool b) { return b ? "true" : "false"; };
+ o << "{pki: " << config.pki
+ << ", sasl-mechs: [" << JoinMapped(config.sasl_mechs, SaslMechanism::name_of, ", ")
+ << "], token: " << bool_string(config.token)
+ << ", encryption: ";
+
+ switch (config.encryption) {
+ case RpcEncryption::DISABLED: o << "DISABLED"; break;
+ case RpcEncryption::OPTIONAL: o << "OPTIONAL"; break;
+ case RpcEncryption::REQUIRED: o << "REQUIRED"; break;
+ }
+
+ o << "}";
+ return o;
+}
+
+// A description of a negotiation sequence, including client and server
+// configuration, as well as expected results.
+struct NegotiationDescriptor {
+ EndpointConfig client;
+ EndpointConfig server;
+
+ bool use_test_socket;
+
+ bool rpc_encrypt_loopback;
+
+ // The expected client status from negotiating.
+ Status client_status;
+ // The expected server status from negotiating.
+ Status server_status;
+
+ // The expected negotiated authentication type.
+ AuthenticationType negotiated_authn;
+
+ // The expected SASL mechanism, if SASL authentication is negotiated.
+ SaslMechanism::Type negotiated_mech;
+
+ // Whether the negotiation is expected to perform a TLS handshake.
+ bool tls_negotiated;
+};
+std::ostream& operator<<(std::ostream& o, NegotiationDescriptor c) {
+ auto bool_string = [] (bool b) { return b ? "true" : "false"; };
+ o << "{client: " << c.client
+ << ", server: " << c.server
+ << "}, rpc-encrypt-loopback: " << bool_string(c.rpc_encrypt_loopback);
+ return o;
+}
+
+class NegotiationTestSocket : public Socket {
+ public:
+ // Return an arbitrary public IP
+ Status GetPeerAddress(Sockaddr *cur_addr) const override {
+ return cur_addr->ParseString("8.8.8.8:12345", 0);
+ }
+};
+
+class TestNegotiation : public RpcTestBase,
+ public ::testing::WithParamInterface<NegotiationDescriptor> {
+ public:
+ void SetUp() override {
+ RpcTestBase::SetUp();
+ ASSERT_OK(SaslInit());
+ }
+};
+
+TEST_P(TestNegotiation, TestNegotiation) {
+ NegotiationDescriptor desc = GetParam();
+
+ // Generate a trusted root certificate.
+ PrivateKey ca_key;
+ Cert ca_cert;
+ ASSERT_OK(GenerateSelfSignedCAForTests(&ca_key, &ca_cert));
+
+ // Create and configure a TLS context for each endpoint.
+ TlsContext client_tls_context;
+ TlsContext server_tls_context;
+ ASSERT_OK(client_tls_context.Init());
+ ASSERT_OK(server_tls_context.Init());
+ ASSERT_OK(ConfigureTlsContext(desc.client.pki, ca_cert, ca_key, &client_tls_context));
+ ASSERT_OK(ConfigureTlsContext(desc.server.pki, ca_cert, ca_key, &server_tls_context));
+
+ FLAGS_rpc_encrypt_loopback_connections = desc.rpc_encrypt_loopback;
+
+ // Generate an optional client token and server token verifier.
+ TokenSigner token_signer(60, 20, std::make_shared<TokenVerifier>());
+ {
+ unique_ptr<TokenSigningPrivateKey> key;
+ ASSERT_OK(token_signer.CheckNeedKey(&key));
+ // No keys are available yet, so should be able to add.
+ ASSERT_NE(nullptr, key.get());
+ ASSERT_OK(token_signer.AddKey(std::move(key)));
+ }
+ TokenVerifier token_verifier;
+ boost::optional<SignedTokenPB> authn_token;
+ if (desc.client.token) {
+ authn_token = SignedTokenPB();
+ security::TokenPB token;
+ token.set_expire_unix_epoch_seconds(WallTime_Now() + 60);
+ token.mutable_authn()->set_username("client-token");
+ ASSERT_TRUE(token.SerializeToString(authn_token->mutable_token_data()));
+ ASSERT_OK(token_signer.SignToken(&*authn_token));
+ }
+ if (desc.server.token) {
+ ASSERT_OK(token_verifier.ImportKeys(token_signer.verifier().ExportKeys()));
+ }
+
+ // Create the listening socket, client socket, and server socket.
+ Socket listening_socket;
+ ASSERT_OK(listening_socket.Init(0));
+ ASSERT_OK(listening_socket.BindAndListen(Sockaddr(), 1));
+ Sockaddr server_addr;
+ ASSERT_OK(listening_socket.GetSocketAddress(&server_addr));
+
+ unique_ptr<Socket> client_socket(new Socket());
+ ASSERT_OK(client_socket->Init(0));
+ client_socket->Connect(server_addr);
+
+ unique_ptr<Socket> server_socket(desc.use_test_socket ?
+ new NegotiationTestSocket() :
+ new Socket());
+
+ Sockaddr client_addr;
+ CHECK_OK(listening_socket.Accept(server_socket.get(), &client_addr, 0));
+
+ // Create and configure the client and server negotiation instances.
+ ClientNegotiation client_negotiation(std::move(client_socket),
+ &client_tls_context,
+ authn_token,
+ desc.client.encryption);
+ ServerNegotiation server_negotiation(std::move(server_socket),
+ &server_tls_context,
+ &token_verifier,
+ desc.server.encryption);
+
+ // Set client and server SASL mechanisms.
+ MiniKdc kdc;
+ bool kdc_started = false;
+ auto start_kdc_once = [&] () {
+ if (!kdc_started) {
+ kdc_started = true;
+ RETURN_NOT_OK(kdc.Start());
+ }
+ return Status::OK();
+ };
+ for (auto mech : desc.client.sasl_mechs) {
+ switch (mech) {
+ case SaslMechanism::INVALID: break;
+ case SaslMechanism::PLAIN:
+ ASSERT_OK(client_negotiation.EnablePlain("client-plain", "client-password"));
+ break;
+ case SaslMechanism::GSSAPI:
+ ASSERT_OK(start_kdc_once());
+ ASSERT_OK(kdc.CreateUserPrincipal("client-gssapi"));
+ ASSERT_OK(kdc.Kinit("client-gssapi"));
+ ASSERT_OK(kdc.SetKrb5Environment());
+ client_negotiation.set_server_fqdn("127.0.0.1");
+ ASSERT_OK(client_negotiation.EnableGSSAPI());
+ break;
+ }
+ }
+ for (auto mech : desc.server.sasl_mechs) {
+ switch (mech) {
+ case SaslMechanism::INVALID: break;
+ case SaslMechanism::PLAIN:
+ ASSERT_OK(server_negotiation.EnablePlain());
+ break;
+ case SaslMechanism::GSSAPI:
+ ASSERT_OK(start_kdc_once());
+ // Create the server principal and keytab.
+ string kt_path;
+ ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+ CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+ server_negotiation.set_server_fqdn("127.0.0.1");
+ ASSERT_OK(server_negotiation.EnableGSSAPI());
+ break;
+ }
+ }
+
+ // Run the client/server negotiation. Because negotiation is blocking, it
+ // has to be done on separate threads.
+ Status client_status;
+ Status server_status;
+ thread client_thread([&] () {
+ scoped_refptr<Trace> t(new Trace());
+ ADOPT_TRACE(t.get());
+ client_status = client_negotiation.Negotiate();
+ // Close the socket so that the server will not block forever on error.
+ client_negotiation.socket()->Close();
+
+ if (FLAGS_rpc_trace_negotiation || !client_status.ok()) {
+ string msg = Trace::CurrentTrace()->DumpToString();
+ if (!client_status.ok()) {
+ LOG(WARNING) << "Failed client RPC negotiation. Client trace:\n" << msg;
+ } else {
+ LOG(INFO) << "RPC negotiation tracing enabled. Client trace:\n" << msg;
+ }
+ }
+ });
+ thread server_thread([&] () {
+ scoped_refptr<Trace> t(new Trace());
+ ADOPT_TRACE(t.get());
+ server_status = server_negotiation.Negotiate();
+ // Close the socket so that the client will not block forever on error.
+ server_negotiation.socket()->Close();
+
+ if (FLAGS_rpc_trace_negotiation || !server_status.ok()) {
+ string msg = Trace::CurrentTrace()->DumpToString();
+ if (!server_status.ok()) {
+ LOG(WARNING) << "Failed server RPC negotiation. Server trace:\n" << msg;
+ } else {
+ LOG(INFO) << "RPC negotiation tracing enabled. Server trace:\n" << msg;
+ }
+ }
+ });
+ client_thread.join();
+ server_thread.join();
+
+ // Check the negotiation outcome against the expected outcome.
+ EXPECT_EQ(desc.client_status.CodeAsString(), client_status.CodeAsString());
+ EXPECT_EQ(desc.server_status.CodeAsString(), server_status.CodeAsString());
+ ASSERT_STR_MATCHES(client_status.ToString(), desc.client_status.ToString());
+ ASSERT_STR_MATCHES(server_status.ToString(), desc.server_status.ToString());
+
+ if (client_status.ok()) {
+ EXPECT_TRUE(server_status.ok());
+
+ // Make sure the negotiations agree with the expected values.
+ EXPECT_EQ(desc.negotiated_authn, client_negotiation.negotiated_authn());
+ EXPECT_EQ(desc.negotiated_mech, client_negotiation.negotiated_mechanism());
+ EXPECT_EQ(desc.negotiated_authn, server_negotiation.negotiated_authn());
+ EXPECT_EQ(desc.negotiated_mech, server_negotiation.negotiated_mechanism());
+ EXPECT_EQ(desc.tls_negotiated, server_negotiation.tls_negotiated());
+ EXPECT_EQ(desc.tls_negotiated, server_negotiation.tls_negotiated());
+
+ bool client_tls_socket = dynamic_cast<security::TlsSocket*>(client_negotiation.socket());
+ bool server_tls_socket = dynamic_cast<security::TlsSocket*>(server_negotiation.socket());
+ EXPECT_EQ(desc.rpc_encrypt_loopback, client_tls_socket);
+ EXPECT_EQ(desc.rpc_encrypt_loopback, server_tls_socket);
+
+ // Check that the expected user subject is authenticated.
+ RemoteUser remote_user = server_negotiation.take_authenticated_user();
+ switch (server_negotiation.negotiated_authn()) {
+ case AuthenticationType::SASL:
+ switch (server_negotiation.negotiated_mechanism()) {
+ case SaslMechanism::PLAIN:
+ EXPECT_EQ("client-plain", remote_user.username());
+ break;
+ case SaslMechanism::GSSAPI:
+ EXPECT_EQ("client-gssapi", remote_user.username());
+ EXPECT_EQ("client-gssapi@KRBTEST.COM", remote_user.principal().value_or(""));
+ break;
+ case SaslMechanism::INVALID: LOG(FATAL) << "invalid mechanism negotiated";
+ }
+ break;
+ case AuthenticationType::CERTIFICATE: {
+ // We expect the cert to be using the local username, because it hasn't
+ // logged in from any Keytab.
+ string expected;
+ CHECK_OK(GetLoggedInUser(&expected));
+ EXPECT_EQ(expected, remote_user.username());
+ EXPECT_FALSE(remote_user.principal());
+ break;
+ }
+ case AuthenticationType::TOKEN:
+ EXPECT_EQ("client-token", remote_user.username());
+ break;
+ case AuthenticationType::INVALID: LOG(FATAL) << "invalid authentication negotiated";
+ }
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(NegotiationCombinations,
+ TestNegotiation,
+ ::testing::Values(
+
+ // client: no authn/mechs
+ // server: no authn/mechs
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ {},
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::NONE,
+ {},
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ false,
+ false,
+ Status::NotAuthorized(".*client is not configured with an authentication type"),
+ Status::NetworkError(""),
+ AuthenticationType::INVALID,
+ SaslMechanism::INVALID,
+ false,
+ },
+
+ // client: PLAIN
+ // server: no authn/mechs
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::NONE,
+ {},
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ false,
+ false,
+ Status::NotAuthorized(".* server mechanism list is empty"),
+ Status::NotAuthorized(".* server mechanism list is empty"),
+ AuthenticationType::INVALID,
+ SaslMechanism::INVALID,
+ false,
+ },
+
+ // client: PLAIN
+ // server: PLAIN
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::OPTIONAL
+ },
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::DISABLED,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::PLAIN,
+ false,
+ },
+
+ // client: GSSAPI
+ // server: GSSAPI
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::DISABLED,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::GSSAPI,
+ false,
+ },
+
+ // client: GSSAPI, PLAIN
+ // server: GSSAPI, PLAIN
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::DISABLED,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::GSSAPI,
+ false,
+ },
+
+ // client: GSSAPI, PLAIN
+ // server: GSSAPI
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::DISABLED,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::GSSAPI,
+ false,
+ },
+
+ // client: PLAIN
+ // server: GSSAPI
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::DISABLED,
+ },
+ false,
+ false,
+ Status::NotAuthorized(".*client does not have Kerberos enabled"),
+ Status::NetworkError(""),
+ AuthenticationType::INVALID,
+ SaslMechanism::INVALID,
+ false,
+ },
+
+ // client: GSSAPI,
+ // server: GSSAPI, self-signed cert
+ // loopback encryption
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::SELF_SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ false,
+ true,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::GSSAPI,
+ true,
+ },
+
+ // client: GSSAPI, signed-cert
+ // server: GSSAPI, self-signed cert
+ // This tests that the server will not advertise CERTIFICATE authentication,
+ // since it doesn't have a trusted cert.
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::SELF_SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::GSSAPI,
+ true,
+ },
+
+ // client: PLAIN,
+ // server: PLAIN, self-signed cert
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::SELF_SIGNED,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::PLAIN,
+ true,
+ },
+
+ // client: signed-cert
+ // server: signed-cert
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::CERTIFICATE,
+ SaslMechanism::INVALID,
+ true,
+ },
+
+ // client: token, trusted cert
+ // server: token, signed-cert, GSSAPI
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::TRUSTED,
+ { },
+ true,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::SIGNED,
+ { SaslMechanism::PLAIN },
+ true,
+ RpcEncryption::OPTIONAL,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::TOKEN,
+ SaslMechanism::INVALID,
+ true,
+ },
+
+ // client: PLAIN, token
+ // server: PLAIN, token, signed cert
+ // Test that the client won't negotiate token authn if it doesn't have a
+ // trusted cert. We aren't expecting this to happen in practice (the
+ // token and trusted CA cert should come as a package).
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::PLAIN },
+ true,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::SIGNED,
+ { SaslMechanism::PLAIN },
+ true,
+ RpcEncryption::OPTIONAL,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::PLAIN,
+ true,
+ },
+
+ // client: PLAIN, GSSAPI, signed-cert, token
+ // server: PLAIN, GSSAPI, signed-cert, token
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::SIGNED,
+ { SaslMechanism::PLAIN, SaslMechanism::GSSAPI },
+ true,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::SIGNED,
+ { SaslMechanism::PLAIN, SaslMechanism::GSSAPI },
+ true,
+ RpcEncryption::OPTIONAL,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::CERTIFICATE,
+ SaslMechanism::INVALID,
+ true,
+ },
+
+ // client: PLAIN, TLS disabled
+ // server: PLAIN, TLS required
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::DISABLED,
+ },
+ EndpointConfig {
+ PkiConfig::SIGNED,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ false,
+ false,
+ Status::NotAuthorized(".*client does not support required TLS encryption"),
+ Status::NotAuthorized(".*client does not support required TLS encryption"),
+ AuthenticationType::SASL,
+ SaslMechanism::PLAIN,
+ true,
+ },
+
+ // client: PLAIN, TLS required
+ // server: PLAIN, TLS disabled
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ EndpointConfig {
+ PkiConfig::SIGNED,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::DISABLED,
+ },
+ false,
+ false,
+ Status::NotAuthorized(".*server does not support required TLS encryption"),
+ Status::NetworkError(""),
+ AuthenticationType::SASL,
+ SaslMechanism::PLAIN,
+ true,
+ },
+
+ // client: GSSAPI, TLS required, externally-signed cert
+ // server: GSSAPI, TLS required, externally-signed cert
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::EXTERNALLY_SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ EndpointConfig {
+ PkiConfig::EXTERNALLY_SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::GSSAPI,
+ true,
+ },
+
+ // client: GSSAPI, TLS optional, externally-signed cert
+ // server: GSSAPI, TLS required, signed cert
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::EXTERNALLY_SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::OPTIONAL,
+ },
+ EndpointConfig {
+ PkiConfig::SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::GSSAPI,
+ true,
+ },
+
+ // client: GSSAPI, TLS required
+ // server: GSSAPI, TLS required, externally-signed cert
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ EndpointConfig {
+ PkiConfig::EXTERNALLY_SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::GSSAPI,
+ true,
+ },
+
+ // client: GSSAPI, PLAIN, TLS required, externally-signed cert
+ // server: PLAIN, TLS required, externally-signed cert
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::EXTERNALLY_SIGNED,
+ { SaslMechanism::GSSAPI, SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ EndpointConfig {
+ PkiConfig::EXTERNALLY_SIGNED,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::PLAIN,
+ true,
+ },
+
+ // client: GSSAPI, TLS disabled, signed cert
+ // server: GSSAPI, TLS required, externally-signed cert
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::DISABLED,
+ },
+ EndpointConfig {
+ PkiConfig::EXTERNALLY_SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ false,
+ false,
+ Status::NotAuthorized(".*client does not support required TLS encryption"),
+ Status::NotAuthorized(".*client does not support required TLS encryption"),
+ AuthenticationType::SASL,
+ SaslMechanism::GSSAPI,
+ true,
+ },
+
+ // client: GSSAPI, TLS required, signed cert
+ // server: GSSAPI, TLS required, externally-signed cert
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ EndpointConfig {
+ PkiConfig::EXTERNALLY_SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ false,
+ false,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::GSSAPI,
+ true,
+ },
+
+ // client: PLAIN
+ // server: PLAIN
+ // connection from public routable IP
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::OPTIONAL
+ },
+ EndpointConfig {
+ PkiConfig::NONE,
+ { SaslMechanism::PLAIN },
+ false,
+ RpcEncryption::OPTIONAL
+ },
+ true,
+ false,
+ Status::NotAuthorized(".*unencrypted connections from publicly routable IPs"),
+ Status::NotAuthorized(".*unencrypted connections from publicly routable IPs"),
+ AuthenticationType::SASL,
+ SaslMechanism::PLAIN,
+ false,
+ },
+
+ // client: GSSAPI, TLS required, externally-signed cert
+ // server: GSSAPI, TLS required, externally-signed cert
+ // connection from public routable IP
+ NegotiationDescriptor {
+ EndpointConfig {
+ PkiConfig::EXTERNALLY_SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ EndpointConfig {
+ PkiConfig::EXTERNALLY_SIGNED,
+ { SaslMechanism::GSSAPI },
+ false,
+ RpcEncryption::REQUIRED,
+ },
+ true,
+ // true as no longer a loopback connection.
+ true,
+ Status::OK(),
+ Status::OK(),
+ AuthenticationType::SASL,
+ SaslMechanism::GSSAPI,
+ true,
+ }
+));
+
+// A "Callable" that takes a socket for use with starting a thread.
+// Can be used for ServerNegotiation or ClientNegotiation threads.
+typedef std::function<void(unique_ptr<Socket>)> SocketCallable;
+
+// Call Accept() on the socket, then pass the connection to the server runner
+static void RunAcceptingDelegator(Socket* acceptor,
+ const SocketCallable& server_runner) {
+ unique_ptr<Socket> conn(new Socket());
+ Sockaddr remote;
+ CHECK_OK(acceptor->Accept(conn.get(), &remote, 0));
+ server_runner(std::move(conn));
+}
+
+// Set up a socket and run a negotiation sequence.
+static void RunNegotiationTest(const SocketCallable& server_runner,
+ const SocketCallable& client_runner) {
+ Socket server_sock;
+ CHECK_OK(server_sock.Init(0));
+ ASSERT_OK(server_sock.BindAndListen(Sockaddr(), 1));
+ Sockaddr server_bind_addr;
+ ASSERT_OK(server_sock.GetSocketAddress(&server_bind_addr));
+ thread server(RunAcceptingDelegator, &server_sock, server_runner);
+
+ unique_ptr<Socket> client_sock(new Socket());
+ CHECK_OK(client_sock->Init(0));
+ ASSERT_OK(client_sock->Connect(server_bind_addr));
+ thread client(client_runner, std::move(client_sock));
+
+ LOG(INFO) << "Waiting for test threads to terminate...";
+ client.join();
+ LOG(INFO) << "Client thread terminated.";
+
+ server.join();
+ LOG(INFO) << "Server thread terminated.";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+#ifndef __APPLE__
+template<class T>
+using CheckerFunction = std::function<void(const Status&, T&)>;
+
+// Run GSSAPI negotiation from the server side. Runs
+// 'post_check' after negotiation to verify the result.
+static void RunGSSAPINegotiationServer(unique_ptr<Socket> socket,
+ const CheckerFunction<ServerNegotiation>& post_check) {
+ TlsContext tls_context;
+ CHECK_OK(tls_context.Init());
+ TokenVerifier token_verifier;
+ ServerNegotiation server_negotiation(std::move(socket), &tls_context,
+ &token_verifier, RpcEncryption::OPTIONAL);
+ server_negotiation.set_server_fqdn("127.0.0.1");
+ CHECK_OK(server_negotiation.EnableGSSAPI());
+ post_check(server_negotiation.Negotiate(), server_negotiation);
+}
+
+// Run GSSAPI negotiation from the client side. Runs
+// 'post_check' after negotiation to verify the result.
+static void RunGSSAPINegotiationClient(unique_ptr<Socket> conn,
+ const CheckerFunction<ClientNegotiation>& post_check) {
+ TlsContext tls_context;
+ CHECK_OK(tls_context.Init());
+ ClientNegotiation client_negotiation(std::move(conn), &tls_context,
+ boost::none, RpcEncryption::OPTIONAL);
+ client_negotiation.set_server_fqdn("127.0.0.1");
+ CHECK_OK(client_negotiation.EnableGSSAPI());
+ post_check(client_negotiation.Negotiate(), client_negotiation);
+}
+
+// Test invalid SASL negotiations using the GSSAPI (kerberos) mechanism over a socket.
+// This test is ignored on macOS because the system Kerberos implementation
+// (Heimdal) caches the non-existence of client credentials, which causes futher
+// tests to fail.
+TEST_F(TestNegotiation, TestGSSAPIInvalidNegotiation) {
+ MiniKdc kdc;
+ ASSERT_OK(kdc.Start());
+
+ // Try to negotiate with no krb5 credentials on either side. It should fail on both
+ // sides.
+ RunNegotiationTest(
+ std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+ [](const Status& s, ServerNegotiation& server) {
+ // The client notices there are no credentials and
+ // doesn't send any failure message to the server.
+ // Instead, it just disconnects.
+ //
+ // TODO(todd): it might be preferable to have the server
+ // fail to start if it has no valid keytab.
+ CHECK(s.IsNetworkError());
+ }),
+ std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+ [](const Status& s, ClientNegotiation& client) {
+ CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+ CHECK_GT(s.ToString().find("No Kerberos credentials available"), 0);
+#endif
+ }));
+
+
+ // Create the server principal and keytab.
+ string kt_path;
+ ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+ CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+ // Try to negotiate with no krb5 credentials on the client. It should fail on both
+ // sides.
+ RunNegotiationTest(
+ std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+ [](const Status& s, ServerNegotiation& server) {
+ // The client notices there are no credentials and
+ // doesn't send any failure message to the server.
+ // Instead, it just disconnects.
+ CHECK(s.IsNetworkError());
+ }),
+ std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+ [](const Status& s, ClientNegotiation& client) {
+ CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+ ASSERT_STR_MATCHES(s.ToString(),
+ "Not authorized: No Kerberos credentials available.*");
+#endif
+ }));
+
+ // Create and kinit as a client user.
+ ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
+ ASSERT_OK(kdc.Kinit("testuser"));
+ ASSERT_OK(kdc.SetKrb5Environment());
+
+ // Change the server's keytab file so that it has inappropriate
+ // credentials.
+ // Authentication should now fail.
+ ASSERT_OK(kdc.CreateServiceKeytab("otherservice/127.0.0.1", &kt_path));
+ CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+ RunNegotiationTest(
+ std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+ [](const Status& s, ServerNegotiation& server) {
+ CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "No key table entry found matching kudu/127.0.0.1");
+#endif
+ }),
+ std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+ [](const Status& s, ClientNegotiation& client) {
+ CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "No key table entry found matching kudu/127.0.0.1");
+#endif
+ }));
+}
+#endif
+
+#ifndef __APPLE__
+// Test that the pre-flight check for servers requiring Kerberos provides
+// nice error messages for missing or bad keytabs.
+//
+// This is ignored on macOS because the system Kerberos implementation does not
+// fail the preflight check when the keytab is inaccessible, probably because
+// the preflight check passes a 0-length token.
+TEST_F(TestNegotiation, TestPreflight) {
+ // Try pre-flight with no keytab.
+ Status s = ServerNegotiation::PreflightCheckGSSAPI();
+ ASSERT_FALSE(s.ok());
+#ifndef KRB5_VERSION_LE_1_10
+ ASSERT_STR_MATCHES(s.ToString(), "Key table file.*not found");
+#endif
+ // Try with a valid krb5 environment and keytab.
+ MiniKdc kdc;
+ ASSERT_OK(kdc.Start());
+ ASSERT_OK(kdc.SetKrb5Environment());
+ string kt_path;
+ ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+ CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+ ASSERT_OK(ServerNegotiation::PreflightCheckGSSAPI());
+
+ // Try with an inaccessible keytab.
+ CHECK_ERR(chmod(kt_path.c_str(), 0000));
+ s = ServerNegotiation::PreflightCheckGSSAPI();
+ ASSERT_FALSE(s.ok());
+#ifndef KRB5_VERSION_LE_1_10
+ ASSERT_STR_MATCHES(s.ToString(), "error accessing keytab: Permission denied");
+#endif
+ CHECK_ERR(unlink(kt_path.c_str()));
+
+ // Try with a keytab that has the wrong credentials.
+ ASSERT_OK(kdc.CreateServiceKeytab("wrong-service/127.0.0.1", &kt_path));
+ CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+ s = ServerNegotiation::PreflightCheckGSSAPI();
+ ASSERT_FALSE(s.ok());
+#ifndef KRB5_VERSION_LE_1_10
+ ASSERT_STR_MATCHES(s.ToString(), "No key table entry found matching kudu/.*");
+#endif
+}
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void RunTimeoutExpectingServer(unique_ptr<Socket> socket) {
+ TlsContext tls_context;
+ CHECK_OK(tls_context.Init());
+ TokenVerifier token_verifier;
+ ServerNegotiation server_negotiation(std::move(socket), &tls_context,
+ &token_verifier, RpcEncryption::OPTIONAL);
+ CHECK_OK(server_negotiation.EnablePlain());
+ Status s = server_negotiation.Negotiate();
+ ASSERT_TRUE(s.IsNetworkError()) << "Expected client to time out and close the connection. Got: "
+ << s.ToString();
+}
+
+static void RunTimeoutNegotiationClient(unique_ptr<Socket> sock) {
+ TlsContext tls_context;
+ CHECK_OK(tls_context.Init());
+ ClientNegotiation client_negotiation(std::move(sock), &tls_context,
+ boost::none, RpcEncryption::OPTIONAL);
+ CHECK_OK(client_negotiation.EnablePlain("test", "test"));
+ MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
+ client_negotiation.set_deadline(deadline);
+ Status s = client_negotiation.Negotiate();
+ ASSERT_TRUE(s.IsTimedOut()) << "Expected timeout! Got: " << s.ToString();
+ CHECK_OK(client_negotiation.socket()->Shutdown(true, true));
+}
+
+// Ensure that the client times out.
+TEST_F(TestNegotiation, TestClientTimeout) {
+ RunNegotiationTest(RunTimeoutExpectingServer, RunTimeoutNegotiationClient);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void RunTimeoutNegotiationServer(unique_ptr<Socket> socket) {
+ TlsContext tls_context;
+ CHECK_OK(tls_context.Init());
+ TokenVerifier token_verifier;
+ ServerNegotiation server_negotiation(std::move(socket), &tls_context,
+ &token_verifier, RpcEncryption::OPTIONAL);
+ CHECK_OK(server_negotiation.EnablePlain());
+ MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
+ server_negotiation.set_deadline(deadline);
+ Status s = server_negotiation.Negotiate();
+ ASSERT_TRUE(s.IsTimedOut()) << "Expected timeout! Got: " << s.ToString();
+ CHECK_OK(server_negotiation.socket()->Close());
+}
+
+static void RunTimeoutExpectingClient(unique_ptr<Socket> socket) {
+ TlsContext tls_context;
+ CHECK_OK(tls_context.Init());
+ ClientNegotiation client_negotiation(std::move(socket), &tls_context,
+ boost::none, RpcEncryption::OPTIONAL);
+ CHECK_OK(client_negotiation.EnablePlain("test", "test"));
+ Status s = client_negotiation.Negotiate();
+ ASSERT_TRUE(s.IsNetworkError()) << "Expected server to time out and close the connection. Got: "
+ << s.ToString();
+}
+
+// Ensure that the server times out.
+TEST_F(TestNegotiation, TestServerTimeout) {
+ RunNegotiationTest(RunTimeoutNegotiationServer, RunTimeoutExpectingClient);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// This suite of tests ensure that applications that embed the Kudu client are
+// able to externally handle the initialization of SASL. See KUDU-1749 and
+// IMPALA-4497 for context.
+//
+// The tests are a bit tricky because the initialization of SASL is static state
+// that we can't easily clear/reset between test cases. So, each test invokes
+// itself as a subprocess with the appropriate --gtest_filter line as well as a
+// special flag to indicate that it is the test child running.
+class TestDisableInit : public KuduTest {
+ protected:
+ // Run the lambda 'f' in a newly-started process, capturing its stderr
+ // into 'stderr'.
+ template<class TestFunc>
+ void DoTest(const TestFunc& f, string* stderr = nullptr) {
+ if (FLAGS_is_test_child) {
+ f();
+ return;
+ }
+
+ // Invoke the currently-running test case in a new subprocess.
+ string filter_flag = strings::Substitute("--gtest_filter=$0.$1",
+ CURRENT_TEST_CASE_NAME(), CURRENT_TEST_NAME());
+ string executable_path;
+ CHECK_OK(env_->GetExecutablePath(&executable_path));
+ string stdout;
+ Status s = Subprocess::Call({ executable_path, "test", filter_flag, "--is_test_child" },
+ "" /* stdin */,
+ &stdout,
+ stderr);
+ ASSERT_TRUE(s.ok()) << "Test failed: " << stdout;
+ }
+};
+
+// Test disabling SASL but not actually properly initializing it before usage.
+TEST_F(TestDisableInit, TestDisableSasl_NotInitialized) {
+ DoTest([]() {
+ CHECK_OK(DisableSaslInitialization());
+ Status s = SaslInit();
+ ASSERT_STR_CONTAINS(s.ToString(), "was disabled, but SASL was not externally initialized");
+ });
+}
+
+// Test disabling SASL with proper initialization by some other app.
+TEST_F(TestDisableInit, TestDisableSasl_Good) {
+ DoTest([]() {
+ rpc::internal::SaslSetMutex();
+ sasl_client_init(NULL);
+ CHECK_OK(DisableSaslInitialization());
+ ASSERT_OK(SaslInit());
+ });
+}
+
+// Test a client which inits SASL itself but doesn't remember to disable Kudu's
+// SASL initialization.
+TEST_F(TestDisableInit, TestMultipleSaslInit) {
+ string stderr;
+ DoTest([]() {
+ rpc::internal::SaslSetMutex();
+ sasl_client_init(NULL);
+ ASSERT_OK(SaslInit());
+ }, &stderr);
+ // If we are the parent, we should see the warning from the child that it automatically
+ // skipped initialization because it detected that it was already initialized.
+ if (!FLAGS_is_test_child) {
+ ASSERT_STR_CONTAINS(stderr, "Skipping initialization");
+ }
+}
+
+// We are not able to detect mutexes not being set with the macOS version of libsasl.
+#ifndef __APPLE__
+// Test disabling SASL but not remembering to initialize the SASL mutex support. This
+// should succeed but generate a warning.
+TEST_F(TestDisableInit, TestDisableSasl_NoMutexImpl) {
+ string stderr;
+ DoTest([]() {
+ sasl_client_init(NULL);
+ CHECK_OK(DisableSaslInitialization());
+ ASSERT_OK(SaslInit());
+ }, &stderr);
+ // If we are the parent, we should see the warning from the child.
+ if (!FLAGS_is_test_child) {
+ ASSERT_STR_CONTAINS(stderr, "not provided with a mutex implementation");
+ }
+}
+
+// Test a client which inits SASL itself but doesn't remember to disable Kudu's
+// SASL initialization.
+TEST_F(TestDisableInit, TestMultipleSaslInit_NoMutexImpl) {
+ string stderr;
+ DoTest([]() {
+ sasl_client_init(NULL);
+ ASSERT_OK(SaslInit());
+ }, &stderr);
+ // If we are the parent, we should see the warning from the child that it automatically
+ // skipped initialization because it detected that it was already initialized.
+ if (!FLAGS_is_test_child) {
+ ASSERT_STR_CONTAINS(stderr, "Skipping initialization");
+ ASSERT_STR_CONTAINS(stderr, "not provided with a mutex implementation");
+ }
+}
+#endif
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/negotiation.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/negotiation.cc b/be/src/kudu/rpc/negotiation.cc
new file mode 100644
index 0000000..66a0112
--- /dev/null
+++ b/be/src/kudu/rpc/negotiation.cc
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/negotiation.h"
+
+#include <poll.h>
+#include <sys/time.h>
+
+#include <memory>
+#include <ostream>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/blocking_ops.h"
+#include "kudu/rpc/client_negotiation.h"
+#include "kudu/rpc/connection.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/reactor.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/status.h"
+#include "kudu/util/trace.h"
+
+DEFINE_bool(rpc_trace_negotiation, false,
+ "If enabled, dump traces of all RPC negotiations to the log");
+TAG_FLAG(rpc_trace_negotiation, runtime);
+TAG_FLAG(rpc_trace_negotiation, advanced);
+TAG_FLAG(rpc_trace_negotiation, experimental);
+
+DEFINE_int32(rpc_negotiation_inject_delay_ms, 0,
+ "If enabled, injects the given number of milliseconds delay into "
+ "the RPC negotiation process on the server side.");
+TAG_FLAG(rpc_negotiation_inject_delay_ms, unsafe);
+
+DECLARE_string(keytab_file);
+DECLARE_string(rpc_certificate_file);
+
+DEFINE_bool(rpc_encrypt_loopback_connections, false,
+ "Whether to encrypt data transfer on RPC connections that stay within "
+ "a single host. Encryption here is likely to offer no additional "
+ "security benefit since only a local 'root' user could intercept the "
+ "traffic, and wire encryption does not suitably protect against such "
+ "an attacker.");
+TAG_FLAG(rpc_encrypt_loopback_connections, advanced);
+
+using std::unique_ptr;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+
+const char* AuthenticationTypeToString(AuthenticationType t) {
+ switch (t) {
+ case AuthenticationType::INVALID: return "INVALID"; break;
+ case AuthenticationType::SASL: return "SASL"; break;
+ case AuthenticationType::TOKEN: return "TOKEN"; break;
+ case AuthenticationType::CERTIFICATE: return "CERTIFICATE"; break;
+ }
+ return "<cannot reach here>";
+}
+
+std::ostream& operator<<(std::ostream& o, AuthenticationType authentication_type) {
+ return o << AuthenticationTypeToString(authentication_type);
+}
+
+// Wait for the client connection to be established and become ready for writing.
+static Status WaitForClientConnect(Socket* socket, const MonoTime& deadline) {
+ TRACE("Waiting for socket to connect");
+ int fd = socket->GetFd();
+ struct pollfd poll_fd;
+ poll_fd.fd = fd;
+ poll_fd.events = POLLOUT;
+ poll_fd.revents = 0;
+
+ MonoTime now;
+ MonoDelta remaining;
+ while (true) {
+ now = MonoTime::Now();
+ remaining = deadline - now;
+ DVLOG(4) << "Client waiting to connect for negotiation, time remaining until timeout deadline: "
+ << remaining.ToString();
+ if (PREDICT_FALSE(remaining.ToNanoseconds() <= 0)) {
+ return Status::TimedOut("Timeout exceeded waiting to connect");
+ }
+#if defined(__linux__)
+ struct timespec ts;
+ remaining.ToTimeSpec(&ts);
+ int ready = ppoll(&poll_fd, 1, &ts, NULL);
+#else
+ int ready = poll(&poll_fd, 1, remaining.ToMilliseconds());
+#endif
+ if (ready == -1) {
+ int err = errno;
+ if (err == EINTR) {
+ // We were interrupted by a signal, let's go again.
+ continue;
+ } else {
+ return Status::NetworkError("Error from ppoll() while waiting to connect",
+ ErrnoToString(err), err);
+ }
+ } else if (ready == 0) {
+ // Timeout exceeded. Loop back to the top to our impending doom.
+ continue;
+ } else {
+ // Success.
+ break;
+ }
+ }
+
+ // Connect finished, but this doesn't mean that we connected successfully.
+ // Check the socket for an error.
+ int so_error = 0;
+ socklen_t socklen = sizeof(so_error);
+ int rc = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &socklen);
+ if (rc != 0) {
+ return Status::NetworkError("Unable to check connected socket for errors",
+ ErrnoToString(errno),
+ errno);
+ }
+ if (so_error != 0) {
+ return Status::NetworkError("connect", ErrnoToString(so_error), so_error);
+ }
+
+ return Status::OK();
+}
+
+// Disable / reset socket timeouts.
+static Status DisableSocketTimeouts(Socket* socket) {
+ RETURN_NOT_OK(socket->SetSendTimeout(MonoDelta::FromNanoseconds(0L)));
+ RETURN_NOT_OK(socket->SetRecvTimeout(MonoDelta::FromNanoseconds(0L)));
+ return Status::OK();
+}
+
+// Perform client negotiation. We don't LOG() anything, we leave that to our caller.
+static Status DoClientNegotiation(Connection* conn,
+ RpcAuthentication authentication,
+ RpcEncryption encryption,
+ MonoTime deadline,
+ unique_ptr<ErrorStatusPB>* rpc_error) {
+ const auto* messenger = conn->reactor_thread()->reactor()->messenger();
+ // Prefer secondary credentials (such as authn token) if permitted by policy.
+ const auto authn_token = (conn->credentials_policy() == CredentialsPolicy::PRIMARY_CREDENTIALS)
+ ? boost::none : messenger->authn_token();
+ ClientNegotiation client_negotiation(conn->release_socket(),
+ &messenger->tls_context(),
+ authn_token,
+ encryption);
+
+ // Note that the fqdn is an IP address here: we've already lost whatever DNS
+ // name the client was attempting to use. Unless krb5 is configured with 'rdns
+ // = false', it will automatically take care of reversing this address to its
+ // canonical hostname to determine the expected server principal.
+ client_negotiation.set_server_fqdn(conn->remote().host());
+
+ if (authentication != RpcAuthentication::DISABLED) {
+ Status s = client_negotiation.EnableGSSAPI();
+ if (!s.ok()) {
+ // If we can't enable GSSAPI, it's likely the client is just missing the
+ // appropriate SASL plugin. We don't want to require it to be installed
+ // if the user doesn't care about connecting to servers using Kerberos
+ // authentication. So, we'll just VLOG this here. If we try to connect
+ // to a server which requires Kerberos, we'll get a negotiation error
+ // at that point.
+ if (VLOG_IS_ON(1)) {
+ KLOG_FIRST_N(INFO, 1) << "Couldn't enable GSSAPI (Kerberos) SASL plugin: "
+ << s.message().ToString()
+ << ". This process will be unable to connect to "
+ << "servers requiring Kerberos authentication.";
+ }
+
+ if (authentication == RpcAuthentication::REQUIRED &&
+ !authn_token &&
+ !messenger->tls_context().has_signed_cert()) {
+ return Status::InvalidArgument(
+ "Kerberos, token, or PKI certificate credentials must be provided in order to "
+ "require authentication for a client");
+ }
+ }
+ }
+
+ if (authentication != RpcAuthentication::REQUIRED) {
+ RETURN_NOT_OK(client_negotiation.EnablePlain(conn->local_user_credentials().real_user(), ""));
+ }
+
+ client_negotiation.set_deadline(deadline);
+
+ RETURN_NOT_OK(WaitForClientConnect(client_negotiation.socket(), deadline));
+ RETURN_NOT_OK(client_negotiation.socket()->SetNonBlocking(false));
+ RETURN_NOT_OK(client_negotiation.Negotiate(rpc_error));
+ RETURN_NOT_OK(DisableSocketTimeouts(client_negotiation.socket()));
+
+ // Transfer the negotiated socket and state back to the connection.
+ conn->adopt_socket(client_negotiation.release_socket());
+ conn->set_remote_features(client_negotiation.take_server_features());
+
+ // Sanity check: if no authn token was supplied as user credentials,
+ // the negotiated authentication type cannot be AuthenticationType::TOKEN.
+ DCHECK(!(authn_token == boost::none &&
+ client_negotiation.negotiated_authn() == AuthenticationType::TOKEN));
+
+ return Status::OK();
+}
+
+// Perform server negotiation. We don't LOG() anything, we leave that to our caller.
+static Status DoServerNegotiation(Connection* conn,
+ RpcAuthentication authentication,
+ RpcEncryption encryption,
+ const MonoTime& deadline) {
+ if (authentication == RpcAuthentication::REQUIRED &&
+ FLAGS_keytab_file.empty() &&
+ FLAGS_rpc_certificate_file.empty()) {
+ return Status::InvalidArgument("RPC authentication (--rpc_authentication) may not be "
+ "required unless Kerberos (--keytab_file) or external PKI "
+ "(--rpc_certificate_file et al) are configured");
+ }
+
+ if (FLAGS_rpc_negotiation_inject_delay_ms > 0) {
+ LOG(WARNING) << "Injecting " << FLAGS_rpc_negotiation_inject_delay_ms
+ << "ms delay in negotiation";
+ SleepFor(MonoDelta::FromMilliseconds(FLAGS_rpc_negotiation_inject_delay_ms));
+ }
+
+ // Create a new ServerNegotiation to handle the synchronous negotiation.
+ const auto* messenger = conn->reactor_thread()->reactor()->messenger();
+ ServerNegotiation server_negotiation(conn->release_socket(),
+ &messenger->tls_context(),
+ &messenger->token_verifier(),
+ encryption);
+
+ if (authentication != RpcAuthentication::DISABLED && !FLAGS_keytab_file.empty()) {
+ RETURN_NOT_OK(server_negotiation.EnableGSSAPI());
+ }
+ if (authentication != RpcAuthentication::REQUIRED) {
+ RETURN_NOT_OK(server_negotiation.EnablePlain());
+ }
+
+ server_negotiation.set_deadline(deadline);
+
+ RETURN_NOT_OK(server_negotiation.socket()->SetNonBlocking(false));
+
+ RETURN_NOT_OK(server_negotiation.Negotiate());
+ RETURN_NOT_OK(DisableSocketTimeouts(server_negotiation.socket()));
+
+ // Transfer the negotiated socket and state back to the connection.
+ conn->adopt_socket(server_negotiation.release_socket());
+ conn->set_remote_features(server_negotiation.take_client_features());
+ conn->set_remote_user(server_negotiation.take_authenticated_user());
+
+ return Status::OK();
+}
+
+void Negotiation::RunNegotiation(const scoped_refptr<Connection>& conn,
+ RpcAuthentication authentication,
+ RpcEncryption encryption,
+ MonoTime deadline) {
+ Status s;
+ unique_ptr<ErrorStatusPB> rpc_error;
+ if (conn->direction() == Connection::SERVER) {
+ s = DoServerNegotiation(conn.get(), authentication, encryption, deadline);
+ } else {
+ s = DoClientNegotiation(conn.get(), authentication, encryption, deadline,
+ &rpc_error);
+ }
+
+ if (PREDICT_FALSE(!s.ok())) {
+ string msg = Substitute("$0 connection negotiation failed: $1",
+ conn->direction() == Connection::SERVER ? "Server" : "Client",
+ conn->ToString());
+ s = s.CloneAndPrepend(msg);
+ }
+ TRACE("Negotiation complete: $0", s.ToString());
+
+ bool is_bad = !s.ok() && !(
+ (s.IsNetworkError() && s.posix_code() == ECONNREFUSED) ||
+ s.IsNotAuthorized());
+
+ if (is_bad || FLAGS_rpc_trace_negotiation) {
+ string msg = Trace::CurrentTrace()->DumpToString();
+ if (is_bad) {
+ LOG(WARNING) << "Failed RPC negotiation. Trace:\n" << msg;
+ } else {
+ LOG(INFO) << "RPC negotiation tracing enabled. Trace:\n" << msg;
+ }
+ }
+
+ if (conn->direction() == Connection::SERVER && s.IsNotAuthorized()) {
+ LOG(WARNING) << "Unauthorized connection attempt: " << s.message().ToString();
+ }
+ conn->CompleteNegotiation(std::move(s), std::move(rpc_error));
+}
+
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/negotiation.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/negotiation.h b/be/src/kudu/rpc/negotiation.h
new file mode 100644
index 0000000..2ca459b
--- /dev/null
+++ b/be/src/kudu/rpc/negotiation.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_RPC_NEGOTIATION_H
+#define KUDU_RPC_NEGOTIATION_H
+
+#include <iosfwd>
+
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/monotime.h"
+
+namespace kudu {
+namespace rpc {
+
+class Connection;
+enum class RpcAuthentication;
+enum class RpcEncryption;
+
+enum class AuthenticationType {
+ INVALID,
+ SASL,
+ TOKEN,
+ CERTIFICATE,
+};
+const char* AuthenticationTypeToString(AuthenticationType t);
+
+std::ostream& operator<<(std::ostream& o, AuthenticationType authentication_type);
+
+class Negotiation {
+ public:
+
+ // Perform negotiation for a connection (either server or client)
+ static void RunNegotiation(const scoped_refptr<Connection>& conn,
+ RpcAuthentication authentication,
+ RpcEncryption encryption,
+ MonoTime deadline);
+ private:
+ DISALLOW_IMPLICIT_CONSTRUCTORS(Negotiation);
+};
+
+} // namespace rpc
+} // namespace kudu
+#endif // KUDU_RPC_NEGOTIATION_H
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/outbound_call.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/outbound_call.cc b/be/src/kudu/rpc/outbound_call.cc
new file mode 100644
index 0000000..af03f1c
--- /dev/null
+++ b/be/src/kudu/rpc/outbound_call.cc
@@ -0,0 +1,509 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <boost/functional/hash.hpp>
+#include <gflags/gflags.h>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_introspection.pb.h"
+#include "kudu/rpc/rpc_sidecar.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/rpc/transfer.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/kernel_stack_watchdog.h"
+
+// 100M cycles should be about 50ms on a 2Ghz box. This should be high
+// enough that involuntary context switches don't trigger it, but low enough
+// that any serious blocking behavior on the reactor would.
+DEFINE_int64(rpc_callback_max_cycles, 100 * 1000 * 1000,
+ "The maximum number of cycles for which an RPC callback "
+ "should be allowed to run without emitting a warning."
+ " (Advanced debugging option)");
+TAG_FLAG(rpc_callback_max_cycles, advanced);
+TAG_FLAG(rpc_callback_max_cycles, runtime);
+
+using std::unique_ptr;
+
+namespace kudu {
+namespace rpc {
+
+using google::protobuf::Message;
+using strings::Substitute;
+
+static const double kMicrosPerSecond = 1000000.0;
+
+///
+/// OutboundCall
+///
+
+OutboundCall::OutboundCall(const ConnectionId& conn_id,
+ const RemoteMethod& remote_method,
+ google::protobuf::Message* response_storage,
+ RpcController* controller,
+ ResponseCallback callback)
+ : state_(READY),
+ remote_method_(remote_method),
+ conn_id_(conn_id),
+ callback_(std::move(callback)),
+ controller_(DCHECK_NOTNULL(controller)),
+ response_(DCHECK_NOTNULL(response_storage)) {
+ DVLOG(4) << "OutboundCall " << this << " constructed with state_: " << StateName(state_)
+ << " and RPC timeout: "
+ << (controller->timeout().Initialized() ? controller->timeout().ToString() : "none");
+ header_.set_call_id(kInvalidCallId);
+ remote_method.ToPB(header_.mutable_remote_method());
+ start_time_ = MonoTime::Now();
+
+ if (!controller_->required_server_features().empty()) {
+ required_rpc_features_.insert(RpcFeatureFlag::APPLICATION_FEATURE_FLAGS);
+ }
+
+ if (controller_->request_id_) {
+ header_.set_allocated_request_id(controller_->request_id_.release());
+ }
+}
+
+OutboundCall::~OutboundCall() {
+ DCHECK(IsFinished());
+ DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " << StateName(state_);
+}
+
+Status OutboundCall::SerializeTo(vector<Slice>* slices) {
+ DCHECK_LT(0, request_buf_.size())
+ << "Must call SetRequestPayload() before SerializeTo()";
+
+ const MonoDelta &timeout = controller_->timeout();
+ if (timeout.Initialized()) {
+ header_.set_timeout_millis(timeout.ToMilliseconds());
+ }
+
+ for (uint32_t feature : controller_->required_server_features()) {
+ header_.add_required_feature_flags(feature);
+ }
+
+ DCHECK_LE(0, sidecar_byte_size_);
+ serialization::SerializeHeader(
+ header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_);
+
+ slices->push_back(Slice(header_buf_));
+ slices->push_back(Slice(request_buf_));
+ for (const unique_ptr<RpcSidecar>& car : sidecars_) slices->push_back(car->AsSlice());
+ return Status::OK();
+}
+
+void OutboundCall::SetRequestPayload(const Message& req,
+ vector<unique_ptr<RpcSidecar>>&& sidecars) {
+ DCHECK_EQ(-1, sidecar_byte_size_);
+
+ sidecars_ = move(sidecars);
+
+ // Compute total size of sidecar payload so that extra space can be reserved as part of
+ // the request body.
+ uint32_t message_size = req.ByteSize();
+ sidecar_byte_size_ = 0;
+ for (const unique_ptr<RpcSidecar>& car: sidecars_) {
+ header_.add_sidecar_offsets(sidecar_byte_size_ + message_size);
+ sidecar_byte_size_ += car->AsSlice().size();
+ }
+
+ serialization::SerializeMessage(req, &request_buf_, sidecar_byte_size_, true);
+}
+
+Status OutboundCall::status() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return status_;
+}
+
+const ErrorStatusPB* OutboundCall::error_pb() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return error_pb_.get();
+}
+
+string OutboundCall::StateName(State state) {
+ switch (state) {
+ case READY:
+ return "READY";
+ case ON_OUTBOUND_QUEUE:
+ return "ON_OUTBOUND_QUEUE";
+ case SENDING:
+ return "SENDING";
+ case SENT:
+ return "SENT";
+ case NEGOTIATION_TIMED_OUT:
+ return "NEGOTIATION_TIMED_OUT";
+ case TIMED_OUT:
+ return "TIMED_OUT";
+ case FINISHED_NEGOTIATION_ERROR:
+ return "FINISHED_NEGOTIATION_ERROR";
+ case FINISHED_ERROR:
+ return "FINISHED_ERROR";
+ case FINISHED_SUCCESS:
+ return "FINISHED_SUCCESS";
+ default:
+ LOG(DFATAL) << "Unknown state in OutboundCall: " << state;
+ return StringPrintf("UNKNOWN(%d)", state);
+ }
+}
+
+void OutboundCall::set_state(State new_state) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ set_state_unlocked(new_state);
+}
+
+OutboundCall::State OutboundCall::state() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ return state_;
+}
+
+void OutboundCall::set_state_unlocked(State new_state) {
+ // Sanity check state transitions.
+ DVLOG(3) << "OutboundCall " << this << " (" << ToString() << ") switching from " <<
+ StateName(state_) << " to " << StateName(new_state);
+ switch (new_state) {
+ case ON_OUTBOUND_QUEUE:
+ DCHECK_EQ(state_, READY);
+ break;
+ case SENDING:
+ // Allow SENDING to be set idempotently so we don't have to specifically check
+ // whether the state is transitioning in the RPC code.
+ DCHECK(state_ == ON_OUTBOUND_QUEUE || state_ == SENDING);
+ break;
+ case SENT:
+ DCHECK_EQ(state_, SENDING);
+ break;
+ case NEGOTIATION_TIMED_OUT:
+ DCHECK(state_ == ON_OUTBOUND_QUEUE);
+ break;
+ case TIMED_OUT:
+ DCHECK(state_ == SENT || state_ == ON_OUTBOUND_QUEUE || state_ == SENDING);
+ break;
+ case FINISHED_SUCCESS:
+ DCHECK_EQ(state_, SENT);
+ break;
+ default:
+ // No sanity checks for others.
+ break;
+ }
+
+ state_ = new_state;
+}
+
+void OutboundCall::CallCallback() {
+ int64_t start_cycles = CycleClock::Now();
+ {
+ SCOPED_WATCH_STACK(100);
+ callback_();
+ // Clear the callback, since it may be holding onto reference counts
+ // via bound parameters. We do this inside the timer because it's possible
+ // the user has naughty destructors that block, and we want to account for that
+ // time here if they happen to run on this thread.
+ callback_ = NULL;
+ }
+ int64_t end_cycles = CycleClock::Now();
+ int64_t wait_cycles = end_cycles - start_cycles;
+ if (PREDICT_FALSE(wait_cycles > FLAGS_rpc_callback_max_cycles)) {
+ double micros = static_cast<double>(wait_cycles) / base::CyclesPerSecond()
+ * kMicrosPerSecond;
+
+ LOG(WARNING) << "RPC callback for " << ToString() << " blocked reactor thread for "
+ << micros << "us";
+ }
+}
+
+void OutboundCall::SetResponse(gscoped_ptr<CallResponse> resp) {
+ call_response_ = std::move(resp);
+ Slice r(call_response_->serialized_response());
+
+ if (call_response_->is_success()) {
+ // TODO: here we're deserializing the call response within the reactor thread,
+ // which isn't great, since it would block processing of other RPCs in parallel.
+ // Should look into a way to avoid this.
+ if (!response_->ParseFromArray(r.data(), r.size())) {
+ SetFailed(Status::IOError("invalid RPC response, missing fields",
+ response_->InitializationErrorString()));
+ return;
+ }
+ set_state(FINISHED_SUCCESS);
+ CallCallback();
+ } else {
+ // Error
+ gscoped_ptr<ErrorStatusPB> err(new ErrorStatusPB());
+ if (!err->ParseFromArray(r.data(), r.size())) {
+ SetFailed(Status::IOError("Was an RPC error but could not parse error response",
+ err->InitializationErrorString()));
+ return;
+ }
+ ErrorStatusPB* err_raw = err.release();
+ SetFailed(Status::RemoteError(err_raw->message()), Phase::REMOTE_CALL, err_raw);
+ }
+}
+
+void OutboundCall::SetQueued() {
+ set_state(ON_OUTBOUND_QUEUE);
+}
+
+void OutboundCall::SetSending() {
+ set_state(SENDING);
+}
+
+void OutboundCall::SetSent() {
+ set_state(SENT);
+
+ // This method is called in the reactor thread, so free the header buf,
+ // which was also allocated from this thread. tcmalloc's thread caching
+ // behavior is a lot more efficient if memory is freed from the same thread
+ // which allocated it -- this lets it keep to thread-local operations instead
+ // of taking a mutex to put memory back on the global freelist.
+ delete [] header_buf_.release();
+
+ // request_buf_ is also done being used here, but since it was allocated by
+ // the caller thread, we would rather let that thread free it whenever it
+ // deletes the RpcController.
+}
+
+void OutboundCall::SetFailed(const Status &status,
+ Phase phase,
+ ErrorStatusPB* err_pb) {
+ DCHECK(!status.ok());
+ DCHECK(phase == Phase::CONNECTION_NEGOTIATION || phase == Phase::REMOTE_CALL);
+ {
+ std::lock_guard<simple_spinlock> l(lock_);
+ status_ = status;
+ if (err_pb) {
+ error_pb_.reset(err_pb);
+ }
+ set_state_unlocked(phase == Phase::CONNECTION_NEGOTIATION
+ ? FINISHED_NEGOTIATION_ERROR
+ : FINISHED_ERROR);
+ }
+ CallCallback();
+}
+
+void OutboundCall::SetTimedOut(Phase phase) {
+ static const char* kErrMsgNegotiation =
+ "connection negotiation to $1 for RPC $0 timed out after $2 ($3)";
+ static const char* kErrMsgCall = "$0 RPC to $1 timed out after $2 ($3)";
+ DCHECK(phase == Phase::CONNECTION_NEGOTIATION || phase == Phase::REMOTE_CALL);
+
+ // We have to fetch timeout outside the lock to avoid a lock
+ // order inversion between this class and RpcController.
+ const MonoDelta timeout = controller_->timeout();
+ {
+ std::lock_guard<simple_spinlock> l(lock_);
+ status_ = Status::TimedOut(
+ Substitute((phase == Phase::REMOTE_CALL) ? kErrMsgCall : kErrMsgNegotiation,
+ remote_method_.method_name(),
+ conn_id_.remote().ToString(),
+ timeout.ToString(),
+ StateName(state_)));
+ set_state_unlocked((phase == Phase::REMOTE_CALL) ? TIMED_OUT : NEGOTIATION_TIMED_OUT);
+ }
+ CallCallback();
+}
+
+bool OutboundCall::IsTimedOut() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ switch (state_) {
+ case NEGOTIATION_TIMED_OUT: // fall-through
+ case TIMED_OUT:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool OutboundCall::IsNegotiationError() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ switch (state_) {
+ case FINISHED_NEGOTIATION_ERROR: // fall-through
+ case NEGOTIATION_TIMED_OUT:
+ return true;
+ default:
+ return false;
+ }
+}
+
+bool OutboundCall::IsFinished() const {
+ std::lock_guard<simple_spinlock> l(lock_);
+ switch (state_) {
+ case READY:
+ case SENDING:
+ case ON_OUTBOUND_QUEUE:
+ case SENT:
+ return false;
+ case NEGOTIATION_TIMED_OUT:
+ case TIMED_OUT:
+ case FINISHED_NEGOTIATION_ERROR:
+ case FINISHED_ERROR:
+ case FINISHED_SUCCESS:
+ return true;
+ default:
+ LOG(FATAL) << "Unknown call state: " << state_;
+ return false;
+ }
+}
+
+string OutboundCall::ToString() const {
+ return Substitute("RPC call $0 -> $1", remote_method_.ToString(), conn_id_.ToString());
+}
+
+void OutboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
+ RpcCallInProgressPB* resp) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ resp->mutable_header()->CopyFrom(header_);
+ resp->set_micros_elapsed((MonoTime::Now() - start_time_).ToMicroseconds());
+
+ switch (state_) {
+ case READY:
+ // Don't bother setting a state for "READY" since we don't expose a call
+ // until it's at least on the queue of a connection.
+ break;
+ case ON_OUTBOUND_QUEUE:
+ resp->set_state(RpcCallInProgressPB::ON_OUTBOUND_QUEUE);
+ break;
+ case SENDING:
+ resp->set_state(RpcCallInProgressPB::SENDING);
+ break;
+ case SENT:
+ resp->set_state(RpcCallInProgressPB::SENT);
+ break;
+ case NEGOTIATION_TIMED_OUT:
+ resp->set_state(RpcCallInProgressPB::NEGOTIATION_TIMED_OUT);
+ break;
+ case TIMED_OUT:
+ resp->set_state(RpcCallInProgressPB::TIMED_OUT);
+ break;
+ case FINISHED_NEGOTIATION_ERROR:
+ resp->set_state(RpcCallInProgressPB::FINISHED_NEGOTIATION_ERROR);
+ break;
+ case FINISHED_ERROR:
+ resp->set_state(RpcCallInProgressPB::FINISHED_ERROR);
+ break;
+ case FINISHED_SUCCESS:
+ resp->set_state(RpcCallInProgressPB::FINISHED_SUCCESS);
+ break;
+ }
+}
+
+///
+/// ConnectionId
+///
+
+ConnectionId::ConnectionId() {}
+
+ConnectionId::ConnectionId(const ConnectionId& other) {
+ DoCopyFrom(other);
+}
+
+ConnectionId::ConnectionId(const Sockaddr& remote, UserCredentials user_credentials) {
+ remote_ = remote;
+ user_credentials_ = std::move(user_credentials);
+}
+
+void ConnectionId::set_remote(const Sockaddr& remote) {
+ remote_ = remote;
+}
+
+void ConnectionId::set_user_credentials(UserCredentials user_credentials) {
+ user_credentials_ = std::move(user_credentials);
+}
+
+void ConnectionId::CopyFrom(const ConnectionId& other) {
+ DoCopyFrom(other);
+}
+
+string ConnectionId::ToString() const {
+ // Does not print the password.
+ return StringPrintf("{remote=%s, user_credentials=%s}",
+ remote_.ToString().c_str(),
+ user_credentials_.ToString().c_str());
+}
+
+void ConnectionId::DoCopyFrom(const ConnectionId& other) {
+ remote_ = other.remote_;
+ user_credentials_ = other.user_credentials_;
+}
+
+size_t ConnectionId::HashCode() const {
+ size_t seed = 0;
+ boost::hash_combine(seed, remote_.HashCode());
+ boost::hash_combine(seed, user_credentials_.HashCode());
+ return seed;
+}
+
+bool ConnectionId::Equals(const ConnectionId& other) const {
+ return (remote() == other.remote()
+ && user_credentials().Equals(other.user_credentials()));
+}
+
+size_t ConnectionIdHash::operator() (const ConnectionId& conn_id) const {
+ return conn_id.HashCode();
+}
+
+bool ConnectionIdEqual::operator() (const ConnectionId& cid1, const ConnectionId& cid2) const {
+ return cid1.Equals(cid2);
+}
+
+///
+/// CallResponse
+///
+
+CallResponse::CallResponse()
+ : parsed_(false) {
+}
+
+Status CallResponse::GetSidecar(int idx, Slice* sidecar) const {
+ DCHECK(parsed_);
+ if (idx < 0 || idx >= header_.sidecar_offsets_size()) {
+ return Status::InvalidArgument(strings::Substitute(
+ "Index $0 does not reference a valid sidecar", idx));
+ }
+ *sidecar = sidecar_slices_[idx];
+ return Status::OK();
+}
+
+Status CallResponse::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
+ CHECK(!parsed_);
+ RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_,
+ &serialized_response_));
+
+ // Use information from header to extract the payload slices.
+ RETURN_NOT_OK(RpcSidecar::ParseSidecars(header_.sidecar_offsets(),
+ serialized_response_, sidecar_slices_));
+
+ if (header_.sidecar_offsets_size() > 0) {
+ serialized_response_ =
+ Slice(serialized_response_.data(), header_.sidecar_offsets(0));
+ }
+
+ transfer_.swap(transfer);
+ parsed_ = true;
+ return Status::OK();
+}
+
+} // namespace rpc
+} // namespace kudu