You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/17 03:14:23 UTC
[07/15] incubator-impala git commit: IMPALA-4669: [KRPC] Import RPC
library from kudu@314c9d8
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/sasl_common.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/sasl_common.cc b/be/src/kudu/rpc/sasl_common.cc
new file mode 100644
index 0000000..9f14413
--- /dev/null
+++ b/be/src/kudu/rpc/sasl_common.cc
@@ -0,0 +1,459 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/sasl_common.h"
+
+#include <string.h>
+
+#include <algorithm>
+#include <limits>
+#include <mutex>
+#include <string>
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <regex.h>
+#include <sasl/sasl.h>
+#include <sasl/saslplug.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/rw_mutex.h"
+#include "kudu/security/init.h"
+
+using std::set;
+
+DECLARE_string(keytab_file);
+
+namespace kudu {
+namespace rpc {
+
+const char* const kSaslMechPlain = "PLAIN";
+const char* const kSaslMechGSSAPI = "GSSAPI";
+extern const size_t kSaslMaxOutBufLen = 1024;
+
+// See WrapSaslCall().
+static __thread string* g_auth_failure_capture = nullptr;
+
+// Determine whether initialization was ever called
+static Status sasl_init_status = Status::OK();
+static bool sasl_is_initialized = false;
+
+// If true, then we expect someone else has initialized SASL.
+static bool g_disable_sasl_init = false;
+
+// Output Sasl messages.
+// context: not used.
+// level: logging level.
+// message: message to output;
+static int SaslLogCallback(void* context, int level, const char* message) {
+
+ if (message == nullptr) return SASL_BADPARAM;
+
+ switch (level) {
+ case SASL_LOG_NONE:
+ break;
+
+ case SASL_LOG_ERR:
+ LOG(ERROR) << "SASL: " << message;
+ break;
+
+ case SASL_LOG_WARN:
+ LOG(WARNING) << "SASL: " << message;
+ break;
+
+ case SASL_LOG_NOTE:
+ LOG(INFO) << "SASL: " << message;
+ break;
+
+ case SASL_LOG_FAIL:
+ // Capture authentication failures in a thread-local to be picked up
+ // by WrapSaslCall() below.
+ VLOG(1) << "SASL: " << message;
+ if (g_auth_failure_capture) {
+ *g_auth_failure_capture = message;
+ }
+ break;
+
+ case SASL_LOG_DEBUG:
+ VLOG(1) << "SASL: " << message;
+ break;
+
+ case SASL_LOG_TRACE:
+ case SASL_LOG_PASS:
+ VLOG(3) << "SASL: " << message;
+ break;
+ }
+
+ return SASL_OK;
+}
+
+// Get Sasl option.
+// context: not used
+// plugin_name: name of plugin for which an option is being requested.
+// option: option requested
+// result: set to result which persists until next getopt in same thread,
+// unchanged if option not found
+// len: length of the result
+// Return SASL_FAIL if the option is not handled, this does not fail the handshake.
+static int SaslGetOption(void* context, const char* plugin_name, const char* option,
+ const char** result, unsigned* len) {
+ // Handle Sasl Library options
+ if (plugin_name == nullptr) {
+ // Return the logging level that we want the sasl library to use.
+ if (strcmp("log_level", option) == 0) {
+ int level = SASL_LOG_NOTE;
+ if (VLOG_IS_ON(1)) {
+ level = SASL_LOG_DEBUG;
+ } else if (VLOG_IS_ON(3)) {
+ level = SASL_LOG_TRACE;
+ }
+ // The library's contract for this method is that the caller gets to keep
+ // the returned buffer until the next call by the same thread, so we use a
+ // threadlocal for the buffer.
+ static __thread char buf[4];
+ snprintf(buf, arraysize(buf), "%d", level);
+ *result = buf;
+ if (len != nullptr) *len = strlen(buf);
+ return SASL_OK;
+ }
+ // Options can default so don't complain.
+ VLOG(4) << "SaslGetOption: Unknown library option: " << option;
+ return SASL_FAIL;
+ }
+ VLOG(4) << "SaslGetOption: Unknown plugin: " << plugin_name;
+ return SASL_FAIL;
+}
+
+// Array of callbacks for the sasl library.
+static sasl_callback_t callbacks[] = {
+ { SASL_CB_LOG, reinterpret_cast<int (*)()>(&SaslLogCallback), nullptr },
+ { SASL_CB_GETOPT, reinterpret_cast<int (*)()>(&SaslGetOption), nullptr },
+ { SASL_CB_LIST_END, nullptr, nullptr }
+ // TODO(todd): provide a CANON_USER callback? This is necessary if we want
+ // to support some kind of auth-to-local mapping of Kerberos principals
+ // to local usernames. See Impala's implementation for inspiration.
+};
+
+
+// SASL requires mutexes for thread safety, but doesn't implement
+// them itself. So, we have to hook them up to our mutex implementation.
+static void* SaslMutexAlloc() {
+ return static_cast<void*>(new Mutex());
+}
+static void SaslMutexFree(void* m) {
+ delete static_cast<Mutex*>(m);
+}
+static int SaslMutexLock(void* m) {
+ static_cast<Mutex*>(m)->lock();
+ return 0; // indicates success.
+}
+static int SaslMutexUnlock(void* m) {
+ static_cast<Mutex*>(m)->unlock();
+ return 0; // indicates success.
+}
+
+namespace internal {
+void SaslSetMutex() {
+ sasl_set_mutex(&SaslMutexAlloc, &SaslMutexLock, &SaslMutexUnlock, &SaslMutexFree);
+}
+} // namespace internal
+
+// Sasl initialization detection methods. The OS X SASL library doesn't define
+// the sasl_global_utils symbol, so we have to use less robust methods of
+// detection.
+#if defined(__APPLE__)
+static bool SaslIsInitialized() {
+ return sasl_global_listmech() != nullptr;
+}
+static bool SaslMutexImplementationProvided() {
+ return SaslIsInitialized();
+}
+#else
+
+// This symbol is exported by the SASL library but not defined
+// in the headers. It's marked as an API in the library source,
+// so seems safe to rely on.
+extern "C" sasl_utils_t* sasl_global_utils;
+static bool SaslIsInitialized() {
+ return sasl_global_utils != nullptr;
+}
+static bool SaslMutexImplementationProvided() {
+ if (!SaslIsInitialized()) return false;
+ void* m = sasl_global_utils->mutex_alloc();
+ sasl_global_utils->mutex_free(m);
+ // The default implementation of mutex_alloc just returns the constant pointer 0x1.
+ // This is a bit of an ugly heuristic, but seems unlikely that anyone would ever
+ // provide a valid implementation that returns an invalid pointer value.
+ return m != reinterpret_cast<void*>(1);
+}
+#endif
+
+// Actually perform the initialization for the SASL subsystem.
+// Meant to be called via GoogleOnceInit().
+static void DoSaslInit() {
+ VLOG(3) << "Initializing SASL library";
+
+ bool sasl_initialized = SaslIsInitialized();
+ if (sasl_initialized && !g_disable_sasl_init) {
+ LOG(WARNING) << "SASL was initialized prior to Kudu's initialization. Skipping "
+ << "initialization. Call kudu::client::DisableSaslInitialization() "
+ << "to suppress this message.";
+ g_disable_sasl_init = true;
+ }
+
+ if (g_disable_sasl_init) {
+ if (!sasl_initialized) {
+ sasl_init_status = Status::RuntimeError(
+ "SASL initialization was disabled, but SASL was not externally initialized.");
+ return;
+ }
+ if (!SaslMutexImplementationProvided()) {
+ LOG(WARNING)
+ << "SASL appears to be initialized by code outside of Kudu "
+ << "but was not provided with a mutex implementation! If "
+ << "manually initializing SASL, use sasl_set_mutex(3).";
+ }
+ return;
+ }
+ internal::SaslSetMutex();
+ int result = sasl_client_init(&callbacks[0]);
+ if (result != SASL_OK) {
+ sasl_init_status = Status::RuntimeError("Could not initialize SASL client",
+ sasl_errstring(result, nullptr, nullptr));
+ return;
+ }
+
+ result = sasl_server_init(&callbacks[0], kSaslAppName);
+ if (result != SASL_OK) {
+ sasl_init_status = Status::RuntimeError("Could not initialize SASL server",
+ sasl_errstring(result, nullptr, nullptr));
+ return;
+ }
+
+ sasl_is_initialized = true;
+}
+
+Status DisableSaslInitialization() {
+ if (g_disable_sasl_init) return Status::OK();
+ if (sasl_is_initialized) {
+ return Status::IllegalState("SASL already initialized. Initialization can only be disabled "
+ "before first usage.");
+ }
+ g_disable_sasl_init = true;
+ return Status::OK();
+}
+
+Status SaslInit() {
+ // Only execute SASL initialization once
+ static GoogleOnceType once = GOOGLE_ONCE_INIT;
+ GoogleOnceInit(&once, &DoSaslInit);
+ return sasl_init_status;
+}
+
+static string CleanSaslError(const string& err) {
+ // In the case of GSS failures, we often get the actual error message
+ // buried inside a bunch of generic cruft. Use a regexp to extract it
+ // out. Note that we avoid std::regex because it appears to be broken
+ // with older libstdcxx.
+ static regex_t re;
+ static std::once_flag once;
+
+#if defined(__APPLE__)
+ static const char* kGssapiPattern = "GSSAPI Error: Miscellaneous failure \\(see text \\((.+)\\)";
+#else
+ static const char* kGssapiPattern = "Unspecified GSS failure. +"
+ "Minor code may provide more information +"
+ "\\((.+)\\)";
+#endif
+
+ std::call_once(once, []{ CHECK_EQ(0, regcomp(&re, kGssapiPattern, REG_EXTENDED)); });
+ regmatch_t matches[2];
+ if (regexec(&re, err.c_str(), arraysize(matches), matches, 0) == 0) {
+ return err.substr(matches[1].rm_so, matches[1].rm_eo - matches[1].rm_so);
+ }
+ return err;
+}
+
+static string SaslErrDesc(int status, sasl_conn_t* conn) {
+ string err;
+ if (conn != nullptr) {
+ err = sasl_errdetail(conn);
+ } else {
+ err = sasl_errstring(status, nullptr, nullptr);
+ }
+
+ return CleanSaslError(err);
+}
+
+Status WrapSaslCall(sasl_conn_t* conn, const std::function<int()>& call) {
+ // In many cases, the GSSAPI SASL plugin will generate a nice error
+ // message as a message logged at SASL_LOG_FAIL logging level, but then
+ // return a useless one in sasl_errstring(). So, we set a global thread-local
+ // variable to capture any auth failure log message while we make the
+ // call into the library.
+ //
+ // The thread-local thing is a bit of a hack, but the logging callback
+ // is set globally rather than on a per-connection basis.
+ string err;
+ g_auth_failure_capture = &err;
+
+ // Take the 'kerberos_reinit_lock' here to avoid a possible race with ticket renewal.
+ bool kerberos_supported = !FLAGS_keytab_file.empty();
+ if (kerberos_supported) kudu::security::KerberosReinitLock()->ReadLock();
+ int rc = call();
+ if (kerberos_supported) kudu::security::KerberosReinitLock()->ReadUnlock();
+ g_auth_failure_capture = nullptr;
+
+ switch (rc) {
+ case SASL_OK:
+ return Status::OK();
+ case SASL_CONTINUE:
+ return Status::Incomplete("");
+ case SASL_FAIL: // Generic failure (encompasses missing krb5 credentials).
+ case SASL_BADAUTH: // Authentication failure.
+ case SASL_BADMAC: // Decode failure.
+ case SASL_NOAUTHZ: // Authorization failure.
+ case SASL_NOUSER: // User not found.
+ case SASL_WRONGMECH: // Server doesn't support requested mechanism.
+ case SASL_BADSERV: { // Server failed mutual authentication.
+ if (err.empty()) {
+ err = SaslErrDesc(rc, conn);
+ } else {
+ err = CleanSaslError(err);
+ }
+ return Status::NotAuthorized(err);
+ }
+ default:
+ return Status::RuntimeError(SaslErrDesc(rc, conn));
+ }
+}
+
+Status SaslEncode(sasl_conn_t* conn, const std::string& plaintext, std::string* encoded) {
+ size_t offset = 0;
+
+ // The SASL library can only encode up to a maximum amount at a time, so we
+ // have to call encode multiple times if our input is larger than this max.
+ while (offset < plaintext.size()) {
+ const char* out;
+ unsigned out_len;
+ size_t len = std::min(kSaslMaxOutBufLen, plaintext.size() - offset);
+
+ RETURN_NOT_OK(WrapSaslCall(conn, [&]() {
+ return sasl_encode(conn, plaintext.data() + offset, len, &out, &out_len);
+ }));
+
+ encoded->append(out, out_len);
+ offset += len;
+ }
+
+ return Status::OK();
+}
+
+Status SaslDecode(sasl_conn_t* conn, const string& encoded, string* plaintext) {
+ size_t offset = 0;
+
+ // The SASL library can only decode up to a maximum amount at a time, so we
+ // have to call decode multiple times if our input is larger than this max.
+ while (offset < encoded.size()) {
+ const char* out;
+ unsigned out_len;
+ size_t len = std::min(kSaslMaxOutBufLen, encoded.size() - offset);
+
+ RETURN_NOT_OK(WrapSaslCall(conn, [&]() {
+ return sasl_decode(conn, encoded.data() + offset, len, &out, &out_len);
+ }));
+
+ plaintext->append(out, out_len);
+ offset += len;
+ }
+
+ return Status::OK();
+}
+
+string SaslIpPortString(const Sockaddr& addr) {
+ string addr_str = addr.ToString();
+ size_t colon_pos = addr_str.find(':');
+ if (colon_pos != string::npos) {
+ addr_str[colon_pos] = ';';
+ }
+ return addr_str;
+}
+
+set<SaslMechanism::Type> SaslListAvailableMechs() {
+ set<SaslMechanism::Type> mechs;
+
+ // Array of NULL-terminated strings. Array terminated with NULL.
+ for (const char** mech_strings = sasl_global_listmech();
+ mech_strings != nullptr && *mech_strings != nullptr;
+ mech_strings++) {
+ auto mech = SaslMechanism::value_of(*mech_strings);
+ if (mech != SaslMechanism::INVALID) {
+ mechs.insert(mech);
+ }
+ }
+ return mechs;
+}
+
+sasl_callback_t SaslBuildCallback(int id, int (*proc)(void), void* context) {
+ sasl_callback_t callback;
+ callback.id = id;
+ callback.proc = proc;
+ callback.context = context;
+ return callback;
+}
+
+Status EnableIntegrityProtection(sasl_conn_t* sasl_conn) {
+ sasl_security_properties_t sec_props;
+ memset(&sec_props, 0, sizeof(sec_props));
+ sec_props.min_ssf = 1;
+ sec_props.max_ssf = std::numeric_limits<sasl_ssf_t>::max();
+ sec_props.maxbufsize = kSaslMaxOutBufLen;
+
+ RETURN_NOT_OK_PREPEND(WrapSaslCall(sasl_conn, [&] () {
+ return sasl_setprop(sasl_conn, SASL_SEC_PROPS, &sec_props);
+ }), "failed to set SASL security properties");
+ return Status::OK();
+}
+
+SaslMechanism::Type SaslMechanism::value_of(const string& mech) {
+ if (boost::iequals(mech, "PLAIN")) {
+ return PLAIN;
+ }
+ if (boost::iequals(mech, "GSSAPI")) {
+ return GSSAPI;
+ }
+ return INVALID;
+}
+
+const char* SaslMechanism::name_of(SaslMechanism::Type val) {
+ switch (val) {
+ case PLAIN: return "PLAIN";
+ case GSSAPI: return "GSSAPI";
+ default:
+ return "INVALID";
+ }
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/sasl_common.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/sasl_common.h b/be/src/kudu/rpc/sasl_common.h
new file mode 100644
index 0000000..6022f9e
--- /dev/null
+++ b/be/src/kudu/rpc/sasl_common.h
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_SASL_COMMON_H
+#define KUDU_RPC_SASL_COMMON_H
+
+#include <stdint.h> // Required for sasl/sasl.h
+
+#include <string>
+#include <set>
+
+#include <sasl/sasl.h>
+
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Sockaddr;
+
+namespace rpc {
+
+using std::string;
+
+// Constants
+extern const char* const kSaslMechPlain;
+extern const char* const kSaslMechGSSAPI;
+extern const size_t kSaslMaxOutBufLen;
+
+struct SaslMechanism {
+ enum Type {
+ INVALID,
+ PLAIN,
+ GSSAPI
+ };
+ static Type value_of(const std::string& mech);
+ static const char* name_of(Type val);
+};
+
+// Initialize the SASL library.
+// appname: Name of the application for logging messages & sasl plugin configuration.
+// Note that this string must remain allocated for the lifetime of the program.
+// This function must be called before using SASL.
+// If the library initializes without error, calling more than once has no effect.
+//
+// Some SASL plugins take time to initialize random number generators and other things,
+// so the first time this function is invoked it may execute for several seconds.
+// After that, it should be very fast. This function should be invoked as early as possible
+// in the application lifetime to avoid SASL initialization taking place in a
+// performance-critical section.
+//
+// This function is thread safe and uses a static lock.
+// This function should NOT be called during static initialization.
+Status SaslInit() WARN_UNUSED_RESULT;
+
+// Disable Kudu's initialization of SASL. See equivalent method in client.h.
+Status DisableSaslInitialization() WARN_UNUSED_RESULT;
+
+// Wrap a call into the SASL library. 'call' should be a lambda which
+// returns a SASL error code.
+//
+// The result is translated into a Status as follows:
+//
+// SASL_OK: Status::OK()
+// SASL_CONTINUE: Status::Incomplete()
+// otherwise: Status::NotAuthorized()
+//
+// The Status message is beautified to be more user-friendly compared
+// to the underlying sasl_errdetails() call.
+Status WrapSaslCall(sasl_conn_t* conn, const std::function<int()>& call) WARN_UNUSED_RESULT;
+
+// Return <ip>;<port> string formatted for SASL library use.
+string SaslIpPortString(const Sockaddr& addr);
+
+// Return available plugin mechanisms for the given connection.
+std::set<SaslMechanism::Type> SaslListAvailableMechs();
+
+// Initialize and return a libsasl2 callback data structure based on the passed args.
+// id: A SASL callback identifier (e.g., SASL_CB_GETOPT).
+// proc: A C-style callback with appropriate signature based on the callback id, or NULL.
+// context: An object to pass to the callback as the context pointer, or NULL.
+sasl_callback_t SaslBuildCallback(int id, int (*proc)(void), void* context);
+
+// Require integrity protection on the SASL connection. Should be called before
+// initiating the SASL negotiation.
+Status EnableIntegrityProtection(sasl_conn_t* sasl_conn) WARN_UNUSED_RESULT;
+
+// Encode the provided data and append it to 'encoded'.
+Status SaslEncode(sasl_conn_t* conn,
+ const std::string& plaintext,
+ std::string* encoded) WARN_UNUSED_RESULT;
+
+// Decode the provided SASL-encoded data and append it to 'plaintext'.
+Status SaslDecode(sasl_conn_t* conn,
+ const std::string& encoded,
+ std::string* plaintext) WARN_UNUSED_RESULT;
+
+// Deleter for sasl_conn_t instances, for use with gscoped_ptr after calling sasl_*_new()
+struct SaslDeleter {
+ inline void operator()(sasl_conn_t* conn) {
+ sasl_dispose(&conn);
+ }
+};
+
+// Internals exposed in the header for test purposes.
+namespace internal {
+void SaslSetMutex();
+} // namespace internal
+
+} // namespace rpc
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/sasl_helper.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/sasl_helper.cc b/be/src/kudu/rpc/sasl_helper.cc
new file mode 100644
index 0000000..53f9d08
--- /dev/null
+++ b/be/src/kudu/rpc/sasl_helper.cc
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/sasl_helper.h"
+
+#include <string>
+
+#include <glog/logging.h>
+#include <google/protobuf/message_lite.h>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/join.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/util/status.h"
+
+using std::string;
+
+namespace kudu {
+namespace rpc {
+
+using google::protobuf::MessageLite;
+
+SaslHelper::SaslHelper(PeerType peer_type)
+ : peer_type_(peer_type),
+ global_mechs_(SaslListAvailableMechs()),
+ plain_enabled_(false),
+ gssapi_enabled_(false) {
+ tag_ = (peer_type_ == SERVER) ? "Server" : "Client";
+}
+
+void SaslHelper::set_server_fqdn(const string& domain_name) {
+ server_fqdn_ = domain_name;
+}
+const char* SaslHelper::server_fqdn() const {
+ return server_fqdn_.empty() ? nullptr : server_fqdn_.c_str();
+}
+
+const char* SaslHelper::EnabledMechsString() const {
+ enabled_mechs_string_ = JoinMapped(enabled_mechs_, SaslMechanism::name_of, " ");
+ return enabled_mechs_string_.c_str();
+}
+
+int SaslHelper::GetOptionCb(const char* plugin_name, const char* option,
+ const char** result, unsigned* len) {
+ DVLOG(4) << tag_ << ": GetOption Callback called. ";
+ DVLOG(4) << tag_ << ": GetOption Plugin name: "
+ << (plugin_name == nullptr ? "NULL" : plugin_name);
+ DVLOG(4) << tag_ << ": GetOption Option name: " << option;
+
+ if (PREDICT_FALSE(result == nullptr)) {
+ LOG(DFATAL) << tag_ << ": SASL Library passed NULL result out-param to GetOption callback!";
+ return SASL_BADPARAM;
+ }
+
+ if (plugin_name == nullptr) {
+ // SASL library option, not a plugin option
+ if (strcmp(option, "mech_list") == 0) {
+ *result = EnabledMechsString();
+ if (len != nullptr) *len = strlen(*result);
+ VLOG(4) << tag_ << ": Enabled mech list: " << *result;
+ return SASL_OK;
+ }
+ VLOG(4) << tag_ << ": GetOptionCb: Unknown library option: " << option;
+ } else {
+ VLOG(4) << tag_ << ": GetOptionCb: Unknown plugin: " << plugin_name;
+ }
+ return SASL_FAIL;
+}
+
+Status SaslHelper::EnablePlain() {
+ RETURN_NOT_OK(EnableMechanism(SaslMechanism::PLAIN));
+ plain_enabled_ = true;
+ return Status::OK();
+}
+
+Status SaslHelper::EnableGSSAPI() {
+ RETURN_NOT_OK(EnableMechanism(SaslMechanism::GSSAPI));
+ gssapi_enabled_ = true;
+ return Status::OK();
+}
+
+Status SaslHelper::EnableMechanism(SaslMechanism::Type mech) {
+ if (PREDICT_FALSE(!ContainsKey(global_mechs_, mech))) {
+ return Status::InvalidArgument("unable to find SASL plugin", SaslMechanism::name_of(mech));
+ }
+ enabled_mechs_.insert(mech);
+ return Status::OK();
+}
+
+bool SaslHelper::IsPlainEnabled() const {
+ return plain_enabled_;
+}
+
+Status SaslHelper::CheckNegotiateCallId(int32_t call_id) const {
+ if (call_id != kNegotiateCallId) {
+ Status s = Status::IllegalState(strings::Substitute(
+ "Received illegal call-id during negotiation; expected: $0, received: $1",
+ kNegotiateCallId, call_id));
+ LOG(DFATAL) << tag_ << ": " << s.ToString();
+ return s;
+ }
+ return Status::OK();
+}
+
+Status SaslHelper::ParseNegotiatePB(const Slice& param_buf, NegotiatePB* msg) {
+ if (!msg->ParseFromArray(param_buf.data(), param_buf.size())) {
+ return Status::IOError(tag_ + ": Invalid SASL message, missing fields",
+ msg->InitializationErrorString());
+ }
+ return Status::OK();
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/sasl_helper.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/sasl_helper.h b/be/src/kudu/rpc/sasl_helper.h
new file mode 100644
index 0000000..0a3107c
--- /dev/null
+++ b/be/src/kudu/rpc/sasl_helper.h
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_SASL_HELPER_H
+#define KUDU_RPC_SASL_HELPER_H
+
+#include <set>
+#include <string>
+
+#include <sasl/sasl.h>
+
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Sockaddr;
+
+namespace rpc {
+
+class NegotiatePB;
+
+// Helper class which contains functionality that is common to client and server
+// SASL negotiations. Most of these methods are convenience methods for
+// interacting with the libsasl2 library.
+class SaslHelper {
+ public:
+ enum PeerType {
+ CLIENT,
+ SERVER
+ };
+
+ explicit SaslHelper(PeerType peer_type);
+ ~SaslHelper() = default;
+
+ // Specify the fully-qualified domain name of the remote server.
+ void set_server_fqdn(const std::string& domain_name);
+ const char* server_fqdn() const;
+
+ // Globally-registered available SASL plugins.
+ const std::set<SaslMechanism::Type>& GlobalMechs() const {
+ return global_mechs_;
+ }
+
+ // Helper functions for managing the list of active SASL mechanisms.
+ const std::set<SaslMechanism::Type>& EnabledMechs() const {
+ return enabled_mechs_;
+ }
+
+ // Implements the client_mech_list / mech_list callbacks.
+ int GetOptionCb(const char* plugin_name, const char* option, const char** result, unsigned* len);
+
+ // Enable the PLAIN SASL mechanism.
+ Status EnablePlain();
+
+ // Enable the GSSAPI (Kerberos) mechanism.
+ Status EnableGSSAPI();
+
+ // Check for the PLAIN SASL mechanism.
+ bool IsPlainEnabled() const;
+
+ // Sanity check that the call ID is the negotiation call ID.
+ // Logs DFATAL if call_id does not match.
+ Status CheckNegotiateCallId(int32_t call_id) const;
+
+ // Parse msg from the given Slice.
+ Status ParseNegotiatePB(const Slice& param_buf, NegotiatePB* msg);
+
+ private:
+ Status EnableMechanism(SaslMechanism::Type mech);
+
+ // Returns space-delimited local mechanism list string suitable for passing
+ // to libsasl2, such as via "mech_list" callbacks.
+ // The returned pointer is valid only until the next call to EnabledMechsString().
+ const char* EnabledMechsString() const;
+
+ std::string server_fqdn_;
+
+ // Authentication types and data.
+ const PeerType peer_type_;
+ std::string tag_;
+ std::set<SaslMechanism::Type> global_mechs_; // Cache of global mechanisms.
+ std::set<SaslMechanism::Type> enabled_mechs_; // Active mechanisms.
+ mutable std::string enabled_mechs_string_; // Mechanism list string returned by callbacks.
+
+ bool plain_enabled_;
+ bool gssapi_enabled_;
+
+ DISALLOW_COPY_AND_ASSIGN(SaslHelper);
+};
+
+} // namespace rpc
+} // namespace kudu
+
+#endif // KUDU_RPC_SASL_HELPER_H
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/serialization.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/serialization.cc b/be/src/kudu/rpc/serialization.cc
new file mode 100644
index 0000000..dbb0fc5
--- /dev/null
+++ b/be/src/kudu/rpc/serialization.cc
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/serialization.h"
+
+#include <glog/logging.h>
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/io/coded_stream.h>
+
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/util/faststring.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/status.h"
+
+DECLARE_int32(rpc_max_message_size);
+
+using google::protobuf::MessageLite;
+using google::protobuf::io::CodedInputStream;
+using google::protobuf::io::CodedOutputStream;
+using strings::Substitute;
+
+namespace kudu {
+namespace rpc {
+namespace serialization {
+
+enum {
+ kHeaderPosVersion = 0,
+ kHeaderPosServiceClass = 1,
+ kHeaderPosAuthProto = 2
+};
+
+void SerializeMessage(const MessageLite& message, faststring* param_buf,
+ int additional_size, bool use_cached_size) {
+ int pb_size = use_cached_size ? message.GetCachedSize() : message.ByteSize();
+ DCHECK_EQ(message.ByteSize(), pb_size);
+ int recorded_size = pb_size + additional_size;
+ int size_with_delim = pb_size + CodedOutputStream::VarintSize32(recorded_size);
+ int total_size = size_with_delim + additional_size;
+
+ if (total_size > FLAGS_rpc_max_message_size) {
+ LOG(WARNING) << Substitute("Serialized $0 ($1 bytes) is larger than the maximum configured "
+ "RPC message size ($2 bytes). "
+ "Sending anyway, but peer may reject the data.",
+ message.GetTypeName(), total_size, FLAGS_rpc_max_message_size);
+ }
+
+ param_buf->resize(size_with_delim);
+ uint8_t* dst = param_buf->data();
+ dst = CodedOutputStream::WriteVarint32ToArray(recorded_size, dst);
+ dst = message.SerializeWithCachedSizesToArray(dst);
+ CHECK_EQ(dst, param_buf->data() + size_with_delim);
+}
+
+void SerializeHeader(const MessageLite& header,
+ size_t param_len,
+ faststring* header_buf) {
+
+ CHECK(header.IsInitialized())
+ << "RPC header missing fields: " << header.InitializationErrorString();
+
+ // Compute all the lengths for the packet.
+ size_t header_pb_len = header.ByteSize();
+ size_t header_tot_len = kMsgLengthPrefixLength // Int prefix for the total length.
+ + CodedOutputStream::VarintSize32(header_pb_len) // Varint delimiter for header PB.
+ + header_pb_len; // Length for the header PB itself.
+ size_t total_size = header_tot_len + param_len;
+
+ header_buf->resize(header_tot_len);
+ uint8_t* dst = header_buf->data();
+
+ // 1. The length for the whole request, not including the 4-byte
+ // length prefix.
+ NetworkByteOrder::Store32(dst, total_size - kMsgLengthPrefixLength);
+ dst += sizeof(uint32_t);
+
+ // 2. The varint-prefixed RequestHeader PB
+ dst = CodedOutputStream::WriteVarint32ToArray(header_pb_len, dst);
+ dst = header.SerializeWithCachedSizesToArray(dst);
+
+ // We should have used the whole buffer we allocated.
+ CHECK_EQ(dst, header_buf->data() + header_tot_len);
+}
+
+Status ParseMessage(const Slice& buf,
+ MessageLite* parsed_header,
+ Slice* parsed_main_message) {
+
+ // First grab the total length
+ if (PREDICT_FALSE(buf.size() < kMsgLengthPrefixLength)) {
+ return Status::Corruption("Invalid packet: not enough bytes for length header",
+ KUDU_REDACT(buf.ToDebugString()));
+ }
+
+ int total_len = NetworkByteOrder::Load32(buf.data());
+ DCHECK_EQ(total_len + kMsgLengthPrefixLength, buf.size())
+ << "Got mis-sized buffer: " << KUDU_REDACT(buf.ToDebugString());
+
+ CodedInputStream in(buf.data(), buf.size());
+ in.Skip(kMsgLengthPrefixLength);
+
+ uint32_t header_len;
+ if (PREDICT_FALSE(!in.ReadVarint32(&header_len))) {
+ return Status::Corruption("Invalid packet: missing header delimiter",
+ KUDU_REDACT(buf.ToDebugString()));
+ }
+
+ CodedInputStream::Limit l;
+ l = in.PushLimit(header_len);
+ if (PREDICT_FALSE(!parsed_header->ParseFromCodedStream(&in))) {
+ return Status::Corruption("Invalid packet: header too short",
+ KUDU_REDACT(buf.ToDebugString()));
+ }
+ in.PopLimit(l);
+
+ uint32_t main_msg_len;
+ if (PREDICT_FALSE(!in.ReadVarint32(&main_msg_len))) {
+ return Status::Corruption("Invalid packet: missing main msg length",
+ KUDU_REDACT(buf.ToDebugString()));
+ }
+
+ if (PREDICT_FALSE(!in.Skip(main_msg_len))) {
+ return Status::Corruption(
+ StringPrintf("Invalid packet: data too short, expected %d byte main_msg", main_msg_len),
+ KUDU_REDACT(buf.ToDebugString()));
+ }
+
+ if (PREDICT_FALSE(in.BytesUntilLimit() > 0)) {
+ return Status::Corruption(
+ StringPrintf("Invalid packet: %d extra bytes at end of packet", in.BytesUntilLimit()),
+ KUDU_REDACT(buf.ToDebugString()));
+ }
+
+ *parsed_main_message = Slice(buf.data() + buf.size() - main_msg_len,
+ main_msg_len);
+ return Status::OK();
+}
+
+void SerializeConnHeader(uint8_t* buf) {
+ memcpy(reinterpret_cast<char *>(buf), kMagicNumber, kMagicNumberLength);
+ buf += kMagicNumberLength;
+ buf[kHeaderPosVersion] = kCurrentRpcVersion;
+ buf[kHeaderPosServiceClass] = 0; // TODO: implement
+ buf[kHeaderPosAuthProto] = 0; // TODO: implement
+}
+
+// validate the entire rpc header (magic number + flags)
+Status ValidateConnHeader(const Slice& slice) {
+ DCHECK_EQ(kMagicNumberLength + kHeaderFlagsLength, slice.size())
+ << "Invalid RPC header length";
+
+ // validate actual magic
+ if (!slice.starts_with(kMagicNumber)) {
+ if (slice.starts_with("GET ") ||
+ slice.starts_with("POST") ||
+ slice.starts_with("HEAD")) {
+ return Status::InvalidArgument("invalid negotation, appears to be an HTTP client on "
+ "the RPC port");
+ }
+ return Status::InvalidArgument("connection must begin with magic number", kMagicNumber);
+ }
+
+ const uint8_t *data = slice.data();
+ data += kMagicNumberLength;
+
+ // validate version
+ if (data[kHeaderPosVersion] != kCurrentRpcVersion) {
+ return Status::InvalidArgument("Unsupported RPC version",
+ StringPrintf("Received: %d, Supported: %d",
+ data[kHeaderPosVersion], kCurrentRpcVersion));
+ }
+
+ // TODO: validate additional header flags:
+ // RPC_SERVICE_CLASS
+ // RPC_AUTH_PROTOCOL
+
+ return Status::OK();
+}
+
+} // namespace serialization
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/serialization.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/serialization.h b/be/src/kudu/rpc/serialization.h
new file mode 100644
index 0000000..26df3a7
--- /dev/null
+++ b/be/src/kudu/rpc/serialization.h
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_RPC_SERIALIZATION_H
+#define KUDU_RPC_SERIALIZATION_H
+
+#include <inttypes.h>
+#include <string.h>
+
+namespace google {
+namespace protobuf {
+class MessageLite;
+} // namespace protobuf
+} // namespace google
+
+namespace kudu {
+
+class Status;
+class faststring;
+class Slice;
+
+namespace rpc {
+namespace serialization {
+
+// Serialize the request param into a buffer which is allocated by this function.
+// Uses the message's cached size by calling MessageLite::GetCachedSize().
+// In : 'message' Protobuf Message to serialize
+// 'additional_size' Optional argument which increases the recorded size
+// within param_buf. This argument is necessary if there will be
+// additional sidecars appended onto the message (that aren't part of
+// the protobuf itself).
+// 'use_cached_size' Additional optional argument whether to use the cached
+// or explicit byte size by calling MessageLite::GetCachedSize() or
+// MessageLite::ByteSize(), respectively.
+// Out: The faststring 'param_buf' to be populated with the serialized bytes.
+// The faststring's length is only determined by the amount that
+// needs to be serialized for the protobuf (i.e., no additional space
+// is reserved for 'additional_size', which only affects the
+// size indicator prefix in 'param_buf').
+void SerializeMessage(const google::protobuf::MessageLite& message,
+ faststring* param_buf, int additional_size = 0,
+ bool use_cached_size = false);
+
+// Serialize the request or response header into a buffer which is allocated
+// by this function.
+// Includes leading 32-bit length of the buffer.
+// In: Protobuf Header to serialize,
+// Length of the message param following this header in the frame.
+// Out: faststring to be populated with the serialized bytes.
+void SerializeHeader(const google::protobuf::MessageLite& header,
+ size_t param_len,
+ faststring* header_buf);
+
+// Deserialize the request.
+// In: data buffer Slice.
+// Out: parsed_header PB initialized,
+// parsed_main_message pointing to offset in original buffer containing
+// the main payload.
+Status ParseMessage(const Slice& buf,
+ google::protobuf::MessageLite* parsed_header,
+ Slice* parsed_main_message);
+
+// Serialize the RPC connection header (magic number + flags).
+// buf must have 7 bytes available (kMagicNumberLength + kHeaderFlagsLength).
+void SerializeConnHeader(uint8_t* buf);
+
+// Validate the entire rpc header (magic number + flags).
+Status ValidateConnHeader(const Slice& slice);
+
+
+} // namespace serialization
+} // namespace rpc
+} // namespace kudu
+#endif // KUDU_RPC_SERIALIZATION_H
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/server_negotiation.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/server_negotiation.cc b/be/src/kudu/rpc/server_negotiation.cc
new file mode 100644
index 0000000..5e6d070
--- /dev/null
+++ b/be/src/kudu/rpc/server_negotiation.cc
@@ -0,0 +1,980 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/rpc/server_negotiation.h"
+
+#include <limits>
+#include <memory>
+#include <set>
+#include <string>
+
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+#include <google/protobuf/message_lite.h>
+#include <sasl/sasl.h>
+
+#include "kudu/gutil/casts.h"
+#include "kudu/gutil/endian.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/strings/split.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/rpc/blocking_ops.h"
+#include "kudu/rpc/constants.h"
+#include "kudu/rpc/messenger.h"
+#include "kudu/rpc/serialization.h"
+#include "kudu/security/cert.h"
+#include "kudu/security/crypto.h"
+#include "kudu/security/init.h"
+#include "kudu/security/tls_context.h"
+#include "kudu/security/tls_handshake.h"
+#include "kudu/security/tls_socket.h"
+#include "kudu/security/token_verifier.h"
+#include "kudu/util/fault_injection.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/net/net_util.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/trace.h"
+
+using std::set;
+using std::string;
+using std::unique_ptr;
+
+// Fault injection flags.
+DEFINE_double(rpc_inject_invalid_authn_token_ratio, 0,
+ "If set higher than 0, AuthenticateByToken() randomly injects "
+ "errors replying with FATAL_INVALID_AUTHENTICATION_TOKEN code. "
+ "The flag's value corresponds to the probability of the fault "
+ "injection event. Used for only for tests.");
+TAG_FLAG(rpc_inject_invalid_authn_token_ratio, runtime);
+TAG_FLAG(rpc_inject_invalid_authn_token_ratio, unsafe);
+
+DECLARE_bool(rpc_encrypt_loopback_connections);
+
+DEFINE_string(trusted_subnets,
+ "127.0.0.0/8,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,169.254.0.0/16",
+ "A trusted subnet whitelist. If set explicitly, all unauthenticated "
+ "or unencrypted connections are prohibited except the ones from the "
+ "specified address blocks. Otherwise, private network (127.0.0.0/8, etc.) "
+ "and local subnets of all local network interfaces will be used. Set it "
+ "to '0.0.0.0/0' to allow unauthenticated/unencrypted connections from all "
+ "remote IP addresses. However, if network access is not otherwise restricted "
+ "by a firewall, malicious users may be able to gain unauthorized access.");
+TAG_FLAG(trusted_subnets, advanced);
+TAG_FLAG(trusted_subnets, evolving);
+
+static bool ValidateTrustedSubnets(const char* /*flagname*/, const string& value) {
+ if (value.empty()) {
+ return true;
+ }
+
+ for (const auto& t : strings::Split(value, ",", strings::SkipEmpty())) {
+ kudu::Network network;
+ kudu::Status s = network.ParseCIDRString(t.ToString());
+ if (!s.ok()) {
+ LOG(ERROR) << "Invalid subnet address: " << t
+ << ". Subnet must be specified in CIDR notation.";
+ return false;
+ }
+ }
+
+ return true;
+}
+
+DEFINE_validator(trusted_subnets, &ValidateTrustedSubnets);
+
+namespace kudu {
+namespace rpc {
+
+namespace {
+vector<Network>* g_trusted_subnets = nullptr;
+} // anonymous namespace
+
+static int ServerNegotiationGetoptCb(ServerNegotiation* server_negotiation,
+ const char* plugin_name,
+ const char* option,
+ const char** result,
+ unsigned* len) {
+ return server_negotiation->GetOptionCb(plugin_name, option, result, len);
+}
+
+static int ServerNegotiationPlainAuthCb(sasl_conn_t* conn,
+ ServerNegotiation* server_negotiation,
+ const char* user,
+ const char* pass,
+ unsigned passlen,
+ struct propctx* propctx) {
+ return server_negotiation->PlainAuthCb(conn, user, pass, passlen, propctx);
+}
+
+ServerNegotiation::ServerNegotiation(unique_ptr<Socket> socket,
+ const security::TlsContext* tls_context,
+ const security::TokenVerifier* token_verifier,
+ RpcEncryption encryption)
+ : socket_(std::move(socket)),
+ helper_(SaslHelper::SERVER),
+ tls_context_(tls_context),
+ encryption_(encryption),
+ tls_negotiated_(false),
+ token_verifier_(token_verifier),
+ negotiated_authn_(AuthenticationType::INVALID),
+ negotiated_mech_(SaslMechanism::INVALID),
+ deadline_(MonoTime::Max()) {
+ callbacks_.push_back(SaslBuildCallback(SASL_CB_GETOPT,
+ reinterpret_cast<int (*)()>(&ServerNegotiationGetoptCb), this));
+ callbacks_.push_back(SaslBuildCallback(SASL_CB_SERVER_USERDB_CHECKPASS,
+ reinterpret_cast<int (*)()>(&ServerNegotiationPlainAuthCb), this));
+ callbacks_.push_back(SaslBuildCallback(SASL_CB_LIST_END, nullptr, nullptr));
+}
+
+Status ServerNegotiation::EnablePlain() {
+ return helper_.EnablePlain();
+}
+
+Status ServerNegotiation::EnableGSSAPI() {
+ return helper_.EnableGSSAPI();
+}
+
+SaslMechanism::Type ServerNegotiation::negotiated_mechanism() const {
+ return negotiated_mech_;
+}
+
+void ServerNegotiation::set_server_fqdn(const string& domain_name) {
+ helper_.set_server_fqdn(domain_name);
+}
+
+void ServerNegotiation::set_deadline(const MonoTime& deadline) {
+ deadline_ = deadline;
+}
+
+Status ServerNegotiation::Negotiate() {
+ TRACE("Beginning negotiation");
+
+ // Wait until starting negotiation to check that the socket, tls_context, and
+ // token_verifier are not null, since they do not need to be set for
+ // PreflightCheckGSSAPI.
+ DCHECK(socket_);
+ DCHECK(tls_context_);
+ DCHECK(token_verifier_);
+
+ // Ensure we can use blocking calls on the socket during negotiation.
+ RETURN_NOT_OK(EnsureBlockingMode(socket_.get()));
+
+ faststring recv_buf;
+
+ // Step 1: Read the connection header.
+ RETURN_NOT_OK(ValidateConnectionHeader(&recv_buf));
+
+ { // Step 2: Receive and respond to the NEGOTIATE step message.
+ NegotiatePB request;
+ RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf));
+ RETURN_NOT_OK(HandleNegotiate(request));
+ TRACE("Negotiated authn=$0", AuthenticationTypeToString(negotiated_authn_));
+ }
+
+ // Step 3: if both ends support TLS, do a TLS handshake.
+ if (encryption_ != RpcEncryption::DISABLED &&
+ tls_context_->has_cert() &&
+ ContainsKey(client_features_, TLS)) {
+ RETURN_NOT_OK(tls_context_->InitiateHandshake(security::TlsHandshakeType::SERVER,
+ &tls_handshake_));
+
+ if (negotiated_authn_ != AuthenticationType::CERTIFICATE) {
+ // The server does not need to verify the client's certificate unless it's
+ // being used for authentication.
+ tls_handshake_.set_verification_mode(security::TlsVerificationMode::VERIFY_NONE);
+ }
+
+ while (true) {
+ NegotiatePB request;
+ RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf));
+ Status s = HandleTlsHandshake(request);
+ if (s.ok()) break;
+ if (!s.IsIncomplete()) return s;
+ }
+ tls_negotiated_ = true;
+ }
+
+ // Rejects any connection from public routable IPs if encryption
+ // is disabled. See KUDU-1875.
+ if (!tls_negotiated_) {
+ Sockaddr addr;
+ RETURN_NOT_OK(socket_->GetPeerAddress(&addr));
+
+ if (!IsTrustedConnection(addr)) {
+ // Receives client response before sending error
+ // message, even though the response is never used,
+ // to avoid risk condition that connection gets
+ // closed before client receives server's error
+ // message.
+ NegotiatePB request;
+ RETURN_NOT_OK(RecvNegotiatePB(&request, &recv_buf));
+
+ Status s = Status::NotAuthorized("unencrypted connections from publicly routable "
+ "IPs are prohibited. See --trusted_subnets flag "
+ "for more information.",
+ addr.ToString());
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+ }
+
+ // Step 4: Authentication
+ switch (negotiated_authn_) {
+ case AuthenticationType::SASL:
+ RETURN_NOT_OK(AuthenticateBySasl(&recv_buf));
+ break;
+ case AuthenticationType::TOKEN:
+ RETURN_NOT_OK(AuthenticateByToken(&recv_buf));
+ break;
+ case AuthenticationType::CERTIFICATE:
+ RETURN_NOT_OK(AuthenticateByCertificate());
+ break;
+ case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
+ }
+
+ // Step 5: Receive connection context.
+ RETURN_NOT_OK(RecvConnectionContext(&recv_buf));
+
+ TRACE("Negotiation successful");
+ return Status::OK();
+}
+
+Status ServerNegotiation::PreflightCheckGSSAPI() {
+ // TODO(todd): the error messages that come from this function on el6
+ // are relatively useless due to the following krb5 bug:
+ // http://krbdev.mit.edu/rt/Ticket/Display.html?id=6973
+ // This may not be useful anymore given the keytab login that happens
+ // in security/init.cc.
+
+ // Initialize a ServerNegotiation with a null socket, and enable
+ // only GSSAPI.
+ //
+ // We aren't going to actually send/receive any messages, but
+ // this makes it easier to reuse the initialization code.
+ ServerNegotiation server(nullptr, nullptr, nullptr, RpcEncryption::OPTIONAL);
+ Status s = server.EnableGSSAPI();
+ if (!s.ok()) {
+ return Status::RuntimeError(s.message());
+ }
+
+ RETURN_NOT_OK(server.InitSaslServer());
+
+ // Start the SASL server as if we were accepting a connection.
+ const char* server_out = nullptr; // ignored
+ uint32_t server_out_len = 0;
+ s = WrapSaslCall(server.sasl_conn_.get(), [&]() {
+ return sasl_server_start(
+ server.sasl_conn_.get(),
+ kSaslMechGSSAPI,
+ "", 0, // Pass a 0-length token.
+ &server_out, &server_out_len);
+ });
+
+ // We expect 'Incomplete' status to indicate that the first step of negotiation
+ // was correct.
+ if (s.IsIncomplete()) return Status::OK();
+
+ string err_msg = s.message().ToString();
+ if (err_msg == "Permission denied") {
+ // For bad keytab permissions, we get a rather vague message. So,
+ // we make it more specific for better usability.
+ err_msg = "error accessing keytab: " + err_msg;
+ }
+ return Status::RuntimeError(err_msg);
+}
+
+Status ServerNegotiation::RecvNegotiatePB(NegotiatePB* msg, faststring* recv_buf) {
+ RequestHeader header;
+ Slice param_buf;
+ RETURN_NOT_OK(ReceiveFramedMessageBlocking(socket(), recv_buf, &header, ¶m_buf, deadline_));
+ Status s = helper_.CheckNegotiateCallId(header.call_id());
+ if (!s.ok()) {
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_RPC_HEADER, s));
+ return s;
+ }
+
+ s = helper_.ParseNegotiatePB(param_buf, msg);
+ if (!s.ok()) {
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_DESERIALIZING_REQUEST, s));
+ return s;
+ }
+
+ TRACE("Received $0 NegotiatePB request", NegotiatePB::NegotiateStep_Name(msg->step()));
+ return Status::OK();
+}
+
+Status ServerNegotiation::SendNegotiatePB(const NegotiatePB& msg) {
+ ResponseHeader header;
+ header.set_call_id(kNegotiateCallId);
+
+ DCHECK(socket_);
+ DCHECK(msg.IsInitialized()) << "message must be initialized";
+ DCHECK(msg.has_step()) << "message must have a step";
+
+ TRACE("Sending $0 NegotiatePB response", NegotiatePB::NegotiateStep_Name(msg.step()));
+ return SendFramedMessageBlocking(socket(), header, msg, deadline_);
+}
+
+Status ServerNegotiation::SendError(ErrorStatusPB::RpcErrorCodePB code, const Status& err) {
+ DCHECK(!err.ok());
+
+ // Create header with negotiation-specific callId
+ ResponseHeader header;
+ header.set_call_id(kNegotiateCallId);
+ header.set_is_error(true);
+
+ // Get RPC error code from Status object
+ ErrorStatusPB msg;
+ msg.set_code(code);
+ msg.set_message(err.ToString());
+
+ TRACE("Sending RPC error: $0: $1", ErrorStatusPB::RpcErrorCodePB_Name(code), err.ToString());
+ RETURN_NOT_OK(SendFramedMessageBlocking(socket(), header, msg, deadline_));
+
+ return Status::OK();
+}
+
+Status ServerNegotiation::ValidateConnectionHeader(faststring* recv_buf) {
+ TRACE("Waiting for connection header");
+ size_t num_read;
+ const size_t conn_header_len = kMagicNumberLength + kHeaderFlagsLength;
+ recv_buf->resize(conn_header_len);
+ RETURN_NOT_OK(socket_->BlockingRecv(recv_buf->data(), conn_header_len, &num_read, deadline_));
+ DCHECK_EQ(conn_header_len, num_read);
+
+ RETURN_NOT_OK(serialization::ValidateConnHeader(*recv_buf));
+ TRACE("Connection header received");
+ return Status::OK();
+}
+
+// calls sasl_server_init() and sasl_server_new()
+Status ServerNegotiation::InitSaslServer() {
+ RETURN_NOT_OK(SaslInit());
+
+ // TODO(unknown): Support security flags.
+ unsigned secflags = 0;
+
+ sasl_conn_t* sasl_conn = nullptr;
+ RETURN_NOT_OK_PREPEND(WrapSaslCall(nullptr /* no conn */, [&]() {
+ return sasl_server_new(
+ // Registered name of the service using SASL. Required.
+ kSaslProtoName,
+ // The fully qualified domain name of this server.
+ helper_.server_fqdn(),
+ // Permits multiple user realms on server. NULL == use default.
+ nullptr,
+ // Local and remote IP address strings. We don't use any mechanisms
+ // which need these.
+ nullptr,
+ nullptr,
+ // Connection-specific callbacks.
+ &callbacks_[0],
+ // Security flags.
+ secflags,
+ &sasl_conn);
+ }), "Unable to create new SASL server");
+ sasl_conn_.reset(sasl_conn);
+ return Status::OK();
+}
+
+Status ServerNegotiation::HandleNegotiate(const NegotiatePB& request) {
+ if (request.step() != NegotiatePB::NEGOTIATE) {
+ Status s = Status::NotAuthorized("expected NEGOTIATE step",
+ NegotiatePB::NegotiateStep_Name(request.step()));
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+ TRACE("Received NEGOTIATE request from client");
+
+ // Fill in the set of features supported by the client.
+ for (int flag : request.supported_features()) {
+ // We only add the features that our local build knows about.
+ RpcFeatureFlag feature_flag = RpcFeatureFlag_IsValid(flag) ?
+ static_cast<RpcFeatureFlag>(flag) : UNKNOWN;
+ if (feature_flag != UNKNOWN) {
+ client_features_.insert(feature_flag);
+ }
+ }
+
+ if (encryption_ == RpcEncryption::REQUIRED &&
+ !ContainsKey(client_features_, RpcFeatureFlag::TLS)) {
+ Status s = Status::NotAuthorized("client does not support required TLS encryption");
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+
+ // Find the set of mutually supported authentication types.
+ set<AuthenticationType> authn_types;
+ if (request.authn_types().empty()) {
+ // If the client doesn't send any support authentication types, we assume
+ // support for SASL. This preserves backwards compatibility with clients who
+ // don't support security features.
+ authn_types.insert(AuthenticationType::SASL);
+ } else {
+ for (const auto& type : request.authn_types()) {
+ switch (type.type_case()) {
+ case AuthenticationTypePB::kSasl:
+ authn_types.insert(AuthenticationType::SASL);
+ break;
+ case AuthenticationTypePB::kToken:
+ authn_types.insert(AuthenticationType::TOKEN);
+ break;
+ case AuthenticationTypePB::kCertificate:
+ // We only provide authenticated TLS if the certificates are generated
+ // by the internal CA.
+ if (!tls_context_->is_external_cert()) {
+ authn_types.insert(AuthenticationType::CERTIFICATE);
+ }
+ break;
+ case AuthenticationTypePB::TYPE_NOT_SET: {
+ Sockaddr addr;
+ RETURN_NOT_OK(socket_->GetPeerAddress(&addr));
+ KLOG_EVERY_N_SECS(WARNING, 60)
+ << "client supports unknown authentication type, consider updating server, address: "
+ << addr.ToString();
+ break;
+ }
+ }
+ }
+
+ if (authn_types.empty()) {
+ Status s = Status::NotSupported("no mutually supported authentication types");
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+ }
+
+ if (encryption_ != RpcEncryption::DISABLED &&
+ ContainsKey(authn_types, AuthenticationType::CERTIFICATE) &&
+ tls_context_->has_signed_cert()) {
+ // If the client supports it and we are locally configured with TLS and have
+ // a CA-signed cert, choose cert authn.
+ // TODO(KUDU-1924): consider adding the fingerprint of the CA cert which signed
+ // the client's cert to the authentication message.
+ negotiated_authn_ = AuthenticationType::CERTIFICATE;
+ } else if (ContainsKey(authn_types, AuthenticationType::TOKEN) &&
+ token_verifier_->GetMaxKnownKeySequenceNumber() >= 0 &&
+ encryption_ != RpcEncryption::DISABLED &&
+ tls_context_->has_signed_cert()) {
+ // If the client supports it, we have a TSK to verify the client's token,
+ // and we have a signed-cert so the client can verify us, choose token authn.
+ // TODO(KUDU-1924): consider adding the TSK sequence number to the authentication
+ // message.
+ negotiated_authn_ = AuthenticationType::TOKEN;
+ } else {
+ // Otherwise we always can fallback to SASL.
+ DCHECK(ContainsKey(authn_types, AuthenticationType::SASL));
+ negotiated_authn_ = AuthenticationType::SASL;
+ }
+
+ // Fill in the NEGOTIATE step response for the client.
+ NegotiatePB response;
+ response.set_step(NegotiatePB::NEGOTIATE);
+
+ // Tell the client which features we support.
+ server_features_ = kSupportedServerRpcFeatureFlags;
+ if (tls_context_->has_cert() && encryption_ != RpcEncryption::DISABLED) {
+ server_features_.insert(TLS);
+ // If the remote peer is local, then we allow using TLS for authentication
+ // without encryption or integrity.
+ if (socket_->IsLoopbackConnection() && !FLAGS_rpc_encrypt_loopback_connections) {
+ server_features_.insert(TLS_AUTHENTICATION_ONLY);
+ }
+ }
+
+ for (RpcFeatureFlag feature : server_features_) {
+ response.add_supported_features(feature);
+ }
+
+ switch (negotiated_authn_) {
+ case AuthenticationType::CERTIFICATE:
+ response.add_authn_types()->mutable_certificate();
+ break;
+ case AuthenticationType::TOKEN:
+ response.add_authn_types()->mutable_token();
+ break;
+ case AuthenticationType::SASL: {
+ response.add_authn_types()->mutable_sasl();
+ const set<SaslMechanism::Type>& server_mechs = helper_.EnabledMechs();
+ if (PREDICT_FALSE(server_mechs.empty())) {
+ // This will happen if no mechanisms are enabled before calling Init()
+ Status s = Status::NotAuthorized("SASL server mechanism list is empty!");
+ LOG(ERROR) << s.ToString();
+ TRACE("Sending FATAL_UNAUTHORIZED response to client");
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+
+ for (auto mechanism : server_mechs) {
+ response.add_sasl_mechanisms()->set_mechanism(SaslMechanism::name_of(mechanism));
+ }
+ break;
+ }
+ case AuthenticationType::INVALID: LOG(FATAL) << "unreachable";
+ }
+
+ return SendNegotiatePB(response);
+}
+
+Status ServerNegotiation::HandleTlsHandshake(const NegotiatePB& request) {
+ if (PREDICT_FALSE(request.step() != NegotiatePB::TLS_HANDSHAKE)) {
+ Status s = Status::NotAuthorized("expected TLS_HANDSHAKE step",
+ NegotiatePB::NegotiateStep_Name(request.step()));
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+
+ if (PREDICT_FALSE(!request.has_tls_handshake())) {
+ Status s = Status::NotAuthorized(
+ "No TLS handshake token in TLS_HANDSHAKE request from client");
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+
+ string token;
+ Status s = tls_handshake_.Continue(request.tls_handshake(), &token);
+
+ if (PREDICT_FALSE(!s.IsIncomplete() && !s.ok())) {
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+
+ // Regardless of whether this is the final handshake roundtrip (in which case
+ // Continue would have returned OK), we still need to return a response.
+ RETURN_NOT_OK(SendTlsHandshake(std::move(token)));
+ RETURN_NOT_OK(s);
+
+ // TLS handshake is finished.
+ if (ContainsKey(server_features_, TLS_AUTHENTICATION_ONLY) &&
+ ContainsKey(client_features_, TLS_AUTHENTICATION_ONLY)) {
+ TRACE("Negotiated auth-only $0 with cipher suite $1",
+ tls_handshake_.GetProtocol(), tls_handshake_.GetCipherSuite());
+ return tls_handshake_.FinishNoWrap(*socket_);
+ }
+
+ TRACE("Negotiated $0 with cipher suite $1",
+ tls_handshake_.GetProtocol(), tls_handshake_.GetCipherSuite());
+ return tls_handshake_.Finish(&socket_);
+}
+
+Status ServerNegotiation::SendTlsHandshake(string tls_token) {
+ NegotiatePB msg;
+ msg.set_step(NegotiatePB::TLS_HANDSHAKE);
+ msg.mutable_tls_handshake()->swap(tls_token);
+ return SendNegotiatePB(msg);
+}
+
+Status ServerNegotiation::AuthenticateBySasl(faststring* recv_buf) {
+ RETURN_NOT_OK(InitSaslServer());
+
+ NegotiatePB request;
+ RETURN_NOT_OK(RecvNegotiatePB(&request, recv_buf));
+ Status s = HandleSaslInitiate(request);
+
+ while (s.IsIncomplete()) {
+ RETURN_NOT_OK(RecvNegotiatePB(&request, recv_buf));
+ s = HandleSaslResponse(request);
+ }
+ RETURN_NOT_OK(s);
+
+ const char* c_username = nullptr;
+ int rc = sasl_getprop(sasl_conn_.get(), SASL_USERNAME,
+ reinterpret_cast<const void**>(&c_username));
+ // We expect that SASL_USERNAME will always get set.
+ CHECK(rc == SASL_OK && c_username != nullptr) << "No username on authenticated connection";
+ if (negotiated_mech_ == SaslMechanism::GSSAPI) {
+ // The SASL library doesn't include the user's realm in the username if it's the
+ // same realm as the default realm of the server. So, we pass it back through the
+ // Kerberos library to add back the realm if necessary.
+ string principal = c_username;
+ RETURN_NOT_OK_PREPEND(security::CanonicalizeKrb5Principal(&principal),
+ "could not canonicalize krb5 principal");
+
+ // Map the principal to the corresponding local username. For example, admins
+ // can set up mappings so that joe@REMOTEREALM becomes something like 'remote-joe'
+ // locally for the purposes of group mapping, ACLs, etc.
+ string local_name;
+ RETURN_NOT_OK_PREPEND(security::MapPrincipalToLocalName(principal, &local_name),
+ strings::Substitute("could not map krb5 principal '$0' to username",
+ principal));
+ authenticated_user_.SetAuthenticatedByKerberos(std::move(local_name), std::move(principal));
+ } else {
+ authenticated_user_.SetUnauthenticated(c_username);
+ }
+ return Status::OK();
+}
+
+Status ServerNegotiation::AuthenticateByToken(faststring* recv_buf) {
+ // Sanity check that TLS has been negotiated. Receiving the token on an
+ // unencrypted channel is a big no-no.
+ CHECK(tls_negotiated_);
+
+ // Receive the token from the client.
+ NegotiatePB pb;
+ RETURN_NOT_OK(RecvNegotiatePB(&pb, recv_buf));
+
+ if (pb.step() != NegotiatePB::TOKEN_EXCHANGE) {
+ Status s = Status::NotAuthorized("expected TOKEN_EXCHANGE step",
+ NegotiatePB::NegotiateStep_Name(pb.step()));
+ }
+ if (!pb.has_authn_token()) {
+ Status s = Status::NotAuthorized("TOKEN_EXCHANGE message must include an authentication token");
+ }
+
+ // TODO(KUDU-1924): propagate the specific token verification failure back to the client,
+ // so it knows how to intelligently retry.
+ security::TokenPB token;
+ auto verification_result = token_verifier_->VerifyTokenSignature(pb.authn_token(), &token);
+ switch (verification_result) {
+ case security::VerificationResult::VALID: break;
+
+ case security::VerificationResult::INVALID_TOKEN:
+ case security::VerificationResult::INVALID_SIGNATURE:
+ case security::VerificationResult::EXPIRED_TOKEN:
+ case security::VerificationResult::EXPIRED_SIGNING_KEY: {
+ // These errors indicate the client should get a new token and try again.
+ Status s = Status::NotAuthorized(VerificationResultToString(verification_result));
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_AUTHENTICATION_TOKEN, s));
+ return s;
+ }
+
+ case security::VerificationResult::UNKNOWN_SIGNING_KEY: {
+ // The server doesn't recognize the signing key. This indicates that the
+ // server has not been updated with the most recent TSKs, so tell the
+ // client to try again later.
+ Status s = Status::NotAuthorized(VerificationResultToString(verification_result));
+ RETURN_NOT_OK(SendError(ErrorStatusPB::ERROR_UNAVAILABLE, s));
+ return s;
+ }
+ case security::VerificationResult::INCOMPATIBLE_FEATURE: {
+ Status s = Status::NotAuthorized(VerificationResultToString(verification_result));
+ // These error types aren't recoverable by having the client get a new token.
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+ }
+
+ if (!token.has_authn()) {
+ Status s = Status::NotAuthorized("non-authentication token presented for authentication");
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+ if (!token.authn().has_username()) {
+ // This is a runtime error because there should be no way a client could
+ // get a signed authn token without a subject.
+ Status s = Status::RuntimeError("authentication token has no username");
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_AUTHENTICATION_TOKEN, s));
+ return s;
+ }
+
+ if (PREDICT_FALSE(FLAGS_rpc_inject_invalid_authn_token_ratio > 0)) {
+ security::VerificationResult res;
+ int sel = rand() % 4;
+ switch (sel) {
+ case 0:
+ res = security::VerificationResult::INVALID_TOKEN;
+ break;
+ case 1:
+ res = security::VerificationResult::INVALID_SIGNATURE;
+ break;
+ case 2:
+ res = security::VerificationResult::EXPIRED_TOKEN;
+ break;
+ case 3:
+ res = security::VerificationResult::EXPIRED_SIGNING_KEY;
+ break;
+ }
+ const Status s = kudu::fault_injection::MaybeReturnFailure(
+ FLAGS_rpc_inject_invalid_authn_token_ratio,
+ Status::NotAuthorized(VerificationResultToString(res)));
+ if (!s.ok()) {
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_AUTHENTICATION_TOKEN, s));
+ return s;
+ }
+ }
+
+ authenticated_user_.SetAuthenticatedByToken(token.authn().username());
+
+ // Respond with success message.
+ pb.Clear();
+ pb.set_step(NegotiatePB::TOKEN_EXCHANGE);
+ return SendNegotiatePB(pb);
+}
+
+Status ServerNegotiation::AuthenticateByCertificate() {
+ // Sanity check that TLS has been negotiated. Cert-based authentication is
+ // only possible with TLS.
+ CHECK(tls_negotiated_);
+
+ // Grab the subject from the client's cert.
+ security::Cert cert;
+ RETURN_NOT_OK(tls_handshake_.GetRemoteCert(&cert));
+
+ boost::optional<string> user_id = cert.UserId();
+ boost::optional<string> principal = cert.KuduKerberosPrincipal();
+
+ if (!user_id) {
+ Status s = Status::NotAuthorized("did not find expected X509 userId extension in cert");
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_INVALID_AUTHENTICATION_TOKEN, s));
+ return s;
+ }
+
+ authenticated_user_.SetAuthenticatedByClientCert(*user_id, std::move(principal));
+
+ return Status::OK();
+}
+
+Status ServerNegotiation::HandleSaslInitiate(const NegotiatePB& request) {
+ if (PREDICT_FALSE(request.step() != NegotiatePB::SASL_INITIATE)) {
+ Status s = Status::NotAuthorized("expected SASL_INITIATE step",
+ NegotiatePB::NegotiateStep_Name(request.step()));
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+ TRACE("Received SASL_INITIATE request from client");
+
+ if (request.sasl_mechanisms_size() != 1) {
+ Status s = Status::NotAuthorized(
+ "SASL_INITIATE request must include exactly one SASL mechanism, found",
+ std::to_string(request.sasl_mechanisms_size()));
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+
+ const string& mechanism = request.sasl_mechanisms(0).mechanism();
+ TRACE("Client requested to use mechanism: $0", mechanism);
+
+ negotiated_mech_ = SaslMechanism::value_of(mechanism);
+
+ // Rejects any connection from public routable IPs if authentication mechanism
+ // is plain. See KUDU-1875.
+ if (negotiated_mech_ == SaslMechanism::PLAIN) {
+ Sockaddr addr;
+ RETURN_NOT_OK(socket_->GetPeerAddress(&addr));
+
+ if (!IsTrustedConnection(addr)) {
+ Status s = Status::NotAuthorized("unauthenticated connections from publicly "
+ "routable IPs are prohibited. See "
+ "--trusted_subnets flag for more information.",
+ addr.ToString());
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+ }
+
+ // If the negotiated mechanism is GSSAPI (Kerberos), configure SASL to use
+ // integrity protection so that the channel bindings and nonce can be
+ // verified.
+ if (negotiated_mech_ == SaslMechanism::GSSAPI) {
+ RETURN_NOT_OK(EnableIntegrityProtection(sasl_conn_.get()));
+ }
+
+ const char* server_out = nullptr;
+ uint32_t server_out_len = 0;
+ TRACE("Calling sasl_server_start()");
+
+ Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
+ return sasl_server_start(
+ sasl_conn_.get(), // The SASL connection context created by init()
+ mechanism.c_str(), // The mechanism requested by the client.
+ request.token().c_str(), // Optional string the client gave us.
+ request.token().length(), // Client string len.
+ &server_out, // The output of the SASL library, might not be NULL terminated
+ &server_out_len); // Output len.
+ });
+
+ if (PREDICT_FALSE(!s.ok() && !s.IsIncomplete())) {
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+
+ // We have a valid mechanism match
+ if (s.ok()) {
+ DCHECK(server_out_len == 0);
+ RETURN_NOT_OK(SendSaslSuccess());
+ } else { // s.IsIncomplete() (equivalent to SASL_CONTINUE)
+ RETURN_NOT_OK(SendSaslChallenge(server_out, server_out_len));
+ }
+ return s;
+}
+
+Status ServerNegotiation::HandleSaslResponse(const NegotiatePB& request) {
+ if (PREDICT_FALSE(request.step() != NegotiatePB::SASL_RESPONSE)) {
+ Status s = Status::NotAuthorized("expected SASL_RESPONSE step",
+ NegotiatePB::NegotiateStep_Name(request.step()));
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+ TRACE("Received SASL_RESPONSE request from client");
+
+ if (!request.has_token()) {
+ Status s = Status::NotAuthorized("no token in SASL_RESPONSE from client");
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+ }
+
+ const char* server_out = nullptr;
+ uint32_t server_out_len = 0;
+ TRACE("Calling sasl_server_step()");
+ Status s = WrapSaslCall(sasl_conn_.get(), [&]() {
+ return sasl_server_step(
+ sasl_conn_.get(), // The SASL connection context created by init()
+ request.token().c_str(), // Optional string the client gave us
+ request.token().length(), // Client string len
+ &server_out, // The output of the SASL library, might not be NULL terminated
+ &server_out_len); // Output len
+ });
+
+ if (s.ok()) {
+ DCHECK(server_out_len == 0);
+ return SendSaslSuccess();
+ }
+ if (s.IsIncomplete()) {
+ return SendSaslChallenge(server_out, server_out_len);
+ }
+ RETURN_NOT_OK(SendError(ErrorStatusPB::FATAL_UNAUTHORIZED, s));
+ return s;
+}
+
+Status ServerNegotiation::SendSaslChallenge(const char* challenge, unsigned clen) {
+ NegotiatePB response;
+ response.set_step(NegotiatePB::SASL_CHALLENGE);
+ response.mutable_token()->assign(challenge, clen);
+ RETURN_NOT_OK(SendNegotiatePB(response));
+ return Status::Incomplete("");
+}
+
+Status ServerNegotiation::SendSaslSuccess() {
+ NegotiatePB response;
+ response.set_step(NegotiatePB::SASL_SUCCESS);
+
+ if (negotiated_mech_ == SaslMechanism::GSSAPI) {
+ // Send a nonce to the client.
+ nonce_ = string();
+ RETURN_NOT_OK(security::GenerateNonce(nonce_.get_ptr()));
+ response.set_nonce(*nonce_);
+
+ if (tls_negotiated_) {
+ // Send the channel bindings to the client.
+ security::Cert cert;
+ RETURN_NOT_OK(tls_handshake_.GetLocalCert(&cert));
+
+ string plaintext_channel_bindings;
+ RETURN_NOT_OK(cert.GetServerEndPointChannelBindings(&plaintext_channel_bindings));
+ RETURN_NOT_OK(SaslEncode(sasl_conn_.get(),
+ plaintext_channel_bindings,
+ response.mutable_channel_bindings()));
+ }
+ }
+
+ RETURN_NOT_OK(SendNegotiatePB(response));
+ return Status::OK();
+}
+
+Status ServerNegotiation::RecvConnectionContext(faststring* recv_buf) {
+ TRACE("Waiting for connection context");
+ RequestHeader header;
+ Slice param_buf;
+ RETURN_NOT_OK(ReceiveFramedMessageBlocking(socket(), recv_buf, &header, ¶m_buf, deadline_));
+ DCHECK(header.IsInitialized());
+
+ if (header.call_id() != kConnectionContextCallId) {
+ return Status::NotAuthorized("expected ConnectionContext callid, received",
+ std::to_string(header.call_id()));
+ }
+
+ ConnectionContextPB conn_context;
+ if (!conn_context.ParseFromArray(param_buf.data(), param_buf.size())) {
+ return Status::NotAuthorized("invalid ConnectionContextPB message, missing fields",
+ conn_context.InitializationErrorString());
+ }
+
+ if (nonce_) {
+ Status s;
+ // Validate that the client returned the correct SASL protected nonce.
+ if (!conn_context.has_encoded_nonce()) {
+ return Status::NotAuthorized("ConnectionContextPB wrapped nonce missing");
+ }
+
+ string decoded_nonce;
+ s = SaslDecode(sasl_conn_.get(), conn_context.encoded_nonce(), &decoded_nonce);
+ if (!s.ok()) {
+ return Status::NotAuthorized("failed to decode nonce", s.message());
+ }
+
+ if (*nonce_ != decoded_nonce) {
+ Sockaddr addr;
+ RETURN_NOT_OK(socket_->GetPeerAddress(&addr));
+ LOG(WARNING) << "Received an invalid connection nonce from client "
+ << addr.ToString()
+ << ", this could indicate a replay attack";
+ return Status::NotAuthorized("nonce mismatch");
+ }
+ }
+
+ return Status::OK();
+}
+
+int ServerNegotiation::GetOptionCb(const char* plugin_name,
+ const char* option,
+ const char** result,
+ unsigned* len) {
+ return helper_.GetOptionCb(plugin_name, option, result, len);
+}
+
+int ServerNegotiation::PlainAuthCb(sasl_conn_t* /*conn*/,
+ const char* /*user*/,
+ const char* /*pass*/,
+ unsigned /*passlen*/,
+ struct propctx* /*propctx*/) {
+ TRACE("Received PLAIN auth.");
+ if (PREDICT_FALSE(!helper_.IsPlainEnabled())) {
+ LOG(DFATAL) << "Password authentication callback called while PLAIN auth disabled";
+ return SASL_BADPARAM;
+ }
+ // We always allow PLAIN authentication to succeed.
+ return SASL_OK;
+}
+
+bool ServerNegotiation::IsTrustedConnection(const Sockaddr& addr) {
+ static std::once_flag once;
+ std::call_once(once, [] {
+ g_trusted_subnets = new vector<Network>();
+ CHECK_OK(Network::ParseCIDRStrings(FLAGS_trusted_subnets, g_trusted_subnets));
+
+ // If --trusted_subnets is not set explicitly, local subnets of all local network
+ // interfaces as well as the default private subnets will be used.
+ if (google::GetCommandLineFlagInfoOrDie("trusted_subnets").is_default) {
+ std::vector<Network> local_networks;
+ WARN_NOT_OK(GetLocalNetworks(&local_networks),
+ "Unable to get local networks.");
+
+ g_trusted_subnets->insert(g_trusted_subnets->end(),
+ local_networks.begin(),
+ local_networks.end());
+ }
+ });
+
+ return std::any_of(g_trusted_subnets->begin(), g_trusted_subnets->end(),
+ [&](const Network& t) { return t.WithinNetwork(addr); });
+}
+
+} // namespace rpc
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7db60aa/be/src/kudu/rpc/server_negotiation.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/server_negotiation.h b/be/src/kudu/rpc/server_negotiation.h
new file mode 100644
index 0000000..e9e945a
--- /dev/null
+++ b/be/src/kudu/rpc/server_negotiation.h
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <set>
+#include <string>
+#include <vector>
+
+#include <sasl/sasl.h>
+
+#include "kudu/rpc/negotiation.h"
+#include "kudu/rpc/remote_user.h"
+#include "kudu/rpc/rpc_header.pb.h"
+#include "kudu/rpc/sasl_common.h"
+#include "kudu/rpc/sasl_helper.h"
+#include "kudu/security/tls_handshake.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Slice;
+
+namespace security {
+class TlsContext;
+class TokenVerifier;
+}
+
+namespace rpc {
+
+// Class for doing KRPC negotiation with a remote client over a bidirectional socket.
+// Operations on this class are NOT thread-safe.
+class ServerNegotiation {
+ public:
+ // Creates a new server negotiation instance, taking ownership of the
+ // provided socket. After completing the negotiation process by setting the
+ // desired options and calling Negotiate((), the socket can be retrieved with
+ // release_socket().
+ //
+ // The provided TlsContext must outlive this negotiation instance.
+ ServerNegotiation(std::unique_ptr<Socket> socket,
+ const security::TlsContext* tls_context,
+ const security::TokenVerifier* token_verifier,
+ RpcEncryption encryption);
+
+ // Enable PLAIN authentication.
+ // Despite PLAIN authentication taking a username and password, we disregard
+ // the password and use this as a "unauthenticated" mode.
+ // Must be called before Negotiate().
+ Status EnablePlain();
+
+ // Enable GSSAPI (Kerberos) authentication.
+ // Must be called before Negotiate().
+ Status EnableGSSAPI();
+
+ // Returns mechanism negotiated by this connection.
+ // Must be called after Negotiate().
+ SaslMechanism::Type negotiated_mechanism() const;
+
+ // Returns the negotiated authentication type for the connection.
+ // Must be called after Negotiate().
+ AuthenticationType negotiated_authn() const {
+ DCHECK_NE(negotiated_authn_, AuthenticationType::INVALID);
+ return negotiated_authn_;
+ }
+
+ // Returns true if TLS was negotiated.
+ // Must be called after Negotiate().
+ bool tls_negotiated() const {
+ return tls_negotiated_;
+ }
+
+ // Returns the set of RPC system features supported by the remote client.
+ // Must be called after Negotiate().
+ std::set<RpcFeatureFlag> client_features() const {
+ return client_features_;
+ }
+
+ // Returns the set of RPC system features supported by the remote client.
+ // Must be called after Negotiate().
+ // Subsequent calls to this method or client_features() will return an empty set.
+ std::set<RpcFeatureFlag> take_client_features() {
+ return std::move(client_features_);
+ }
+
+ // Name of the user that was authenticated.
+ // Must be called after a successful Negotiate().
+ //
+ // Subsequent calls will return bogus data.
+ RemoteUser take_authenticated_user() {
+ return std::move(authenticated_user_);
+ }
+
+ // Specify the fully-qualified domain name of the remote server.
+ // Must be called before Negotiate(). Required for some mechanisms.
+ void set_server_fqdn(const std::string& domain_name);
+
+ // Set deadline for connection negotiation.
+ void set_deadline(const MonoTime& deadline);
+
+ Socket* socket() const { return socket_.get(); }
+
+ // Returns the socket owned by this server negotiation. The caller will own
+ // the socket after this call, and the negotiation instance should no longer
+ // be used. Must be called after Negotiate().
+ std::unique_ptr<Socket> release_socket() { return std::move(socket_); }
+
+ // Negotiate with the remote client. Should only be called once per
+ // ServerNegotiation and socket instance, after all options have been set.
+ //
+ // Returns OK on success, otherwise may return NotAuthorized, NotSupported, or
+ // another non-OK status.
+ Status Negotiate() WARN_UNUSED_RESULT;
+
+ // SASL callback for plugin options, supported mechanisms, etc.
+ // Returns SASL_FAIL if the option is not handled, which does not fail the handshake.
+ int GetOptionCb(const char* plugin_name, const char* option,
+ const char** result, unsigned* len);
+
+ // SASL callback for PLAIN authentication via SASL_CB_SERVER_USERDB_CHECKPASS.
+ int PlainAuthCb(sasl_conn_t* conn, const char* user, const char* pass,
+ unsigned passlen, struct propctx* propctx);
+
+ // Perform a "pre-flight check" that everything required to act as a Kerberos
+ // server is properly set up.
+ static Status PreflightCheckGSSAPI() WARN_UNUSED_RESULT;
+
+ private:
+
+ // Parse a negotiate request from the client, deserializing it into 'msg'.
+ // If the request is malformed, sends an error message to the client.
+ Status RecvNegotiatePB(NegotiatePB* msg, faststring* recv_buf) WARN_UNUSED_RESULT;
+
+ // Encode and send the specified negotiate response message to the server.
+ Status SendNegotiatePB(const NegotiatePB& msg) WARN_UNUSED_RESULT;
+
+ // Encode and send the specified RPC error message to the client.
+ // Calls Status.ToString() for the embedded error message.
+ Status SendError(ErrorStatusPB::RpcErrorCodePB code, const Status& err) WARN_UNUSED_RESULT;
+
+ // Parse and validate connection header.
+ Status ValidateConnectionHeader(faststring* recv_buf) WARN_UNUSED_RESULT;
+
+ // Initialize the SASL server negotiation instance.
+ Status InitSaslServer() WARN_UNUSED_RESULT;
+
+ // Handle case when client sends NEGOTIATE request. Builds the set of
+ // client-supported RPC features, determines a mutually supported
+ // authentication type to use for the connection, and sends a NEGOTIATE
+ // response.
+ Status HandleNegotiate(const NegotiatePB& request) WARN_UNUSED_RESULT;
+
+ // Handle a TLS_HANDSHAKE request message from the server.
+ Status HandleTlsHandshake(const NegotiatePB& request) WARN_UNUSED_RESULT;
+
+ // Send a TLS_HANDSHAKE response message to the server with the provided token.
+ Status SendTlsHandshake(std::string tls_token) WARN_UNUSED_RESULT;
+
+ // Authenticate the client using SASL. Populates the 'authenticated_user_'
+ // field with the SASL principal.
+ // 'recv_buf' allows a receive buffer to be reused.
+ Status AuthenticateBySasl(faststring* recv_buf) WARN_UNUSED_RESULT;
+
+ // Authenticate the client using a token. Populates the
+ // 'authenticated_user_' field with the token's principal.
+ // 'recv_buf' allows a receive buffer to be reused.
+ Status AuthenticateByToken(faststring* recv_buf) WARN_UNUSED_RESULT;
+
+ // Authenticate the client using the client's TLS certificate. Populates the
+ // 'authenticated_user_' field with the certificate's subject.
+ Status AuthenticateByCertificate() WARN_UNUSED_RESULT;
+
+ // Handle case when client sends SASL_INITIATE request.
+ // Returns Status::OK if the SASL negotiation is complete, or
+ // Status::Incomplete if a SASL_RESPONSE step is expected.
+ Status HandleSaslInitiate(const NegotiatePB& request) WARN_UNUSED_RESULT;
+
+ // Handle case when client sends SASL_RESPONSE request.
+ Status HandleSaslResponse(const NegotiatePB& request) WARN_UNUSED_RESULT;
+
+ // Send a SASL_CHALLENGE response to the client with a challenge token.
+ Status SendSaslChallenge(const char* challenge, unsigned clen) WARN_UNUSED_RESULT;
+
+ // Send a SASL_SUCCESS response to the client.
+ Status SendSaslSuccess() WARN_UNUSED_RESULT;
+
+ // Receive and validate the ConnectionContextPB.
+ Status RecvConnectionContext(faststring* recv_buf) WARN_UNUSED_RESULT;
+
+ // Returns true if connection is from trusted subnets or local networks.
+ bool IsTrustedConnection(const Sockaddr& addr);
+
+ // The socket to the remote client.
+ std::unique_ptr<Socket> socket_;
+
+ // SASL state.
+ std::vector<sasl_callback_t> callbacks_;
+ std::unique_ptr<sasl_conn_t, SaslDeleter> sasl_conn_;
+ SaslHelper helper_;
+ boost::optional<std::string> nonce_;
+
+ // TLS state.
+ const security::TlsContext* tls_context_;
+ security::TlsHandshake tls_handshake_;
+ const RpcEncryption encryption_;
+ bool tls_negotiated_;
+
+ // TSK state.
+ const security::TokenVerifier* token_verifier_;
+
+ // The set of features supported by the client and server. Filled in during negotiation.
+ std::set<RpcFeatureFlag> client_features_;
+ std::set<RpcFeatureFlag> server_features_;
+
+ // The successfully-authenticated user, if applicable. Filled in during
+ // negotiation.
+ RemoteUser authenticated_user_;
+
+ // The authentication type. Filled in during negotiation.
+ AuthenticationType negotiated_authn_;
+
+ // The SASL mechanism. Filled in during negotiation if the negotiated
+ // authentication type is SASL.
+ SaslMechanism::Type negotiated_mech_;
+
+ // Negotiation timeout deadline.
+ MonoTime deadline_;
+};
+
+} // namespace rpc
+} // namespace kudu