You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/01/24 00:00:19 UTC

[2/3] kudu git commit: TLS-negotiation [5/n]: Rename sasl_[client|server] to [client|server]_negotiation

TLS-negotiation [5/n]: Rename sasl_[client|server] to [client|server]_negotiation

This rename is in anticipation of a major refactor to both of these
classes which will make them less specific to SASL. The file rename is
kept in a separate commit in order to preserve git history.

Change-Id: Ib4fb4bcdb7c7a43eea88b2911d2b2e75eafa62b0
Reviewed-on: http://gerrit.cloudera.org:8080/5759
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/41c45523
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/41c45523
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/41c45523

Branch: refs/heads/master
Commit: 41c45523f7d34d7800fd29d4cbe40330cac13e6a
Parents: 9d0421e
Author: Dan Burkert <da...@apache.org>
Authored: Fri Jan 20 15:34:43 2017 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Mon Jan 23 23:37:18 2017 +0000

----------------------------------------------------------------------
 src/kudu/rpc/CMakeLists.txt        |   8 +-
 src/kudu/rpc/client_negotiation.cc | 522 +++++++++++++++++++++++++++++
 src/kudu/rpc/client_negotiation.h  | 180 ++++++++++
 src/kudu/rpc/connection.cc         |   3 +-
 src/kudu/rpc/connection.h          |   6 +-
 src/kudu/rpc/negotiation-test.cc   | 567 ++++++++++++++++++++++++++++++++
 src/kudu/rpc/negotiation.cc        |   4 +-
 src/kudu/rpc/reactor.cc            |   4 +-
 src/kudu/rpc/sasl_client.cc        | 521 -----------------------------
 src/kudu/rpc/sasl_client.h         | 180 ----------
 src/kudu/rpc/sasl_rpc-test.cc      | 567 --------------------------------
 src/kudu/rpc/sasl_server.cc        | 498 ----------------------------
 src/kudu/rpc/sasl_server.h         | 180 ----------
 src/kudu/rpc/server_negotiation.cc | 499 ++++++++++++++++++++++++++++
 src/kudu/rpc/server_negotiation.h  | 180 ++++++++++
 15 files changed, 1960 insertions(+), 1959 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/CMakeLists.txt b/src/kudu/rpc/CMakeLists.txt
index 6ba028b..fa21551 100644
--- a/src/kudu/rpc/CMakeLists.txt
+++ b/src/kudu/rpc/CMakeLists.txt
@@ -43,12 +43,13 @@ ADD_EXPORTABLE_LIBRARY(rpc_introspection_proto
 set(KRPC_SRCS
     acceptor_pool.cc
     blocking_ops.cc
-    outbound_call.cc
+    client_negotiation.cc
     connection.cc
     constants.cc
     inbound_call.cc
     messenger.cc
     negotiation.cc
+    outbound_call.cc
     proxy.cc
     reactor.cc
     remote_method.cc
@@ -59,10 +60,9 @@ set(KRPC_SRCS
     rpc_controller.cc
     rpcz_store.cc
     sasl_common.cc
-    sasl_client.cc
     sasl_helper.cc
-    sasl_server.cc
     serialization.cc
+    server_negotiation.cc
     service_if.cc
     service_pool.cc
     service_queue.cc
@@ -116,11 +116,11 @@ target_link_libraries(rtest_krpc
 set(KUDU_TEST_LINK_LIBS rtest_krpc krpc rpc_header_proto security-test ${KUDU_MIN_TEST_LIBS})
 ADD_KUDU_TEST(exactly_once_rpc-test)
 ADD_KUDU_TEST(mt-rpc-test RUN_SERIAL true)
+ADD_KUDU_TEST(negotiation-test)
 ADD_KUDU_TEST(reactor-test)
 ADD_KUDU_TEST(request_tracker-test)
 ADD_KUDU_TEST(rpc-bench RUN_SERIAL true)
 ADD_KUDU_TEST(rpc-test)
 ADD_KUDU_TEST(rpc_stub-test)
-ADD_KUDU_TEST(sasl_rpc-test)
 ADD_KUDU_TEST(service_queue-test)
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/client_negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.cc b/src/kudu/rpc/client_negotiation.cc
new file mode 100644
index 0000000..0eb96b8
--- /dev/null
+++ b/src/kudu/rpc/client_negotiation.cc
@@ -0,0 +1,522 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/client_negotiation.h"
+
+#include <string.h>
+
+#include <map>
+#include <set>
+#include <string>
+
+#include <glog/logging.h>
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/strings/util.h"
+#include "kudu/rpc/blocking_ops.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/trace.h"
+
+namespace kudu {
+namespace rpc {
+
+using std::map;
+using std::set;
+using std::string;
+
+static int SaslClientGetoptCb(void* sasl_client, const char* plugin_name, const char* option,
+                       const char** result, unsigned* len) {
+  return static_cast<SaslClient*>(sasl_client)
+    ->GetOptionCb(plugin_name, option, result, len);
+}
+
+static int SaslClientSimpleCb(void *sasl_client, int id,
+                       const char **result, unsigned *len) {
+  return static_cast<SaslClient*>(sasl_client)->SimpleCb(id, result, len);
+}
+
+static int SaslClientSecretCb(sasl_conn_t* conn, void *sasl_client, int id,
+                       sasl_secret_t** psecret) {
+  return static_cast<SaslClient*>(sasl_client)->SecretCb(conn, id, psecret);
+}
+
+// Return an appropriately-typed Status object based on an ErrorStatusPB returned
+// from an Error RPC.
+// In case there is no relevant Status type, return a RuntimeError.
+static Status StatusFromRpcError(const ErrorStatusPB& error) {
+  DCHECK(error.IsInitialized()) << "Error status PB must be initialized";
+  if (PREDICT_FALSE(!error.has_code())) {
+    return Status::RuntimeError(error.message());
+  }
+  string code_name = ErrorStatusPB::RpcErrorCodePB_Name(error.code());
+  switch (error.code()) {
+    case ErrorStatusPB_RpcErrorCodePB_FATAL_UNAUTHORIZED:
+      return Status::NotAuthorized(code_name, error.message());
+    default:
+      return Status::RuntimeError(code_name, error.message());
+  }
+}
+
+SaslClient::SaslClient(string app_name, Socket* socket)
+    : app_name_(std::move(app_name)),
+      sock_(socket),
+      helper_(SaslHelper::CLIENT),
+      client_state_(SaslNegotiationState::NEW),
+      negotiated_mech_(SaslMechanism::INVALID),
+      deadline_(MonoTime::Max()) {
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
+      reinterpret_cast<int (*)()>(&SaslClientGetoptCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_AUTHNAME,
+      reinterpret_cast<int (*)()>(&SaslClientSimpleCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_PASS,
+      reinterpret_cast<int (*)()>(&SaslClientSecretCb), this));
+  callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr));
+}
+
+Status SaslClient::EnablePlain(const string& user, const string& pass) {
+  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
+  RETURN_NOT_OK(helper_.EnablePlain());
+  plain_auth_user_ = user;
+  plain_pass_ = pass;
+  return Status::OK();
+}
+
+Status SaslClient::EnableGSSAPI() {
+  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
+  return helper_.EnableGSSAPI();
+}
+
+SaslMechanism::Type SaslClient::negotiated_mechanism() const {
+  DCHECK_EQ(client_state_, SaslNegotiationState::NEGOTIATED);
+  return negotiated_mech_;
+}
+
+void SaslClient::set_local_addr(const Sockaddr& addr) {
+  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
+  helper_.set_local_addr(addr);
+}
+
+void SaslClient::set_remote_addr(const Sockaddr& addr) {
+  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
+  helper_.set_remote_addr(addr);
+}
+
+void SaslClient::set_server_fqdn(const string& domain_name) {
+  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
+  helper_.set_server_fqdn(domain_name);
+}
+
+void SaslClient::set_deadline(const MonoTime& deadline) {
+  DCHECK_NE(client_state_, SaslNegotiationState::NEGOTIATED);
+  deadline_ = deadline;
+}
+
+// calls sasl_client_init() and sasl_client_new()
+Status SaslClient::Init(const string& service_type) {
+  RETURN_NOT_OK(SaslInit(app_name_.c_str()));
+
+  // Ensure we are not called more than once.
+  if (client_state_ != SaslNegotiationState::NEW) {
+    return Status::IllegalState("Init() may only be called once per SaslClient object.");
+  }
+
+  // TODO(unknown): Support security flags.
+  unsigned secflags = 0;
+
+  sasl_conn_t* sasl_conn = nullptr;
+  Status s = WrapSaslCall(nullptr /* no conn */, [&]() {
+      return sasl_client_new(
+          service_type.c_str(),         // Registered name of the service using SASL. Required.
+          helper_.server_fqdn(),        // The fully qualified domain name of the remote server.
+          helper_.local_addr_string(),  // Local and remote IP address strings. (NULL disables
+          helper_.remote_addr_string(), //   mechanisms which require this info.)
+          &callbacks_[0],               // Connection-specific callbacks.
+          secflags,                     // Security flags.
+          &sasl_conn);
+    });
+  if (!s.ok()) {
+    return Status::RuntimeError("Unable to create new SASL client",
+                                s.message());
+  }
+  sasl_conn_.reset(sasl_conn);
+
+  client_state_ = SaslNegotiationState::INITIALIZED;
+  return Status::OK();
+}
+
+Status SaslClient::Negotiate() {
+  // After negotiation, we no longer need the SASL library object, so
+  // may as well free its memory since the connection may be long-lived.
+  // Additionally, this works around a SEGV seen at process shutdown time:
+  // if we still have SASL objects retained by Reactor when the process
+  // is exiting, the SASL libraries may start destructing global state
+  // and cause a crash when we sasl_dispose the connection.
+  auto cleanup = MakeScopedCleanup([&]() {
+      sasl_conn_.reset();
+    });
+  TRACE("Called SaslClient::Negotiate()");
+
+  // Ensure we called exactly once, and in the right order.
+  if (client_state_ == SaslNegotiationState::NEW) {
+    return Status::IllegalState("SaslClient: Init() must be called before calling Negotiate()");
+  }
+  if (client_state_ == SaslNegotiationState::NEGOTIATED) {
+    return Status::IllegalState("SaslClient: Negotiate() may only be called once per object.");
+  }
+
+  // Ensure we can use blocking calls on the socket during negotiation.
+  RETURN_NOT_OK(EnsureBlockingMode(sock_));
+
+  // Start by asking the server for a list of available auth mechanisms.
+  RETURN_NOT_OK(SendNegotiateMessage());
+
+  faststring recv_buf;
+  nego_ok_ = false;
+
+  // We set nego_ok_ = true when the SASL library returns SASL_OK to us.
+  // We set nego_response_expected_ = true each time we send a request to the server.
+  while (!nego_ok_ || nego_response_expected_) {
+    ResponseHeader header;
+    Slice param_buf;
+    RETURN_NOT_OK(ReceiveFramedMessageBlocking(sock_, &recv_buf, &header, &param_buf, deadline_));
+    nego_response_expected_ = false;
+
+    NegotiatePB response;
+    RETURN_NOT_OK(ParseNegotiatePB(header, param_buf, &response));
+
+    switch (response.step()) {
+      // NEGOTIATE: Server has sent us its list of supported SASL mechanisms.
+      case NegotiatePB::NEGOTIATE:
+        RETURN_NOT_OK(HandleNegotiateResponse(response));
+        break;
+
+      // SASL_CHALLENGE: Server sent us a follow-up to an SASL_INITIATE or SASL_RESPONSE request.
+      case NegotiatePB::SASL_CHALLENGE:
+        RETURN_NOT_OK(HandleChallengeResponse(response));
+        break;
+
+      // SASL_SUCCESS: Server has accepted our authentication request. Negotiation successful.
+      case NegotiatePB::SASL_SUCCESS:
+        RETURN_NOT_OK(HandleSuccessResponse(response));
+        break;
+
+      // Client sent us some unsupported SASL response.
+      default:
+        LOG(ERROR) << "SASL Client: Received unsupported response from server";
+        return Status::InvalidArgument("RPC client doesn't support Negotiate step",
+                                       NegotiatePB::NegotiateStep_Name(response.step()));
+    }
+  }
+
+  TRACE("SASL Client: Successful negotiation");
+  client_state_ = SaslNegotiationState::NEGOTIATED;
+  return Status::OK();
+}
+
+Status SaslClient::SendNegotiatePB(const NegotiatePB& msg) {
+  DCHECK_NE(client_state_, SaslNegotiationState::NEW)
+      << "Must not send Negotiate messages before calling Init()";
+  DCHECK_NE(client_state_, SaslNegotiationState::NEGOTIATED)
+      << "Must not send Negotiate messages after negotiation succeeds";
+
+  // Create header with SASL-specific callId
+  RequestHeader header;
+  header.set_call_id(kNegotiateCallId);
+  return helper_.SendNegotiatePB(sock_, header, msg, deadline_);
+}
+
+Status SaslClient::ParseNegotiatePB(const ResponseHeader& header,
+                                    const Slice& param_buf,
+                                    NegotiatePB* response) {
+  RETURN_NOT_OK(helper_.SanityCheckNegotiateCallId(header.call_id()));
+
+  if (header.is_error()) {
+    return ParseError(param_buf);
+  }
+
+  return helper_.ParseNegotiatePB(param_buf, response);
+}
+
+Status SaslClient::SendNegotiateMessage() {
+  NegotiatePB msg;
+  msg.set_step(NegotiatePB::NEGOTIATE);
+
+  // Advertise our supported features.
+  for (RpcFeatureFlag feature : kSupportedClientRpcFeatureFlags) {
+    msg.add_supported_features(feature);
+  }
+
+  TRACE("SASL Client: Sending NEGOTIATE request to server.");
+  RETURN_NOT_OK(SendNegotiatePB(msg));
+  nego_response_expected_ = true;
+  return Status::OK();
+}
+
+Status SaslClient::SendInitiateMessage(const NegotiatePB_SaslAuth& auth,
+    const char* init_msg, unsigned init_msg_len) {
+  NegotiatePB msg;
+  msg.set_step(NegotiatePB::SASL_INITIATE);
+  msg.mutable_token()->assign(init_msg, init_msg_len);
+  msg.add_auths()->CopyFrom(auth);
+  TRACE("SASL Client: Sending SASL_INITIATE request to server.");
+  RETURN_NOT_OK(SendNegotiatePB(msg));
+  nego_response_expected_ = true;
+  return Status::OK();
+}
+
+Status SaslClient::SendResponseMessage(const char* resp_msg, unsigned resp_msg_len) {
+  NegotiatePB reply;
+  reply.set_step(NegotiatePB::SASL_RESPONSE);
+  reply.mutable_token()->assign(resp_msg, resp_msg_len);
+  TRACE("SASL Client: Sending SASL_RESPONSE request to server.");
+  RETURN_NOT_OK(SendNegotiatePB(reply));
+  nego_response_expected_ = true;
+  return Status::OK();
+}
+
+Status SaslClient::DoSaslStep(const string& in, const char** out, unsigned* out_len) {
+  TRACE("SASL Client: Calling sasl_client_step()");
+  Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
+      return sasl_client_step(sasl_conn_.get(), in.c_str(), in.length(), nullptr, out, out_len);
+    });
+  if (s.ok()) {
+    nego_ok_ = true;
+  }
+  return s;
+}
+
+Status SaslClient::HandleNegotiateResponse(const NegotiatePB& response) {
+  TRACE("SASL Client: Received NEGOTIATE response from server");
+  // Fill in the set of features supported by the server.
+  for (int flag : response.supported_features()) {
+    // We only add the features that our local build knows about.
+    RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
+                                  static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
+    if (ContainsKey(kSupportedClientRpcFeatureFlags, feature_flag)) {
+      server_features_.insert(feature_flag);
+    }
+  }
+
+  // Build a map of the mechanisms offered by the server.
+  const set<string>& local_mechs = helper_.LocalMechs();
+  set<string> server_mechs;
+  map<string, NegotiatePB::SaslAuth> server_mech_map;
+  for (const NegotiatePB::SaslAuth& auth : response.auths()) {
+    const auto& mech = auth.mechanism();
+    server_mech_map[mech] = auth;
+    server_mechs.insert(mech);
+  }
+  // Determine which server mechs are also enabled by the client.
+  // Cyrus SASL 2.1.25 and later supports doing this set intersection via
+  // the 'client_mech_list' option, but that version is not available on
+  // RHEL 6, so we have to do it manually.
+  set<string> matching_mechs = STLSetIntersection(local_mechs, server_mechs);
+
+  if (matching_mechs.empty() &&
+      ContainsKey(server_mechs, kSaslMechGSSAPI) &&
+      !ContainsKey(local_mechs, kSaslMechGSSAPI)) {
+    return Status::NotAuthorized("server requires GSSAPI (Kerberos) authentication and "
+                                 "client was missing the required SASL module");
+  }
+
+  string matching_mechs_str = JoinElements(matching_mechs, " ");
+  TRACE("SASL Client: Matching mech list: $0", matching_mechs_str);
+
+  const char* init_msg = nullptr;
+  unsigned init_msg_len = 0;
+  const char* negotiated_mech = nullptr;
+
+  /* select a mechanism for a connection
+   *  mechlist      -- mechanisms server has available (punctuation ignored)
+   * output:
+   *  prompt_need   -- on SASL_INTERACT, list of prompts needed to continue
+   *  clientout     -- the initial client response to send to the server
+   *  mech          -- set to mechanism name
+   *
+   * Returns:
+   *  SASL_OK       -- success
+   *  SASL_CONTINUE -- negotiation required
+   *  SASL_NOMEM    -- not enough memory
+   *  SASL_NOMECH   -- no mechanism meets requested properties
+   *  SASL_INTERACT -- user interaction needed to fill in prompt_need list
+   */
+  TRACE("SASL Client: Calling sasl_client_start()");
+  Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
+      return sasl_client_start(
+          sasl_conn_.get(),           // The SASL connection context created by init()
+          matching_mechs_str.c_str(), // The list of mechanisms to negotiate.
+          nullptr,                    // Disables INTERACT return if NULL.
+          &init_msg,                  // Filled in on success.
+          &init_msg_len,              // Filled in on success.
+          &negotiated_mech);          // Filled in on success.
+    });
+
+  if (s.ok()) {
+    nego_ok_ = true;
+  } else if (!s.IsIncomplete()) {
+    return s;
+  }
+
+  // The server matched one of our mechanisms.
+  NegotiatePB::SaslAuth* auth = FindOrNull(server_mech_map, negotiated_mech);
+  if (PREDICT_FALSE(auth == nullptr)) {
+    return Status::IllegalState("Unable to find auth in map, unexpected error", negotiated_mech);
+  }
+  negotiated_mech_ = SaslMechanism::value_of(negotiated_mech);
+
+  RETURN_NOT_OK(SendInitiateMessage(*auth, init_msg, init_msg_len));
+  return Status::OK();
+}
+
+Status SaslClient::HandleChallengeResponse(const NegotiatePB& response) {
+  TRACE("SASL Client: Received SASL_CHALLENGE response from server");
+  if (PREDICT_FALSE(nego_ok_)) {
+    LOG(DFATAL) << "Server sent SASL_CHALLENGE response after client library returned SASL_OK";
+  }
+
+  if (PREDICT_FALSE(!response.has_token())) {
+    return Status::InvalidArgument("No token in SASL_CHALLENGE response from server");
+  }
+
+  const char* out = nullptr;
+  unsigned out_len = 0;
+  Status s = DoSaslStep(response.token(), &out, &out_len);
+  if (!s.ok() && !s.IsIncomplete()) {
+    return s;
+  }
+  RETURN_NOT_OK(SendResponseMessage(out, out_len));
+  return Status::OK();
+}
+
+Status SaslClient::HandleSuccessResponse(const NegotiatePB& response) {
+  TRACE("SASL Client: Received SASL_SUCCESS response from server");
+  if (!nego_ok_) {
+    const char* out = nullptr;
+    unsigned out_len = 0;
+    Status s = DoSaslStep(response.token(), &out, &out_len);
+    if (s.IsIncomplete()) {
+      return Status::IllegalState("Server indicated successful authentication, but client "
+                                  "was not complete");
+    }
+    RETURN_NOT_OK(s);
+    if (out_len > 0) {
+      return Status::IllegalState("SASL client library generated spurious token after SASL_SUCCESS",
+          string(out, out_len));
+    }
+    CHECK(nego_ok_);
+  }
+  return Status::OK();
+}
+
+// Parse error status message from raw bytes of an ErrorStatusPB.
+Status SaslClient::ParseError(const Slice& err_data) {
+  ErrorStatusPB error;
+  if (!error.ParseFromArray(err_data.data(), err_data.size())) {
+    return Status::IOError("Invalid error response, missing fields",
+        error.InitializationErrorString());
+  }
+  Status s = StatusFromRpcError(error);
+  TRACE("SASL Client: Received error response from server: $0", s.ToString());
+  return s;
+}
+
+int SaslClient::GetOptionCb(const char* plugin_name, const char* option,
+                            const char** result, unsigned* len) {
+  return helper_.GetOptionCb(plugin_name, option, result, len);
+}
+
+// Used for PLAIN.
+// SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
+int SaslClient::SimpleCb(int id, const char** result, unsigned* len) {
+  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+    LOG(DFATAL) << "SASL Client: Simple callback called, but PLAIN auth is not enabled";
+    return SASL_FAIL;
+  }
+  if (PREDICT_FALSE(result == nullptr)) {
+    LOG(DFATAL) << "SASL Client: result outparam is NULL";
+    return SASL_BADPARAM;
+  }
+  switch (id) {
+    // TODO(unknown): Support impersonation?
+    // For impersonation, USER is the impersonated user, AUTHNAME is the "sudoer".
+    case SASL_CB_USER:
+      TRACE("SASL Client: callback for SASL_CB_USER");
+      *result = plain_auth_user_.c_str();
+      if (len != nullptr) *len = plain_auth_user_.length();
+      break;
+    case SASL_CB_AUTHNAME:
+      TRACE("SASL Client: callback for SASL_CB_AUTHNAME");
+      *result = plain_auth_user_.c_str();
+      if (len != nullptr) *len = plain_auth_user_.length();
+      break;
+    case SASL_CB_LANGUAGE:
+      LOG(DFATAL) << "SASL Client: Unable to handle SASL callback type SASL_CB_LANGUAGE"
+        << "(" << id << ")";
+      return SASL_BADPARAM;
+    default:
+      LOG(DFATAL) << "SASL Client: Unexpected SASL callback type: " << id;
+      return SASL_BADPARAM;
+  }
+
+  return SASL_OK;
+}
+
+// Used for PLAIN.
+// SASL callback for SASL_CB_PASS: User password.
+int SaslClient::SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret) {
+  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+    LOG(DFATAL) << "SASL Client: Plain secret callback called, but PLAIN auth is not enabled";
+    return SASL_FAIL;
+  }
+  switch (id) {
+    case SASL_CB_PASS: {
+      if (!conn || !psecret) return SASL_BADPARAM;
+
+      size_t len = plain_pass_.length();
+      *psecret = reinterpret_cast<sasl_secret_t*>(malloc(sizeof(sasl_secret_t) + len));
+      if (!*psecret) {
+        return SASL_NOMEM;
+      }
+      psecret_.reset(*psecret);  // Ensure that we free() this structure later.
+      (*psecret)->len = len;
+      memcpy((*psecret)->data, plain_pass_.c_str(), len + 1);
+      break;
+    }
+    default:
+      LOG(DFATAL) << "SASL Client: Unexpected SASL callback type: " << id;
+      return SASL_BADPARAM;
+  }
+
+  return SASL_OK;
+}
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/client_negotiation.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/client_negotiation.h b/src/kudu/rpc/client_negotiation.h
new file mode 100644
index 0000000..2f01e56
--- /dev/null
+++ b/src/kudu/rpc/client_negotiation.h
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_SASL_CLIENT_H
+#define KUDU_RPC_SASL_CLIENT_H
+
+#include <set>
+#include <string>
+#include <vector>
+
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+namespace rpc {
+
+using std::string;
+
+class NegotiatePB;
+class NegotiatePB_SaslAuth;
+class ResponseHeader;
+
+// Class for doing SASL negotiation with a SaslServer over a bidirectional socket.
+// Operations on this class are NOT thread-safe.
+class SaslClient {
+ public:
+  // Does not take ownership of the socket indicated by the fd.
+  SaslClient(string app_name, Socket* socket);
+
+  // Enable PLAIN authentication.
+  // Must be called after Init().
+  Status EnablePlain(const string& user, const string& pass);
+
+  // Enable GSSAPI authentication.
+  // Call after Init().
+  Status EnableGSSAPI();
+
+  // Returns mechanism negotiated by this connection.
+  // Must be called after Negotiate().
+  SaslMechanism::Type negotiated_mechanism() const;
+
+  // Returns the set of RPC system features supported by the remote server.
+  // Must be called after Negotiate().
+  const std::set<RpcFeatureFlag>& server_features() const {
+    return server_features_;
+  }
+
+  // Specify IP:port of local side of connection.
+  // Must be called before Init(). Required for some mechanisms.
+  void set_local_addr(const Sockaddr& addr);
+
+  // Specify IP:port of remote side of connection.
+  // Must be called before Init(). Required for some mechanisms.
+  void set_remote_addr(const Sockaddr& addr);
+
+  // Specify the fully-qualified domain name of the remote server.
+  // Must be called before Init(). Required for some mechanisms.
+  void set_server_fqdn(const string& domain_name);
+
+  // Set deadline for connection negotiation.
+  void set_deadline(const MonoTime& deadline);
+
+  // Get deadline for connection negotiation.
+  const MonoTime& deadline() const { return deadline_; }
+
+  // Initialize a new SASL client. Must be called before Negotiate().
+  // Returns OK on success, otherwise RuntimeError.
+  Status Init(const string& service_type);
+
+  // Begin negotiation with the SASL server on the other side of the fd socket
+  // that this client was constructed with.
+  // Returns OK on success.
+  // Otherwise, it may return NotAuthorized, NotSupported, or another non-OK status.
+  Status Negotiate();
+
+  // SASL callback for plugin options, supported mechanisms, etc.
+  // Returns SASL_FAIL if the option is not handled, which does not fail the handshake.
+  int GetOptionCb(const char* plugin_name, const char* option,
+                  const char** result, unsigned* len);
+
+  // SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
+  int SimpleCb(int id, const char** result, unsigned* len);
+
+  // SASL callback for SASL_CB_PASS
+  int SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret);
+
+ private:
+  // Encode and send the specified negotiate message to the server.
+  Status SendNegotiatePB(const NegotiatePB& msg);
+
+  // Validate that header does not indicate an error, parse param_buf into response.
+  Status ParseNegotiatePB(const ResponseHeader& header,
+                          const Slice& param_buf,
+                          NegotiatePB* response);
+
+  // Send an NEGOTIATE message to the server.
+  Status SendNegotiateMessage();
+
+  // Send an SASL_INITIATE message to the server.
+  Status SendInitiateMessage(const NegotiatePB_SaslAuth& auth,
+                             const char* init_msg, unsigned init_msg_len);
+
+  // Send a RESPONSE message to the server.
+  Status SendResponseMessage(const char* resp_msg, unsigned resp_msg_len);
+
+  // Perform a client-side step of the SASL negotiation.
+  // Input is what came from the server. Output is what we will send back to the server.
+  // Returns:
+  //   Status::OK if sasl_client_step returns SASL_OK.
+  //   Status::Incomplete if sasl_client_step returns SASL_CONTINUE
+  // otherwise returns an appropriate error status.
+  Status DoSaslStep(const string& in, const char** out, unsigned* out_len);
+
+  // Handle case when server sends NEGOTIATE response.
+  Status HandleNegotiateResponse(const NegotiatePB& response);
+
+  // Handle case when server sends CHALLENGE response.
+  Status HandleChallengeResponse(const NegotiatePB& response);
+
+  // Handle case when server sends SUCCESS response.
+  Status HandleSuccessResponse(const NegotiatePB& response);
+
+  // Parse error status message from raw bytes of an ErrorStatusPB.
+  Status ParseError(const Slice& err_data);
+
+  string app_name_;
+  Socket* sock_;
+  std::vector<sasl_callback_t> callbacks_;
+  // The SASL connection object. This is initialized in Init() and
+  // freed after Negotiate() completes (regardless whether it was successful).
+  gscoped_ptr<sasl_conn_t, SaslDeleter> sasl_conn_;
+  SaslHelper helper_;
+
+  string plain_auth_user_;
+  string plain_pass_;
+  gscoped_ptr<sasl_secret_t, FreeDeleter> psecret_;
+
+  // The set of features supported by the server.
+  std::set<RpcFeatureFlag> server_features_;
+
+  SaslNegotiationState::Type client_state_;
+
+  // The mechanism we negotiated with the server.
+  SaslMechanism::Type negotiated_mech_;
+
+  // Intra-negotiation state.
+  bool nego_ok_;  // During negotiation: did we get a SASL_OK response from the SASL library?
+  bool nego_response_expected_;  // During negotiation: Are we waiting for a server response?
+
+  // Negotiation timeout deadline.
+  MonoTime deadline_;
+
+  DISALLOW_COPY_AND_ASSIGN(SaslClient);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif  // KUDU_RPC_SASL_CLIENT_H

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/connection.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index c7a87e9..2abf6ca 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -30,14 +30,13 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/client_negotiation.h"
 #include "kudu/rpc/constants.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/reactor.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
-#include "kudu/rpc/sasl_client.h"
-#include "kudu/rpc/sasl_server.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/security/ssl_factory.h"
 #include "kudu/security/ssl_socket.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/connection.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/connection.h b/src/kudu/rpc/connection.h
index bb2aa94..ef70483 100644
--- a/src/kudu/rpc/connection.h
+++ b/src/kudu/rpc/connection.h
@@ -30,10 +30,10 @@
 
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
-#include "kudu/rpc/outbound_call.h"
-#include "kudu/rpc/sasl_client.h"
-#include "kudu/rpc/sasl_server.h"
+#include "kudu/rpc/client_negotiation.h"
 #include "kudu/rpc/inbound_call.h"
+#include "kudu/rpc/outbound_call.h"
+#include "kudu/rpc/server_negotiation.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/negotiation-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation-test.cc b/src/kudu/rpc/negotiation-test.cc
new file mode 100644
index 0000000..2bb73e4
--- /dev/null
+++ b/src/kudu/rpc/negotiation-test.cc
@@ -0,0 +1,567 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/rpc-test-base.h"
+
+#include <stdlib.h>
+#include <sys/stat.h>
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include <gtest/gtest.h>
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/client_negotiation.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/server_negotiation.h"
+#include "kudu/security/test/mini_kdc.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/subprocess.h"
+
+using std::string;
+using std::thread;
+
+// HACK: MIT Kerberos doesn't have any way of determining its version number,
+// but the error messages in krb5-1.10 and earlier are broken due to
+// a bug: http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973
+//
+// Since we don't have any way to explicitly figure out the version, we just
+// look for this random macro which was added in 1.11 (the same version in which
+// the above bug was fixed).
+#ifndef KRB5_RESPONDER_QUESTION_PASSWORD
+#define KRB5_VERSION_LE_1_10
+#endif
+
+DEFINE_bool(is_test_child, false,
+            "Used by tests which require clean processes. "
+            "See TestDisableInit.");
+
+namespace kudu {
+namespace rpc {
+
+class TestSaslRpc : public RpcTestBase {
+ public:
+  virtual void SetUp() OVERRIDE {
+    RpcTestBase::SetUp();
+    ASSERT_OK(SaslInit(kSaslAppName));
+  }
+};
+
+// Test basic initialization of the objects.
+TEST_F(TestSaslRpc, TestBasicInit) {
+  SaslServer server(kSaslAppName, nullptr);
+  server.EnablePlain();
+  ASSERT_OK(server.Init(kSaslAppName));
+  SaslClient client(kSaslAppName, nullptr);
+  client.EnablePlain("test", "test");
+  ASSERT_OK(client.Init(kSaslAppName));
+}
+
+// A "Callable" that takes a Socket* param, for use with starting a thread.
+// Can be used for SaslServer or SaslClient threads.
+typedef std::function<void(Socket*)> SocketCallable;
+
+// Call Accept() on the socket, then pass the connection to the server runner
+static void RunAcceptingDelegator(Socket* acceptor,
+                                  const SocketCallable& server_runner) {
+  Socket conn;
+  Sockaddr remote;
+  CHECK_OK(acceptor->Accept(&conn, &remote, 0));
+  server_runner(&conn);
+}
+
+// Set up a socket and run a SASL negotiation.
+static void RunNegotiationTest(const SocketCallable& server_runner,
+                               const SocketCallable& client_runner) {
+  Socket server_sock;
+  CHECK_OK(server_sock.Init(0));
+  ASSERT_OK(server_sock.BindAndListen(Sockaddr(), 1));
+  Sockaddr server_bind_addr;
+  ASSERT_OK(server_sock.GetSocketAddress(&server_bind_addr));
+  thread server(RunAcceptingDelegator, &server_sock, server_runner);
+
+  Socket client_sock;
+  CHECK_OK(client_sock.Init(0));
+  ASSERT_OK(client_sock.Connect(server_bind_addr));
+  thread client(client_runner, &client_sock);
+
+  LOG(INFO) << "Waiting for test threads to terminate...";
+  client.join();
+  LOG(INFO) << "Client thread terminated.";
+
+  // TODO(todd): if the client fails to negotiate, it doesn't
+  // always result in sending a nice error message to the
+  // other side.
+  client_sock.Close();
+
+  server.join();
+  LOG(INFO) << "Server thread terminated.";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void RunPlainNegotiationServer(Socket* conn) {
+  SaslServer sasl_server(kSaslAppName, conn);
+  CHECK_OK(sasl_server.EnablePlain());
+  CHECK_OK(sasl_server.Init(kSaslAppName));
+  CHECK_OK(sasl_server.Negotiate());
+  CHECK(ContainsKey(sasl_server.client_features(), APPLICATION_FEATURE_FLAGS));
+  CHECK_EQ("my-username", sasl_server.authenticated_user());
+}
+
+static void RunPlainNegotiationClient(Socket* conn) {
+  SaslClient sasl_client(kSaslAppName, conn);
+  CHECK_OK(sasl_client.EnablePlain("my-username", "ignored password"));
+  CHECK_OK(sasl_client.Init(kSaslAppName));
+  CHECK_OK(sasl_client.Negotiate());
+  CHECK(ContainsKey(sasl_client.server_features(), APPLICATION_FEATURE_FLAGS));
+}
+
+// Test SASL negotiation using the PLAIN mechanism over a socket.
+TEST_F(TestSaslRpc, TestPlainNegotiation) {
+  RunNegotiationTest(RunPlainNegotiationServer, RunPlainNegotiationClient);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+
+template<class T>
+using CheckerFunction = std::function<void(const Status&, T&)>;
+
+// Run GSSAPI negotiation from the server side. Runs
+// 'post_check' after negotiation to verify the result.
+static void RunGSSAPINegotiationServer(
+    Socket* conn,
+    const CheckerFunction<SaslServer>& post_check) {
+  SaslServer sasl_server(kSaslAppName, conn);
+  sasl_server.set_server_fqdn("127.0.0.1");
+  CHECK_OK(sasl_server.EnableGSSAPI());
+  CHECK_OK(sasl_server.Init(kSaslAppName));
+  post_check(sasl_server.Negotiate(), sasl_server);
+}
+
+// Run GSSAPI negotiation from the client side. Runs
+// 'post_check' after negotiation to verify the result.
+static void RunGSSAPINegotiationClient(
+    Socket* conn,
+    const CheckerFunction<SaslClient>& post_check) {
+  SaslClient sasl_client(kSaslAppName, conn);
+  sasl_client.set_server_fqdn("127.0.0.1");
+  CHECK_OK(sasl_client.EnableGSSAPI());
+  CHECK_OK(sasl_client.Init(kSaslAppName));
+  post_check(sasl_client.Negotiate(), sasl_client);
+}
+
+// Test configuring a client to allow but not require Kerberos/GSSAPI,
+// and connect to a server which requires Kerberos/GSSAPI.
+//
+// They should negotiate to use Kerberos/GSSAPI.
+TEST_F(TestSaslRpc, TestRestrictiveServer_NonRestrictiveClient) {
+  MiniKdc kdc;
+  ASSERT_OK(kdc.Start());
+
+  // Create the server principal and keytab.
+  string kt_path;
+  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  // Create and kinit as a client user.
+  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
+  ASSERT_OK(kdc.Kinit("testuser"));
+  ASSERT_OK(kdc.SetKrb5Environment());
+
+  // Authentication should now succeed on both sides.
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, SaslServer& server) {
+                  CHECK_OK(s);
+                  CHECK_EQ(SaslMechanism::GSSAPI, server.negotiated_mechanism());
+                  CHECK_EQ("testuser", server.authenticated_user());
+                }),
+      [](Socket* conn) {
+        SaslClient sasl_client(kSaslAppName, conn);
+        sasl_client.set_server_fqdn("127.0.0.1");
+        // The client enables both PLAIN and GSSAPI.
+        CHECK_OK(sasl_client.EnablePlain("foo", "bar"));
+        CHECK_OK(sasl_client.EnableGSSAPI());
+        CHECK_OK(sasl_client.Init(kSaslAppName));
+        CHECK_OK(sasl_client.Negotiate());
+        CHECK_EQ(SaslMechanism::GSSAPI, sasl_client.negotiated_mechanism());
+      });
+}
+
+// Test configuring a client to only support PLAIN, and a server which
+// only supports GSSAPI. This would happen, for example, if an old Kudu
+// client tries to talk to a secure-only cluster.
+TEST_F(TestSaslRpc, TestNoMatchingMechanisms) {
+  MiniKdc kdc;
+  ASSERT_OK(kdc.Start());
+
+  // Create the server principal and keytab.
+  string kt_path;
+  ASSERT_OK(kdc.CreateServiceKeytab("kudu/localhost", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, SaslServer& server) {
+                  // The client fails to find a matching mechanism and
+                  // doesn't send any failure message to the server.
+                  // Instead, it just disconnects.
+                  //
+                  // TODO(todd): this could produce a better message!
+                  ASSERT_STR_CONTAINS(s.ToString(), "got EOF from remote");
+                }),
+      [](Socket* conn) {
+        SaslClient sasl_client(kSaslAppName, conn);
+        sasl_client.set_server_fqdn("127.0.0.1");
+        // The client enables both PLAIN and GSSAPI.
+        CHECK_OK(sasl_client.EnablePlain("foo", "bar"));
+        CHECK_OK(sasl_client.Init(kSaslAppName));
+        Status s = sasl_client.Negotiate();
+        ASSERT_STR_CONTAINS(s.ToString(), "client was missing the required SASL module");
+      });
+}
+
+// Test SASL negotiation using the GSSAPI (kerberos) mechanism over a socket.
+TEST_F(TestSaslRpc, TestGSSAPINegotiation) {
+  MiniKdc kdc;
+  ASSERT_OK(kdc.Start());
+
+  // Create the server principal and keytab.
+  string kt_path;
+  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  // Create and kinit as a client user.
+  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
+  ASSERT_OK(kdc.Kinit("testuser"));
+  ASSERT_OK(kdc.SetKrb5Environment());
+
+  // Authentication should succeed on both sides.
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, SaslServer& server) {
+                  CHECK_OK(s);
+                  CHECK_EQ(SaslMechanism::GSSAPI, server.negotiated_mechanism());
+                  CHECK_EQ("testuser", server.authenticated_user());
+                }),
+      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+                [](const Status& s, SaslClient& client) {
+                  CHECK_OK(s);
+                  CHECK_EQ(SaslMechanism::GSSAPI, client.negotiated_mechanism());
+                }));
+}
+
+#ifndef __APPLE__
+// Test invalid SASL negotiations using the GSSAPI (kerberos) mechanism over a socket.
+// This test is ignored on macOS because the system Kerberos implementation
+// (Heimdal) caches the non-existence of client credentials, which causes futher
+// tests to fail.
+TEST_F(TestSaslRpc, TestGSSAPIInvalidNegotiation) {
+  MiniKdc kdc;
+  ASSERT_OK(kdc.Start());
+
+  // Try to negotiate with no krb5 credentials on either side. It should fail on both
+  // sides.
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, SaslServer& server) {
+                  // The client notices there are no credentials and
+                  // doesn't send any failure message to the server.
+                  // Instead, it just disconnects.
+                  //
+                  // TODO(todd): it might be preferable to have the server
+                  // fail to start if it has no valid keytab.
+                  CHECK(s.IsNetworkError());
+                }),
+      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+                [](const Status& s, SaslClient& client) {
+                  CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+                  CHECK_GT(s.ToString().find("No Kerberos credentials available"), 0);
+#endif
+                }));
+
+
+  // Create the server principal and keytab.
+  string kt_path;
+  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  // Try to negotiate with no krb5 credentials on the client. It should fail on both
+  // sides.
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, SaslServer& server) {
+                  // The client notices there are no credentials and
+                  // doesn't send any failure message to the server.
+                  // Instead, it just disconnects.
+                  CHECK(s.IsNetworkError());
+                }),
+      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+                [](const Status& s, SaslClient& client) {
+                  CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+                  CHECK_EQ(s.message().ToString(), "No Kerberos credentials available");
+#endif
+                }));
+
+  // Create and kinit as a client user.
+  ASSERT_OK(kdc.CreateUserPrincipal("testuser"));
+  ASSERT_OK(kdc.Kinit("testuser"));
+  ASSERT_OK(kdc.SetKrb5Environment());
+
+  // Change the server's keytab file so that it has inappropriate
+  // credentials.
+  // Authentication should now fail.
+  ASSERT_OK(kdc.CreateServiceKeytab("otherservice/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  RunNegotiationTest(
+      std::bind(RunGSSAPINegotiationServer, std::placeholders::_1,
+                [](const Status& s, SaslServer& server) {
+                  CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+                  ASSERT_STR_CONTAINS(s.ToString(),
+                                      "No key table entry found matching kudu/127.0.0.1");
+#endif
+                }),
+      std::bind(RunGSSAPINegotiationClient, std::placeholders::_1,
+                [](const Status& s, SaslClient& client) {
+                  CHECK(s.IsNotAuthorized());
+#ifndef KRB5_VERSION_LE_1_10
+                  ASSERT_STR_CONTAINS(s.ToString(),
+                                      "No key table entry found matching kudu/127.0.0.1");
+#endif
+                }));
+}
+#endif
+
+#ifndef __APPLE__
+// Test that the pre-flight check for servers requiring Kerberos provides
+// nice error messages for missing or bad keytabs.
+//
+// This is ignored on macOS because the system Kerberos implementation does not
+// fail the preflight check when the keytab is inaccessible, probably because
+// the preflight check passes a 0-length token.
+TEST_F(TestSaslRpc, TestPreflight) {
+  // Try pre-flight with no keytab.
+  Status s = SaslServer::PreflightCheckGSSAPI(kSaslAppName);
+  ASSERT_FALSE(s.ok());
+#ifndef KRB5_VERSION_LE_1_10
+  ASSERT_STR_MATCHES(s.ToString(), "Key table file.*not found");
+#endif
+  // Try with a valid krb5 environment and keytab.
+  MiniKdc kdc;
+  ASSERT_OK(kdc.Start());
+  ASSERT_OK(kdc.SetKrb5Environment());
+  string kt_path;
+  ASSERT_OK(kdc.CreateServiceKeytab("kudu/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+
+  ASSERT_OK(SaslServer::PreflightCheckGSSAPI(kSaslAppName));
+
+  // Try with an inaccessible keytab.
+  CHECK_ERR(chmod(kt_path.c_str(), 0000));
+  s = SaslServer::PreflightCheckGSSAPI(kSaslAppName);
+  ASSERT_FALSE(s.ok());
+#ifndef KRB5_VERSION_LE_1_10
+  ASSERT_STR_MATCHES(s.ToString(), "error accessing keytab: Permission denied");
+#endif
+  CHECK_ERR(unlink(kt_path.c_str()));
+
+  // Try with a keytab that has the wrong credentials.
+  ASSERT_OK(kdc.CreateServiceKeytab("wrong-service/127.0.0.1", &kt_path));
+  CHECK_ERR(setenv("KRB5_KTNAME", kt_path.c_str(), 1 /*replace*/));
+  s = SaslServer::PreflightCheckGSSAPI(kSaslAppName);
+  ASSERT_FALSE(s.ok());
+#ifndef KRB5_VERSION_LE_1_10
+  ASSERT_STR_MATCHES(s.ToString(), "No key table entry found matching kudu/.*");
+#endif
+}
+#endif
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void RunTimeoutExpectingServer(Socket* conn) {
+  SaslServer sasl_server(kSaslAppName, conn);
+  CHECK_OK(sasl_server.EnablePlain());
+  CHECK_OK(sasl_server.Init(kSaslAppName));
+  Status s = sasl_server.Negotiate();
+  ASSERT_TRUE(s.IsNetworkError()) << "Expected client to time out and close the connection. Got: "
+      << s.ToString();
+}
+
+static void RunTimeoutNegotiationClient(Socket* sock) {
+  SaslClient sasl_client(kSaslAppName, sock);
+  CHECK_OK(sasl_client.EnablePlain("test", "test"));
+  CHECK_OK(sasl_client.Init(kSaslAppName));
+  MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
+  sasl_client.set_deadline(deadline);
+  Status s = sasl_client.Negotiate();
+  ASSERT_TRUE(s.IsTimedOut()) << "Expected timeout! Got: " << s.ToString();
+  CHECK_OK(sock->Shutdown(true, true));
+}
+
+// Ensure that the client times out.
+TEST_F(TestSaslRpc, TestClientTimeout) {
+  RunNegotiationTest(RunTimeoutExpectingServer, RunTimeoutNegotiationClient);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+static void RunTimeoutNegotiationServer(Socket* sock) {
+  SaslServer sasl_server(kSaslAppName, sock);
+  CHECK_OK(sasl_server.EnablePlain());
+  CHECK_OK(sasl_server.Init(kSaslAppName));
+  MonoTime deadline = MonoTime::Now() - MonoDelta::FromMilliseconds(100L);
+  sasl_server.set_deadline(deadline);
+  Status s = sasl_server.Negotiate();
+  ASSERT_TRUE(s.IsTimedOut()) << "Expected timeout! Got: " << s.ToString();
+  CHECK_OK(sock->Close());
+}
+
+static void RunTimeoutExpectingClient(Socket* conn) {
+  SaslClient sasl_client(kSaslAppName, conn);
+  CHECK_OK(sasl_client.EnablePlain("test", "test"));
+  CHECK_OK(sasl_client.Init(kSaslAppName));
+  Status s = sasl_client.Negotiate();
+  ASSERT_TRUE(s.IsNetworkError()) << "Expected server to time out and close the connection. Got: "
+      << s.ToString();
+}
+
+// Ensure that the server times out.
+TEST_F(TestSaslRpc, TestServerTimeout) {
+  RunNegotiationTest(RunTimeoutNegotiationServer, RunTimeoutExpectingClient);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// This suite of tests ensure that applications that embed the Kudu client are
+// able to externally handle the initialization of SASL. See KUDU-1749 and
+// IMPALA-4497 for context.
+//
+// The tests are a bit tricky because the initialization of SASL is static state
+// that we can't easily clear/reset between test cases. So, each test invokes
+// itself as a subprocess with the appropriate --gtest_filter line as well as a
+// special flag to indicate that it is the test child running.
+class TestDisableInit : public KuduTest {
+ protected:
+  // Run the lambda 'f' in a newly-started process, capturing its stderr
+  // into 'stderr'.
+  template<class TestFunc>
+  void DoTest(const TestFunc& f, string* stderr = nullptr) {
+    if (FLAGS_is_test_child) {
+      f();
+      return;
+    }
+
+    // Invoke the currently-running test case in a new subprocess.
+    string filter_flag = strings::Substitute("--gtest_filter=$0.$1",
+                                             CURRENT_TEST_CASE_NAME(), CURRENT_TEST_NAME());
+    string executable_path;
+    CHECK_OK(env_->GetExecutablePath(&executable_path));
+    string stdout;
+    Status s = Subprocess::Call({ executable_path, "test", filter_flag, "--is_test_child" },
+                                "" /* stdin */,
+                                &stdout,
+                                stderr);
+    ASSERT_TRUE(s.ok()) << "Test failed: " << stdout;
+  }
+};
+
+// Test disabling SASL but not actually properly initializing it before usage.
+TEST_F(TestDisableInit, TestDisableSasl_NotInitialized) {
+  DoTest([]() {
+      CHECK_OK(DisableSaslInitialization());
+      Status s = SaslInit("kudu");
+      ASSERT_STR_CONTAINS(s.ToString(), "was disabled, but SASL was not externally initialized");
+    });
+}
+
+// Test disabling SASL with proper initialization by some other app.
+TEST_F(TestDisableInit, TestDisableSasl_Good) {
+  DoTest([]() {
+      rpc::internal::SaslSetMutex();
+      sasl_client_init(NULL);
+      CHECK_OK(DisableSaslInitialization());
+      ASSERT_OK(SaslInit("kudu"));
+    });
+}
+
+// Test a client which inits SASL itself but doesn't remember to disable Kudu's
+// SASL initialization.
+TEST_F(TestDisableInit, TestMultipleSaslInit) {
+  string stderr;
+  DoTest([]() {
+      rpc::internal::SaslSetMutex();
+      sasl_client_init(NULL);
+      ASSERT_OK(SaslInit("kudu"));
+    }, &stderr);
+  // If we are the parent, we should see the warning from the child that it automatically
+  // skipped initialization because it detected that it was already initialized.
+  if (!FLAGS_is_test_child) {
+    ASSERT_STR_CONTAINS(stderr, "Skipping initialization");
+  }
+}
+
+// We are not able to detect mutexes not being set with the macOS version of libsasl.
+#ifndef __APPLE__
+// Test disabling SASL but not remembering to initialize the SASL mutex support. This
+// should succeed but generate a warning.
+TEST_F(TestDisableInit, TestDisableSasl_NoMutexImpl) {
+  string stderr;
+  DoTest([]() {
+      sasl_client_init(NULL);
+      CHECK_OK(DisableSaslInitialization());
+      ASSERT_OK(SaslInit("kudu"));
+    }, &stderr);
+  // If we are the parent, we should see the warning from the child.
+  if (!FLAGS_is_test_child) {
+    ASSERT_STR_CONTAINS(stderr, "not provided with a mutex implementation");
+  }
+}
+
+// Test a client which inits SASL itself but doesn't remember to disable Kudu's
+// SASL initialization.
+TEST_F(TestDisableInit, TestMultipleSaslInit_NoMutexImpl) {
+  string stderr;
+  DoTest([]() {
+      sasl_client_init(NULL);
+      ASSERT_OK(SaslInit("kudu"));
+    }, &stderr);
+  // If we are the parent, we should see the warning from the child that it automatically
+  // skipped initialization because it detected that it was already initialized.
+  if (!FLAGS_is_test_child) {
+    ASSERT_STR_CONTAINS(stderr, "Skipping initialization");
+    ASSERT_STR_CONTAINS(stderr, "not provided with a mutex implementation");
+  }
+}
+#endif
+
+} // namespace rpc
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/negotiation.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/negotiation.cc b/src/kudu/rpc/negotiation.cc
index e42a884..859016c 100644
--- a/src/kudu/rpc/negotiation.cc
+++ b/src/kudu/rpc/negotiation.cc
@@ -28,12 +28,12 @@
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/rpc/blocking_ops.h"
+#include "kudu/rpc/client_negotiation.h"
 #include "kudu/rpc/connection.h"
 #include "kudu/rpc/reactor.h"
 #include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/sasl_client.h"
 #include "kudu/rpc/sasl_common.h"
-#include "kudu/rpc/sasl_server.h"
+#include "kudu/rpc/server_negotiation.h"
 #include "kudu/util/errno.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/status.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/reactor.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/reactor.cc b/src/kudu/rpc/reactor.cc
index 0b2b19b..f178a2d 100644
--- a/src/kudu/rpc/reactor.cc
+++ b/src/kudu/rpc/reactor.cc
@@ -31,13 +31,13 @@
 
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
+#include "kudu/rpc/client_negotiation.h"
 #include "kudu/rpc/connection.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/negotiation.h"
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_introspection.pb.h"
-#include "kudu/rpc/sasl_client.h"
-#include "kudu/rpc/sasl_server.h"
+#include "kudu/rpc/server_negotiation.h"
 #include "kudu/rpc/transfer.h"
 #include "kudu/security/ssl_factory.h"
 #include "kudu/security/ssl_socket.h"

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/sasl_client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_client.cc b/src/kudu/rpc/sasl_client.cc
deleted file mode 100644
index c88e3cc..0000000
--- a/src/kudu/rpc/sasl_client.cc
+++ /dev/null
@@ -1,521 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "kudu/rpc/sasl_client.h"
-
-#include <string.h>
-
-#include <map>
-#include <set>
-#include <string>
-
-#include <glog/logging.h>
-#include <sasl/sasl.h>
-
-#include "kudu/gutil/endian.h"
-#include "kudu/gutil/map-util.h"
-#include "kudu/gutil/stl_util.h"
-#include "kudu/gutil/stringprintf.h"
-#include "kudu/gutil/strings/join.h"
-#include "kudu/gutil/strings/substitute.h"
-#include "kudu/gutil/strings/util.h"
-#include "kudu/rpc/blocking_ops.h"
-#include "kudu/rpc/constants.h"
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/sasl_common.h"
-#include "kudu/rpc/sasl_helper.h"
-#include "kudu/rpc/serialization.h"
-#include "kudu/util/faststring.h"
-#include "kudu/util/net/sockaddr.h"
-#include "kudu/util/net/socket.h"
-#include "kudu/util/scoped_cleanup.h"
-#include "kudu/util/trace.h"
-
-namespace kudu {
-namespace rpc {
-
-using std::map;
-using std::set;
-using std::string;
-
-static int SaslClientGetoptCb(void* sasl_client, const char* plugin_name, const char* option,
-                       const char** result, unsigned* len) {
-  return static_cast<SaslClient*>(sasl_client)
-    ->GetOptionCb(plugin_name, option, result, len);
-}
-
-static int SaslClientSimpleCb(void *sasl_client, int id,
-                       const char **result, unsigned *len) {
-  return static_cast<SaslClient*>(sasl_client)->SimpleCb(id, result, len);
-}
-
-static int SaslClientSecretCb(sasl_conn_t* conn, void *sasl_client, int id,
-                       sasl_secret_t** psecret) {
-  return static_cast<SaslClient*>(sasl_client)->SecretCb(conn, id, psecret);
-}
-
-// Return an appropriately-typed Status object based on an ErrorStatusPB returned
-// from an Error RPC.
-// In case there is no relevant Status type, return a RuntimeError.
-static Status StatusFromRpcError(const ErrorStatusPB& error) {
-  DCHECK(error.IsInitialized()) << "Error status PB must be initialized";
-  if (PREDICT_FALSE(!error.has_code())) {
-    return Status::RuntimeError(error.message());
-  }
-  string code_name = ErrorStatusPB::RpcErrorCodePB_Name(error.code());
-  switch (error.code()) {
-    case ErrorStatusPB_RpcErrorCodePB_FATAL_UNAUTHORIZED:
-      return Status::NotAuthorized(code_name, error.message());
-    default:
-      return Status::RuntimeError(code_name, error.message());
-  }
-}
-
-SaslClient::SaslClient(string app_name, Socket* socket)
-    : app_name_(std::move(app_name)),
-      sock_(socket),
-      helper_(SaslHelper::CLIENT),
-      client_state_(SaslNegotiationState::NEW),
-      negotiated_mech_(SaslMechanism::INVALID),
-      deadline_(MonoTime::Max()) {
-  callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
-      reinterpret_cast<int (*)()>(&SaslClientGetoptCb), this));
-  callbacks_.push_back(SaslBuildCallback(SASL_CB_AUTHNAME,
-      reinterpret_cast<int (*)()>(&SaslClientSimpleCb), this));
-  callbacks_.push_back(SaslBuildCallback(SASL_CB_PASS,
-      reinterpret_cast<int (*)()>(&SaslClientSecretCb), this));
-  callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr));
-}
-
-Status SaslClient::EnablePlain(const string& user, const string& pass) {
-  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
-  RETURN_NOT_OK(helper_.EnablePlain());
-  plain_auth_user_ = user;
-  plain_pass_ = pass;
-  return Status::OK();
-}
-
-Status SaslClient::EnableGSSAPI() {
-  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
-  return helper_.EnableGSSAPI();
-}
-
-SaslMechanism::Type SaslClient::negotiated_mechanism() const {
-  DCHECK_EQ(client_state_, SaslNegotiationState::NEGOTIATED);
-  return negotiated_mech_;
-}
-
-void SaslClient::set_local_addr(const Sockaddr& addr) {
-  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
-  helper_.set_local_addr(addr);
-}
-
-void SaslClient::set_remote_addr(const Sockaddr& addr) {
-  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
-  helper_.set_remote_addr(addr);
-}
-
-void SaslClient::set_server_fqdn(const string& domain_name) {
-  DCHECK_EQ(client_state_, SaslNegotiationState::NEW);
-  helper_.set_server_fqdn(domain_name);
-}
-
-void SaslClient::set_deadline(const MonoTime& deadline) {
-  DCHECK_NE(client_state_, SaslNegotiationState::NEGOTIATED);
-  deadline_ = deadline;
-}
-
-// calls sasl_client_init() and sasl_client_new()
-Status SaslClient::Init(const string& service_type) {
-  RETURN_NOT_OK(SaslInit(app_name_.c_str()));
-
-  // Ensure we are not called more than once.
-  if (client_state_ != SaslNegotiationState::NEW) {
-    return Status::IllegalState("Init() may only be called once per SaslClient object.");
-  }
-
-  // TODO: Support security flags.
-  unsigned secflags = 0;
-
-  sasl_conn_t* sasl_conn = nullptr;
-  Status s = WrapSaslCall(nullptr /* no conn */, [&]() {
-      return sasl_client_new(
-          service_type.c_str(),         // Registered name of the service using SASL. Required.
-          helper_.server_fqdn(),        // The fully qualified domain name of the remote server.
-          helper_.local_addr_string(),  // Local and remote IP address strings. (NULL disables
-          helper_.remote_addr_string(), //   mechanisms which require this info.)
-          &callbacks_[0],               // Connection-specific callbacks.
-          secflags,                     // Security flags.
-          &sasl_conn);
-    });
-  if (!s.ok()) {
-    return Status::RuntimeError("Unable to create new SASL client",
-                                s.message());
-  }
-  sasl_conn_.reset(sasl_conn);
-
-  client_state_ = SaslNegotiationState::INITIALIZED;
-  return Status::OK();
-}
-
-Status SaslClient::Negotiate() {
-  // After negotiation, we no longer need the SASL library object, so
-  // may as well free its memory since the connection may be long-lived.
-  // Additionally, this works around a SEGV seen at process shutdown time:
-  // if we still have SASL objects retained by Reactor when the process
-  // is exiting, the SASL libraries may start destructing global state
-  // and cause a crash when we sasl_dispose the connection.
-  auto cleanup = MakeScopedCleanup([&]() {
-      sasl_conn_.reset();
-    });
-  TRACE("Called SaslClient::Negotiate()");
-
-  // Ensure we called exactly once, and in the right order.
-  if (client_state_ == SaslNegotiationState::NEW) {
-    return Status::IllegalState("SaslClient: Init() must be called before calling Negotiate()");
-  } else if (client_state_ == SaslNegotiationState::NEGOTIATED) {
-    return Status::IllegalState("SaslClient: Negotiate() may only be called once per object.");
-  }
-
-  // Ensure we can use blocking calls on the socket during negotiation.
-  RETURN_NOT_OK(EnsureBlockingMode(sock_));
-
-  // Start by asking the server for a list of available auth mechanisms.
-  RETURN_NOT_OK(SendNegotiateMessage());
-
-  faststring recv_buf;
-  nego_ok_ = false;
-
-  // We set nego_ok_ = true when the SASL library returns SASL_OK to us.
-  // We set nego_response_expected_ = true each time we send a request to the server.
-  while (!nego_ok_ || nego_response_expected_) {
-    ResponseHeader header;
-    Slice param_buf;
-    RETURN_NOT_OK(ReceiveFramedMessageBlocking(sock_, &recv_buf, &header, &param_buf, deadline_));
-    nego_response_expected_ = false;
-
-    NegotiatePB response;
-    RETURN_NOT_OK(ParseNegotiatePB(header, param_buf, &response));
-
-    switch (response.step()) {
-      // NEGOTIATE: Server has sent us its list of supported SASL mechanisms.
-      case NegotiatePB::NEGOTIATE:
-        RETURN_NOT_OK(HandleNegotiateResponse(response));
-        break;
-
-      // SASL_CHALLENGE: Server sent us a follow-up to an SASL_INITIATE or SASL_RESPONSE request.
-      case NegotiatePB::SASL_CHALLENGE:
-        RETURN_NOT_OK(HandleChallengeResponse(response));
-        break;
-
-      // SASL_SUCCESS: Server has accepted our authentication request. Negotiation successful.
-      case NegotiatePB::SASL_SUCCESS:
-        RETURN_NOT_OK(HandleSuccessResponse(response));
-        break;
-
-      // Client sent us some unsupported SASL response.
-      default:
-        LOG(ERROR) << "SASL Client: Received unsupported response from server";
-        return Status::InvalidArgument("RPC client doesn't support Negotiate step",
-                                       NegotiatePB::NegotiateStep_Name(response.step()));
-    }
-  }
-
-  TRACE("SASL Client: Successful negotiation");
-  client_state_ = SaslNegotiationState::NEGOTIATED;
-  return Status::OK();
-}
-
-Status SaslClient::SendNegotiatePB(const NegotiatePB& msg) {
-  DCHECK_NE(client_state_, SaslNegotiationState::NEW)
-      << "Must not send Negotiate messages before calling Init()";
-  DCHECK_NE(client_state_, SaslNegotiationState::NEGOTIATED)
-      << "Must not send Negotiate messages after negotiation succeeds";
-
-  // Create header with SASL-specific callId
-  RequestHeader header;
-  header.set_call_id(kNegotiateCallId);
-  return helper_.SendNegotiatePB(sock_, header, msg, deadline_);
-}
-
-Status SaslClient::ParseNegotiatePB(const ResponseHeader& header,
-                                    const Slice& param_buf,
-                                    NegotiatePB* response) {
-  RETURN_NOT_OK(helper_.SanityCheckNegotiateCallId(header.call_id()));
-
-  if (header.is_error()) {
-    return ParseError(param_buf);
-  }
-
-  return helper_.ParseNegotiatePB(param_buf, response);
-}
-
-Status SaslClient::SendNegotiateMessage() {
-  NegotiatePB msg;
-  msg.set_step(NegotiatePB::NEGOTIATE);
-
-  // Advertise our supported features.
-  for (RpcFeatureFlag feature : kSupportedClientRpcFeatureFlags) {
-    msg.add_supported_features(feature);
-  }
-
-  TRACE("SASL Client: Sending NEGOTIATE request to server.");
-  RETURN_NOT_OK(SendNegotiatePB(msg));
-  nego_response_expected_ = true;
-  return Status::OK();
-}
-
-Status SaslClient::SendInitiateMessage(const NegotiatePB_SaslAuth& auth,
-    const char* init_msg, unsigned init_msg_len) {
-  NegotiatePB msg;
-  msg.set_step(NegotiatePB::SASL_INITIATE);
-  msg.mutable_token()->assign(init_msg, init_msg_len);
-  msg.add_auths()->CopyFrom(auth);
-  TRACE("SASL Client: Sending SASL_INITIATE request to server.");
-  RETURN_NOT_OK(SendNegotiatePB(msg));
-  nego_response_expected_ = true;
-  return Status::OK();
-}
-
-Status SaslClient::SendResponseMessage(const char* resp_msg, unsigned resp_msg_len) {
-  NegotiatePB reply;
-  reply.set_step(NegotiatePB::SASL_RESPONSE);
-  reply.mutable_token()->assign(resp_msg, resp_msg_len);
-  TRACE("SASL Client: Sending SASL_RESPONSE request to server.");
-  RETURN_NOT_OK(SendNegotiatePB(reply));
-  nego_response_expected_ = true;
-  return Status::OK();
-}
-
-Status SaslClient::DoSaslStep(const string& in, const char** out, unsigned* out_len) {
-  TRACE("SASL Client: Calling sasl_client_step()");
-  Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
-      return sasl_client_step(sasl_conn_.get(), in.c_str(), in.length(), nullptr, out, out_len);
-    });
-  if (s.ok()) {
-    nego_ok_ = true;
-  }
-  return s;
-}
-
-Status SaslClient::HandleNegotiateResponse(const NegotiatePB& response) {
-  TRACE("SASL Client: Received NEGOTIATE response from server");
-  // Fill in the set of features supported by the server.
-  for (int flag : response.supported_features()) {
-    // We only add the features that our local build knows about.
-    RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
-                                  static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
-    if (ContainsKey(kSupportedClientRpcFeatureFlags, feature_flag)) {
-      server_features_.insert(feature_flag);
-    }
-  }
-
-  // Build a map of the mechanisms offered by the server.
-  const set<string>& local_mechs = helper_.LocalMechs();
-  set<string> server_mechs;
-  map<string, NegotiatePB::SaslAuth> server_mech_map;
-  for (const NegotiatePB::SaslAuth& auth : response.auths()) {
-    const auto& mech = auth.mechanism();
-    server_mech_map[mech] = auth;
-    server_mechs.insert(mech);
-  }
-  // Determine which server mechs are also enabled by the client.
-  // Cyrus SASL 2.1.25 and later supports doing this set intersection via
-  // the 'client_mech_list' option, but that version is not available on
-  // RHEL 6, so we have to do it manually.
-  set<string> matching_mechs = STLSetIntersection(local_mechs, server_mechs);
-
-  if (matching_mechs.empty() &&
-      ContainsKey(server_mechs, kSaslMechGSSAPI) &&
-      !ContainsKey(local_mechs, kSaslMechGSSAPI)) {
-    return Status::NotAuthorized("server requires GSSAPI (Kerberos) authentication and "
-                                 "client was missing the required SASL module");
-  }
-
-  string matching_mechs_str = JoinElements(matching_mechs, " ");
-  TRACE("SASL Client: Matching mech list: $0", matching_mechs_str);
-
-  const char* init_msg = nullptr;
-  unsigned init_msg_len = 0;
-  const char* negotiated_mech = nullptr;
-
-  /* select a mechanism for a connection
-   *  mechlist      -- mechanisms server has available (punctuation ignored)
-   * output:
-   *  prompt_need   -- on SASL_INTERACT, list of prompts needed to continue
-   *  clientout     -- the initial client response to send to the server
-   *  mech          -- set to mechanism name
-   *
-   * Returns:
-   *  SASL_OK       -- success
-   *  SASL_CONTINUE -- negotiation required
-   *  SASL_NOMEM    -- not enough memory
-   *  SASL_NOMECH   -- no mechanism meets requested properties
-   *  SASL_INTERACT -- user interaction needed to fill in prompt_need list
-   */
-  TRACE("SASL Client: Calling sasl_client_start()");
-  Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
-      return sasl_client_start(
-          sasl_conn_.get(),           // The SASL connection context created by init()
-          matching_mechs_str.c_str(), // The list of mechanisms to negotiate.
-          nullptr,                    // Disables INTERACT return if NULL.
-          &init_msg,                  // Filled in on success.
-          &init_msg_len,              // Filled in on success.
-          &negotiated_mech);          // Filled in on success.
-    });
-
-  if (s.ok()) {
-    nego_ok_ = true;
-  } else if (!s.IsIncomplete()) {
-    return s;
-  }
-
-  // The server matched one of our mechanisms.
-  NegotiatePB::SaslAuth* auth = FindOrNull(server_mech_map, negotiated_mech);
-  if (PREDICT_FALSE(auth == nullptr)) {
-    return Status::IllegalState("Unable to find auth in map, unexpected error", negotiated_mech);
-  }
-  negotiated_mech_ = SaslMechanism::value_of(negotiated_mech);
-
-  RETURN_NOT_OK(SendInitiateMessage(*auth, init_msg, init_msg_len));
-  return Status::OK();
-}
-
-Status SaslClient::HandleChallengeResponse(const NegotiatePB& response) {
-  TRACE("SASL Client: Received SASL_CHALLENGE response from server");
-  if (PREDICT_FALSE(nego_ok_)) {
-    LOG(DFATAL) << "Server sent SASL_CHALLENGE response after client library returned SASL_OK";
-  }
-
-  if (PREDICT_FALSE(!response.has_token())) {
-    return Status::InvalidArgument("No token in SASL_CHALLENGE response from server");
-  }
-
-  const char* out = nullptr;
-  unsigned out_len = 0;
-  Status s = DoSaslStep(response.token(), &out, &out_len);
-  if (!s.ok() && !s.IsIncomplete()) {
-    return s;
-  }
-  RETURN_NOT_OK(SendResponseMessage(out, out_len));
-  return Status::OK();
-}
-
-Status SaslClient::HandleSuccessResponse(const NegotiatePB& response) {
-  TRACE("SASL Client: Received SASL_SUCCESS response from server");
-  if (!nego_ok_) {
-    const char* out = nullptr;
-    unsigned out_len = 0;
-    Status s = DoSaslStep(response.token(), &out, &out_len);
-    if (s.IsIncomplete()) {
-      return Status::IllegalState("Server indicated successful authentication, but client "
-                                  "was not complete");
-    }
-    RETURN_NOT_OK(s);
-    if (out_len > 0) {
-      return Status::IllegalState("SASL client library generated spurious token after SASL_SUCCESS",
-          string(out, out_len));
-    }
-    CHECK(nego_ok_);
-  }
-  return Status::OK();
-}
-
-// Parse error status message from raw bytes of an ErrorStatusPB.
-Status SaslClient::ParseError(const Slice& err_data) {
-  ErrorStatusPB error;
-  if (!error.ParseFromArray(err_data.data(), err_data.size())) {
-    return Status::IOError("Invalid error response, missing fields",
-        error.InitializationErrorString());
-  }
-  Status s = StatusFromRpcError(error);
-  TRACE("SASL Client: Received error response from server: $0", s.ToString());
-  return s;
-}
-
-int SaslClient::GetOptionCb(const char* plugin_name, const char* option,
-                            const char** result, unsigned* len) {
-  return helper_.GetOptionCb(plugin_name, option, result, len);
-}
-
-// Used for PLAIN.
-// SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
-int SaslClient::SimpleCb(int id, const char** result, unsigned* len) {
-  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
-    LOG(DFATAL) << "SASL Client: Simple callback called, but PLAIN auth is not enabled";
-    return SASL_FAIL;
-  }
-  if (PREDICT_FALSE(result == nullptr)) {
-    LOG(DFATAL) << "SASL Client: result outparam is NULL";
-    return SASL_BADPARAM;
-  }
-  switch (id) {
-    // TODO: Support impersonation?
-    // For impersonation, USER is the impersonated user, AUTHNAME is the "sudoer".
-    case SASL_CB_USER:
-      TRACE("SASL Client: callback for SASL_CB_USER");
-      *result = plain_auth_user_.c_str();
-      if (len != nullptr) *len = plain_auth_user_.length();
-      break;
-    case SASL_CB_AUTHNAME:
-      TRACE("SASL Client: callback for SASL_CB_AUTHNAME");
-      *result = plain_auth_user_.c_str();
-      if (len != nullptr) *len = plain_auth_user_.length();
-      break;
-    case SASL_CB_LANGUAGE:
-      LOG(DFATAL) << "SASL Client: Unable to handle SASL callback type SASL_CB_LANGUAGE"
-        << "(" << id << ")";
-      return SASL_BADPARAM;
-    default:
-      LOG(DFATAL) << "SASL Client: Unexpected SASL callback type: " << id;
-      return SASL_BADPARAM;
-  }
-
-  return SASL_OK;
-}
-
-// Used for PLAIN.
-// SASL callback for SASL_CB_PASS: User password.
-int SaslClient::SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret) {
-  if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
-    LOG(DFATAL) << "SASL Client: Plain secret callback called, but PLAIN auth is not enabled";
-    return SASL_FAIL;
-  }
-  switch (id) {
-    case SASL_CB_PASS: {
-      if (!conn || !psecret) return SASL_BADPARAM;
-
-      int len = plain_pass_.length();
-      *psecret = reinterpret_cast<sasl_secret_t*>(malloc(sizeof(sasl_secret_t) + len));
-      if (!*psecret) {
-        return SASL_NOMEM;
-      }
-      psecret_.reset(*psecret);  // Ensure that we free() this structure later.
-      (*psecret)->len = len;
-      memcpy(reinterpret_cast<char *>((*psecret)->data), plain_pass_.c_str(), len + 1);
-      break;
-    }
-    default:
-      LOG(DFATAL) << "SASL Client: Unexpected SASL callback type: " << id;
-      return SASL_BADPARAM;
-  }
-
-  return SASL_OK;
-}
-
-} // namespace rpc
-} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/41c45523/src/kudu/rpc/sasl_client.h
----------------------------------------------------------------------
diff --git a/src/kudu/rpc/sasl_client.h b/src/kudu/rpc/sasl_client.h
deleted file mode 100644
index 746a6f8..0000000
--- a/src/kudu/rpc/sasl_client.h
+++ /dev/null
@@ -1,180 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef KUDU_RPC_SASL_CLIENT_H
-#define KUDU_RPC_SASL_CLIENT_H
-
-#include <set>
-#include <string>
-#include <vector>
-
-#include <sasl/sasl.h>
-
-#include "kudu/gutil/gscoped_ptr.h"
-#include "kudu/rpc/rpc_header.pb.h"
-#include "kudu/rpc/sasl_common.h"
-#include "kudu/rpc/sasl_helper.h"
-#include "kudu/util/monotime.h"
-#include "kudu/util/status.h"
-#include "kudu/util/net/socket.h"
-
-namespace kudu {
-namespace rpc {
-
-using std::string;
-
-class NegotiatePB;
-class NegotiatePB_SaslAuth;
-class ResponseHeader;
-
-// Class for doing SASL negotiation with a SaslServer over a bidirectional socket.
-// Operations on this class are NOT thread-safe.
-class SaslClient {
- public:
-  // Does not take ownership of the socket indicated by the fd.
-  SaslClient(string app_name, Socket* socket);
-
-  // Enable PLAIN authentication.
-  // Must be called after Init().
-  Status EnablePlain(const string& user, const string& pass);
-
-  // Enable GSSAPI authentication.
-  // Call after Init().
-  Status EnableGSSAPI();
-
-  // Returns mechanism negotiated by this connection.
-  // Must be called after Negotiate().
-  SaslMechanism::Type negotiated_mechanism() const;
-
-  // Returns the set of RPC system features supported by the remote server.
-  // Must be called after Negotiate().
-  const std::set<RpcFeatureFlag>& server_features() const {
-    return server_features_;
-  }
-
-  // Specify IP:port of local side of connection.
-  // Must be called before Init(). Required for some mechanisms.
-  void set_local_addr(const Sockaddr& addr);
-
-  // Specify IP:port of remote side of connection.
-  // Must be called before Init(). Required for some mechanisms.
-  void set_remote_addr(const Sockaddr& addr);
-
-  // Specify the fully-qualified domain name of the remote server.
-  // Must be called before Init(). Required for some mechanisms.
-  void set_server_fqdn(const string& domain_name);
-
-  // Set deadline for connection negotiation.
-  void set_deadline(const MonoTime& deadline);
-
-  // Get deadline for connection negotiation.
-  const MonoTime& deadline() const { return deadline_; }
-
-  // Initialize a new SASL client. Must be called before Negotiate().
-  // Returns OK on success, otherwise RuntimeError.
-  Status Init(const string& service_type);
-
-  // Begin negotiation with the SASL server on the other side of the fd socket
-  // that this client was constructed with.
-  // Returns OK on success.
-  // Otherwise, it may return NotAuthorized, NotSupported, or another non-OK status.
-  Status Negotiate();
-
-  // SASL callback for plugin options, supported mechanisms, etc.
-  // Returns SASL_FAIL if the option is not handled, which does not fail the handshake.
-  int GetOptionCb(const char* plugin_name, const char* option,
-                  const char** result, unsigned* len);
-
-  // SASL callback for SASL_CB_USER, SASL_CB_AUTHNAME, SASL_CB_LANGUAGE
-  int SimpleCb(int id, const char** result, unsigned* len);
-
-  // SASL callback for SASL_CB_PASS
-  int SecretCb(sasl_conn_t* conn, int id, sasl_secret_t** psecret);
-
- private:
-  // Encode and send the specified negotiate message to the server.
-  Status SendNegotiatePB(const NegotiatePB& msg);
-
-  // Validate that header does not indicate an error, parse param_buf into response.
-  Status ParseNegotiatePB(const ResponseHeader& header,
-                          const Slice& param_buf,
-                          NegotiatePB* response);
-
-  // Send an NEGOTIATE message to the server.
-  Status SendNegotiateMessage();
-
-  // Send an SASL_INITIATE message to the server.
-  Status SendInitiateMessage(const NegotiatePB_SaslAuth& auth,
-                             const char* init_msg, unsigned init_msg_len);
-
-  // Send a RESPONSE message to the server.
-  Status SendResponseMessage(const char* resp_msg, unsigned resp_msg_len);
-
-  // Perform a client-side step of the SASL negotiation.
-  // Input is what came from the server. Output is what we will send back to the server.
-  // Returns:
-  //   Status::OK if sasl_client_step returns SASL_OK.
-  //   Status::Incomplete if sasl_client_step returns SASL_CONTINUE
-  // otherwise returns an appropriate error status.
-  Status DoSaslStep(const string& in, const char** out, unsigned* out_len);
-
-  // Handle case when server sends NEGOTIATE response.
-  Status HandleNegotiateResponse(const NegotiatePB& response);
-
-  // Handle case when server sends CHALLENGE response.
-  Status HandleChallengeResponse(const NegotiatePB& response);
-
-  // Handle case when server sends SUCCESS response.
-  Status HandleSuccessResponse(const NegotiatePB& response);
-
-  // Parse error status message from raw bytes of an ErrorStatusPB.
-  Status ParseError(const Slice& err_data);
-
-  string app_name_;
-  Socket* sock_;
-  std::vector<sasl_callback_t> callbacks_;
-  // The SASL connection object. This is initialized in Init() and
-  // freed after Negotiate() completes (regardless whether it was successful).
-  gscoped_ptr<sasl_conn_t, SaslDeleter> sasl_conn_;
-  SaslHelper helper_;
-
-  string plain_auth_user_;
-  string plain_pass_;
-  gscoped_ptr<sasl_secret_t, FreeDeleter> psecret_;
-
-  // The set of features supported by the server.
-  std::set<RpcFeatureFlag> server_features_;
-
-  SaslNegotiationState::Type client_state_;
-
-  // The mechanism we negotiated with the server.
-  SaslMechanism::Type negotiated_mech_;
-
-  // Intra-negotiation state.
-  bool nego_ok_;  // During negotiation: did we get a SASL_OK response from the SASL library?
-  bool nego_response_expected_;  // During negotiation: Are we waiting for a server response?
-
-  // Negotiation timeout deadline.
-  MonoTime deadline_;
-
-  DISALLOW_COPY_AND_ASSIGN(SaslClient);
-};
-
-} // namespace rpc
-} // namespace kudu
-
-#endif  // KUDU_RPC_SASL_CLIENT_H