You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/01/24 00:00:18 UTC

[1/3] kudu git commit: TLS-negotiation [5/n]: Rename sasl_[client|server] to [client|server]_negotiation

Repository: kudu
Updated Branches:
  refs/heads/master 9d0421e80 -> bbf753061


http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/sasl_rpc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_rpc-test.cc b/src/kudu/rpc/sasl_rpc-test.cc
deleted file mode 100644
index 6220e72..0000000
--- a/src/kudu/rpc/sasl_rpc-test.cc
+++ /dev/null
@@ -1,567 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/rpc/rpc-test-base.h"
-
-#include <stdlib.h>
-#include <sys/stat.h>
-
-#include <functional>
-#include <memory>
-#include <string>
-#include <thread>
-
-#include <gtest/gtest.h>
-#include <sasl/sasl.h>
-
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/rpc/constants.h"
-#include "kudu/rpc/sasl_client.h"
-#include "kudu/rpc/sasl_common.h"
-#include "kudu/rpc/sasl_server.h"
-#include "kudu/security/test/mini_kdc.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/net/sockaddr.h"
-#include "kudu/util/net/socket.h"
-#include "kudu/util/subprocess.h"
-
-using std::string;
-using std::thread;
-
-// HACK: MIT Kerberos doesn't have any way of determining its version number,
-// but the error messages in krb5-1.10 and earlier are broken due to
-// a bug: http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973
-//
-// Since we don't have any way to explicitly figure out the version, we just
-// look for this random macro which was added in 1.11 (the same version in which
-// the above bug was fixed).
-#ifndef KRB5_RESPONDER_QUESTION_PASSWORD
-#define KRB5_VERSION_LE_1_10
-#endif
-
-DEFINE_bool(is_test_child, false,
-            "Used by tests which require clean processes. "
-            "See TestDisableInit.");
-
-namespace kudu {
-namespace rpc {
-
-class TestSaslRpc : public RpcTestBase {
- public:
-  virtual void SetUp() OVERRIDE {
-    RpcTestBase::SetUp();
-    ASSERT_OK(SaslInit(kSaslAppName));
-  }
-};
-
-// Test basic initialization of the objects.
-TEST_F(TestSaslRpc, TestBasicInit) {
-  SaslServer server(kSaslAppName, nullptr);
-  server.EnablePlain();
-  ASSERT_OK(server.Init(kSaslAppName));
-  SaslClient client(kSaslAppName, nullptr);
-  client.EnablePlain("test", "test");
-  ASSERT_OK(client.Init(kSaslAppName));
-}
-
-// A "Callable" that takes a Socket* param, for use with starting a thread.
-// Can be used for SaslServer or SaslClient threads.
-typedef std::function<void(Socket*)> SocketCallable;
-
-// Call Accept() on the socket, then pass the connection to the server runner
-static void RunAcceptingDelegator(Socket* acceptor,
-                                  const SocketCallable& server_runner) {
-  Socket conn;
-  Sockaddr remote;
-  CHECK_OK(acceptor->Accept(&conn, &remote, 0));
-  server_runner(&conn);
-}
-
-// Set up a socket and run a SASL negotiation.
-static void RunNegotiationTest(const SocketCallable& server_runner,
-                               const SocketCallable& client_runner) {
-  Socket server_sock;
-  CHECK_OK(server_sock.Init(0));
-  ASSERT_OK(server_sock.BindAndListen(Sockaddr(), 1));
-  Sockaddr server_bind_addr;
-  ASSERT_OK(server_sock.GetSocketAddress(&server_bind_addr));
-  thread server(RunAcceptingDelegator, &server_sock, server_runner);
-
-  Socket client_sock;
-  CHECK_OK(client_sock.Init(0));
-  ASSERT_OK(client_sock.Connect(server_bind_addr));
-  thread client(client_runner, &client_sock);
-
-  LOG(INFO) << "Waiting for test threads to terminate...";
-  client.join();
-  LOG(INFO) << "Client thread terminated.";
-
-  // TODO(todd): if the client fails to negotiate, it doesn't
-  // always result in sending a nice error message to the
-  // other side.
-  client_sock.Close();
-
-  server.join();
-  LOG(INFO) << "Server thread terminated.";
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-static void RunPlainNegotiationServer(Socket* conn) {
-  SaslServer sasl_server(kSaslAppName, conn);
-  CHECK_OK(sasl_server.EnablePlain());
-  CHECK_OK(sasl_server.Init(kSaslAppName));
-  CHECK_OK(sasl_server.Negotiate());
-  CHECK(ContainsKey(sasl_server.client_features(), APPLICATION_FEATURE_FLAGS));
-  CHECK_EQ("my-username", sasl_server.authenticated_user());
-}
-
-static void RunPlainNegotiationClient(Socket* conn) {
-  SaslClient sasl_client(kSaslAppName, conn);
-  CHECK_OK(sasl_client.EnablePlain("my-username", "ignored password"));
-  CHECK_OK(sasl_client.Init(kSaslAppName));
-  CHECK_OK(sasl_client.Negotiate());
-  CHECK(ContainsKey(sasl_client.server_features(), APPLICATION_FEATURE_FLAGS));
-}
-
-// Test SASL negotiation using the PLAIN mechanism over a socket.
-TEST_F(TestSaslRpc, TestPlainNegotiation) {
-  RunNegotiationTest(RunPlainNegotiationServer, RunPlainNegotiationClient);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-
-template<class T>
-using CheckerFunction = std::function<void(const Status&, T&)>;
-
-// Run GSSAPI negotiation from the server side. Runs
-// 'post_check' after negotiation to verify the result.
-static void RunGSSAPINegotiationServer(
-    Socket* conn,
-    const CheckerFunction<SaslServer>& post_check) {
-  SaslServer sasl_server(kSaslAppName, conn);
-  sasl_server.set_server_fqdn("127.0.0.1");
-  CHECK_OK(sasl_server.EnableGSSAPI());
-  CHECK_OK(sasl_server.Init(kSaslAppName));
-  post_check(sasl_server.Negotiate(), sasl_server);
-}
-
-// Run GSSAPI negotiation from the client side. Runs
-// 'post_check' after negotiation to verify the result.
-static void RunGSSAPINegotiationClient(
-    Socket* conn,
-    const CheckerFunction<SaslClient>& post_check) {
-  SaslClient sasl_client(kSaslAppName, conn);
-  sasl_client.set_server_fqdn("127.0.0.1");
-  CHECK_OK(sasl_client.EnableGSSAPI());
-  CHECK_OK(sasl_client.Init(kSaslAppName));
-  post_check(sasl_client.Negotiate(), sasl_client);
-}
-
-// Test configuring a client to allow but not require Kerberos/GSSAPI,
-// and connect to a server which requires Kerberos/GSSAPI.
-//
-// They should negotiate to use Kerberos/GSSAPI.
-TEST_F(TestSaslRpc, TestRestrictiveServer_NonRestrictiveClient) {
-  MiniKdc kdc;
-  ASSERT_OK(kdc.Start());
-
-  // Create the server principal and keytab.
-  string kt_path;
-  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
-  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
-
-  // Create and kinit as a client user.
-  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
-  ASSERT_OK(kdc.Kinit("testuser"));
-  ASSERT_OK(kdc.SetKrb5Environment());
-
-  // Authentication should now succeed on both sides.
-  RunNegotiationTest(
-      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
-                [](const Status& s, SaslServer& server) {
-                  CHECK_OK(s);
-                  CHECK_EQ(SaslMechanism::GSSAPI, server.negotiated_mechanism());
-                  CHECK_EQ("testuser", server.authenticated_user());
-                }),
-      [](Socket* conn) {
-        SaslClient sasl_client(kSaslAppName, conn);
-        sasl_client.set_server_fqdn("127.0.0.1");
-        // The client enables both PLAIN and GSSAPI.
-        CHECK_OK(sasl_client.EnablePlain("foo", "bar"));
-        CHECK_OK(sasl_client.EnableGSSAPI());
-        CHECK_OK(sasl_client.Init(kSaslAppName));
-        CHECK_OK(sasl_client.Negotiate());
-        CHECK_EQ(SaslMechanism::GSSAPI, sasl_client.negotiated_mechanism());
-      });
-}
-
-// Test configuring a client to only support PLAIN, and a server which
-// only supports GSSAPI. This would happen, for example, if an old Kudu
-// client tries to talk to a secure-only cluster.
-TEST_F(TestSaslRpc, TestNoMatchingMechanisms) {
-  MiniKdc kdc;
-  ASSERT_OK(kdc.Start());
-
-  // Create the server principal and keytab.
-  string kt_path;
-  ASSERT_OK(kdc.CreateServiceKeytab("kudu/localhost", &kt_path));
-  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
-
-  RunNegotiationTest(
-      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
-                [](const Status& s, SaslServer& server) {
-                  // The client fails to find a matching mechanism and
-                  // doesn't send any failure message to the server.
-                  // Instead, it just disconnects.
-                  //
-                  // TODO(todd): this could produce a better message!
-                  ASSERT_STR_CONTAINS(s.ToString(), "got EOF from remote");
-                }),
-      [](Socket* conn) {
-        SaslClient sasl_client(kSaslAppName, conn);
-        sasl_client.set_server_fqdn("127.0.0.1");
-        // The client enables both PLAIN and GSSAPI.
-        CHECK_OK(sasl_client.EnablePlain("foo", "bar"));
-        CHECK_OK(sasl_client.Init(kSaslAppName));
-        Status s = sasl_client.Negotiate();
-        ASSERT_STR_CONTAINS(s.ToString(), "client was missing the required SASL module");
-      });
-}
-
-// Test SASL negotiation using the GSSAPI (kerberos) mechanism over a socket.
-TEST_F(TestSaslRpc, TestGSSAPINegotiation) {
-  MiniKdc kdc;
-  ASSERT_OK(kdc.Start());
-
-  // Create the server principal and keytab.
-  string kt_path;
-  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
-  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
-
-  // Create and kinit as a client user.
-  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
-  ASSERT_OK(kdc.Kinit("testuser"));
-  ASSERT_OK(kdc.SetKrb5Environment());
-
-  // Authentication should succeed on both sides.
-  RunNegotiationTest(
-      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
-                [](const Status& s, SaslServer& server) {
-                  CHECK_OK(s);
-                  CHECK_EQ(SaslMechanism::GSSAPI, server.negotiated_mechanism());
-                  CHECK_EQ("testuser", server.authenticated_user());
-                }),
-      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
-                [](const Status& s, SaslClient& client) {
-                  CHECK_OK(s);
-                  CHECK_EQ(SaslMechanism::GSSAPI, client.negotiated_mechanism());
-                }));
-}
-
-#ifndef __APPLE__
-// Test invalid SASL negotiations using the GSSAPI (kerberos) mechanism over a socket.
-// This test is ignored on macOS because the system Kerberos implementation
-// (Heimdal) caches the non-existence of client credentials, which causes futher
-// tests to fail.
-TEST_F(TestSaslRpc, TestGSSAPIInvalidNegotiation) {
-  MiniKdc kdc;
-  ASSERT_OK(kdc.Start());
-
-  // Try to negotiate with no krb5 credentials on either side. It should fail on both
-  // sides.
-  RunNegotiationTest(
-      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
-                [](const Status& s, SaslServer& server) {
-                  // The client notices there are no credentials and
-                  // doesn't send any failure message to the server.
-                  // Instead, it just disconnects.
-                  //
-                  // TODO(todd): it might be preferable to have the server
-                  // fail to start if it has no valid keytab.
-                  CHECK(s.IsNetworkError());
-                }),
-      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
-                [](const Status& s, SaslClient& client) {
-                  CHECK(s.IsNotAuthorized());
-#ifndef KRB5_VERSION_LE_1_10
-                  CHECK_GT(s.ToString().find("No Kerberos credentials available"), 0);
-#endif
-                }));
-
-
-  // Create the server principal and keytab.
-  string kt_path;
-  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
-  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
-
-  // Try to negotiate with no krb5 credentials on the client. It should fail on both
-  // sides.
-  RunNegotiationTest(
-      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
-                [](const Status& s, SaslServer& server) {
-                  // The client notices there are no credentials and
-                  // doesn't send any failure message to the server.
-                  // Instead, it just disconnects.
-                  CHECK(s.IsNetworkError());
-                }),
-      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
-                [](const Status& s, SaslClient& client) {
-                  CHECK(s.IsNotAuthorized());
-#ifndef KRB5_VERSION_LE_1_10
-                  CHECK_EQ(s.message().ToString(), "No Kerberos credentials available");
-#endif
-                }));
-
-  // Create and kinit as a client user.
-  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
-  ASSERT_OK(kdc.Kinit("testuser"));
-  ASSERT_OK(kdc.SetKrb5Environment());
-
-  // Change the server's keytab file so that it has inappropriate
-  // credentials.
-  // Authentication should now fail.
-  ASSERT_OK(kdc.CreateServiceKeytab("otherservice/127.0.0.1", &kt_path));
-  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
-
-  RunNegotiationTest(
-      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
-                [](const Status& s, SaslServer& server) {
-                  CHECK(s.IsNotAuthorized());
-#ifndef KRB5_VERSION_LE_1_10
-                  ASSERT_STR_CONTAINS(s.ToString(),
-                                      "No key table entry found matching kudu/127.0.0.1");
-#endif
-                }),
-      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
-                [](const Status& s, SaslClient& client) {
-                  CHECK(s.IsNotAuthorized());
-#ifndef KRB5_VERSION_LE_1_10
-                  ASSERT_STR_CONTAINS(s.ToString(),
-                                      "No key table entry found matching kudu/127.0.0.1");
-#endif
-                }));
-}
-#endif
-
-#ifndef __APPLE__
-// Test that the pre-flight check for servers requiring Kerberos provides
-// nice error messages for missing or bad keytabs.
-//
-// This is ignored on macOS because the system Kerberos implementation does not
-// fail the preflight check when the keytab is inaccessible, probably because
-// the preflight check passes a 0-length token.
-TEST_F(TestSaslRpc, TestPreflight) {
-  // Try pre-flight with no keytab.
-  Status s = SaslServer::PreflightCheckGSSAPI(kSaslAppName);
-  ASSERT_FALSE(s.ok());
-#ifndef KRB5_VERSION_LE_1_10
-  ASSERT_STR_MATCHES(s.ToString(), "Key table file.*not found");
-#endif
-  // Try with a valid krb5 environment and keytab.
-  MiniKdc kdc;
-  ASSERT_OK(kdc.Start());
-  ASSERT_OK(kdc.SetKrb5Environment());
-  string kt_path;
-  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
-  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
-
-  ASSERT_OK(SaslServer::PreflightCheckGSSAPI(kSaslAppName));
-
-  // Try with an inaccessible keytab.
-  CHECK_ERR(chmod(kt_path.c_str(), 0000));
-  s = SaslServer::PreflightCheckGSSAPI(kSaslAppName);
-  ASSERT_FALSE(s.ok());
-#ifndef KRB5_VERSION_LE_1_10
-  ASSERT_STR_MATCHES(s.ToString(), "error accessing keytab: Permission denied");
-#endif
-  CHECK_ERR(unlink(kt_path.c_str()));
-
-  // Try with a keytab that has the wrong credentials.
-  ASSERT_OK(kdc.CreateServiceKeytab("wrong-service/127.0.0.1", &kt_path));
-  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
-  s = SaslServer::PreflightCheckGSSAPI(kSaslAppName);
-  ASSERT_FALSE(s.ok());
-#ifndef KRB5_VERSION_LE_1_10
-  ASSERT_STR_MATCHES(s.ToString(), "No key table entry found matching kudu/.*");
-#endif
-}
-#endif
-
-////////////////////////////////////////////////////////////////////////////////
-
-static void RunTimeoutExpectingServer(Socket* conn) {
-  SaslServer sasl_server(kSaslAppName, conn);
-  CHECK_OK(sasl_server.EnablePlain());
-  CHECK_OK(sasl_server.Init(kSaslAppName));
-  Status s = sasl_server.Negotiate();
-  ASSERT_TRUE(s.IsNetworkError()) << "Expected client to time out and close the connection. Got: "
-      << s.ToString();
-}
-
-static void RunTimeoutNegotiationClient(Socket* sock) {
-  SaslClient sasl_client(kSaslAppName, sock);
-  CHECK_OK(sasl_client.EnablePlain("test", "test"));
-  CHECK_OK(sasl_client.Init(kSaslAppName));
-  MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
-  sasl_client.set_deadline(deadline);
-  Status s = sasl_client.Negotiate();
-  ASSERT_TRUE(s.IsTimedOut()) << "Expected timeout! Got: " << s.ToString();
-  CHECK_OK(sock->Shutdown(true, true));
-}
-
-// Ensure that the client times out.
-TEST_F(TestSaslRpc, TestClientTimeout) {
-  RunNegotiationTest(RunTimeoutExpectingServer, RunTimeoutNegotiationClient);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-static void RunTimeoutNegotiationServer(Socket* sock) {
-  SaslServer sasl_server(kSaslAppName, sock);
-  CHECK_OK(sasl_server.EnablePlain());
-  CHECK_OK(sasl_server.Init(kSaslAppName));
-  MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
-  sasl_server.set_deadline(deadline);
-  Status s = sasl_server.Negotiate();
-  ASSERT_TRUE(s.IsTimedOut()) << "Expected timeout! Got: " << s.ToString();
-  CHECK_OK(sock->Close());
-}
-
-static void RunTimeoutExpectingClient(Socket* conn) {
-  SaslClient sasl_client(kSaslAppName, conn);
-  CHECK_OK(sasl_client.EnablePlain("test", "test"));
-  CHECK_OK(sasl_client.Init(kSaslAppName));
-  Status s = sasl_client.Negotiate();
-  ASSERT_TRUE(s.IsNetworkError()) << "Expected server to time out and close the connection. Got: "
-      << s.ToString();
-}
-
-// Ensure that the server times out.
-TEST_F(TestSaslRpc, TestServerTimeout) {
-  RunNegotiationTest(RunTimeoutNegotiationServer, RunTimeoutExpectingClient);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-// This suite of tests ensure that applications that embed the Kudu client are
-// able to externally handle the initialization of SASL. See KUDU-1749 and
-// IMPALA-4497 for context.
-//
-// The tests are a bit tricky because the initialization of SASL is static state
-// that we can't easily clear/reset between test cases. So, each test invokes
-// itself as a subprocess with the appropriate --gtest_filter line as well as a
-// special flag to indicate that it is the test child running.
-class TestDisableInit : public KuduTest {
- protected:
-  // Run the lambda 'f' in a newly-started process, capturing its stderr
-  // into 'stderr'.
-  template<class TestFunc>
-  void DoTest(const TestFunc& f, string* stderr = nullptr) {
-    if (FLAGS_is_test_child) {
-      f();
-      return;
-    }
-
-    // Invoke the currently-running test case in a new subprocess.
-    string filter_flag = strings::Substitute("--gtest_filter=$0.$1",
-                                             CURRENT_TEST_CASE_NAME(), CURRENT_TEST_NAME());
-    string executable_path;
-    CHECK_OK(env_->GetExecutablePath(&executable_path));
-    string stdout;
-    Status s = Subprocess::Call({ executable_path, "test", filter_flag, "--is_test_child" },
-                                "" /* stdin */,
-                                &stdout,
-                                stderr);
-    ASSERT_TRUE(s.ok()) << "Test failed: " << stdout;
-  }
-};
-
-// Test disabling SASL but not actually properly initializing it before usage.
-TEST_F(TestDisableInit, TestDisableSasl_NotInitialized) {
-  DoTest([]() {
-      CHECK_OK(DisableSaslInitialization());
-      Status s = SaslInit("kudu");
-      ASSERT_STR_CONTAINS(s.ToString(), "was disabled, but SASL was not externally initialized");
-    });
-}
-
-// Test disabling SASL with proper initialization by some other app.
-TEST_F(TestDisableInit, TestDisableSasl_Good) {
-  DoTest([]() {
-      rpc::internal::SaslSetMutex();
-      sasl_client_init(NULL);
-      CHECK_OK(DisableSaslInitialization());
-      ASSERT_OK(SaslInit("kudu"));
-    });
-}
-
-// Test a client which inits SASL itself but doesn't remember to disable Kudu's
-// SASL initialization.
-TEST_F(TestDisableInit, TestMultipleSaslInit) {
-  string stderr;
-  DoTest([]() {
-      rpc::internal::SaslSetMutex();
-      sasl_client_init(NULL);
-      ASSERT_OK(SaslInit("kudu"));
-    }, &stderr);
-  // If we are the parent, we should see the warning from the child that it automatically
-  // skipped initialization because it detected that it was already initialized.
-  if (!FLAGS_is_test_child) {
-    ASSERT_STR_CONTAINS(stderr, "Skipping initialization");
-  }
-}
-
-// We are not able to detect mutexes not being set with the macOS version of libsasl.
-#ifndef __APPLE__
-// Test disabling SASL but not remembering to initialize the SASL mutex support. This
-// should succeed but generate a warning.
-TEST_F(TestDisableInit, TestDisableSasl_NoMutexImpl) {
-  string stderr;
-  DoTest([]() {
-      sasl_client_init(NULL);
-      CHECK_OK(DisableSaslInitialization());
-      ASSERT_OK(SaslInit("kudu"));
-    }, &stderr);
-  // If we are the parent, we should see the warning from the child.
-  if (!FLAGS_is_test_child) {
-    ASSERT_STR_CONTAINS(stderr, "not provided with a mutex implementation");
-  }
-}
-
-// Test a client which inits SASL itself but doesn't remember to disable Kudu's
-// SASL initialization.
-TEST_F(TestDisableInit, TestMultipleSaslInit_NoMutexImpl) {
-  string stderr;
-  DoTest([]() {
-      sasl_client_init(NULL);
-      ASSERT_OK(SaslInit("kudu"));
-    }, &stderr);
-  // If we are the parent, we should see the warning from the child that it automatically
-  // skipped initialization because it detected that it was already initialized.
-  if (!FLAGS_is_test_child) {
-    ASSERT_STR_CONTAINS(stderr, "Skipping initialization");
-    ASSERT_STR_CONTAINS(stderr, "not provided with a mutex implementation");
-  }
-}
-#endif
-
-} // namespace rpc
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/sasl_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_server.cc b/src/kudu/rpc/sasl_server.cc
deleted file mode 100644
index d8c627d..0000000
--- a/src/kudu/rpc/sasl_server.cc
+++ /dev/null
@@ -1,498 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/rpc/sasl_server.h"
-
-#include <glog/logging.h>
-#include <google/protobuf/message_lite.h>
-#include <limits>
-#include <sasl/sasl.h>
-#include <set>
-#include <string>
-
-#include "kudu/gutil/endian.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/stringprintf.h"
-#include "kudu/gutil/strings/split.h"
-#include "kudu/rpc/blocking_ops.h"
-#include "kudu/rpc/constants.h"
-#include "kudu/rpc/serialization.h"
-#include "kudu/util/net/sockaddr.h"
-#include "kudu/util/net/socket.h"
-#include "kudu/util/scoped_cleanup.h"
-#include "kudu/util/trace.h"
-
-namespace kudu {
-namespace rpc {
-
-static int SaslServerGetoptCb(void* sasl_server, const char* plugin_name, const char* option,
-                       const char** result, unsigned* len) {
-  return static_cast<SaslServer*>(sasl_server)
-    ->GetOptionCb(plugin_name, option, result, len);
-}
-
-static int SaslServerPlainAuthCb(sasl_conn_t *conn, void *sasl_server, const char *user,
-    const char *pass, unsigned passlen, struct propctx *propctx) {
-  return static_cast<SaslServer*>(sasl_server)
-    ->PlainAuthCb(conn, user, pass, passlen, propctx);
-}
-
-SaslServer::SaslServer(string app_name, Socket* socket)
-    : app_name_(std::move(app_name)),
-      sock_(socket),
-      helper_(SaslHelper::SERVER),
-      server_state_(SaslNegotiationState::NEW),
-      negotiated_mech_(SaslMechanism::INVALID),
-      deadline_(MonoTime::Max()) {
-  callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
-      reinterpret_cast<int (*)()>(&SaslServerGetoptCb), this));
-  callbacks_.push_back(SaslBuildCallback(SASL_CB_SERVER_USERDB_CHECKPASS,
-      reinterpret_cast<int (*)()>(&SaslServerPlainAuthCb), this));
-  callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr));
-}
-
-Status SaslServer::EnablePlain() {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
-  RETURN_NOT_OK(helper_.EnablePlain());
-  return Status::OK();
-}
-
-Status SaslServer::EnableGSSAPI() {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
-  return helper_.EnableGSSAPI();
-}
-
-SaslMechanism::Type SaslServer::negotiated_mechanism() const {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEGOTIATED);
-  return negotiated_mech_;
-}
-
-const std::string& SaslServer::authenticated_user() const {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEGOTIATED);
-  return authenticated_user_;
-}
-
-void SaslServer::set_local_addr(const Sockaddr& addr) {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
-  helper_.set_local_addr(addr);
-}
-
-void SaslServer::set_remote_addr(const Sockaddr& addr) {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
-  helper_.set_remote_addr(addr);
-}
-
-void SaslServer::set_server_fqdn(const string& domain_name) {
-  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
-  helper_.set_server_fqdn(domain_name);
-}
-
-void SaslServer::set_deadline(const MonoTime& deadline) {
-  DCHECK_NE(server_state_, SaslNegotiationState::NEGOTIATED);
-  deadline_ = deadline;
-}
-
-// calls sasl_server_init() and sasl_server_new()
-Status SaslServer::Init(const string& service_type) {
-  RETURN_NOT_OK(SaslInit(app_name_.c_str()));
-
-  // Ensure we are not called more than once.
-  if (server_state_ != SaslNegotiationState::NEW) {
-    return Status::IllegalState("Init() may only be called once per SaslServer object.");
-  }
-
-  // TODO: Support security flags.
-  unsigned secflags = 0;
-
-  sasl_conn_t* sasl_conn = nullptr;
-  Status s = WrapSaslCall(nullptr /* no conn */, [&]() {
-      return sasl_server_new(
-          // Registered name of the service using SASL. Required.
-          service_type.c_str(),
-          // The fully qualified domain name of this server.
-          helper_.server_fqdn(),
-          // Permits multiple user realms on server. NULL == use default.
-          nullptr,
-          // Local and remote IP address strings. (NULL disables
-          // mechanisms which require this info.)
-          helper_.local_addr_string(),
-          helper_.remote_addr_string(),
-          // Connection-specific callbacks.
-          &callbacks_[0],
-          // Security flags.
-          secflags,
-          &sasl_conn);
-    });
-
-  if (PREDICT_FALSE(!s.ok())) {
-    return Status::RuntimeError("Unable to create new SASL server",
-                                s.message());
-  }
-  sasl_conn_.reset(sasl_conn);
-
-  server_state_ = SaslNegotiationState::INITIALIZED;
-  return Status::OK();
-}
-
-Status SaslServer::Negotiate() {
-  // After negotiation, we no longer need the SASL library object, so
-  // may as well free its memory since the connection may be long-lived.
-  auto cleanup = MakeScopedCleanup([&]() {
-      sasl_conn_.reset();
-    });
-  DVLOG(4) << "Called SaslServer::Negotiate()";
-
-  // Ensure we are called exactly once, and in the right order.
-  if (server_state_ == SaslNegotiationState::NEW) {
-    return Status::IllegalState("SaslServer: Init() must be called before calling Negotiate()");
-  } else if (server_state_ == SaslNegotiationState::NEGOTIATED) {
-    return Status::IllegalState("SaslServer: Negotiate() may only be called once per object.");
-  }
-
-  // Ensure we can use blocking calls on the socket during negotiation.
-  RETURN_NOT_OK(EnsureBlockingMode(sock_));
-
-  faststring recv_buf;
-
-  // Read connection header
-  RETURN_NOT_OK(ValidateConnectionHeader(&recv_buf));
-
-  nego_ok_ = false;
-  while (!nego_ok_) {
-    TRACE("Waiting for next Negotiation message...");
-    RequestHeader header;
-    Slice param_buf;
-    RETURN_NOT_OK(ReceiveFramedMessageBlocking(sock_, &recv_buf, &header, &param_buf, deadline_));
-
-    NegotiatePB request;
-    RETURN_NOT_OK(ParseNegotiatePB(header, param_buf, &request));
-
-    switch (request.step()) {
-      // NEGOTIATE: They want a list of available mechanisms.
-      case NegotiatePB::NEGOTIATE:
-        RETURN_NOT_OK(HandleNegotiateRequest(request));
-        break;
-
-      // INITIATE: They want to initiate negotiation based on their specified mechanism.
-      case NegotiatePB::SASL_INITIATE:
-        RETURN_NOT_OK(HandleInitiateRequest(request));
-        break;
-
-      // RESPONSE: Client sent a new request as a follow-up to a SASL_CHALLENGE response.
-      case NegotiatePB::SASL_RESPONSE:
-        RETURN_NOT_OK(HandleResponseRequest(request));
-        break;
-
-      // Client sent us an unsupported Negotiation request.
-      default: {
-        TRACE("SASL Server: Received unsupported request from client");
-        Status s = Status::InvalidArgument("RPC server doesn't support negotiation step in request",
-                                           NegotiatePB::NegotiateStep_Name(request.step()));
-        RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
-        return s;
-      }
-    }
-  }
-
-  const char* username = nullptr;
-  int rc = sasl_getprop(sasl_conn_.get(), SASL_USERNAME,
-                        reinterpret_cast<const void**>(&username));
-  // We expect that SASL_USERNAME will always get set.
-  CHECK(rc == SASL_OK && username != nullptr)
-      << "No username on authenticated connection";
-  authenticated_user_ = username;
-
-  TRACE("SASL Server: Successful negotiation");
-  server_state_ = SaslNegotiationState::NEGOTIATED;
-  return Status::OK();
-}
-
-Status SaslServer::ValidateConnectionHeader(faststring* recv_buf) {
-  TRACE("Waiting for connection header");
-  size_t num_read;
-  const size_t conn_header_len = kMagicNumberLength + kHeaderFlagsLength;
-  recv_buf->resize(conn_header_len);
-  RETURN_NOT_OK(sock_->BlockingRecv(recv_buf->data(), conn_header_len, &num_read, deadline_));
-  DCHECK_EQ(conn_header_len, num_read);
-
-  RETURN_NOT_OK(serialization::ValidateConnHeader(*recv_buf));
-  TRACE("Connection header received");
-  return Status::OK();
-}
-
-Status SaslServer::ParseNegotiatePB(const RequestHeader& header,
-                                    const Slice& param_buf,
-                                    NegotiatePB* request) {
-  Status s = helper_.SanityCheckNegotiateCallId(header.call_id());
-  if (!s.ok()) {
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_INVALID_RPC_HEADER, s));
-  }
-
-  s = helper_.ParseNegotiatePB(param_buf, request);
-  if (!s.ok()) {
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_DESERIALIZING_REQUEST, s));
-    return s;
-  }
-
-  return Status::OK();
-}
-
-Status SaslServer::SendNegotiatePB(const NegotiatePB& msg) {
-  DCHECK_NE(server_state_, SaslNegotiationState::NEW)
-      << "Must not send Negotiate messages before calling Init()";
-  DCHECK_NE(server_state_, SaslNegotiationState::NEGOTIATED)
-      << "Must not send Negotiate messages after Negotiate() succeeds";
-
-  // Create header with negotiation-specific callId
-  ResponseHeader header;
-  header.set_call_id(kNegotiateCallId);
-  return helper_.SendNegotiatePB(sock_, header, msg, deadline_);
-}
-
-Status SaslServer::SendRpcError(ErrorStatusPB::RpcErrorCodePB code, const Status& err) {
-  DCHECK_NE(server_state_, SaslNegotiationState::NEW)
-      << "Must not send SASL messages before calling Init()";
-  DCHECK_NE(server_state_, SaslNegotiationState::NEGOTIATED)
-      << "Must not send SASL messages after Negotiate() succeeds";
-  if (err.ok()) {
-    return Status::InvalidArgument("Cannot send error message using OK status");
-  }
-
-  // Create header with negotiation-specific callId
-  ResponseHeader header;
-  header.set_call_id(kNegotiateCallId);
-  header.set_is_error(true);
-
-  // Get RPC error code from Status object
-  ErrorStatusPB msg;
-  msg.set_code(code);
-  msg.set_message(err.ToString());
-
-  RETURN_NOT_OK(helper_.SendNegotiatePB(sock_, header, msg, deadline_));
-  TRACE("Sent SASL error: $0", ErrorStatusPB::RpcErrorCodePB_Name(code));
-  return Status::OK();
-}
-
-Status SaslServer::HandleNegotiateRequest(const NegotiatePB& request) {
-  TRACE("SASL Server: Received NEGOTIATE request from client");
-
-  // Fill in the set of features supported by the client.
-  for (int flag : request.supported_features()) {
-    // We only add the features that our local build knows about.
-    RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
-                                  static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
-    if (ContainsKey(kSupportedServerRpcFeatureFlags, feature_flag)) {
-      client_features_.insert(feature_flag);
-    }
-  }
-
-  set<string> server_mechs = helper_.LocalMechs();
-  if (PREDICT_FALSE(server_mechs.empty())) {
-    // This will happen if no mechanisms are enabled before calling Init()
-    Status s = Status::IllegalState("SASL server mechanism list is empty!");
-    LOG(ERROR) << s.ToString();
-    TRACE("SASL Server: Sending FATAL_UNAUTHORIZED response to client");
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
-    return s;
-  }
-
-  RETURN_NOT_OK(SendNegotiateResponse(server_mechs));
-  return Status::OK();
-}
-
-Status SaslServer::SendNegotiateResponse(const set<string>& server_mechs) {
-  NegotiatePB response;
-  response.set_step(NegotiatePB::NEGOTIATE);
-
-  for (const string& mech : server_mechs) {
-    response.add_auths()->set_mechanism(mech);
-  }
-
-  // Tell the client which features we support.
-  for (RpcFeatureFlag feature : kSupportedServerRpcFeatureFlags) {
-    response.add_supported_features(feature);
-  }
-
-  RETURN_NOT_OK(SendNegotiatePB(response));
-  TRACE("Sent NEGOTIATE response");
-  return Status::OK();
-}
-
-
-Status SaslServer::HandleInitiateRequest(const NegotiatePB& request) {
-  TRACE("SASL Server: Received INITIATE request from client");
-
-  if (request.auths_size() != 1) {
-    Status s = Status::NotAuthorized(StringPrintf(
-          "SASL INITIATE request must include exactly one SaslAuth section, found: %d",
-          request.auths_size()));
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
-    return s;
-  }
-
-  const NegotiatePB::SaslAuth& auth = request.auths(0);
-  TRACE("SASL Server: Client requested to use mechanism: $0", auth.mechanism());
-
-  // Security issue to display this. Commented out but left for debugging purposes.
-  //DVLOG(3) << "SASL server: Client token: " << request.token();
-
-  const char* server_out = nullptr;
-  uint32_t server_out_len = 0;
-  TRACE("SASL Server: Calling sasl_server_start()");
-
-  Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
-      return sasl_server_start(
-          sasl_conn_.get(),         // The SASL connection context created by init()
-          auth.mechanism().c_str(), // The mechanism requested by the client.
-          request.token().c_str(),  // Optional string the client gave us.
-          request.token().length(), // Client string len.
-          &server_out,              // The output of the SASL library, might not be NULL terminated
-          &server_out_len);         // Output len.
-    });
-  if (PREDICT_FALSE(!s.ok() && !s.IsIncomplete())) {
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
-    return s;
-  }
-  negotiated_mech_ = SaslMechanism::value_of(auth.mechanism());
-
-  // We have a valid mechanism match
-  if (s.ok()) {
-    nego_ok_ = true;
-    RETURN_NOT_OK(SendSuccessResponse(server_out, server_out_len));
-  } else { // s.IsIncomplete() (equivalent to SASL_CONTINUE)
-    RETURN_NOT_OK(SendChallengeResponse(server_out, server_out_len));
-  }
-  return Status::OK();
-}
-
-Status SaslServer::SendChallengeResponse(const char* challenge, unsigned clen) {
-  NegotiatePB response;
-  response.set_step(NegotiatePB::SASL_CHALLENGE);
-  response.mutable_token()->assign(challenge, clen);
-  TRACE("SASL Server: Sending SASL_CHALLENGE response to client");
-  RETURN_NOT_OK(SendNegotiatePB(response));
-  return Status::OK();
-}
-
-Status SaslServer::SendSuccessResponse(const char* token, unsigned tlen) {
-  NegotiatePB response;
-  response.set_step(NegotiatePB::SASL_SUCCESS);
-  if (PREDICT_FALSE(tlen > 0)) {
-    response.mutable_token()->assign(token, tlen);
-  }
-  TRACE("SASL Server: Sending SASL_SUCCESS response to client");
-  RETURN_NOT_OK(SendNegotiatePB(response));
-  return Status::OK();
-}
-
-
-Status SaslServer::HandleResponseRequest(const NegotiatePB& request) {
-  TRACE("SASL Server: Received RESPONSE request from client");
-
-  if (!request.has_token()) {
-    Status s = Status::InvalidArgument("No token in SASL_RESPONSE from client");
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
-    return s;
-  }
-
-  const char* server_out = nullptr;
-  uint32_t server_out_len = 0;
-  TRACE("SASL Server: Calling sasl_server_step()");
-  Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
-      return sasl_server_step(
-          sasl_conn_.get(),         // The SASL connection context created by init()
-          request.token().c_str(),  // Optional string the client gave us
-          request.token().length(), // Client string len
-          &server_out,              // The output of the SASL library, might not be NULL terminated
-          &server_out_len);         // Output len
-    });
-  if (!s.ok() && !s.IsIncomplete()) {
-    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
-    return s;
-  }
-
-  if (s.ok()) {
-    nego_ok_ = true;
-    RETURN_NOT_OK(SendSuccessResponse(server_out, server_out_len));
-  } else { // s.IsIncomplete() (equivalent to SASL_CONTINUE)
-    RETURN_NOT_OK(SendChallengeResponse(server_out, server_out_len));
-  }
-  return Status::OK();
-}
-
-int SaslServer::GetOptionCb(const char* plugin_name, const char* option,
-                            const char** result, unsigned* len) {
-  return helper_.GetOptionCb(plugin_name, option, result, len);
-}
-
-int SaslServer::PlainAuthCb(sasl_conn_t * /*conn*/, const char * /*user*/, const char * /*pass*/,
-                            unsigned /*passlen*/, struct propctx * /*propctx*/) {
-  TRACE("SASL Server: Received PLAIN auth.");
-  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
-    LOG(DFATAL) << "Password authentication callback called while PLAIN auth disabled";
-    return SASL_BADPARAM;
-  }
-  // We always allow PLAIN authentication to succeed.
-  return SASL_OK;
-}
-
-Status SaslServer::PreflightCheckGSSAPI(const string& app_name) {
-  // TODO(todd): the error messages that come from this function on el6
-  // are relatively useless due to the following krb5 bug:
-  // http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973
-  // This may not be useful anymore given the keytab login that happens
-  // in security/init.cc.
-
-  // Initialize a SaslServer with a null socket, and enable
-  // only GSSAPI.
-  //
-  // We aren't going to actually send/receive any messages, but
-  // this makes it easier to reuse the initialization code.
-  SaslServer server(app_name, nullptr);
-  Status s = server.EnableGSSAPI();
-  if (!s.ok()) {
-    return Status::RuntimeError(s.message());
-  }
-
-  RETURN_NOT_OK(server.Init(app_name));
-
-  // Start the SASL server as if we were accepting a connection.
-  const char* server_out = nullptr; // ignored
-  uint32_t server_out_len = 0;
-  s = WrapSaslCall(server.sasl_conn_.get(), [&]() {
-      return sasl_server_start(
-          server.sasl_conn_.get(),
-          kSaslMechGSSAPI,
-          "", 0,  // Pass a 0-length token.
-          &server_out, &server_out_len);
-    });
-
-  // We expect 'Incomplete' status to indicate that the first step of negotiation
-  // was correct.
-  if (s.IsIncomplete()) return Status::OK();
-
-  string err_msg = s.message().ToString();
-  if (err_msg == "Permission denied") {
-    // For bad keytab permissions, we get a rather vague message. So,
-    // we make it more specific for better usability.
-    err_msg = "error accessing keytab: " + err_msg;
-  }
-  return Status::RuntimeError(err_msg);
-}
-
-} // namespace rpc
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/sasl_server.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_server.h b/src/kudu/rpc/sasl_server.h
deleted file mode 100644
index 679a7c4..0000000
--- a/src/kudu/rpc/sasl_server.h
+++ /dev/null
@@ -1,180 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef KUDU_RPC_SASL_SERVER_H
-#define KUDU_RPC_SASL_SERVER_H
-
-#include <set>
-#include <string>
-#include <vector>
-
-#include <sasl/sasl.h>
-
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/sasl_common.h"
-#include "kudu/rpc/sasl_helper.h"
-#include "kudu/util/net/socket.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-
-namespace kudu {
-
-class Slice;
-
-namespace rpc {
-
-using std::string;
-
-// Class for doing SASL negotiation with a SaslClient over a bidirectional socket.
-// Operations on this class are NOT thread-safe.
-class SaslServer {
- public:
-  // Does not take ownership of 'socket'.
-  SaslServer(string app_name, Socket* socket);
-
-  // Enable PLAIN authentication.
-  // Despite PLAIN authentication taking a username and password, we disregard
-  // the password and use this as a "unauthenticated" mode.
-  // Must be called after Init().
-  Status EnablePlain();
-
-  // Enable GSSAPI (Kerberos) authentication.
-  // Call after Init().
-  Status EnableGSSAPI();
-
-  // Returns mechanism negotiated by this connection.
-  // Must be called after Negotiate().
-  SaslMechanism::Type negotiated_mechanism() const;
-
-  // Returns the set of RPC system features supported by the remote client.
-  // Must be called after Negotiate().
-  const std::set<RpcFeatureFlag>& client_features() const {
-    return client_features_;
-  }
-
-  // Name of the user that was authenticated.
-  // Must be called after a successful Negotiate().
-  const std::string& authenticated_user() const;
-
-  // Specify IP:port of local side of connection.
-  // Must be called before Init(). Required for some mechanisms.
-  void set_local_addr(const Sockaddr& addr);
-
-  // Specify IP:port of remote side of connection.
-  // Must be called before Init(). Required for some mechanisms.
-  void set_remote_addr(const Sockaddr& addr);
-
-  // Specify the fully-qualified domain name of the remote server.
-  // Must be called before Init(). Required for some mechanisms.
-  void set_server_fqdn(const string& domain_name);
-
-  // Set deadline for connection negotiation.
-  void set_deadline(const MonoTime& deadline);
-
-  // Get deadline for connection negotiation.
-  const MonoTime& deadline() const { return deadline_; }
-
-  // Initialize a new SASL server. Must be called before Negotiate().
-  // Returns OK on success, otherwise RuntimeError.
-  Status Init(const string& service_type);
-
-  // Begin negotiation with the SASL client on the other side of the fd socket
-  // that this server was constructed with.
-  // Returns OK on success.
-  // Otherwise, it may return NotAuthorized, NotSupported, or another non-OK status.
-  Status Negotiate();
-
-  // SASL callback for plugin options, supported mechanisms, etc.
-  // Returns SASL_FAIL if the option is not handled, which does not fail the handshake.
-  int GetOptionCb(const char* plugin_name, const char* option,
-                  const char** result, unsigned* len);
-
-  // SASL callback for PLAIN authentication via SASL_CB_SERVER_USERDB_CHECKPASS.
-  int PlainAuthCb(sasl_conn_t* conn, const char* user, const char* pass,
-                  unsigned passlen, struct propctx* propctx);
-
-  // Perform a "pre-flight check" that everything required to act as a Kerberos
-  // server is properly set up.
-  static Status PreflightCheckGSSAPI(const std::string& app_name);
-
- private:
-  // Parse and validate connection header.
-  Status ValidateConnectionHeader(faststring* recv_buf);
-
-  // Parse request body. If malformed, sends an error message to the client.
-  Status ParseNegotiatePB(const RequestHeader& header,
-                          const Slice& param_buf,
-                          NegotiatePB* request);
-
-  // Encode and send the specified SASL message to the client.
-  Status SendNegotiatePB(const NegotiatePB& msg);
-
-  // Encode and send the specified RPC error message to the client.
-  // Calls Status.ToString() for the embedded error message.
-  Status SendRpcError(ErrorStatusPB::RpcErrorCodePB code, const Status& err);
-
-  // Handle case when client sends NEGOTIATE request.
-  Status HandleNegotiateRequest(const NegotiatePB& request);
-
-  // Send a NEGOTIATE response to the client with the list of available mechanisms.
-  Status SendNegotiateResponse(const std::set<string>& server_mechs);
-
-  // Handle case when client sends INITIATE request.
-  Status HandleInitiateRequest(const NegotiatePB& request);
-
-  // Send a CHALLENGE response to the client with a challenge token.
-  Status SendChallengeResponse(const char* challenge, unsigned clen);
-
-  // Send a SUCCESS response to the client with an token (typically empty).
-  Status SendSuccessResponse(const char* token, unsigned tlen);
-
-  // Handle case when client sends RESPONSE request.
-  Status HandleResponseRequest(const NegotiatePB& request);
-
-  string app_name_;
-  Socket* sock_;
-  std::vector<sasl_callback_t> callbacks_;
-  // The SASL connection object. This is initialized in Init() and
-  // freed after Negotiate() completes (regardless whether it was successful).
-  gscoped_ptr<sasl_conn_t, SaslDeleter> sasl_conn_;
-  SaslHelper helper_;
-
-  // The set of features that the client supports. Filled in
-  // after we receive the NEGOTIATE request from the client.
-  std::set<RpcFeatureFlag> client_features_;
-
-  // The successfully-authenticated user, if applicable.
-  string authenticated_user_;
-
-  SaslNegotiationState::Type server_state_;
-
-  // The mechanism we negotiated with the client.
-  SaslMechanism::Type negotiated_mech_;
-
-  // Intra-negotiation state.
-  bool nego_ok_;  // During negotiation: did we get a SASL_OK response from the SASL library?
-
-  // Negotiation timeout deadline.
-  MonoTime deadline_;
-
-  DISALLOW_COPY_AND_ASSIGN(SaslServer);
-};
-
-} // namespace rpc
-} // namespace kudu
-
-#endif  // KUDU_RPC_SASL_SERVER_H

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/server_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.cc b/src/kudu/rpc/server_negotiation.cc
new file mode 100644
index 0000000..9e91481
--- /dev/null
+++ b/src/kudu/rpc/server_negotiation.cc
@@ -0,0 +1,499 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/server_negotiation.h"
+
+#include <glog/logging.h>
+#include <google/protobuf/message_lite.h>
+#include <limits>
+#include <sasl/sasl.h>
+#include <set>
+#include <string>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/rpc/blocking_ops.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/trace.h"
+
+namespace kudu {
+namespace rpc {
+
+static int SaslServerGetoptCb(void* sasl_server, const char* plugin_name, const char* option,
+                       const char** result, unsigned* len) {
+  return static_cast<SaslServer*>(sasl_server)
+    ->GetOptionCb(plugin_name, option, result, len);
+}
+
+static int SaslServerPlainAuthCb(sasl_conn_t *conn, void *sasl_server, const char *user,
+    const char *pass, unsigned passlen, struct propctx *propctx) {
+  return static_cast<SaslServer*>(sasl_server)
+    ->PlainAuthCb(conn, user, pass, passlen, propctx);
+}
+
+SaslServer::SaslServer(string app_name, Socket* socket)
+    : app_name_(std::move(app_name)),
+      sock_(socket),
+      helper_(SaslHelper::SERVER),
+      server_state_(SaslNegotiationState::NEW),
+      negotiated_mech_(SaslMechanism::INVALID),
+      deadline_(MonoTime::Max()) {
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
+      reinterpret_cast<int (*)()>(&SaslServerGetoptCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_SERVER_USERDB_CHECKPASS,
+      reinterpret_cast<int (*)()>(&SaslServerPlainAuthCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr));
+}
+
+Status SaslServer::EnablePlain() {
+  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
+  RETURN_NOT_OK(helper_.EnablePlain());
+  return Status::OK();
+}
+
+Status SaslServer::EnableGSSAPI() {
+  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
+  return helper_.EnableGSSAPI();
+}
+
+SaslMechanism::Type SaslServer::negotiated_mechanism() const {
+  DCHECK_EQ(server_state_, SaslNegotiationState::NEGOTIATED);
+  return negotiated_mech_;
+}
+
+const std::string& SaslServer::authenticated_user() const {
+  DCHECK_EQ(server_state_, SaslNegotiationState::NEGOTIATED);
+  return authenticated_user_;
+}
+
+void SaslServer::set_local_addr(const Sockaddr& addr) {
+  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
+  helper_.set_local_addr(addr);
+}
+
+void SaslServer::set_remote_addr(const Sockaddr& addr) {
+  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
+  helper_.set_remote_addr(addr);
+}
+
+void SaslServer::set_server_fqdn(const string& domain_name) {
+  DCHECK_EQ(server_state_, SaslNegotiationState::NEW);
+  helper_.set_server_fqdn(domain_name);
+}
+
+void SaslServer::set_deadline(const MonoTime& deadline) {
+  DCHECK_NE(server_state_, SaslNegotiationState::NEGOTIATED);
+  deadline_ = deadline;
+}
+
+// calls sasl_server_init() and sasl_server_new()
+Status SaslServer::Init(const string& service_type) {
+  RETURN_NOT_OK(SaslInit(app_name_.c_str()));
+
+  // Ensure we are not called more than once.
+  if (server_state_ != SaslNegotiationState::NEW) {
+    return Status::IllegalState("Init() may only be called once per SaslServer object.");
+  }
+
+  // TODO(unknown): Support security flags.
+  unsigned secflags = 0;
+
+  sasl_conn_t* sasl_conn = nullptr;
+  Status s = WrapSaslCall(nullptr /* no conn */, [&]() {
+      return sasl_server_new(
+          // Registered name of the service using SASL. Required.
+          service_type.c_str(),
+          // The fully qualified domain name of this server.
+          helper_.server_fqdn(),
+          // Permits multiple user realms on server. NULL == use default.
+          nullptr,
+          // Local and remote IP address strings. (NULL disables
+          // mechanisms which require this info.)
+          helper_.local_addr_string(),
+          helper_.remote_addr_string(),
+          // Connection-specific callbacks.
+          &callbacks_[0],
+          // Security flags.
+          secflags,
+          &sasl_conn);
+    });
+
+  if (PREDICT_FALSE(!s.ok())) {
+    return Status::RuntimeError("Unable to create new SASL server",
+                                s.message());
+  }
+  sasl_conn_.reset(sasl_conn);
+
+  server_state_ = SaslNegotiationState::INITIALIZED;
+  return Status::OK();
+}
+
+Status SaslServer::Negotiate() {
+  // After negotiation, we no longer need the SASL library object, so
+  // may as well free its memory since the connection may be long-lived.
+  auto cleanup = MakeScopedCleanup([&]() {
+      sasl_conn_.reset();
+    });
+  DVLOG(4) << "Called SaslServer::Negotiate()";
+
+  // Ensure we are called exactly once, and in the right order.
+  if (server_state_ == SaslNegotiationState::NEW) {
+    return Status::IllegalState("SaslServer: Init() must be called before calling Negotiate()");
+  }
+  if (server_state_ == SaslNegotiationState::NEGOTIATED) {
+    return Status::IllegalState("SaslServer: Negotiate() may only be called once per object.");
+  }
+
+  // Ensure we can use blocking calls on the socket during negotiation.
+  RETURN_NOT_OK(EnsureBlockingMode(sock_));
+
+  faststring recv_buf;
+
+  // Read connection header
+  RETURN_NOT_OK(ValidateConnectionHeader(&recv_buf));
+
+  nego_ok_ = false;
+  while (!nego_ok_) {
+    TRACE("Waiting for next Negotiation message...");
+    RequestHeader header;
+    Slice param_buf;
+    RETURN_NOT_OK(ReceiveFramedMessageBlocking(sock_, &recv_buf, &header, &param_buf, deadline_));
+
+    NegotiatePB request;
+    RETURN_NOT_OK(ParseNegotiatePB(header, param_buf, &request));
+
+    switch (request.step()) {
+      // NEGOTIATE: They want a list of available mechanisms.
+      case NegotiatePB::NEGOTIATE:
+        RETURN_NOT_OK(HandleNegotiateRequest(request));
+        break;
+
+      // INITIATE: They want to initiate negotiation based on their specified mechanism.
+      case NegotiatePB::SASL_INITIATE:
+        RETURN_NOT_OK(HandleInitiateRequest(request));
+        break;
+
+      // RESPONSE: Client sent a new request as a follow-up to a SASL_CHALLENGE response.
+      case NegotiatePB::SASL_RESPONSE:
+        RETURN_NOT_OK(HandleResponseRequest(request));
+        break;
+
+      // Client sent us an unsupported Negotiation request.
+      default: {
+        TRACE("SASL Server: Received unsupported request from client");
+        Status s = Status::InvalidArgument("RPC server doesn't support negotiation step in request",
+                                           NegotiatePB::NegotiateStep_Name(request.step()));
+        RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+        return s;
+      }
+    }
+  }
+
+  const char* username = nullptr;
+  int rc = sasl_getprop(sasl_conn_.get(), SASL_USERNAME,
+                        reinterpret_cast<const void**>(&username));
+  // We expect that SASL_USERNAME will always get set.
+  CHECK(rc == SASL_OK && username != nullptr)
+      << "No username on authenticated connection";
+  authenticated_user_ = username;
+
+  TRACE("SASL Server: Successful negotiation");
+  server_state_ = SaslNegotiationState::NEGOTIATED;
+  return Status::OK();
+}
+
+Status SaslServer::ValidateConnectionHeader(faststring* recv_buf) {
+  TRACE("Waiting for connection header");
+  size_t num_read;
+  const size_t conn_header_len = kMagicNumberLength + kHeaderFlagsLength;
+  recv_buf->resize(conn_header_len);
+  RETURN_NOT_OK(sock_->BlockingRecv(recv_buf->data(), conn_header_len, &num_read, deadline_));
+  DCHECK_EQ(conn_header_len, num_read);
+
+  RETURN_NOT_OK(serialization::ValidateConnHeader(*recv_buf));
+  TRACE("Connection header received");
+  return Status::OK();
+}
+
+Status SaslServer::ParseNegotiatePB(const RequestHeader& header,
+                                    const Slice& param_buf,
+                                    NegotiatePB* request) {
+  Status s = helper_.SanityCheckNegotiateCallId(header.call_id());
+  if (!s.ok()) {
+    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_INVALID_RPC_HEADER, s));
+  }
+
+  s = helper_.ParseNegotiatePB(param_buf, request);
+  if (!s.ok()) {
+    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_DESERIALIZING_REQUEST, s));
+    return s;
+  }
+
+  return Status::OK();
+}
+
+Status SaslServer::SendNegotiatePB(const NegotiatePB& msg) {
+  DCHECK_NE(server_state_, SaslNegotiationState::NEW)
+      << "Must not send Negotiate messages before calling Init()";
+  DCHECK_NE(server_state_, SaslNegotiationState::NEGOTIATED)
+      << "Must not send Negotiate messages after Negotiate() succeeds";
+
+  // Create header with negotiation-specific callId
+  ResponseHeader header;
+  header.set_call_id(kNegotiateCallId);
+  return helper_.SendNegotiatePB(sock_, header, msg, deadline_);
+}
+
+Status SaslServer::SendRpcError(ErrorStatusPB::RpcErrorCodePB code, const Status& err) {
+  DCHECK_NE(server_state_, SaslNegotiationState::NEW)
+      << "Must not send SASL messages before calling Init()";
+  DCHECK_NE(server_state_, SaslNegotiationState::NEGOTIATED)
+      << "Must not send SASL messages after Negotiate() succeeds";
+  if (err.ok()) {
+    return Status::InvalidArgument("Cannot send error message using OK status");
+  }
+
+  // Create header with negotiation-specific callId
+  ResponseHeader header;
+  header.set_call_id(kNegotiateCallId);
+  header.set_is_error(true);
+
+  // Get RPC error code from Status object
+  ErrorStatusPB msg;
+  msg.set_code(code);
+  msg.set_message(err.ToString());
+
+  RETURN_NOT_OK(helper_.SendNegotiatePB(sock_, header, msg, deadline_));
+  TRACE("Sent SASL error: $0", ErrorStatusPB::RpcErrorCodePB_Name(code));
+  return Status::OK();
+}
+
+Status SaslServer::HandleNegotiateRequest(const NegotiatePB& request) {
+  TRACE("SASL Server: Received NEGOTIATE request from client");
+
+  // Fill in the set of features supported by the client.
+  for (int flag : request.supported_features()) {
+    // We only add the features that our local build knows about.
+    RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
+                                  static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
+    if (ContainsKey(kSupportedServerRpcFeatureFlags, feature_flag)) {
+      client_features_.insert(feature_flag);
+    }
+  }
+
+  set<string> server_mechs = helper_.LocalMechs();
+  if (PREDICT_FALSE(server_mechs.empty())) {
+    // This will happen if no mechanisms are enabled before calling Init()
+    Status s = Status::IllegalState("SASL server mechanism list is empty!");
+    LOG(ERROR) << s.ToString();
+    TRACE("SASL Server: Sending FATAL_UNAUTHORIZED response to client");
+    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    return s;
+  }
+
+  RETURN_NOT_OK(SendNegotiateResponse(server_mechs));
+  return Status::OK();
+}
+
+Status SaslServer::SendNegotiateResponse(const set<string>& server_mechs) {
+  NegotiatePB response;
+  response.set_step(NegotiatePB::NEGOTIATE);
+
+  for (const string& mech : server_mechs) {
+    response.add_auths()->set_mechanism(mech);
+  }
+
+  // Tell the client which features we support.
+  for (RpcFeatureFlag feature : kSupportedServerRpcFeatureFlags) {
+    response.add_supported_features(feature);
+  }
+
+  RETURN_NOT_OK(SendNegotiatePB(response));
+  TRACE("Sent NEGOTIATE response");
+  return Status::OK();
+}
+
+
+Status SaslServer::HandleInitiateRequest(const NegotiatePB& request) {
+  TRACE("SASL Server: Received INITIATE request from client");
+
+  if (request.auths_size() != 1) {
+    Status s = Status::NotAuthorized(StringPrintf(
+          "SASL INITIATE request must include exactly one SaslAuth section, found: %d",
+          request.auths_size()));
+    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    return s;
+  }
+
+  const NegotiatePB::SaslAuth& auth = request.auths(0);
+  TRACE("SASL Server: Client requested to use mechanism: $0", auth.mechanism());
+
+  // Security issue to display this. Commented out but left for debugging purposes.
+  //DVLOG(3) << "SASL server: Client token: " << request.token();
+
+  const char* server_out = nullptr;
+  uint32_t server_out_len = 0;
+  TRACE("SASL Server: Calling sasl_server_start()");
+
+  Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
+      return sasl_server_start(
+          sasl_conn_.get(),         // The SASL connection context created by init()
+          auth.mechanism().c_str(), // The mechanism requested by the client.
+          request.token().c_str(),  // Optional string the client gave us.
+          request.token().length(), // Client string len.
+          &server_out,              // The output of the SASL library, might not be NULL terminated
+          &server_out_len);         // Output len.
+    });
+  if (PREDICT_FALSE(!s.ok() && !s.IsIncomplete())) {
+    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    return s;
+  }
+  negotiated_mech_ = SaslMechanism::value_of(auth.mechanism());
+
+  // We have a valid mechanism match
+  if (s.ok()) {
+    nego_ok_ = true;
+    RETURN_NOT_OK(SendSuccessResponse(server_out, server_out_len));
+  } else { // s.IsIncomplete() (equivalent to SASL_CONTINUE)
+    RETURN_NOT_OK(SendChallengeResponse(server_out, server_out_len));
+  }
+  return Status::OK();
+}
+
+Status SaslServer::SendChallengeResponse(const char* challenge, unsigned clen) {
+  NegotiatePB response;
+  response.set_step(NegotiatePB::SASL_CHALLENGE);
+  response.mutable_token()->assign(challenge, clen);
+  TRACE("SASL Server: Sending SASL_CHALLENGE response to client");
+  RETURN_NOT_OK(SendNegotiatePB(response));
+  return Status::OK();
+}
+
+Status SaslServer::SendSuccessResponse(const char* token, unsigned tlen) {
+  NegotiatePB response;
+  response.set_step(NegotiatePB::SASL_SUCCESS);
+  if (PREDICT_FALSE(tlen > 0)) {
+    response.mutable_token()->assign(token, tlen);
+  }
+  TRACE("SASL Server: Sending SASL_SUCCESS response to client");
+  RETURN_NOT_OK(SendNegotiatePB(response));
+  return Status::OK();
+}
+
+
+Status SaslServer::HandleResponseRequest(const NegotiatePB& request) {
+  TRACE("SASL Server: Received RESPONSE request from client");
+
+  if (!request.has_token()) {
+    Status s = Status::InvalidArgument("No token in SASL_RESPONSE from client");
+    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    return s;
+  }
+
+  const char* server_out = nullptr;
+  uint32_t server_out_len = 0;
+  TRACE("SASL Server: Calling sasl_server_step()");
+  Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
+      return sasl_server_step(
+          sasl_conn_.get(),         // The SASL connection context created by init()
+          request.token().c_str(),  // Optional string the client gave us
+          request.token().length(), // Client string len
+          &server_out,              // The output of the SASL library, might not be NULL terminated
+          &server_out_len);         // Output len
+    });
+  if (!s.ok() && !s.IsIncomplete()) {
+    RETURN_NOT_OK(SendRpcError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+    return s;
+  }
+
+  if (s.ok()) {
+    nego_ok_ = true;
+    RETURN_NOT_OK(SendSuccessResponse(server_out, server_out_len));
+  } else { // s.IsIncomplete() (equivalent to SASL_CONTINUE)
+    RETURN_NOT_OK(SendChallengeResponse(server_out, server_out_len));
+  }
+  return Status::OK();
+}
+
+int SaslServer::GetOptionCb(const char* plugin_name, const char* option,
+                            const char** result, unsigned* len) {
+  return helper_.GetOptionCb(plugin_name, option, result, len);
+}
+
+int SaslServer::PlainAuthCb(sasl_conn_t * /*conn*/, const char * /*user*/, const char * /*pass*/,
+                            unsigned /*passlen*/, struct propctx * /*propctx*/) {
+  TRACE("SASL Server: Received PLAIN auth.");
+  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+    LOG(DFATAL) << "Password authentication callback called while PLAIN auth disabled";
+    return SASL_BADPARAM;
+  }
+  // We always allow PLAIN authentication to succeed.
+  return SASL_OK;
+}
+
+Status SaslServer::PreflightCheckGSSAPI(const string& app_name) {
+  // TODO(todd): the error messages that come from this function on el6
+  // are relatively useless due to the following krb5 bug:
+  // http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973
+  // This may not be useful anymore given the keytab login that happens
+  // in security/init.cc.
+
+  // Initialize a SaslServer with a null socket, and enable
+  // only GSSAPI.
+  //
+  // We aren't going to actually send/receive any messages, but
+  // this makes it easier to reuse the initialization code.
+  SaslServer server(app_name, nullptr);
+  Status s = server.EnableGSSAPI();
+  if (!s.ok()) {
+    return Status::RuntimeError(s.message());
+  }
+
+  RETURN_NOT_OK(server.Init(app_name));
+
+  // Start the SASL server as if we were accepting a connection.
+  const char* server_out = nullptr; // ignored
+  uint32_t server_out_len = 0;
+  s = WrapSaslCall(server.sasl_conn_.get(), [&]() {
+      return sasl_server_start(
+          server.sasl_conn_.get(),
+          kSaslMechGSSAPI,
+          "", 0,  // Pass a 0-length token.
+          &server_out, &server_out_len);
+    });
+
+  // We expect 'Incomplete' status to indicate that the first step of negotiation
+  // was correct.
+  if (s.IsIncomplete()) return Status::OK();
+
+  string err_msg = s.message().ToString();
+  if (err_msg == "Permission denied") {
+    // For bad keytab permissions, we get a rather vague message. So,
+    // we make it more specific for better usability.
+    err_msg = "error accessing keytab: " + err_msg;
+  }
+  return Status::RuntimeError(err_msg);
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/server_negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/server_negotiation.h b/src/kudu/rpc/server_negotiation.h
new file mode 100644
index 0000000..53f674d
--- /dev/null
+++ b/src/kudu/rpc/server_negotiation.h
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_SASL_SERVER_H
+#define KUDU_RPC_SASL_SERVER_H
+
+#include <set>
+#include <string>
+#include <vector>
+
+#include <sasl/sasl.h>
+
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Slice;
+
+namespace rpc {
+
+using std::string;
+
+// Class for doing SASL negotiation with a SaslClient over a bidirectional socket.
+// Operations on this class are NOT thread-safe.
+class SaslServer {
+ public:
+  // Does not take ownership of 'socket'.
+  SaslServer(string app_name, Socket* socket);
+
+  // Enable PLAIN authentication.
+  // Despite PLAIN authentication taking a username and password, we disregard
+  // the password and use this as a "unauthenticated" mode.
+  // Must be called after Init().
+  Status EnablePlain();
+
+  // Enable GSSAPI (Kerberos) authentication.
+  // Call after Init().
+  Status EnableGSSAPI();
+
+  // Returns mechanism negotiated by this connection.
+  // Must be called after Negotiate().
+  SaslMechanism::Type negotiated_mechanism() const;
+
+  // Returns the set of RPC system features supported by the remote client.
+  // Must be called after Negotiate().
+  const std::set<RpcFeatureFlag>& client_features() const {
+    return client_features_;
+  }
+
+  // Name of the user that was authenticated.
+  // Must be called after a successful Negotiate().
+  const std::string& authenticated_user() const;
+
+  // Specify IP:port of local side of connection.
+  // Must be called before Init(). Required for some mechanisms.
+  void set_local_addr(const Sockaddr& addr);
+
+  // Specify IP:port of remote side of connection.
+  // Must be called before Init(). Required for some mechanisms.
+  void set_remote_addr(const Sockaddr& addr);
+
+  // Specify the fully-qualified domain name of the remote server.
+  // Must be called before Init(). Required for some mechanisms.
+  void set_server_fqdn(const string& domain_name);
+
+  // Set deadline for connection negotiation.
+  void set_deadline(const MonoTime& deadline);
+
+  // Get deadline for connection negotiation.
+  const MonoTime& deadline() const { return deadline_; }
+
+  // Initialize a new SASL server. Must be called before Negotiate().
+  // Returns OK on success, otherwise RuntimeError.
+  Status Init(const string& service_type);
+
+  // Begin negotiation with the SASL client on the other side of the fd socket
+  // that this server was constructed with.
+  // Returns OK on success.
+  // Otherwise, it may return NotAuthorized, NotSupported, or another non-OK status.
+  Status Negotiate();
+
+  // SASL callback for plugin options, supported mechanisms, etc.
+  // Returns SASL_FAIL if the option is not handled, which does not fail the handshake.
+  int GetOptionCb(const char* plugin_name, const char* option,
+                  const char** result, unsigned* len);
+
+  // SASL callback for PLAIN authentication via SASL_CB_SERVER_USERDB_CHECKPASS.
+  int PlainAuthCb(sasl_conn_t* conn, const char* user, const char* pass,
+                  unsigned passlen, struct propctx* propctx);
+
+  // Perform a "pre-flight check" that everything required to act as a Kerberos
+  // server is properly set up.
+  static Status PreflightCheckGSSAPI(const std::string& app_name);
+
+ private:
+  // Parse and validate connection header.
+  Status ValidateConnectionHeader(faststring* recv_buf);
+
+  // Parse request body. If malformed, sends an error message to the client.
+  Status ParseNegotiatePB(const RequestHeader& header,
+                          const Slice& param_buf,
+                          NegotiatePB* request);
+
+  // Encode and send the specified SASL message to the client.
+  Status SendNegotiatePB(const NegotiatePB& msg);
+
+  // Encode and send the specified RPC error message to the client.
+  // Calls Status.ToString() for the embedded error message.
+  Status SendRpcError(ErrorStatusPB::RpcErrorCodePB code, const Status& err);
+
+  // Handle case when client sends NEGOTIATE request.
+  Status HandleNegotiateRequest(const NegotiatePB& request);
+
+  // Send a NEGOTIATE response to the client with the list of available mechanisms.
+  Status SendNegotiateResponse(const std::set<string>& server_mechs);
+
+  // Handle case when client sends INITIATE request.
+  Status HandleInitiateRequest(const NegotiatePB& request);
+
+  // Send a CHALLENGE response to the client with a challenge token.
+  Status SendChallengeResponse(const char* challenge, unsigned clen);
+
+  // Send a SUCCESS response to the client with an token (typically empty).
+  Status SendSuccessResponse(const char* token, unsigned tlen);
+
+  // Handle case when client sends RESPONSE request.
+  Status HandleResponseRequest(const NegotiatePB& request);
+
+  string app_name_;
+  Socket* sock_;
+  std::vector<sasl_callback_t> callbacks_;
+  // The SASL connection object. This is initialized in Init() and
+  // freed after Negotiate() completes (regardless whether it was successful).
+  gscoped_ptr<sasl_conn_t, SaslDeleter> sasl_conn_;
+  SaslHelper helper_;
+
+  // The set of features that the client supports. Filled in
+  // after we receive the NEGOTIATE request from the client.
+  std::set<RpcFeatureFlag> client_features_;
+
+  // The successfully-authenticated user, if applicable.
+  string authenticated_user_;
+
+  SaslNegotiationState::Type server_state_;
+
+  // The mechanism we negotiated with the client.
+  SaslMechanism::Type negotiated_mech_;
+
+  // Intra-negotiation state.
+  bool nego_ok_;  // During negotiation: did we get a SASL_OK response from the SASL library?
+
+  // Negotiation timeout deadline.
+  MonoTime deadline_;
+
+  DISALLOW_COPY_AND_ASSIGN(SaslServer);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif  // KUDU_RPC_SASL_SERVER_H


[3/3] kudu git commit: tool: remove dead code

Posted by ad...@apache.org.
tool: remove dead code

Change-Id: I93b173c4f2f91d1d7466f99ddc3264a8796e5b51
Reviewed-on: http://gerrit.cloudera.org:8080/5770
Tested-by: Kudu Jenkins
Reviewed-by: Dinesh Bhat <di...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/bbf75306
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/bbf75306
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/bbf75306

Branch: refs/heads/master
Commit: bbf75306127e99b070b214dcc1c9896b8de3989f
Parents: 41c4552
Author: Adar Dembo <ad...@cloudera.com>
Authored: Mon Jan 23 12:57:52 2017 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Mon Jan 23 23:59:55 2017 +0000

----------------------------------------------------------------------
 src/kudu/tools/fs_dump-tool.cc | 213 ------------------------------------
 src/kudu/tools/fs_list-tool.cc | 153 --------------------------
 2 files changed, 366 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/bbf75306/src/kudu/tools/fs_dump-tool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/fs_dump-tool.cc b/src/kudu/tools/fs_dump-tool.cc
deleted file mode 100644
index 92bdb8a..0000000
--- a/src/kudu/tools/fs_dump-tool.cc
+++ /dev/null
@@ -1,213 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// Tool to dump tablets, rowsets, and blocks
-
-#include "kudu/tools/fs_tool.h"
-
-#include <iostream>
-#include <memory>
-#include <sstream>
-#include <vector>
-
-#include <gflags/gflags.h>
-#include <glog/logging.h>
-
-#include "kudu/gutil/strings/numbers.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/util/flags.h"
-#include "kudu/util/logging.h"
-
-DEFINE_int32(nrows, 0, "Number of rows to dump");
-DEFINE_bool(metadata_only, false, "Whether just to dump the block metadata, "
-                                  "when printing blocks.");
-
-/*
-  TODO: support specifying start and end keys
-
-  DEFINE_string(start_key, "", "Start key for rows to dump");
-  DEFINE_string(end_key, "", "Start key for rows to dump");
-*/
-
-DEFINE_bool(headers_only, false, "Don't dump contents, dump headers only");
-
-namespace kudu {
-namespace tools {
-
-using std::string;
-using std::vector;
-using strings::Substitute;
-
-namespace {
-
-enum CommandType {
-  DUMP_TABLET_BLOCKS,
-  DUMP_TABLET_DATA,
-  DUMP_ROWSET,
-  DUMP_CFILE_BLOCK,
-  PRINT_TABLET_META,
-  PRINT_UUID,
-};
-
-struct CommandHandler {
-  CommandType type_;
-  string name_;
-  string desc_;
-
-  CommandHandler(CommandType type, string name, string desc)
-      : type_(type), name_(std::move(name)), desc_(std::move(desc)) {}
-};
-
-const vector<CommandHandler> kCommandHandlers = {
-    CommandHandler(DUMP_TABLET_DATA, "dump_tablet_data",
-                   "Dump a tablet's data (requires a tablet id)"),
-    CommandHandler(DUMP_TABLET_BLOCKS, "dump_tablet_blocks",
-                   "Dump a tablet's constituent blocks (requires a tablet id)"),
-    CommandHandler(DUMP_ROWSET, "dump_rowset",
-                   "Dump a rowset (requires a tablet id and an index)"),
-    CommandHandler(DUMP_CFILE_BLOCK, "dump_block",
-                   "Dump a cfile block (requires a block id)"),
-    CommandHandler(PRINT_TABLET_META, "print_meta",
-                   "Print a tablet metadata (requires a tablet id)"),
-    CommandHandler(PRINT_UUID, "print_uuid",
-                   "Print the UUID (master or TS) to whom the data belongs") };
-
-void PrintUsageToStream(const std::string& prog_name, std::ostream* out) {
-  *out << "Usage: " << prog_name
-       << " [-headers_only] [-nrows <num rows>] "
-       << "-fs_wal_dir <dir> -fs_data_dirs <dirs> <command> <options> "
-       << std::endl << std::endl;
-  *out << "Commands: " << std::endl;
-  for (const CommandHandler& handler : kCommandHandlers) {
-    *out << handler.name_ << ": " << handler.desc_ << std::endl;
-  }
-}
-void Usage(const string& prog_name, const string& msg) {
-  std::cerr << "Error " << prog_name << ": " << msg << std::endl;
-  PrintUsageToStream(prog_name, &std::cerr);
-}
-
-bool ValidateCommand(int argc, char** argv, CommandType* out) {
-  if (argc < 2) {
-    Usage(argv[0], "At least one command must be specified!");
-    return false;
-  }
-  for (const CommandHandler& handler : kCommandHandlers) {
-    if (argv[1] == handler.name_) {
-      *out = handler.type_;
-      return true;
-    }
-  }
-  Usage("Invalid command specified: ", argv[1]);
-  return false;
-}
-
-} // anonymous namespace
-
-static int FsDumpToolMain(int argc, char** argv) {
-  FLAGS_logtostderr = 1;
-  std::ostringstream usage_str;
-  PrintUsageToStream(argv[0], &usage_str);
-  google::SetUsageMessage(usage_str.str());
-  ParseCommandLineFlags(&argc, &argv, true);
-  InitGoogleLoggingSafe(argv[0]);
-
-  CommandType cmd;
-  if (!ValidateCommand(argc, argv, &cmd)) {
-    return 2;
-  }
-
-  FsTool fs_tool(FLAGS_headers_only ? FsTool::HEADERS_ONLY : FsTool::MAXIMUM);
-  CHECK_OK(fs_tool.Init());
-
-  DumpOptions opts;
-  // opts.start_key = FLAGS_start_key;
-  // opts.end_key = FLAGS_end_key;
-  opts.nrows = FLAGS_nrows;
-  opts.metadata_only = FLAGS_metadata_only;
-
-  switch (cmd) {
-    case DUMP_TABLET_DATA:
-    case DUMP_TABLET_BLOCKS:
-    {
-      if (argc < 3) {
-        Usage(argv[0],
-              Substitute("dump_tablet requires tablet id: $0 "
-                         "dump_tablet <tablet_id>",
-                         argv[0]));
-        return 2;
-      }
-      if (cmd == DUMP_TABLET_DATA) {
-        CHECK_OK(fs_tool.DumpTabletData(argv[2]));
-      } else if (cmd == DUMP_TABLET_BLOCKS) {
-        CHECK_OK(fs_tool.DumpTabletBlocks(argv[2], opts, 0));
-      }
-      break;
-    }
-
-    case DUMP_ROWSET: {
-      if (argc < 4) {
-        Usage(argv[0],
-              Substitute("dump_rowset requires tablet id and rowset index: $0"
-                         "dump_rowset <tablet_id> <rowset_index>",
-                         argv[0]));
-        return 2;
-      }
-      uint32_t rowset_idx;
-      CHECK(safe_strtou32(argv[3], &rowset_idx))
-          << "Invalid index specified: " << argv[2];
-      CHECK_OK(fs_tool.DumpRowSet(argv[2], rowset_idx, opts, 0));
-      break;
-    }
-    case DUMP_CFILE_BLOCK: {
-      if (argc < 3) {
-        Usage(argv[0],
-              Substitute("dump_block requires a block id: $0"
-                         "dump_block <block_id>", argv[0]));
-        return 2;
-      }
-      CHECK_OK(fs_tool.DumpCFileBlock(argv[2], opts, 0));
-      break;
-    }
-    case PRINT_TABLET_META: {
-      if (argc < 3) {
-        Usage(argv[0], Substitute("print_meta requires a tablet id: $0"
-                                  "print_meta <tablet_id>", argv[0]));
-        return 2;
-      }
-      CHECK_OK(fs_tool.PrintTabletMeta(argv[2], 0));
-      break;
-    }
-    case PRINT_UUID: {
-      if (argc < 2) {
-        Usage(argv[0], Substitute("$0 print_uuid", argv[0]));
-        return 2;
-      }
-      CHECK_OK(fs_tool.PrintUUID(0));
-      break;
-    }
-  }
-
-  return 0;
-}
-
-} // namespace tools
-} // namespace kudu
-
-int main(int argc, char** argv) {
-  return kudu::tools::FsDumpToolMain(argc, argv);
-}

http://git-wip-us.apache.org/repos/asf/kudu/blob/bbf75306/src/kudu/tools/fs_list-tool.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/fs_list-tool.cc b/src/kudu/tools/fs_list-tool.cc
deleted file mode 100644
index 5f3836f..0000000
--- a/src/kudu/tools/fs_list-tool.cc
+++ /dev/null
@@ -1,153 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// Tool to list local files and directories
-
-#include "kudu/tools/fs_tool.h"
-
-#include <iostream>
-#include <sstream>
-#include <vector>
-
-#include <gflags/gflags.h>
-#include <glog/logging.h>
-
-#include "kudu/util/flags.h"
-#include "kudu/util/logging.h"
-
-DEFINE_bool(verbose, false,
-            "Print additional information (e.g., log segment headers)");
-
-namespace kudu {
-namespace tools {
-
-using std::string;
-using std::vector;
-
-namespace {
-
-enum CommandType {
-  FS_TREE = 1,
-  LIST_LOGS = 2,
-  LIST_TABLETS = 3,
-  LIST_BLOCKS = 4
-};
-
-// TODO: extract and generalized the "verb" handling code with other
-// tools such that it can be shared with other tools.
-
-struct CommandHandler {
-  CommandType type_;
-  string name_;
-  string desc_;
-
-  CommandHandler(CommandType type, string name, string desc)
-      : type_(type), name_(std::move(name)), desc_(std::move(desc)) {}
-};
-
-const vector<CommandHandler> kCommandHandlers = {
-    CommandHandler(FS_TREE, "tree", "Print out a file system tree." ),
-    CommandHandler(LIST_LOGS, "list_logs",
-                   "List file system logs (optionally accepts a tablet id)."),
-    CommandHandler(LIST_TABLETS, "list_tablets", "List tablets." ),
-    CommandHandler(LIST_BLOCKS, "list_blocks",
-                   "List block for tablet (optionally accepts a tablet id).") };
-
-void PrintUsageToStream(const string& prog_name, std::ostream* out) {
-  *out << "Usage: " << prog_name << " [-verbose] "
-       << "-fs_wal_dir <dir> -fs_data_dirs <dirs> <command> [option] "
-       << std::endl << std::endl
-       << "Commands: " << std::endl;
-  for (const CommandHandler& handler : kCommandHandlers) {
-    *out << handler.name_ << ": " << handler.desc_ << std::endl;
-  }
-}
-
-void Usage(const string& prog_name, const string& msg) {
-  std::cerr << "Error " << prog_name << ": " << msg << std::endl
-            << std::endl;
-  PrintUsageToStream(prog_name, &std::cerr);
-}
-
-bool ValidateCommand(int argc, char** argv, CommandType* out) {
-  if (argc < 2) {
-    Usage(argv[0], "At least one command must be specified!");
-    return false;
-  }
-  for (const CommandHandler& handler : kCommandHandlers) {
-    if (argv[1] == handler.name_) {
-      *out = handler.type_;
-      return true;
-    }
-  }
-  Usage("Invalid command specified ", argv[1]);
-  return false;
-}
-
-} // anonymous namespace
-
-static int FsListToolMain(int argc, char** argv) {
-  FLAGS_logtostderr = 1;
-  std::ostringstream usage_str;
-  PrintUsageToStream(argv[0], &usage_str);
-  google::SetUsageMessage(usage_str.str());
-  ParseCommandLineFlags(&argc, &argv, true);
-  InitGoogleLoggingSafe(argv[0]);
-
-  CommandType cmd;
-  if (!ValidateCommand(argc, argv, &cmd)) {
-    return 2;
-  }
-
-  FsTool fs_tool(FLAGS_verbose ? FsTool::HEADERS_ONLY : FsTool::MINIMUM);
-  CHECK_OK_PREPEND(fs_tool.Init(), "Error initializing file system tool");
-
-  switch (cmd) {
-    case FS_TREE: {
-      CHECK_OK(fs_tool.FsTree());
-      break;
-    }
-    case LIST_LOGS: {
-      if (argc > 2) {
-        CHECK_OK(fs_tool.ListLogSegmentsForTablet(argv[2]));
-      } else {
-        CHECK_OK(fs_tool.ListAllLogSegments());
-      }
-      break;
-    }
-    case LIST_TABLETS: {
-      CHECK_OK(fs_tool.ListAllTablets());
-      break;
-    }
-    case LIST_BLOCKS: {
-      if (argc > 2) {
-        CHECK_OK(fs_tool.ListBlocksForTablet(argv[2]));
-      } else {
-         CHECK_OK(fs_tool.ListBlocksForAllTablets());
-      }
-    }
-  }
-
-  return 0;
-}
-
-} // namespace tools
-} // namespace kudu
-
-int main(int argc, char** argv) {
-  return kudu::tools::FsListToolMain(argc, argv);
-}


[2/3] kudu git commit: TLS-negotiation [5/n]: Rename sasl_[client|server] to [client|server]_negotiation

Posted by ad...@apache.org.
TLS-negotiation [5/n]: Rename sasl_[client|server] to [client|server]_negotiation

This rename is in anticipation of a major refactor to both of these
classes which will make them less specific to SASL. The file rename is
kept in a separate commit in order to preserve git history.

Change-Id: Ib4fb4bcdb7c7a43eea88b2911d2b2e75eafa62b0
Reviewed-on: http://gerrit.cloudera.org:8080/5759
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/41c45523
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/41c45523
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/41c45523

Branch: refs/heads/master
Commit: 41c45523f7d34d7800fd29d4cbe40330cac13e6a
Parents: 9d0421e
Author: Dan Burkert <da...@apache.org>
Authored: Fri Jan 20 15:34:43 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Jan 23 23:37:18 2017 +0000

----------------------------------------------------------------------
 src/kudu/rpc/CMakeLists.txt        |   8 +-
 src/kudu/rpc/client_negotiation.cc | 522 +++++++++++++++++++++++++++++
 src/kudu/rpc/client_negotiation.h  | 180 ++++++++++
 src/kudu/rpc/connection.cc         |   3 +-
 src/kudu/rpc/connection.h          |   6 +-
 src/kudu/rpc/negotiation-test.cc   | 567 ++++++++++++++++++++++++++++++++
 src/kudu/rpc/negotiation.cc        |   4 +-
 src/kudu/rpc/reactor.cc            |   4 +-
 src/kudu/rpc/sasl_client.cc        | 521 -----------------------------
 src/kudu/rpc/sasl_client.h         | 180 ----------
 src/kudu/rpc/sasl_rpc-test.cc      | 567 --------------------------------
 src/kudu/rpc/sasl_server.cc        | 498 ----------------------------
 src/kudu/rpc/sasl_server.h         | 180 ----------
 src/kudu/rpc/server_negotiation.cc | 499 ++++++++++++++++++++++++++++
 src/kudu/rpc/server_negotiation.h  | 180 ++++++++++
 15 files changed, 1960 insertions(+), 1959 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 6ba028b..fa21551 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -43,12 +43,13 @@ ADD_EXPORTABLE_LIBRARY(rpc_introspection_proto
 set(KRPC_SRCS
     acceptor_pool.cc
     blocking_ops.cc
-    outbound_call.cc
+    client_negotiation.cc
     connection.cc
     constants.cc
     inbound_call.cc
     messenger.cc
     negotiation.cc
+    outbound_call.cc
     proxy.cc
     reactor.cc
     remote_method.cc
@@ -59,10 +60,9 @@ set(KRPC_SRCS
     rpc_controller.cc
     rpcz_store.cc
     sasl_common.cc
-    sasl_client.cc
     sasl_helper.cc
-    sasl_server.cc
     serialization.cc
+    server_negotiation.cc
     service_if.cc
     service_pool.cc
     service_queue.cc
@@ -116,11 +116,11 @@ target_link_libraries(rtest_krpc
 set(KUDU_TEST_LINK_LIBS rtest_krpc krpc rpc_header_proto security-test ${KUDU_MIN_TEST_LIBS})
 ADD_KUDU_TEST(exactly_once_rpc-test)
 ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true)
+ADD_KUDU_TEST(negotiation-test)
 ADD_KUDU_TEST(reactor-test)
 ADD_KUDU_TEST(request_tracker-test)
 ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
 ADD_KUDU_TEST(rpc-test)
 ADD_KUDU_TEST(rpc_stub-test)
-ADD_KUDU_TEST(sasl_rpc-test)
 ADD_KUDU_TEST(service_queue-test)
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
new file mode 100644
index 0000000..0eb96b8
--- /dev/null
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -0,0 +1,522 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/client_negotiation.h"
+
+#include <string.h>
+
+#include <map>
+#include <set>
+#include <string>
+
+#include <glog/logging.h>
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/rpc/blocking_ops.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/trace.h"
+
+namespace kudu {
+namespace rpc {
+
+using std::map;
+using std::set;
+using std::string;
+
+static int SaslClientGetoptCb(void* sasl_client, const char* plugin_name, const char* option,
+                       const char** result, unsigned* len) {
+  return static_cast<SaslClient*>(sasl_client)
+    ->GetOptionCb(plugin_name, option, result, len);
+}
+
+static int SaslClientSimpleCb(void *sasl_client, int id,
+                       const char **result, unsigned *len) {
+  return static_cast<SaslClient*>(sasl_client)->SimpleCb(id, result, len);
+}
+
+static int SaslClientSecretCb(sasl_conn_t* conn, void *sasl_client, int id,
+                       sasl_secret_t** psecret) {
+  return static_cast<SaslClient*>(sasl_client)->SecretCb(conn, id, psecret);
+}
+
+// Return an appropriately-typed Status object based on an ErrorStatusPB returned
+// from an Error RPC.
+// In case there is no relevant Status type, return a RuntimeError.
+static Status StatusFromRpcError(const ErrorStatusPB& error) {
+  DCHECK(error.IsInitialized()) << "Error status PB must be initialized";
+  if (PREDICT_FALSE(!error.has_code())) {
+    return Status::RuntimeError(error.message());
+  }
+  string code_name = ErrorStatusPB::RpcErrorCodePB_Name(error.code());
+  switch (error.code()) {
+    case ErrorStatusPB_RpcErrorCodePB_FATAL_UNAUTHORIZED:
+      return Status::NotAuthorized(code_name, error.message());
+    default:
+      return Status::RuntimeError(code_name, error.message());
+  }
+}
+
+SaslClient::SaslClient(string app_name, Socket* socket)
+    : app_name_(std::move(app_name)),
+      sock_(socket),
+      helper_(SaslHelper::CLIENT),
+      client_state_(SaslNegotiationState::NEW),
+      negotiated_mech_(SaslMechanism::INVALID),
+      deadline_(MonoTime::Max()) {
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
+      reinterpret_cast<int (*)()>(&SaslClientGetoptCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_AUTHNAME,
+      reinterpret_cast<int (*)()>(&SaslClientSimpleCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_PASS,
+      reinterpret_cast<int (*)()>(&SaslClientSecretCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr));
+}
+
+Status SaslClient::EnablePlain(const string& user, const string& pass) {
+  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
+  RETURN_NOT_OK(helper_.EnablePlain());
+  plain_auth_user_ = user;
+  plain_pass_ = pass;
+  return Status::OK();
+}
+
+Status SaslClient::EnableGSSAPI() {
+  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
+  return helper_.EnableGSSAPI();
+}
+
+SaslMechanism::Type SaslClient::negotiated_mechanism() const {
+  DCHECK_EQ(client_state_, SaslNegotiationState::NEGOTIATED);
+  return negotiated_mech_;
+}
+
+void SaslClient::set_local_addr(const Sockaddr& addr) {
+  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
+  helper_.set_local_addr(addr);
+}
+
+void SaslClient::set_remote_addr(const Sockaddr& addr) {
+  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
+  helper_.set_remote_addr(addr);
+}
+
+void SaslClient::set_server_fqdn(const string& domain_name) {
+  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
+  helper_.set_server_fqdn(domain_name);
+}
+
+void SaslClient::set_deadline(const MonoTime& deadline) {
+  DCHECK_NE(client_state_, SaslNegotiationState::NEGOTIATED);
+  deadline_ = deadline;
+}
+
+// calls sasl_client_init() and sasl_client_new()
+Status SaslClient::Init(const string& service_type) {
+  RETURN_NOT_OK(SaslInit(app_name_.c_str()));
+
+  // Ensure we are not called more than once.
+  if (client_state_ != SaslNegotiationState::NEW) {
+    return Status::IllegalState("Init() may only be called once per SaslClient object.");
+  }
+
+  // TODO(unknown): Support security flags.
+  unsigned secflags = 0;
+
+  sasl_conn_t* sasl_conn = nullptr;
+  Status s = WrapSaslCall(nullptr /* no conn */, [&]() {
+      return sasl_client_new(
+          service_type.c_str(),         // Registered name of the service using SASL. Required.
+          helper_.server_fqdn(),        // The fully qualified domain name of the remote server.
+          helper_.local_addr_string(),  // Local and remote IP address strings. (NULL disables
+          helper_.remote_addr_string(), //   mechanisms which require this info.)
+          &callbacks_[0],               // Connection-specific callbacks.
+          secflags,                     // Security flags.
+          &sasl_conn);
+    });
+  if (!s.ok()) {
+    return Status::RuntimeError("Unable to create new SASL client",
+                                s.message());
+  }
+  sasl_conn_.reset(sasl_conn);
+
+  client_state_ = SaslNegotiationState::INITIALIZED;
+  return Status::OK();
+}
+
+Status SaslClient::Negotiate() {
+  // After negotiation, we no longer need the SASL library object, so
+  // may as well free its memory since the connection may be long-lived.
+  // Additionally, this works around a SEGV seen at process shutdown time:
+  // if we still have SASL objects retained by Reactor when the process
+  // is exiting, the SASL libraries may start destructing global state
+  // and cause a crash when we sasl_dispose the connection.
+  auto cleanup = MakeScopedCleanup([&]() {
+      sasl_conn_.reset();
+    });
+  TRACE("Called SaslClient::Negotiate()");
+
+  // Ensure we called exactly once, and in the right order.
+  if (client_state_ == SaslNegotiationState::NEW) {
+    return Status::IllegalState("SaslClient: Init() must be called before calling Negotiate()");
+  }
+  if (client_state_ == SaslNegotiationState::NEGOTIATED) {
+    return Status::IllegalState("SaslClient: Negotiate() may only be called once per object.");
+  }
+
+  // Ensure we can use blocking calls on the socket during negotiation.
+  RETURN_NOT_OK(EnsureBlockingMode(sock_));
+
+  // Start by asking the server for a list of available auth mechanisms.
+  RETURN_NOT_OK(SendNegotiateMessage());
+
+  faststring recv_buf;
+  nego_ok_ = false;
+
+  // We set nego_ok_ = true when the SASL library returns SASL_OK to us.
+  // We set nego_response_expected_ = true each time we send a request to the server.
+  while (!nego_ok_ || nego_response_expected_) {
+    ResponseHeader header;
+    Slice param_buf;
+    RETURN_NOT_OK(ReceiveFramedMessageBlocking(sock_, &recv_buf, &header, &param_buf, deadline_));
+    nego_response_expected_ = false;
+
+    NegotiatePB response;
+    RETURN_NOT_OK(ParseNegotiatePB(header, param_buf, &response));
+
+    switch (response.step()) {
+      // NEGOTIATE: Server has sent us its list of supported SASL mechanisms.
+      case NegotiatePB::NEGOTIATE:
+        RETURN_NOT_OK(HandleNegotiateResponse(response));
+        break;
+
+      // SASL_CHALLENGE: Server sent us a follow-up to an SASL_INITIATE or SASL_RESPONSE request.
+      case NegotiatePB::SASL_CHALLENGE:
+        RETURN_NOT_OK(HandleChallengeResponse(response));
+        break;
+
+      // SASL_SUCCESS: Server has accepted our authentication request. Negotiation successful.
+      case NegotiatePB::SASL_SUCCESS:
+        RETURN_NOT_OK(HandleSuccessResponse(response));
+        break;
+
+      // Client sent us some unsupported SASL response.
+      default:
+        LOG(ERROR) << "SASL Client: Received unsupported response from server";
+        return Status::InvalidArgument("RPC client doesn't support Negotiate step",
+                                       NegotiatePB::NegotiateStep_Name(response.step()));
+    }
+  }
+
+  TRACE("SASL Client: Successful negotiation");
+  client_state_ = SaslNegotiationState::NEGOTIATED;
+  return Status::OK();
+}
+
+Status SaslClient::SendNegotiatePB(const NegotiatePB& msg) {
+  DCHECK_NE(client_state_, SaslNegotiationState::NEW)
+      << "Must not send Negotiate messages before calling Init()";
+  DCHECK_NE(client_state_, SaslNegotiationState::NEGOTIATED)
+      << "Must not send Negotiate messages after negotiation succeeds";
+
+  // Create header with SASL-specific callId
+  RequestHeader header;
+  header.set_call_id(kNegotiateCallId);
+  return helper_.SendNegotiatePB(sock_, header, msg, deadline_);
+}
+
+Status SaslClient::ParseNegotiatePB(const ResponseHeader& header,
+                                    const Slice& param_buf,
+                                    NegotiatePB* response) {
+  RETURN_NOT_OK(helper_.SanityCheckNegotiateCallId(header.call_id()));
+
+  if (header.is_error()) {
+    return ParseError(param_buf);
+  }
+
+  return helper_.ParseNegotiatePB(param_buf, response);
+}
+
+Status SaslClient::SendNegotiateMessage() {
+  NegotiatePB msg;
+  msg.set_step(NegotiatePB::NEGOTIATE);
+
+  // Advertise our supported features.
+  for (RpcFeatureFlag feature : kSupportedClientRpcFeatureFlags) {
+    msg.add_supported_features(feature);
+  }
+
+  TRACE("SASL Client: Sending NEGOTIATE request to server.");
+  RETURN_NOT_OK(SendNegotiatePB(msg));
+  nego_response_expected_ = true;
+  return Status::OK();
+}
+
+Status SaslClient::SendInitiateMessage(const NegotiatePB_SaslAuth& auth,
+    const char* init_msg, unsigned init_msg_len) {
+  NegotiatePB msg;
+  msg.set_step(NegotiatePB::SASL_INITIATE);
+  msg.mutable_token()->assign(init_msg, init_msg_len);
+  msg.add_auths()->CopyFrom(auth);
+  TRACE("SASL Client: Sending SASL_INITIATE request to server.");
+  RETURN_NOT_OK(SendNegotiatePB(msg));
+  nego_response_expected_ = true;
+  return Status::OK();
+}
+
+Status SaslClient::SendResponseMessage(const char* resp_msg, unsigned resp_msg_len) {
+  NegotiatePB reply;
+  reply.set_step(NegotiatePB::SASL_RESPONSE);
+  reply.mutable_token()->assign(resp_msg, resp_msg_len);
+  TRACE("SASL Client: Sending SASL_RESPONSE request to server.");
+  RETURN_NOT_OK(SendNegotiatePB(reply));
+  nego_response_expected_ = true;
+  return Status::OK();
+}
+
+Status SaslClient::DoSaslStep(const string& in, const char** out, unsigned* out_len) {
+  TRACE("SASL Client: Calling sasl_client_step()");
+  Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
+      return sasl_client_step(sasl_conn_.get(), in.c_str(), in.length(), nullptr, out, out_len);
+    });
+  if (s.ok()) {
+    nego_ok_ = true;
+  }
+  return s;
+}
+
+Status SaslClient::HandleNegotiateResponse(const NegotiatePB& response) {
+  TRACE("SASL Client: Received NEGOTIATE response from server");
+  // Fill in the set of features supported by the server.
+  for (int flag : response.supported_features()) {
+    // We only add the features that our local build knows about.
+    RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
+                                  static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
+    if (ContainsKey(kSupportedClientRpcFeatureFlags, feature_flag)) {
+      server_features_.insert(feature_flag);
+    }
+  }
+
+  // Build a map of the mechanisms offered by the server.
+  const set<string>& local_mechs = helper_.LocalMechs();
+  set<string> server_mechs;
+  map<string, NegotiatePB::SaslAuth> server_mech_map;
+  for (const NegotiatePB::SaslAuth& auth : response.auths()) {
+    const auto& mech = auth.mechanism();
+    server_mech_map[mech] = auth;
+    server_mechs.insert(mech);
+  }
+  // Determine which server mechs are also enabled by the client.
+  // Cyrus SASL 2.1.25 and later supports doing this set intersection via
+  // the 'client_mech_list' option, but that version is not available on
+  // RHEL 6, so we have to do it manually.
+  set<string> matching_mechs = STLSetIntersection(local_mechs, server_mechs);
+
+  if (matching_mechs.empty() &&
+      ContainsKey(server_mechs, kSaslMechGSSAPI) &&
+      !ContainsKey(local_mechs, kSaslMechGSSAPI)) {
+    return Status::NotAuthorized("server requires GSSAPI (Kerberos) authentication and "
+                                 "client was missing the required SASL module");
+  }
+
+  string matching_mechs_str = JoinElements(matching_mechs, " ");
+  TRACE("SASL Client: Matching mech list: $0", matching_mechs_str);
+
+  const char* init_msg = nullptr;
+  unsigned init_msg_len = 0;
+  const char* negotiated_mech = nullptr;
+
+  /* select a mechanism for a connection
+   *  mechlist      -- mechanisms server has available (punctuation ignored)
+   * output:
+   *  prompt_need   -- on SASL_INTERACT, list of prompts needed to continue
+   *  clientout     -- the initial client response to send to the server
+   *  mech          -- set to mechanism name
+   *
+   * Returns:
+   *  SASL_OK       -- success
+   *  SASL_CONTINUE -- negotiation required
+   *  SASL_NOMEM    -- not enough memory
+   *  SASL_NOMECH   -- no mechanism meets requested properties
+   *  SASL_INTERACT -- user interaction needed to fill in prompt_need list
+   */
+  TRACE("SASL Client: Calling sasl_client_start()");
+  Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
+      return sasl_client_start(
+          sasl_conn_.get(),           // The SASL connection context created by init()
+          matching_mechs_str.c_str(), // The list of mechanisms to negotiate.
+          nullptr,                    // Disables INTERACT return if NULL.
+          &init_msg,                  // Filled in on success.
+          &init_msg_len,              // Filled in on success.
+          &negotiated_mech);          // Filled in on success.
+    });
+
+  if (s.ok()) {
+    nego_ok_ = true;
+  } else if (!s.IsIncomplete()) {
+    return s;
+  }
+
+  // The server matched one of our mechanisms.
+  NegotiatePB::SaslAuth* auth = FindOrNull(server_mech_map, negotiated_mech);
+  if (PREDICT_FALSE(auth == nullptr)) {
+    return Status::IllegalState("Unable to find auth in map, unexpected error", negotiated_mech);
+  }
+  negotiated_mech_ = SaslMechanism::value_of(negotiated_mech);
+
+  RETURN_NOT_OK(SendInitiateMessage(*auth, init_msg, init_msg_len));
+  return Status::OK();
+}
+
+Status SaslClient::HandleChallengeResponse(const NegotiatePB& response) {
+  TRACE("SASL Client: Received SASL_CHALLENGE response from server");
+  if (PREDICT_FALSE(nego_ok_)) {
+    LOG(DFATAL) << "Server sent SASL_CHALLENGE response after client library returned SASL_OK";
+  }
+
+  if (PREDICT_FALSE(!response.has_token())) {
+    return Status::InvalidArgument("No token in SASL_CHALLENGE response from server");
+  }
+
+  const char* out = nullptr;
+  unsigned out_len = 0;
+  Status s = DoSaslStep(response.token(), &out, &out_len);
+  if (!s.ok() && !s.IsIncomplete()) {
+    return s;
+  }
+  RETURN_NOT_OK(SendResponseMessage(out, out_len));
+  return Status::OK();
+}
+
+Status SaslClient::HandleSuccessResponse(const NegotiatePB& response) {
+  TRACE("SASL Client: Received SASL_SUCCESS response from server");
+  if (!nego_ok_) {
+    const char* out = nullptr;
+    unsigned out_len = 0;
+    Status s = DoSaslStep(response.token(), &out, &out_len);
+    if (s.IsIncomplete()) {
+      return Status::IllegalState("Server indicated successful authentication, but client "
+                                  "was not complete");
+    }
+    RETURN_NOT_OK(s);
+    if (out_len > 0) {
+      return Status::IllegalState("SASL client library generated spurious token after SASL_SUCCESS",
+          string(out, out_len));
+    }
+    CHECK(nego_ok_);
+  }
+  return Status::OK();
+}
+
+// Parse error status message from raw bytes of an ErrorStatusPB.
+Status SaslClient::ParseError(const Slice& err_data) {
+  ErrorStatusPB error;
+  if (!error.ParseFromArray(err_data.data(), err_data.size())) {
+    return Status::IOError("Invalid error response, missing fields",
+        error.InitializationErrorString());
+  }
+  Status s = StatusFromRpcError(error);
+  TRACE("SASL Client: Received error response from server: $0", s.ToString());
+  return s;
+}
+
+int SaslClient::GetOptionCb(const char* plugin_name, const char* option,
+                            const char** result, unsigned* len) {
+  return helper_.GetOptionCb(plugin_name, option, result, len);
+}
+
+// Used for PLAIN.
+// SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
+int SaslClient::SimpleCb(int id, const char** result, unsigned* len) {
+  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+    LOG(DFATAL) << "SASL Client: Simple callback called, but PLAIN auth is not enabled";
+    return SASL_FAIL;
+  }
+  if (PREDICT_FALSE(result == nullptr)) {
+    LOG(DFATAL) << "SASL Client: result outparam is NULL";
+    return SASL_BADPARAM;
+  }
+  switch (id) {
+    // TODO(unknown): Support impersonation?
+    // For impersonation, USER is the impersonated user, AUTHNAME is the "sudoer".
+    case SASL_CB_USER:
+      TRACE("SASL Client: callback for SASL_CB_USER");
+      *result = plain_auth_user_.c_str();
+      if (len != nullptr) *len = plain_auth_user_.length();
+      break;
+    case SASL_CB_AUTHNAME:
+      TRACE("SASL Client: callback for SASL_CB_AUTHNAME");
+      *result = plain_auth_user_.c_str();
+      if (len != nullptr) *len = plain_auth_user_.length();
+      break;
+    case SASL_CB_LANGUAGE:
+      LOG(DFATAL) << "SASL Client: Unable to handle SASL callback type SASL_CB_LANGUAGE"
+        << "(" << id << ")";
+      return SASL_BADPARAM;
+    default:
+      LOG(DFATAL) << "SASL Client: Unexpected SASL callback type: " << id;
+      return SASL_BADPARAM;
+  }
+
+  return SASL_OK;
+}
+
+// Used for PLAIN.
+// SASL callback for SASL_CB_PASS: User password.
+int SaslClient::SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret) {
+  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+    LOG(DFATAL) << "SASL Client: Plain secret callback called, but PLAIN auth is not enabled";
+    return SASL_FAIL;
+  }
+  switch (id) {
+    case SASL_CB_PASS: {
+      if (!conn || !psecret) return SASL_BADPARAM;
+
+      size_t len = plain_pass_.length();
+      *psecret = reinterpret_cast<sasl_secret_t*>(malloc(sizeof(sasl_secret_t) + len));
+      if (!*psecret) {
+        return SASL_NOMEM;
+      }
+      psecret_.reset(*psecret);  // Ensure that we free() this structure later.
+      (*psecret)->len = len;
+      memcpy((*psecret)->data, plain_pass_.c_str(), len + 1);
+      break;
+    }
+    default:
+      LOG(DFATAL) << "SASL Client: Unexpected SASL callback type: " << id;
+      return SASL_BADPARAM;
+  }
+
+  return SASL_OK;
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/client_negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.h b/src/kudu/rpc/client_negotiation.h
new file mode 100644
index 0000000..2f01e56
--- /dev/null
+++ b/src/kudu/rpc/client_negotiation.h
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_SASL_CLIENT_H
+#define KUDU_RPC_SASL_CLIENT_H
+
+#include <set>
+#include <string>
+#include <vector>
+
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+using std::string;
+
+class NegotiatePB;
+class NegotiatePB_SaslAuth;
+class ResponseHeader;
+
+// Class for doing SASL negotiation with a SaslServer over a bidirectional socket.
+// Operations on this class are NOT thread-safe.
+class SaslClient {
+ public:
+  // Does not take ownership of the socket indicated by the fd.
+  SaslClient(string app_name, Socket* socket);
+
+  // Enable PLAIN authentication.
+  // Must be called after Init().
+  Status EnablePlain(const string& user, const string& pass);
+
+  // Enable GSSAPI authentication.
+  // Call after Init().
+  Status EnableGSSAPI();
+
+  // Returns mechanism negotiated by this connection.
+  // Must be called after Negotiate().
+  SaslMechanism::Type negotiated_mechanism() const;
+
+  // Returns the set of RPC system features supported by the remote server.
+  // Must be called after Negotiate().
+  const std::set<RpcFeatureFlag>& server_features() const {
+    return server_features_;
+  }
+
+  // Specify IP:port of local side of connection.
+  // Must be called before Init(). Required for some mechanisms.
+  void set_local_addr(const Sockaddr& addr);
+
+  // Specify IP:port of remote side of connection.
+  // Must be called before Init(). Required for some mechanisms.
+  void set_remote_addr(const Sockaddr& addr);
+
+  // Specify the fully-qualified domain name of the remote server.
+  // Must be called before Init(). Required for some mechanisms.
+  void set_server_fqdn(const string& domain_name);
+
+  // Set deadline for connection negotiation.
+  void set_deadline(const MonoTime& deadline);
+
+  // Get deadline for connection negotiation.
+  const MonoTime& deadline() const { return deadline_; }
+
+  // Initialize a new SASL client. Must be called before Negotiate().
+  // Returns OK on success, otherwise RuntimeError.
+  Status Init(const string& service_type);
+
+  // Begin negotiation with the SASL server on the other side of the fd socket
+  // that this client was constructed with.
+  // Returns OK on success.
+  // Otherwise, it may return NotAuthorized, NotSupported, or another non-OK status.
+  Status Negotiate();
+
+  // SASL callback for plugin options, supported mechanisms, etc.
+  // Returns SASL_FAIL if the option is not handled, which does not fail the handshake.
+  int GetOptionCb(const char* plugin_name, const char* option,
+                  const char** result, unsigned* len);
+
+  // SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
+  int SimpleCb(int id, const char** result, unsigned* len);
+
+  // SASL callback for SASL_CB_PASS
+  int SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret);
+
+ private:
+  // Encode and send the specified negotiate message to the server.
+  Status SendNegotiatePB(const NegotiatePB& msg);
+
+  // Validate that header does not indicate an error, parse param_buf into response.
+  Status ParseNegotiatePB(const ResponseHeader& header,
+                          const Slice& param_buf,
+                          NegotiatePB* response);
+
+  // Send an NEGOTIATE message to the server.
+  Status SendNegotiateMessage();
+
+  // Send an SASL_INITIATE message to the server.
+  Status SendInitiateMessage(const NegotiatePB_SaslAuth& auth,
+                             const char* init_msg, unsigned init_msg_len);
+
+  // Send a RESPONSE message to the server.
+  Status SendResponseMessage(const char* resp_msg, unsigned resp_msg_len);
+
+  // Perform a client-side step of the SASL negotiation.
+  // Input is what came from the server. Output is what we will send back to the server.
+  // Returns:
+  //   Status::OK if sasl_client_step returns SASL_OK.
+  //   Status::Incomplete if sasl_client_step returns SASL_CONTINUE
+  // otherwise returns an appropriate error status.
+  Status DoSaslStep(const string& in, const char** out, unsigned* out_len);
+
+  // Handle case when server sends NEGOTIATE response.
+  Status HandleNegotiateResponse(const NegotiatePB& response);
+
+  // Handle case when server sends CHALLENGE response.
+  Status HandleChallengeResponse(const NegotiatePB& response);
+
+  // Handle case when server sends SUCCESS response.
+  Status HandleSuccessResponse(const NegotiatePB& response);
+
+  // Parse error status message from raw bytes of an ErrorStatusPB.
+  Status ParseError(const Slice& err_data);
+
+  string app_name_;
+  Socket* sock_;
+  std::vector<sasl_callback_t> callbacks_;
+  // The SASL connection object. This is initialized in Init() and
+  // freed after Negotiate() completes (regardless whether it was successful).
+  gscoped_ptr<sasl_conn_t, SaslDeleter> sasl_conn_;
+  SaslHelper helper_;
+
+  string plain_auth_user_;
+  string plain_pass_;
+  gscoped_ptr<sasl_secret_t, FreeDeleter> psecret_;
+
+  // The set of features supported by the server.
+  std::set<RpcFeatureFlag> server_features_;
+
+  SaslNegotiationState::Type client_state_;
+
+  // The mechanism we negotiated with the server.
+  SaslMechanism::Type negotiated_mech_;
+
+  // Intra-negotiation state.
+  bool nego_ok_;  // During negotiation: did we get a SASL_OK response from the SASL library?
+  bool nego_response_expected_;  // During negotiation: Are we waiting for a server response?
+
+  // Negotiation timeout deadline.
+  MonoTime deadline_;
+
+  DISALLOW_COPY_AND_ASSIGN(SaslClient);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif  // KUDU_RPC_SASL_CLIENT_H

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index c7a87e9..2abf6ca 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -30,14 +30,13 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/client_negotiation.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/reactor.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
-#include "kudu/rpc/sasl_client.h"
-#include "kudu/rpc/sasl_server.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/security/ssl_factory.h"
 #include "kudu/security/ssl_socket.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index bb2aa94..ef70483 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -30,10 +30,10 @@
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/rpc/outbound_call.h"
-#include "kudu/rpc/sasl_client.h"
-#include "kudu/rpc/sasl_server.h"
+#include "kudu/rpc/client_negotiation.h"
 #include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/server_negotiation.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/negotiation-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation-test.cc b/src/kudu/rpc/negotiation-test.cc
new file mode 100644
index 0000000..2bb73e4
--- /dev/null
+++ b/src/kudu/rpc/negotiation-test.cc
@@ -0,0 +1,567 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc-test-base.h"
+
+#include <stdlib.h>
+#include <sys/stat.h>
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include <gtest/gtest.h>
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/client_negotiation.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/security/test/mini_kdc.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/subprocess.h"
+
+using std::string;
+using std::thread;
+
+// HACK: MIT Kerberos doesn't have any way of determining its version number,
+// but the error messages in krb5-1.10 and earlier are broken due to
+// a bug: http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973
+//
+// Since we don't have any way to explicitly figure out the version, we just
+// look for this random macro which was added in 1.11 (the same version in which
+// the above bug was fixed).
+#ifndef KRB5_RESPONDER_QUESTION_PASSWORD
+#define KRB5_VERSION_LE_1_10
+#endif
+
+DEFINE_bool(is_test_child, false,
+            "Used by tests which require clean processes. "
+            "See TestDisableInit.");
+
+namespace kudu {
+namespace rpc {
+
+class TestSaslRpc : public RpcTestBase {
+ public:
+  virtual void SetUp() OVERRIDE {
+    RpcTestBase::SetUp();
+    ASSERT_OK(SaslInit(kSaslAppName));
+  }
+};
+
+// Test basic initialization of the objects.
+TEST_F(TestSaslRpc, TestBasicInit) {
+  SaslServer server(kSaslAppName, nullptr);
+  server.EnablePlain();
+  ASSERT_OK(server.Init(kSaslAppName));
+  SaslClient client(kSaslAppName, nullptr);
+  client.EnablePlain("test", "test");
+  ASSERT_OK(client.Init(kSaslAppName));
+}
+
+// A "Callable" that takes a Socket* param, for use with starting a thread.
+// Can be used for SaslServer or SaslClient threads.
+typedef std::function<void(Socket*)> SocketCallable;
+
+// Call Accept() on the socket, then pass the connection to the server runner
+static void RunAcceptingDelegator(Socket* acceptor,
+                                  const SocketCallable& server_runner) {
+  Socket conn;
+  Sockaddr remote;
+  CHECK_OK(acceptor->Accept(&conn, &remote, 0));
+  server_runner(&conn);
+}
+
+// Set up a socket and run a SASL negotiation.
+static void RunNegotiationTest(const SocketCallable& server_runner,
+                               const SocketCallable& client_runner) {
+  Socket server_sock;
+  CHECK_OK(server_sock.Init(0));
+  ASSERT_OK(server_sock.BindAndListen(Sockaddr(), 1));
+  Sockaddr server_bind_addr;
+  ASSERT_OK(server_sock.GetSocketAddress(&server_bind_addr));
+  thread server(RunAcceptingDelegator, &server_sock, server_runner);
+
+  Socket client_sock;
+  CHECK_OK(client_sock.Init(0));
+  ASSERT_OK(client_sock.Connect(server_bind_addr));
+  thread client(client_runner, &client_sock);
+
+  LOG(INFO) << "Waiting for test threads to terminate...";
+  client.join();
+  LOG(INFO) << "Client thread terminated.";
+
+  // TODO(todd): if the client fails to negotiate, it doesn't
+  // always result in sending a nice error message to the
+  // other side.
+  client_sock.Close();
+
+  server.join();
+  LOG(INFO) << "Server thread terminated.";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void RunPlainNegotiationServer(Socket* conn) {
+  SaslServer sasl_server(kSaslAppName, conn);
+  CHECK_OK(sasl_server.EnablePlain());
+  CHECK_OK(sasl_server.Init(kSaslAppName));
+  CHECK_OK(sasl_server.Negotiate());
+  CHECK(ContainsKey(sasl_server.client_features(), APPLICATION_FEATURE_FLAGS));
+  CHECK_EQ("my-username", sasl_server.authenticated_user());
+}
+
+static void RunPlainNegotiationClient(Socket* conn) {
+  SaslClient sasl_client(kSaslAppName, conn);
+  CHECK_OK(sasl_client.EnablePlain("my-username", "ignored password"));
+  CHECK_OK(sasl_client.Init(kSaslAppName));
+  CHECK_OK(sasl_client.Negotiate());
+  CHECK(ContainsKey(sasl_client.server_features(), APPLICATION_FEATURE_FLAGS));
+}
+
+// Test SASL negotiation using the PLAIN mechanism over a socket.
+TEST_F(TestSaslRpc, TestPlainNegotiation) {
+  RunNegotiationTest(RunPlainNegotiationServer, RunPlainNegotiationClient);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+
+template<class T>
+using CheckerFunction = std::function<void(const Status&, T&)>;
+
+// Run GSSAPI negotiation from the server side. Runs
+// 'post_check' after negotiation to verify the result.
+static void RunGSSAPINegotiationServer(
+    Socket* conn,
+    const CheckerFunction<SaslServer>& post_check) {
+  SaslServer sasl_server(kSaslAppName, conn);
+  sasl_server.set_server_fqdn("127.0.0.1");
+  CHECK_OK(sasl_server.EnableGSSAPI());
+  CHECK_OK(sasl_server.Init(kSaslAppName));
+  post_check(sasl_server.Negotiate(), sasl_server);
+}
+
+// Run GSSAPI negotiation from the client side. Runs
+// 'post_check' after negotiation to verify the result.
+static void RunGSSAPINegotiationClient(
+    Socket* conn,
+    const CheckerFunction<SaslClient>& post_check) {
+  SaslClient sasl_client(kSaslAppName, conn);
+  sasl_client.set_server_fqdn("127.0.0.1");
+  CHECK_OK(sasl_client.EnableGSSAPI());
+  CHECK_OK(sasl_client.Init(kSaslAppName));
+  post_check(sasl_client.Negotiate(), sasl_client);
+}
+
+// Test configuring a client to allow but not require Kerberos/GSSAPI,
+// and connect to a server which requires Kerberos/GSSAPI.
+//
+// They should negotiate to use Kerberos/GSSAPI.
+TEST_F(TestSaslRpc, TestRestrictiveServer_NonRestrictiveClient) {
+  MiniKdc kdc;
+  ASSERT_OK(kdc.Start());
+
+  // Create the server principal and keytab.
+  string kt_path;
+  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  // Create and kinit as a client user.
+  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
+  ASSERT_OK(kdc.Kinit("testuser"));
+  ASSERT_OK(kdc.SetKrb5Environment());
+
+  // Authentication should now succeed on both sides.
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, SaslServer& server) {
+                  CHECK_OK(s);
+                  CHECK_EQ(SaslMechanism::GSSAPI, server.negotiated_mechanism());
+                  CHECK_EQ("testuser", server.authenticated_user());
+                }),
+      [](Socket* conn) {
+        SaslClient sasl_client(kSaslAppName, conn);
+        sasl_client.set_server_fqdn("127.0.0.1");
+        // The client enables both PLAIN and GSSAPI.
+        CHECK_OK(sasl_client.EnablePlain("foo", "bar"));
+        CHECK_OK(sasl_client.EnableGSSAPI());
+        CHECK_OK(sasl_client.Init(kSaslAppName));
+        CHECK_OK(sasl_client.Negotiate());
+        CHECK_EQ(SaslMechanism::GSSAPI, sasl_client.negotiated_mechanism());
+      });
+}
+
+// Test configuring a client to only support PLAIN, and a server which
+// only supports GSSAPI. This would happen, for example, if an old Kudu
+// client tries to talk to a secure-only cluster.
+TEST_F(TestSaslRpc, TestNoMatchingMechanisms) {
+  MiniKdc kdc;
+  ASSERT_OK(kdc.Start());
+
+  // Create the server principal and keytab.
+  string kt_path;
+  ASSERT_OK(kdc.CreateServiceKeytab("kudu/localhost", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, SaslServer& server) {
+                  // The client fails to find a matching mechanism and
+                  // doesn't send any failure message to the server.
+                  // Instead, it just disconnects.
+                  //
+                  // TODO(todd): this could produce a better message!
+                  ASSERT_STR_CONTAINS(s.ToString(), "got EOF from remote");
+                }),
+      [](Socket* conn) {
+        SaslClient sasl_client(kSaslAppName, conn);
+        sasl_client.set_server_fqdn("127.0.0.1");
+        // The client enables both PLAIN and GSSAPI.
+        CHECK_OK(sasl_client.EnablePlain("foo", "bar"));
+        CHECK_OK(sasl_client.Init(kSaslAppName));
+        Status s = sasl_client.Negotiate();
+        ASSERT_STR_CONTAINS(s.ToString(), "client was missing the required SASL module");
+      });
+}
+
+// Test SASL negotiation using the GSSAPI (kerberos) mechanism over a socket.
+TEST_F(TestSaslRpc, TestGSSAPINegotiation) {
+  MiniKdc kdc;
+  ASSERT_OK(kdc.Start());
+
+  // Create the server principal and keytab.
+  string kt_path;
+  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  // Create and kinit as a client user.
+  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
+  ASSERT_OK(kdc.Kinit("testuser"));
+  ASSERT_OK(kdc.SetKrb5Environment());
+
+  // Authentication should succeed on both sides.
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, SaslServer& server) {
+                  CHECK_OK(s);
+                  CHECK_EQ(SaslMechanism::GSSAPI, server.negotiated_mechanism());
+                  CHECK_EQ("testuser", server.authenticated_user());
+                }),
+      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+                [](const Status& s, SaslClient& client) {
+                  CHECK_OK(s);
+                  CHECK_EQ(SaslMechanism::GSSAPI, client.negotiated_mechanism());
+                }));
+}
+
+#ifndef __APPLE__
+// Test invalid SASL negotiations using the GSSAPI (kerberos) mechanism over a socket.
+// This test is ignored on macOS because the system Kerberos implementation
+// (Heimdal) caches the non-existence of client credentials, which causes futher
+// tests to fail.
+TEST_F(TestSaslRpc, TestGSSAPIInvalidNegotiation) {
+  MiniKdc kdc;
+  ASSERT_OK(kdc.Start());
+
+  // Try to negotiate with no krb5 credentials on either side. It should fail on both
+  // sides.
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, SaslServer& server) {
+                  // The client notices there are no credentials and
+                  // doesn't send any failure message to the server.
+                  // Instead, it just disconnects.
+                  //
+                  // TODO(todd): it might be preferable to have the server
+                  // fail to start if it has no valid keytab.
+                  CHECK(s.IsNetworkError());
+                }),
+      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+                [](const Status& s, SaslClient& client) {
+                  CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+                  CHECK_GT(s.ToString().find("No Kerberos credentials available"), 0);
+#endif
+                }));
+
+
+  // Create the server principal and keytab.
+  string kt_path;
+  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  // Try to negotiate with no krb5 credentials on the client. It should fail on both
+  // sides.
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, SaslServer& server) {
+                  // The client notices there are no credentials and
+                  // doesn't send any failure message to the server.
+                  // Instead, it just disconnects.
+                  CHECK(s.IsNetworkError());
+                }),
+      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+                [](const Status& s, SaslClient& client) {
+                  CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+                  CHECK_EQ(s.message().ToString(), "No Kerberos credentials available");
+#endif
+                }));
+
+  // Create and kinit as a client user.
+  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
+  ASSERT_OK(kdc.Kinit("testuser"));
+  ASSERT_OK(kdc.SetKrb5Environment());
+
+  // Change the server's keytab file so that it has inappropriate
+  // credentials.
+  // Authentication should now fail.
+  ASSERT_OK(kdc.CreateServiceKeytab("otherservice/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, SaslServer& server) {
+                  CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+                  ASSERT_STR_CONTAINS(s.ToString(),
+                                      "No key table entry found matching kudu/127.0.0.1");
+#endif
+                }),
+      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+                [](const Status& s, SaslClient& client) {
+                  CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+                  ASSERT_STR_CONTAINS(s.ToString(),
+                                      "No key table entry found matching kudu/127.0.0.1");
+#endif
+                }));
+}
+#endif
+
+#ifndef __APPLE__
+// Test that the pre-flight check for servers requiring Kerberos provides
+// nice error messages for missing or bad keytabs.
+//
+// This is ignored on macOS because the system Kerberos implementation does not
+// fail the preflight check when the keytab is inaccessible, probably because
+// the preflight check passes a 0-length token.
+TEST_F(TestSaslRpc, TestPreflight) {
+  // Try pre-flight with no keytab.
+  Status s = SaslServer::PreflightCheckGSSAPI(kSaslAppName);
+  ASSERT_FALSE(s.ok());
+#ifndef KRB5_VERSION_LE_1_10
+  ASSERT_STR_MATCHES(s.ToString(), "Key table file.*not found");
+#endif
+  // Try with a valid krb5 environment and keytab.
+  MiniKdc kdc;
+  ASSERT_OK(kdc.Start());
+  ASSERT_OK(kdc.SetKrb5Environment());
+  string kt_path;
+  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  ASSERT_OK(SaslServer::PreflightCheckGSSAPI(kSaslAppName));
+
+  // Try with an inaccessible keytab.
+  CHECK_ERR(chmod(kt_path.c_str(), 0000));
+  s = SaslServer::PreflightCheckGSSAPI(kSaslAppName);
+  ASSERT_FALSE(s.ok());
+#ifndef KRB5_VERSION_LE_1_10
+  ASSERT_STR_MATCHES(s.ToString(), "error accessing keytab: Permission denied");
+#endif
+  CHECK_ERR(unlink(kt_path.c_str()));
+
+  // Try with a keytab that has the wrong credentials.
+  ASSERT_OK(kdc.CreateServiceKeytab("wrong-service/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+  s = SaslServer::PreflightCheckGSSAPI(kSaslAppName);
+  ASSERT_FALSE(s.ok());
+#ifndef KRB5_VERSION_LE_1_10
+  ASSERT_STR_MATCHES(s.ToString(), "No key table entry found matching kudu/.*");
+#endif
+}
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void RunTimeoutExpectingServer(Socket* conn) {
+  SaslServer sasl_server(kSaslAppName, conn);
+  CHECK_OK(sasl_server.EnablePlain());
+  CHECK_OK(sasl_server.Init(kSaslAppName));
+  Status s = sasl_server.Negotiate();
+  ASSERT_TRUE(s.IsNetworkError()) << "Expected client to time out and close the connection. Got: "
+      << s.ToString();
+}
+
+static void RunTimeoutNegotiationClient(Socket* sock) {
+  SaslClient sasl_client(kSaslAppName, sock);
+  CHECK_OK(sasl_client.EnablePlain("test", "test"));
+  CHECK_OK(sasl_client.Init(kSaslAppName));
+  MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
+  sasl_client.set_deadline(deadline);
+  Status s = sasl_client.Negotiate();
+  ASSERT_TRUE(s.IsTimedOut()) << "Expected timeout! Got: " << s.ToString();
+  CHECK_OK(sock->Shutdown(true, true));
+}
+
+// Ensure that the client times out.
+TEST_F(TestSaslRpc, TestClientTimeout) {
+  RunNegotiationTest(RunTimeoutExpectingServer, RunTimeoutNegotiationClient);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void RunTimeoutNegotiationServer(Socket* sock) {
+  SaslServer sasl_server(kSaslAppName, sock);
+  CHECK_OK(sasl_server.EnablePlain());
+  CHECK_OK(sasl_server.Init(kSaslAppName));
+  MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
+  sasl_server.set_deadline(deadline);
+  Status s = sasl_server.Negotiate();
+  ASSERT_TRUE(s.IsTimedOut()) << "Expected timeout! Got: " << s.ToString();
+  CHECK_OK(sock->Close());
+}
+
+static void RunTimeoutExpectingClient(Socket* conn) {
+  SaslClient sasl_client(kSaslAppName, conn);
+  CHECK_OK(sasl_client.EnablePlain("test", "test"));
+  CHECK_OK(sasl_client.Init(kSaslAppName));
+  Status s = sasl_client.Negotiate();
+  ASSERT_TRUE(s.IsNetworkError()) << "Expected server to time out and close the connection. Got: "
+      << s.ToString();
+}
+
+// Ensure that the server times out.
+TEST_F(TestSaslRpc, TestServerTimeout) {
+  RunNegotiationTest(RunTimeoutNegotiationServer, RunTimeoutExpectingClient);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// This suite of tests ensure that applications that embed the Kudu client are
+// able to externally handle the initialization of SASL. See KUDU-1749 and
+// IMPALA-4497 for context.
+//
+// The tests are a bit tricky because the initialization of SASL is static state
+// that we can't easily clear/reset between test cases. So, each test invokes
+// itself as a subprocess with the appropriate --gtest_filter line as well as a
+// special flag to indicate that it is the test child running.
+class TestDisableInit : public KuduTest {
+ protected:
+  // Run the lambda 'f' in a newly-started process, capturing its stderr
+  // into 'stderr'.
+  template<class TestFunc>
+  void DoTest(const TestFunc& f, string* stderr = nullptr) {
+    if (FLAGS_is_test_child) {
+      f();
+      return;
+    }
+
+    // Invoke the currently-running test case in a new subprocess.
+    string filter_flag = strings::Substitute("--gtest_filter=$0.$1",
+                                             CURRENT_TEST_CASE_NAME(), CURRENT_TEST_NAME());
+    string executable_path;
+    CHECK_OK(env_->GetExecutablePath(&executable_path));
+    string stdout;
+    Status s = Subprocess::Call({ executable_path, "test", filter_flag, "--is_test_child" },
+                                "" /* stdin */,
+                                &stdout,
+                                stderr);
+    ASSERT_TRUE(s.ok()) << "Test failed: " << stdout;
+  }
+};
+
+// Test disabling SASL but not actually properly initializing it before usage.
+TEST_F(TestDisableInit, TestDisableSasl_NotInitialized) {
+  DoTest([]() {
+      CHECK_OK(DisableSaslInitialization());
+      Status s = SaslInit("kudu");
+      ASSERT_STR_CONTAINS(s.ToString(), "was disabled, but SASL was not externally initialized");
+    });
+}
+
+// Test disabling SASL with proper initialization by some other app.
+TEST_F(TestDisableInit, TestDisableSasl_Good) {
+  DoTest([]() {
+      rpc::internal::SaslSetMutex();
+      sasl_client_init(NULL);
+      CHECK_OK(DisableSaslInitialization());
+      ASSERT_OK(SaslInit("kudu"));
+    });
+}
+
+// Test a client which inits SASL itself but doesn't remember to disable Kudu's
+// SASL initialization.
+TEST_F(TestDisableInit, TestMultipleSaslInit) {
+  string stderr;
+  DoTest([]() {
+      rpc::internal::SaslSetMutex();
+      sasl_client_init(NULL);
+      ASSERT_OK(SaslInit("kudu"));
+    }, &stderr);
+  // If we are the parent, we should see the warning from the child that it automatically
+  // skipped initialization because it detected that it was already initialized.
+  if (!FLAGS_is_test_child) {
+    ASSERT_STR_CONTAINS(stderr, "Skipping initialization");
+  }
+}
+
+// We are not able to detect mutexes not being set with the macOS version of libsasl.
+#ifndef __APPLE__
+// Test disabling SASL but not remembering to initialize the SASL mutex support. This
+// should succeed but generate a warning.
+TEST_F(TestDisableInit, TestDisableSasl_NoMutexImpl) {
+  string stderr;
+  DoTest([]() {
+      sasl_client_init(NULL);
+      CHECK_OK(DisableSaslInitialization());
+      ASSERT_OK(SaslInit("kudu"));
+    }, &stderr);
+  // If we are the parent, we should see the warning from the child.
+  if (!FLAGS_is_test_child) {
+    ASSERT_STR_CONTAINS(stderr, "not provided with a mutex implementation");
+  }
+}
+
+// Test a client which inits SASL itself but doesn't remember to disable Kudu's
+// SASL initialization.
+TEST_F(TestDisableInit, TestMultipleSaslInit_NoMutexImpl) {
+  string stderr;
+  DoTest([]() {
+      sasl_client_init(NULL);
+      ASSERT_OK(SaslInit("kudu"));
+    }, &stderr);
+  // If we are the parent, we should see the warning from the child that it automatically
+  // skipped initialization because it detected that it was already initialized.
+  if (!FLAGS_is_test_child) {
+    ASSERT_STR_CONTAINS(stderr, "Skipping initialization");
+    ASSERT_STR_CONTAINS(stderr, "not provided with a mutex implementation");
+  }
+}
+#endif
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc
index e42a884..859016c 100644
--- a/src/kudu/rpc/negotiation.cc
+++ b/src/kudu/rpc/negotiation.cc
@@ -28,12 +28,12 @@
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/blocking_ops.h"
+#include "kudu/rpc/client_negotiation.h"
 #include "kudu/rpc/connection.h"
 #include "kudu/rpc/reactor.h"
 #include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/sasl_client.h"
 #include "kudu/rpc/sasl_common.h"
-#include "kudu/rpc/sasl_server.h"
+#include "kudu/rpc/server_negotiation.h"
 #include "kudu/util/errno.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/status.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 0b2b19b..f178a2d 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -31,13 +31,13 @@
 
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
+#include "kudu/rpc/client_negotiation.h"
 #include "kudu/rpc/connection.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/negotiation.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
-#include "kudu/rpc/sasl_client.h"
-#include "kudu/rpc/sasl_server.h"
+#include "kudu/rpc/server_negotiation.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/security/ssl_factory.h"
 #include "kudu/security/ssl_socket.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/sasl_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_client.cc b/src/kudu/rpc/sasl_client.cc
deleted file mode 100644
index c88e3cc..0000000
--- a/src/kudu/rpc/sasl_client.cc
+++ /dev/null
@@ -1,521 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/rpc/sasl_client.h"
-
-#include <string.h>
-
-#include <map>
-#include <set>
-#include <string>
-
-#include <glog/logging.h>
-#include <sasl/sasl.h>
-
-#include "kudu/gutil/endian.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/stringprintf.h"
-#include "kudu/gutil/strings/join.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/strings/util.h"
-#include "kudu/rpc/blocking_ops.h"
-#include "kudu/rpc/constants.h"
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/sasl_common.h"
-#include "kudu/rpc/sasl_helper.h"
-#include "kudu/rpc/serialization.h"
-#include "kudu/util/faststring.h"
-#include "kudu/util/net/sockaddr.h"
-#include "kudu/util/net/socket.h"
-#include "kudu/util/scoped_cleanup.h"
-#include "kudu/util/trace.h"
-
-namespace kudu {
-namespace rpc {
-
-using std::map;
-using std::set;
-using std::string;
-
-static int SaslClientGetoptCb(void* sasl_client, const char* plugin_name, const char* option,
-                       const char** result, unsigned* len) {
-  return static_cast<SaslClient*>(sasl_client)
-    ->GetOptionCb(plugin_name, option, result, len);
-}
-
-static int SaslClientSimpleCb(void *sasl_client, int id,
-                       const char **result, unsigned *len) {
-  return static_cast<SaslClient*>(sasl_client)->SimpleCb(id, result, len);
-}
-
-static int SaslClientSecretCb(sasl_conn_t* conn, void *sasl_client, int id,
-                       sasl_secret_t** psecret) {
-  return static_cast<SaslClient*>(sasl_client)->SecretCb(conn, id, psecret);
-}
-
-// Return an appropriately-typed Status object based on an ErrorStatusPB returned
-// from an Error RPC.
-// In case there is no relevant Status type, return a RuntimeError.
-static Status StatusFromRpcError(const ErrorStatusPB& error) {
-  DCHECK(error.IsInitialized()) << "Error status PB must be initialized";
-  if (PREDICT_FALSE(!error.has_code())) {
-    return Status::RuntimeError(error.message());
-  }
-  string code_name = ErrorStatusPB::RpcErrorCodePB_Name(error.code());
-  switch (error.code()) {
-    case ErrorStatusPB_RpcErrorCodePB_FATAL_UNAUTHORIZED:
-      return Status::NotAuthorized(code_name, error.message());
-    default:
-      return Status::RuntimeError(code_name, error.message());
-  }
-}
-
-SaslClient::SaslClient(string app_name, Socket* socket)
-    : app_name_(std::move(app_name)),
-      sock_(socket),
-      helper_(SaslHelper::CLIENT),
-      client_state_(SaslNegotiationState::NEW),
-      negotiated_mech_(SaslMechanism::INVALID),
-      deadline_(MonoTime::Max()) {
-  callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
-      reinterpret_cast<int (*)()>(&SaslClientGetoptCb), this));
-  callbacks_.push_back(SaslBuildCallback(SASL_CB_AUTHNAME,
-      reinterpret_cast<int (*)()>(&SaslClientSimpleCb), this));
-  callbacks_.push_back(SaslBuildCallback(SASL_CB_PASS,
-      reinterpret_cast<int (*)()>(&SaslClientSecretCb), this));
-  callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr));
-}
-
-Status SaslClient::EnablePlain(const string& user, const string& pass) {
-  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
-  RETURN_NOT_OK(helper_.EnablePlain());
-  plain_auth_user_ = user;
-  plain_pass_ = pass;
-  return Status::OK();
-}
-
-Status SaslClient::EnableGSSAPI() {
-  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
-  return helper_.EnableGSSAPI();
-}
-
-SaslMechanism::Type SaslClient::negotiated_mechanism() const {
-  DCHECK_EQ(client_state_, SaslNegotiationState::NEGOTIATED);
-  return negotiated_mech_;
-}
-
-void SaslClient::set_local_addr(const Sockaddr& addr) {
-  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
-  helper_.set_local_addr(addr);
-}
-
-void SaslClient::set_remote_addr(const Sockaddr& addr) {
-  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
-  helper_.set_remote_addr(addr);
-}
-
-void SaslClient::set_server_fqdn(const string& domain_name) {
-  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
-  helper_.set_server_fqdn(domain_name);
-}
-
-void SaslClient::set_deadline(const MonoTime& deadline) {
-  DCHECK_NE(client_state_, SaslNegotiationState::NEGOTIATED);
-  deadline_ = deadline;
-}
-
-// calls sasl_client_init() and sasl_client_new()
-Status SaslClient::Init(const string& service_type) {
-  RETURN_NOT_OK(SaslInit(app_name_.c_str()));
-
-  // Ensure we are not called more than once.
-  if (client_state_ != SaslNegotiationState::NEW) {
-    return Status::IllegalState("Init() may only be called once per SaslClient object.");
-  }
-
-  // TODO: Support security flags.
-  unsigned secflags = 0;
-
-  sasl_conn_t* sasl_conn = nullptr;
-  Status s = WrapSaslCall(nullptr /* no conn */, [&]() {
-      return sasl_client_new(
-          service_type.c_str(),         // Registered name of the service using SASL. Required.
-          helper_.server_fqdn(),        // The fully qualified domain name of the remote server.
-          helper_.local_addr_string(),  // Local and remote IP address strings. (NULL disables
-          helper_.remote_addr_string(), //   mechanisms which require this info.)
-          &callbacks_[0],               // Connection-specific callbacks.
-          secflags,                     // Security flags.
-          &sasl_conn);
-    });
-  if (!s.ok()) {
-    return Status::RuntimeError("Unable to create new SASL client",
-                                s.message());
-  }
-  sasl_conn_.reset(sasl_conn);
-
-  client_state_ = SaslNegotiationState::INITIALIZED;
-  return Status::OK();
-}
-
-Status SaslClient::Negotiate() {
-  // After negotiation, we no longer need the SASL library object, so
-  // may as well free its memory since the connection may be long-lived.
-  // Additionally, this works around a SEGV seen at process shutdown time:
-  // if we still have SASL objects retained by Reactor when the process
-  // is exiting, the SASL libraries may start destructing global state
-  // and cause a crash when we sasl_dispose the connection.
-  auto cleanup = MakeScopedCleanup([&]() {
-      sasl_conn_.reset();
-    });
-  TRACE("Called SaslClient::Negotiate()");
-
-  // Ensure we called exactly once, and in the right order.
-  if (client_state_ == SaslNegotiationState::NEW) {
-    return Status::IllegalState("SaslClient: Init() must be called before calling Negotiate()");
-  } else if (client_state_ == SaslNegotiationState::NEGOTIATED) {
-    return Status::IllegalState("SaslClient: Negotiate() may only be called once per object.");
-  }
-
-  // Ensure we can use blocking calls on the socket during negotiation.
-  RETURN_NOT_OK(EnsureBlockingMode(sock_));
-
-  // Start by asking the server for a list of available auth mechanisms.
-  RETURN_NOT_OK(SendNegotiateMessage());
-
-  faststring recv_buf;
-  nego_ok_ = false;
-
-  // We set nego_ok_ = true when the SASL library returns SASL_OK to us.
-  // We set nego_response_expected_ = true each time we send a request to the server.
-  while (!nego_ok_ || nego_response_expected_) {
-    ResponseHeader header;
-    Slice param_buf;
-    RETURN_NOT_OK(ReceiveFramedMessageBlocking(sock_, &recv_buf, &header, &param_buf, deadline_));
-    nego_response_expected_ = false;
-
-    NegotiatePB response;
-    RETURN_NOT_OK(ParseNegotiatePB(header, param_buf, &response));
-
-    switch (response.step()) {
-      // NEGOTIATE: Server has sent us its list of supported SASL mechanisms.
-      case NegotiatePB::NEGOTIATE:
-        RETURN_NOT_OK(HandleNegotiateResponse(response));
-        break;
-
-      // SASL_CHALLENGE: Server sent us a follow-up to an SASL_INITIATE or SASL_RESPONSE request.
-      case NegotiatePB::SASL_CHALLENGE:
-        RETURN_NOT_OK(HandleChallengeResponse(response));
-        break;
-
-      // SASL_SUCCESS: Server has accepted our authentication request. Negotiation successful.
-      case NegotiatePB::SASL_SUCCESS:
-        RETURN_NOT_OK(HandleSuccessResponse(response));
-        break;
-
-      // Client sent us some unsupported SASL response.
-      default:
-        LOG(ERROR) << "SASL Client: Received unsupported response from server";
-        return Status::InvalidArgument("RPC client doesn't support Negotiate step",
-                                       NegotiatePB::NegotiateStep_Name(response.step()));
-    }
-  }
-
-  TRACE("SASL Client: Successful negotiation");
-  client_state_ = SaslNegotiationState::NEGOTIATED;
-  return Status::OK();
-}
-
-Status SaslClient::SendNegotiatePB(const NegotiatePB& msg) {
-  DCHECK_NE(client_state_, SaslNegotiationState::NEW)
-      << "Must not send Negotiate messages before calling Init()";
-  DCHECK_NE(client_state_, SaslNegotiationState::NEGOTIATED)
-      << "Must not send Negotiate messages after negotiation succeeds";
-
-  // Create header with SASL-specific callId
-  RequestHeader header;
-  header.set_call_id(kNegotiateCallId);
-  return helper_.SendNegotiatePB(sock_, header, msg, deadline_);
-}
-
-Status SaslClient::ParseNegotiatePB(const ResponseHeader& header,
-                                    const Slice& param_buf,
-                                    NegotiatePB* response) {
-  RETURN_NOT_OK(helper_.SanityCheckNegotiateCallId(header.call_id()));
-
-  if (header.is_error()) {
-    return ParseError(param_buf);
-  }
-
-  return helper_.ParseNegotiatePB(param_buf, response);
-}
-
-Status SaslClient::SendNegotiateMessage() {
-  NegotiatePB msg;
-  msg.set_step(NegotiatePB::NEGOTIATE);
-
-  // Advertise our supported features.
-  for (RpcFeatureFlag feature : kSupportedClientRpcFeatureFlags) {
-    msg.add_supported_features(feature);
-  }
-
-  TRACE("SASL Client: Sending NEGOTIATE request to server.");
-  RETURN_NOT_OK(SendNegotiatePB(msg));
-  nego_response_expected_ = true;
-  return Status::OK();
-}
-
-Status SaslClient::SendInitiateMessage(const NegotiatePB_SaslAuth& auth,
-    const char* init_msg, unsigned init_msg_len) {
-  NegotiatePB msg;
-  msg.set_step(NegotiatePB::SASL_INITIATE);
-  msg.mutable_token()->assign(init_msg, init_msg_len);
-  msg.add_auths()->CopyFrom(auth);
-  TRACE("SASL Client: Sending SASL_INITIATE request to server.");
-  RETURN_NOT_OK(SendNegotiatePB(msg));
-  nego_response_expected_ = true;
-  return Status::OK();
-}
-
-Status SaslClient::SendResponseMessage(const char* resp_msg, unsigned resp_msg_len) {
-  NegotiatePB reply;
-  reply.set_step(NegotiatePB::SASL_RESPONSE);
-  reply.mutable_token()->assign(resp_msg, resp_msg_len);
-  TRACE("SASL Client: Sending SASL_RESPONSE request to server.");
-  RETURN_NOT_OK(SendNegotiatePB(reply));
-  nego_response_expected_ = true;
-  return Status::OK();
-}
-
-Status SaslClient::DoSaslStep(const string& in, const char** out, unsigned* out_len) {
-  TRACE("SASL Client: Calling sasl_client_step()");
-  Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
-      return sasl_client_step(sasl_conn_.get(), in.c_str(), in.length(), nullptr, out, out_len);
-    });
-  if (s.ok()) {
-    nego_ok_ = true;
-  }
-  return s;
-}
-
-Status SaslClient::HandleNegotiateResponse(const NegotiatePB& response) {
-  TRACE("SASL Client: Received NEGOTIATE response from server");
-  // Fill in the set of features supported by the server.
-  for (int flag : response.supported_features()) {
-    // We only add the features that our local build knows about.
-    RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
-                                  static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
-    if (ContainsKey(kSupportedClientRpcFeatureFlags, feature_flag)) {
-      server_features_.insert(feature_flag);
-    }
-  }
-
-  // Build a map of the mechanisms offered by the server.
-  const set<string>& local_mechs = helper_.LocalMechs();
-  set<string> server_mechs;
-  map<string, NegotiatePB::SaslAuth> server_mech_map;
-  for (const NegotiatePB::SaslAuth& auth : response.auths()) {
-    const auto& mech = auth.mechanism();
-    server_mech_map[mech] = auth;
-    server_mechs.insert(mech);
-  }
-  // Determine which server mechs are also enabled by the client.
-  // Cyrus SASL 2.1.25 and later supports doing this set intersection via
-  // the 'client_mech_list' option, but that version is not available on
-  // RHEL 6, so we have to do it manually.
-  set<string> matching_mechs = STLSetIntersection(local_mechs, server_mechs);
-
-  if (matching_mechs.empty() &&
-      ContainsKey(server_mechs, kSaslMechGSSAPI) &&
-      !ContainsKey(local_mechs, kSaslMechGSSAPI)) {
-    return Status::NotAuthorized("server requires GSSAPI (Kerberos) authentication and "
-                                 "client was missing the required SASL module");
-  }
-
-  string matching_mechs_str = JoinElements(matching_mechs, " ");
-  TRACE("SASL Client: Matching mech list: $0", matching_mechs_str);
-
-  const char* init_msg = nullptr;
-  unsigned init_msg_len = 0;
-  const char* negotiated_mech = nullptr;
-
-  /* select a mechanism for a connection
-   *  mechlist      -- mechanisms server has available (punctuation ignored)
-   * output:
-   *  prompt_need   -- on SASL_INTERACT, list of prompts needed to continue
-   *  clientout     -- the initial client response to send to the server
-   *  mech          -- set to mechanism name
-   *
-   * Returns:
-   *  SASL_OK       -- success
-   *  SASL_CONTINUE -- negotiation required
-   *  SASL_NOMEM    -- not enough memory
-   *  SASL_NOMECH   -- no mechanism meets requested properties
-   *  SASL_INTERACT -- user interaction needed to fill in prompt_need list
-   */
-  TRACE("SASL Client: Calling sasl_client_start()");
-  Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
-      return sasl_client_start(
-          sasl_conn_.get(),           // The SASL connection context created by init()
-          matching_mechs_str.c_str(), // The list of mechanisms to negotiate.
-          nullptr,                    // Disables INTERACT return if NULL.
-          &init_msg,                  // Filled in on success.
-          &init_msg_len,              // Filled in on success.
-          &negotiated_mech);          // Filled in on success.
-    });
-
-  if (s.ok()) {
-    nego_ok_ = true;
-  } else if (!s.IsIncomplete()) {
-    return s;
-  }
-
-  // The server matched one of our mechanisms.
-  NegotiatePB::SaslAuth* auth = FindOrNull(server_mech_map, negotiated_mech);
-  if (PREDICT_FALSE(auth == nullptr)) {
-    return Status::IllegalState("Unable to find auth in map, unexpected error", negotiated_mech);
-  }
-  negotiated_mech_ = SaslMechanism::value_of(negotiated_mech);
-
-  RETURN_NOT_OK(SendInitiateMessage(*auth, init_msg, init_msg_len));
-  return Status::OK();
-}
-
-Status SaslClient::HandleChallengeResponse(const NegotiatePB& response) {
-  TRACE("SASL Client: Received SASL_CHALLENGE response from server");
-  if (PREDICT_FALSE(nego_ok_)) {
-    LOG(DFATAL) << "Server sent SASL_CHALLENGE response after client library returned SASL_OK";
-  }
-
-  if (PREDICT_FALSE(!response.has_token())) {
-    return Status::InvalidArgument("No token in SASL_CHALLENGE response from server");
-  }
-
-  const char* out = nullptr;
-  unsigned out_len = 0;
-  Status s = DoSaslStep(response.token(), &out, &out_len);
-  if (!s.ok() && !s.IsIncomplete()) {
-    return s;
-  }
-  RETURN_NOT_OK(SendResponseMessage(out, out_len));
-  return Status::OK();
-}
-
-Status SaslClient::HandleSuccessResponse(const NegotiatePB& response) {
-  TRACE("SASL Client: Received SASL_SUCCESS response from server");
-  if (!nego_ok_) {
-    const char* out = nullptr;
-    unsigned out_len = 0;
-    Status s = DoSaslStep(response.token(), &out, &out_len);
-    if (s.IsIncomplete()) {
-      return Status::IllegalState("Server indicated successful authentication, but client "
-                                  "was not complete");
-    }
-    RETURN_NOT_OK(s);
-    if (out_len > 0) {
-      return Status::IllegalState("SASL client library generated spurious token after SASL_SUCCESS",
-          string(out, out_len));
-    }
-    CHECK(nego_ok_);
-  }
-  return Status::OK();
-}
-
-// Parse error status message from raw bytes of an ErrorStatusPB.
-Status SaslClient::ParseError(const Slice& err_data) {
-  ErrorStatusPB error;
-  if (!error.ParseFromArray(err_data.data(), err_data.size())) {
-    return Status::IOError("Invalid error response, missing fields",
-        error.InitializationErrorString());
-  }
-  Status s = StatusFromRpcError(error);
-  TRACE("SASL Client: Received error response from server: $0", s.ToString());
-  return s;
-}
-
-int SaslClient::GetOptionCb(const char* plugin_name, const char* option,
-                            const char** result, unsigned* len) {
-  return helper_.GetOptionCb(plugin_name, option, result, len);
-}
-
-// Used for PLAIN.
-// SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
-int SaslClient::SimpleCb(int id, const char** result, unsigned* len) {
-  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
-    LOG(DFATAL) << "SASL Client: Simple callback called, but PLAIN auth is not enabled";
-    return SASL_FAIL;
-  }
-  if (PREDICT_FALSE(result == nullptr)) {
-    LOG(DFATAL) << "SASL Client: result outparam is NULL";
-    return SASL_BADPARAM;
-  }
-  switch (id) {
-    // TODO: Support impersonation?
-    // For impersonation, USER is the impersonated user, AUTHNAME is the "sudoer".
-    case SASL_CB_USER:
-      TRACE("SASL Client: callback for SASL_CB_USER");
-      *result = plain_auth_user_.c_str();
-      if (len != nullptr) *len = plain_auth_user_.length();
-      break;
-    case SASL_CB_AUTHNAME:
-      TRACE("SASL Client: callback for SASL_CB_AUTHNAME");
-      *result = plain_auth_user_.c_str();
-      if (len != nullptr) *len = plain_auth_user_.length();
-      break;
-    case SASL_CB_LANGUAGE:
-      LOG(DFATAL) << "SASL Client: Unable to handle SASL callback type SASL_CB_LANGUAGE"
-        << "(" << id << ")";
-      return SASL_BADPARAM;
-    default:
-      LOG(DFATAL) << "SASL Client: Unexpected SASL callback type: " << id;
-      return SASL_BADPARAM;
-  }
-
-  return SASL_OK;
-}
-
-// Used for PLAIN.
-// SASL callback for SASL_CB_PASS: User password.
-int SaslClient::SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret) {
-  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
-    LOG(DFATAL) << "SASL Client: Plain secret callback called, but PLAIN auth is not enabled";
-    return SASL_FAIL;
-  }
-  switch (id) {
-    case SASL_CB_PASS: {
-      if (!conn || !psecret) return SASL_BADPARAM;
-
-      int len = plain_pass_.length();
-      *psecret = reinterpret_cast<sasl_secret_t*>(malloc(sizeof(sasl_secret_t) + len));
-      if (!*psecret) {
-        return SASL_NOMEM;
-      }
-      psecret_.reset(*psecret);  // Ensure that we free() this structure later.
-      (*psecret)->len = len;
-      memcpy(reinterpret_cast<char *>((*psecret)->data), plain_pass_.c_str(), len + 1);
-      break;
-    }
-    default:
-      LOG(DFATAL) << "SASL Client: Unexpected SASL callback type: " << id;
-      return SASL_BADPARAM;
-  }
-
-  return SASL_OK;
-}
-
-} // namespace rpc
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/sasl_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_client.h b/src/kudu/rpc/sasl_client.h
deleted file mode 100644
index 746a6f8..0000000
--- a/src/kudu/rpc/sasl_client.h
+++ /dev/null
@@ -1,180 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef KUDU_RPC_SASL_CLIENT_H
-#define KUDU_RPC_SASL_CLIENT_H
-
-#include <set>
-#include <string>
-#include <vector>
-
-#include <sasl/sasl.h>
-
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/sasl_common.h"
-#include "kudu/rpc/sasl_helper.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "kudu/util/net/socket.h"
-
-namespace kudu {
-namespace rpc {
-
-using std::string;
-
-class NegotiatePB;
-class NegotiatePB_SaslAuth;
-class ResponseHeader;
-
-// Class for doing SASL negotiation with a SaslServer over a bidirectional socket.
-// Operations on this class are NOT thread-safe.
-class SaslClient {
- public:
-  // Does not take ownership of the socket indicated by the fd.
-  SaslClient(string app_name, Socket* socket);
-
-  // Enable PLAIN authentication.
-  // Must be called after Init().
-  Status EnablePlain(const string& user, const string& pass);
-
-  // Enable GSSAPI authentication.
-  // Call after Init().
-  Status EnableGSSAPI();
-
-  // Returns mechanism negotiated by this connection.
-  // Must be called after Negotiate().
-  SaslMechanism::Type negotiated_mechanism() const;
-
-  // Returns the set of RPC system features supported by the remote server.
-  // Must be called after Negotiate().
-  const std::set<RpcFeatureFlag>& server_features() const {
-    return server_features_;
-  }
-
-  // Specify IP:port of local side of connection.
-  // Must be called before Init(). Required for some mechanisms.
-  void set_local_addr(const Sockaddr& addr);
-
-  // Specify IP:port of remote side of connection.
-  // Must be called before Init(). Required for some mechanisms.
-  void set_remote_addr(const Sockaddr& addr);
-
-  // Specify the fully-qualified domain name of the remote server.
-  // Must be called before Init(). Required for some mechanisms.
-  void set_server_fqdn(const string& domain_name);
-
-  // Set deadline for connection negotiation.
-  void set_deadline(const MonoTime& deadline);
-
-  // Get deadline for connection negotiation.
-  const MonoTime& deadline() const { return deadline_; }
-
-  // Initialize a new SASL client. Must be called before Negotiate().
-  // Returns OK on success, otherwise RuntimeError.
-  Status Init(const string& service_type);
-
-  // Begin negotiation with the SASL server on the other side of the fd socket
-  // that this client was constructed with.
-  // Returns OK on success.
-  // Otherwise, it may return NotAuthorized, NotSupported, or another non-OK status.
-  Status Negotiate();
-
-  // SASL callback for plugin options, supported mechanisms, etc.
-  // Returns SASL_FAIL if the option is not handled, which does not fail the handshake.
-  int GetOptionCb(const char* plugin_name, const char* option,
-                  const char** result, unsigned* len);
-
-  // SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
-  int SimpleCb(int id, const char** result, unsigned* len);
-
-  // SASL callback for SASL_CB_PASS
-  int SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret);
-
- private:
-  // Encode and send the specified negotiate message to the server.
-  Status SendNegotiatePB(const NegotiatePB& msg);
-
-  // Validate that header does not indicate an error, parse param_buf into response.
-  Status ParseNegotiatePB(const ResponseHeader& header,
-                          const Slice& param_buf,
-                          NegotiatePB* response);
-
-  // Send an NEGOTIATE message to the server.
-  Status SendNegotiateMessage();
-
-  // Send an SASL_INITIATE message to the server.
-  Status SendInitiateMessage(const NegotiatePB_SaslAuth& auth,
-                             const char* init_msg, unsigned init_msg_len);
-
-  // Send a RESPONSE message to the server.
-  Status SendResponseMessage(const char* resp_msg, unsigned resp_msg_len);
-
-  // Perform a client-side step of the SASL negotiation.
-  // Input is what came from the server. Output is what we will send back to the server.
-  // Returns:
-  //   Status::OK if sasl_client_step returns SASL_OK.
-  //   Status::Incomplete if sasl_client_step returns SASL_CONTINUE
-  // otherwise returns an appropriate error status.
-  Status DoSaslStep(const string& in, const char** out, unsigned* out_len);
-
-  // Handle case when server sends NEGOTIATE response.
-  Status HandleNegotiateResponse(const NegotiatePB& response);
-
-  // Handle case when server sends CHALLENGE response.
-  Status HandleChallengeResponse(const NegotiatePB& response);
-
-  // Handle case when server sends SUCCESS response.
-  Status HandleSuccessResponse(const NegotiatePB& response);
-
-  // Parse error status message from raw bytes of an ErrorStatusPB.
-  Status ParseError(const Slice& err_data);
-
-  string app_name_;
-  Socket* sock_;
-  std::vector<sasl_callback_t> callbacks_;
-  // The SASL connection object. This is initialized in Init() and
-  // freed after Negotiate() completes (regardless whether it was successful).
-  gscoped_ptr<sasl_conn_t, SaslDeleter> sasl_conn_;
-  SaslHelper helper_;
-
-  string plain_auth_user_;
-  string plain_pass_;
-  gscoped_ptr<sasl_secret_t, FreeDeleter> psecret_;
-
-  // The set of features supported by the server.
-  std::set<RpcFeatureFlag> server_features_;
-
-  SaslNegotiationState::Type client_state_;
-
-  // The mechanism we negotiated with the server.
-  SaslMechanism::Type negotiated_mech_;
-
-  // Intra-negotiation state.
-  bool nego_ok_;  // During negotiation: did we get a SASL_OK response from the SASL library?
-  bool nego_response_expected_;  // During negotiation: Are we waiting for a server response?
-
-  // Negotiation timeout deadline.
-  MonoTime deadline_;
-
-  DISALLOW_COPY_AND_ASSIGN(SaslClient);
-};
-
-} // namespace rpc
-} // namespace kudu
-
-#endif  // KUDU_RPC_SASL_CLIENT_H