You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2013/09/26 00:58:04 UTC

git commit: Added SASL authentication support.

Updated Branches:
  refs/heads/master 766bc68a6 -> 616973da1


Added SASL authentication support.

Review: https://reviews.apache.org/r/14310


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/616973da
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/616973da
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/616973da

Branch: refs/heads/master
Commit: 616973da105f70f93cd04fe97c0477fac0950fda
Parents: 766bc68
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Sep 19 16:30:45 2013 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Sep 25 15:57:48 2013 -0700

----------------------------------------------------------------------
 configure.ac                |  11 ++
 include/mesos/mesos.proto   |   9 +
 src/Makefile.am             |   5 +
 src/messages/messages.proto |  31 +++
 src/sasl/authenticatee.hpp  | 381 ++++++++++++++++++++++++++++++++++++
 src/sasl/authenticator.hpp  | 407 +++++++++++++++++++++++++++++++++++++++
 src/sasl/auxprop.cpp        | 178 +++++++++++++++++
 src/sasl/auxprop.hpp        |  92 +++++++++
 src/tests/sasl_tests.cpp    | 147 ++++++++++++++
 9 files changed, 1261 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/616973da/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index a900abb..831784b 100644
--- a/configure.ac
+++ b/configure.ac
@@ -508,4 +508,15 @@ AM_CONDITIONAL([WITH_INCLUDED_ZOOKEEPER],
                [test "x$with_included_zookeeper" = "xyes"])
 
 
+# TODO(benh): Also check for md5 support so we can use the CRAM-MD5
+# mechanism. We can likely do a AC_CHECK_LIB looking for a particular
+# function only provided if md5 support is present.
+AC_CHECK_LIB([sasl2], [sasl_done], [],
+             [AC_MSG_ERROR([cannot find libsasl2
+-------------------------------------------------------------------
+We need libsasl2 for authentication!
+-------------------------------------------------------------------
+])])
+
+
 AC_OUTPUT

http://git-wip-us.apache.org/repos/asf/mesos/blob/616973da/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 8f845cc..957576b 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -83,6 +83,15 @@ message ExecutorID {
 
 
 /**
+ * Describes authentication credentials.
+ */
+message Credential {
+  required string principal = 1;
+  optional bytes secret = 2;
+}
+
+
+/**
  * Describes a framework. If the user field is set to an empty string
  * Mesos will automagically set it to the current user. Note that the
  * ID is only available after a framework has registered, however, it

http://git-wip-us.apache.org/repos/asf/mesos/blob/616973da/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 3eae964..ee33613 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -154,6 +154,10 @@ nodist_libmesos_no_3rdparty_la_SOURCES =				\
   $(REGISTRY_PROTOS)
 
 libmesos_no_3rdparty_la_SOURCES =					\
+	sasl/authenticatee.hpp						\
+	sasl/authenticator.hpp						\
+	sasl/auxprop.hpp						\
+	sasl/auxprop.cpp						\
 	sched/sched.cpp							\
 	local/local.cpp							\
 	master/constants.cpp						\
@@ -755,6 +759,7 @@ mesos_tests_SOURCES = tests/main.cpp tests/utils.cpp			\
 	              tests/reaper_tests.cpp				\
 	              tests/slave_recovery_tests.cpp			\
 	              tests/status_update_manager_tests.cpp		\
+	              tests/sasl_tests.cpp				\
 	              tests/gc_tests.cpp				\
 	              tests/resource_offers_tests.cpp			\
 	              tests/fault_tolerance_tests.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/616973da/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 4d400c2..c599eb2 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -335,3 +335,34 @@ message NoMasterDetectedMessage {}
 message NewMasterDetectedMessage {
   required string pid = 2;
 }
+
+
+message AuthenticateMessage {}
+
+
+message AuthenticationMechanismsMessage {
+  repeated string mechanisms = 1; // List of available SASL mechanisms.
+}
+
+
+message AuthenticationStartMessage {
+  required string mechanism = 1;
+  optional string data = 2;
+}
+
+
+message AuthenticationStepMessage {
+  required bytes data = 1;
+}
+
+
+message AuthenticationCompletedMessage {}
+
+
+message AuthenticationFailedMessage {}
+
+
+message AuthenticationErrorMessage {
+  optional string error = 1;
+}
+

http://git-wip-us.apache.org/repos/asf/mesos/blob/616973da/src/sasl/authenticatee.hpp
----------------------------------------------------------------------
diff --git a/src/sasl/authenticatee.hpp b/src/sasl/authenticatee.hpp
new file mode 100644
index 0000000..e1e18fe
--- /dev/null
+++ b/src/sasl/authenticatee.hpp
@@ -0,0 +1,381 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SASL_AUTHENTICATEE_HPP__
+#define __SASL_AUTHENTICATEE_HPP__
+
+#include <sasl/sasl.h>
+
+#include <mesos/mesos.hpp>
+
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/once.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/strings.hpp>
+
+#include "messages/messages.hpp"
+
+namespace mesos {
+namespace internal {
+namespace sasl {
+
+// Forward declaration.
+class AuthenticateeProcess;
+
+
+class Authenticatee
+{
+public:
+  Authenticatee(const Credential& credential);
+  ~Authenticatee();
+
+  // Returns true if successfully authenticated otherwise false or an
+  // error. Note that we distinguish authentication failure (false)
+  // from a failed future in the event the future failed due to a
+  // transient error and authentication can (should) be retried.
+  process::Future<bool> authenticate(const process::UPID& pid);
+
+private:
+  AuthenticateeProcess* process;
+};
+
+
+class AuthenticateeProcess : public ProtobufProcess<AuthenticateeProcess>
+{
+public:
+  AuthenticateeProcess(const Credential& _credential)
+    : ProcessBase(process::ID::generate("authenticatee")),
+      credential(_credential),
+      status(READY),
+      connection(NULL)
+  {
+    const char* data = credential.secret().data();
+    size_t length = credential.secret().length();
+
+    // Need to allocate the secret via 'malloc' because SASL is
+    // expecting the data appended to the end of the struct. *sigh*
+    secret = (sasl_secret_t*) malloc(sizeof(sasl_secret_t) + length);
+
+    CHECK(secret != NULL) << "Failed to allocate memory for secret";
+
+    memcpy(secret->data, data, length);
+    secret->len = length;
+  }
+
+  virtual ~AuthenticateeProcess()
+  {
+    if (connection != NULL) {
+      sasl_dispose(&connection);
+    }
+    free(secret);
+  }
+
+  process::Future<bool> authenticate(const process::UPID& pid)
+  {
+    static process::Once* initialize = new process::Once();
+    static bool initialized = false;
+
+    if (!initialize->once()) {
+      LOG(INFO) << "Initializing client SASL";
+      int result = sasl_client_init(NULL);
+      if (result != SASL_OK) {
+        status = ERROR;
+        std::string error(sasl_errstring(result, NULL, NULL));
+        promise.fail("Failed to initialize SASL: " + error);
+        initialize->done();
+        return promise.future();
+      }
+
+      initialized = true;
+
+      initialize->done();
+    }
+
+    if (!initialized) {
+      promise.fail("Failed to initialize SASL");
+      return promise.future();
+    }
+
+    if (status != READY) {
+      return promise.future();
+    }
+
+    LOG(INFO) << "Creating new client SASL connection";
+
+    callbacks[0].id = SASL_CB_GETREALM;
+    callbacks[0].proc = NULL;
+    callbacks[0].context = NULL;
+
+    callbacks[1].id = SASL_CB_USER;
+    callbacks[1].proc = (int(*)()) &user;
+    callbacks[2].context = (void*) credential.principal().c_str();
+
+    // NOTE: Some SASL mechanisms do not allow/enable "proxying",
+    // i.e., authorization. Therefore, some mechanisms send _only_ the
+    // authoriation name rather than both the user (authentication
+    // name) and authorization name. Thus, for now, we assume
+    // authorization is handled out of band. Consider the
+    // SASL_NEED_PROXY flag if we want to reconsider this in the
+    // future.
+    callbacks[2].id = SASL_CB_AUTHNAME;
+    callbacks[2].proc = (int(*)()) &user;
+    callbacks[2].context = (void*) credential.principal().c_str();
+
+    callbacks[3].id = SASL_CB_PASS;
+    callbacks[3].proc = (int(*)()) &pass;
+    callbacks[3].context = (void*) secret;
+
+    callbacks[4].id = SASL_CB_LIST_END;
+    callbacks[4].proc = NULL;
+    callbacks[4].context = NULL;
+
+    int result = sasl_client_new(
+        "mesos",    // Registered name of service.
+        NULL,       // Server's FQDN.
+        NULL, NULL, // IP Address information strings.
+        callbacks,  // Callbacks supported only for this connection.
+        0,          // Security flags (security layers are enabled
+                    // using security properties, separately).
+        &connection);
+
+    if (result != SASL_OK) {
+      status = ERROR;
+      std::string error(sasl_errstring(result, NULL, NULL));
+      promise.fail("Failed to create client SASL connection: " + error);
+      return promise.future();
+    }
+
+    AuthenticateMessage message;
+    send(pid, message);
+
+    status = STARTING;
+
+    return promise.future();
+  }
+
+protected:
+  virtual void initialize()
+  {
+    // Anticipate mechanisms and steps from the server.
+    install<AuthenticationMechanismsMessage>(
+        &AuthenticateeProcess::mechanisms,
+        &AuthenticationMechanismsMessage::mechanisms);
+
+    install<AuthenticationStepMessage>(
+        &AuthenticateeProcess::step,
+        &AuthenticationStepMessage::data);
+
+    install<AuthenticationCompletedMessage>(
+        &AuthenticateeProcess::completed);
+
+    install<AuthenticationFailedMessage>(
+        &AuthenticateeProcess::failed);
+
+    install<AuthenticationErrorMessage>(
+        &AuthenticateeProcess::error,
+        &AuthenticationErrorMessage::error);
+  }
+
+  void mechanisms(const std::vector<std::string>& mechanisms)
+  {
+    if (status != STARTING) {
+      status = ERROR;
+      promise.fail("Unexpected authentication 'mechanisms' received");
+      return;
+    }
+
+    // TODO(benh): Store 'from' in order to ensure we only communicate
+    // with the same Authenticator.
+
+    LOG(INFO) << "Received SASL authentication mechanisms: "
+              << strings::join(",", mechanisms);
+
+    sasl_interact_t* interact = NULL;
+    const char* output = NULL;
+    unsigned length = 0;
+    const char* mechanism = NULL;
+
+    int result = sasl_client_start(
+        connection,
+        strings::join(" ", mechanisms).c_str(),
+        &interact,     // Set if an interaction is needed.
+        &output,       // The output string (to send to server).
+        &length,       // The length of the output string.
+        &mechanism);   // The chosen mechanism.
+
+    CHECK_NE(SASL_INTERACT, result)
+      << "Not expecting an interaction (ID: " << interact->id << ")";
+
+    if (result != SASL_OK && result != SASL_CONTINUE) {
+      std::string error(sasl_errdetail(connection));
+      status = ERROR;
+      promise.fail("Failed to start the SASL client: " + error);
+      return;
+    }
+
+    LOG(INFO) << "Attempting to authenticate with mechanism '"
+              << mechanism << "'";
+
+    AuthenticationStartMessage message;
+    message.set_mechanism(mechanism);
+    message.set_data(output, length);
+
+    send(from, message);
+
+    status = STEPPING;
+  }
+
+  void step(const std::string& data)
+  {
+    if (status != STEPPING) {
+      status = ERROR;
+      promise.fail("Unexpected authentication 'step' received");
+      return;
+    }
+
+    LOG(INFO) << "Received SASL authentication step";
+
+    sasl_interact_t* interact = NULL;
+    const char* output = NULL;
+    unsigned length = 0;
+
+    int result = sasl_client_step(
+        connection,
+        data.length() == 0 ? NULL : data.data(),
+        data.length(),
+        &interact,
+        &output,
+        &length);
+
+    CHECK_NE(SASL_INTERACT, result)
+      << "Not expecting an interaction (ID: " << interact->id << ")";
+
+    if (result == SASL_OK || result == SASL_CONTINUE) {
+      // We don't start the client with SASL_SUCCESS_DATA so we may
+      // need to send one more "empty" message to the server.
+      AuthenticationStepMessage message;
+      if (output != NULL && length > 0) {
+        message.set_data(output, length);
+      }
+      send(from, message);
+    } else {
+      status = ERROR;
+      std::string error(sasl_errdetail(connection));
+      promise.fail("Failed to perform authentication step: " + error);
+    }
+  }
+
+  void completed()
+  {
+    if (status != STEPPING) {
+      status = ERROR;
+      promise.fail("Unexpected authentication 'completed' received");
+      return;
+    }
+
+    status = COMPLETED;
+    promise.set(true);
+  }
+
+  void failed()
+  {
+    status = FAILED;
+    promise.set(false);
+  }
+
+  void error(const std::string& error)
+  {
+    status = ERROR;
+    promise.fail("Authentication error: " + error);
+  }
+
+private:
+  static int user(
+      void* context,
+      int id,
+      const char** result,
+      unsigned* length)
+  {
+    CHECK(SASL_CB_USER == id || SASL_CB_AUTHNAME == id);
+    *result = static_cast<const char*>(context);
+    if (length != NULL) {
+      *length = strlen(*result);
+    }
+    return SASL_OK;
+  }
+
+  static int pass(
+      sasl_conn_t* connection,
+      void* context,
+      int id,
+      sasl_secret_t** secret)
+  {
+    CHECK_EQ(SASL_CB_PASS, id);
+    *secret = static_cast<sasl_secret_t*>(context);
+    return SASL_OK;
+  }
+
+  const Credential credential;
+
+  sasl_secret_t* secret;
+
+  sasl_callback_t callbacks[5];
+
+  enum {
+    READY,
+    STARTING,
+    STEPPING,
+    COMPLETED,
+    FAILED,
+    ERROR
+  } status;
+
+  sasl_conn_t* connection;
+
+  process::Promise<bool> promise;
+};
+
+
+Authenticatee::Authenticatee(const Credential& credential)
+{
+  process = new AuthenticateeProcess(credential);
+  process::spawn(process);
+}
+
+
+Authenticatee::~Authenticatee()
+{
+  process::terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+process::Future<bool> Authenticatee::authenticate(const process::UPID& pid)
+{
+  return process::dispatch(process, &AuthenticateeProcess::authenticate, pid);
+}
+
+} // namespace sasl {
+} // namespace internal {
+} // namespace mesos {
+
+#endif //__SASL_AUTHENTICATEE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/616973da/src/sasl/authenticator.hpp
----------------------------------------------------------------------
diff --git a/src/sasl/authenticator.hpp b/src/sasl/authenticator.hpp
new file mode 100644
index 0000000..2f78cf0
--- /dev/null
+++ b/src/sasl/authenticator.hpp
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SASL_AUTHENTICATOR_HPP__
+#define __SASL_AUTHENTICATOR_HPP__
+
+#include <sasl/sasl.h>
+#include <sasl/saslplug.h>
+
+#include <process/future.hpp>
+#include <process/id.hpp>
+#include <process/once.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include "messages/messages.hpp"
+
+#include "sasl/auxprop.hpp"
+
+namespace mesos {
+namespace internal {
+namespace sasl {
+
+// Forward declaration.
+class AuthenticatorProcess;
+
+
+class Authenticator
+{
+public:
+  Authenticator(const process::UPID& pid);
+  ~Authenticator();
+
+  process::Future<bool> authenticate();
+
+private:
+  AuthenticatorProcess* process;
+};
+
+
+class AuthenticatorProcess : public ProtobufProcess<AuthenticatorProcess>
+{
+public:
+  AuthenticatorProcess(const process::UPID& _pid)
+    : ProcessBase(process::ID::generate("authenticator")),
+      status(READY),
+      pid(_pid),
+      connection(NULL) {}
+
+  virtual ~AuthenticatorProcess()
+  {
+    if (connection != NULL) {
+      sasl_dispose(&connection);
+    }
+  }
+
+  process::Future<bool> authenticate()
+  {
+    static process::Once* initialize = new process::Once();
+    static bool initialized = false;
+
+    if (!initialize->once()) {
+      LOG(INFO) << "Initializing server SASL";
+
+      int result = sasl_server_init(NULL, "mesos");
+
+      if (result != SASL_OK) {
+        std::string error = "Failed to initialize SASL: ";
+        error += sasl_errstring(result, NULL, NULL);
+        LOG(ERROR) << error;
+        AuthenticationErrorMessage message;
+        message.set_error(error);
+        send(pid, message);
+        status = ERROR;
+        promise.fail(error);
+        initialize->done();
+        return promise.future();
+      }
+
+      result = sasl_auxprop_add_plugin(
+          InMemoryAuxiliaryPropertyPlugin::name(),
+          &InMemoryAuxiliaryPropertyPlugin::initialize);
+
+      if (result != SASL_OK) {
+        std::string error =
+          "Failed to add \"in-memory\" auxiliary property plugin: ";
+        error += sasl_errstring(result, NULL, NULL);
+        LOG(ERROR) << error;
+        AuthenticationErrorMessage message;
+        message.set_error(error);
+        send(pid, message);
+        status = ERROR;
+        promise.fail(error);
+        initialize->done();
+        return promise.future();
+      }
+
+      initialized = true;
+
+      initialize->done();
+    }
+
+    if (!initialized) {
+      promise.fail("Failed to initialize SASL");
+      return promise.future();
+    }
+
+    if (status != READY) {
+      return promise.future();
+    }
+
+    callbacks[0].id = SASL_CB_GETOPT;
+    callbacks[0].proc = (int(*)()) &getopt;
+    callbacks[0].context = NULL;
+
+    callbacks[1].id = SASL_CB_LIST_END;
+    callbacks[1].proc = NULL;
+    callbacks[1].context = NULL;
+
+    LOG(INFO) << "Creating new server SASL connection";
+
+    int result = sasl_server_new(
+        "mesos",    // Registered name of service.
+        NULL,       // Server's FQDN; NULL uses gethostname().
+        NULL,       // The user realm used for password lookups;
+                    // NULL means default to FQDN.
+                    // NOTE: This does not affect Kerberos.
+        NULL, NULL, // IP address information strings.
+        callbacks,  // Callbacks supported only for this connection.
+        0,          // Security flags (security layers are enabled
+                    // using security properties, separately).
+        &connection);
+
+    if (result != SASL_OK) {
+      std::string error = "Failed to create server SASL connection: ";
+      error += sasl_errstring(result, NULL, NULL);
+      LOG(ERROR) << error;
+      AuthenticationErrorMessage message;
+      message.set_error(error);
+      send(pid, message);
+      status = ERROR;
+      promise.fail(error);
+      return promise.future();
+    }
+
+    // Get the list of mechanisms.
+    const char* output = NULL;
+    unsigned length = 0;
+    int count = 0;
+
+    result = sasl_listmech(
+        connection,  // The context for this connection.
+        NULL,        // Not supported.
+        "",          // What to prepend to the output string.
+        ",",         // What to separate mechanisms with.
+        "",          // What to append to the output string.
+        &output,     // The output string.
+        &length,     // The length of the output string.
+        &count);     // The count of the mechanisms in output.
+
+    if (result != SASL_OK) {
+      std::string error = "Failed to get list of mechanisms: ";
+      LOG(WARNING) << error << sasl_errstring(result, NULL, NULL);
+      AuthenticationErrorMessage message;
+      error += sasl_errdetail(connection);
+      message.set_error(error);
+      send(pid, message);
+      status = ERROR;
+      promise.fail(error);
+      return promise.future();
+    }
+
+    std::vector<std::string> mechanisms = strings::tokenize(output, ",");
+
+    // Send authentication mechanisms.
+    AuthenticationMechanismsMessage message;
+    foreach (const std::string& mechanism, mechanisms) {
+      message.add_mechanisms(mechanism);
+    }
+
+    send(pid, message);
+
+    status = STARTING;
+
+    return promise.future();
+  }
+
+protected:
+  virtual void initialize()
+  {
+    link(pid); // Don't bother waiting for a lost authenticatee.
+
+    // Anticipate start and steps messages from the client.
+    install<AuthenticationStartMessage>(
+        &AuthenticatorProcess::start,
+        &AuthenticationStartMessage::mechanism,
+        &AuthenticationStartMessage::data);
+
+    install<AuthenticationStepMessage>(
+        &AuthenticatorProcess::step,
+        &AuthenticationStepMessage::data);
+  }
+
+  virtual void exited(const process::UPID& _pid)
+  {
+    if (pid == _pid) {
+      status = ERROR;
+      promise.fail("Failed to communicate with authenticatee");
+    }
+  }
+
+  void start(const std::string& mechanism, const std::string& data)
+  {
+    if (status != STARTING) {
+      AuthenticationErrorMessage message;
+      message.set_error("Unexpected authentication 'start' received");
+      send(pid, message);
+      status = ERROR;
+      promise.fail(message.error());
+      return;
+    }
+
+    LOG(INFO) << "Received SASL authentication start";
+
+    // Start the server.
+    const char* output = NULL;
+    unsigned length = 0;
+
+    int result = sasl_server_start(
+        connection,
+        mechanism.c_str(),
+        data.length() == 0 ? NULL : data.data(),
+        data.length(),
+        &output,
+        &length);
+
+    handle(result, output, length);
+  }
+
+  void step(const std::string& data)
+  {
+    if (status != STEPPING) {
+      AuthenticationErrorMessage message;
+      message.set_error("Unexpected authentication 'step' received");
+      send(pid, message);
+      status = ERROR;
+      promise.fail(message.error());
+      return;
+    }
+
+    LOG(INFO) << "Received SASL authentication step";
+
+    const char* output = NULL;
+    unsigned length = 0;
+
+    int result = sasl_server_step(
+        connection,
+        data.length() == 0 ? NULL : data.data(),
+        data.length(),
+        &output,
+        &length);
+
+    handle(result, output, length);
+  }
+
+private:
+  static int getopt(
+      void* context,
+      const char* plugin,
+      const char* option,
+      const char** result,
+      unsigned* length)
+  {
+    bool found = false;
+    if (std::string(option) == "auxprop_plugin") {
+      *result = "in-memory-auxprop";
+      found = true;
+    } else if (std::string(option) == "mech_list") {
+      *result = "CRAM-MD5";
+      found = true;
+    } else if (std::string(option) == "pwcheck_method") {
+      *result = "auxprop";
+      found = true;
+    }
+
+    if (found && length != NULL) {
+      *length = strlen(*result);
+    }
+
+    return SASL_OK;
+  }
+
+  // Helper for handling result of server start and step.
+  void handle(int result, const char* output, unsigned length)
+  {
+    if (result == SASL_OK) {
+      LOG(INFO) << "Authentication success";
+      // Note that we're not using SASL_SUCCESS_DATA which means that
+      // we should not have any data to send when we get a SASL_OK.
+      CHECK(output == NULL);
+      send(pid, AuthenticationCompletedMessage());
+      status = COMPLETED;
+      promise.set(true);
+    } else if (result == SASL_CONTINUE) {
+      LOG(INFO) << "Authentication requires more steps";
+      AuthenticationStepMessage message;
+      message.set_data(CHECK_NOTNULL(output), length);
+      send(pid, message);
+      status = STEPPING;
+    } else if (result == SASL_NOUSER || result == SASL_BADAUTH) {
+      LOG(WARNING) << "Authentication failure: "
+                   << sasl_errstring(result, NULL, NULL);
+      send(pid, AuthenticationFailedMessage());
+      status = FAILED;
+      promise.set(false);
+    } else {
+      LOG(ERROR) << "Authentication error: "
+                 << sasl_errstring(result, NULL, NULL);
+      AuthenticationErrorMessage message;
+      std::string error(sasl_errdetail(connection));
+      message.set_error(error);
+      send(pid, message);
+      status = ERROR;
+      promise.fail(message.error());
+    }
+  }
+
+  enum {
+    READY,
+    STARTING,
+    STEPPING,
+    COMPLETED,
+    FAILED,
+    ERROR
+  } status;
+
+  sasl_callback_t callbacks[2];
+
+  const process::UPID pid;
+
+  sasl_conn_t* connection;
+
+  process::Promise<bool> promise;
+};
+
+
+Authenticator::Authenticator(const process::UPID& pid)
+{
+  process = new AuthenticatorProcess(pid);
+  process::spawn(process);
+}
+
+
+Authenticator::~Authenticator()
+{
+  process::terminate(process);
+  process::wait(process);
+  delete process;
+}
+
+
+process::Future<bool> Authenticator::authenticate()
+{
+  return process::dispatch(process, &AuthenticatorProcess::authenticate);
+}
+
+
+namespace secrets {
+
+// Loads secrets (user -> secret) into the in-memory auxiliary
+// property plugin that is used by the authenticators.
+void load(const std::map<std::string, std::string>& secrets)
+{
+  Multimap<std::string, Property> properties;
+
+  foreachpair (const std::string& user, const std::string& secret, secrets) {
+    Property property;
+    property.name = SASL_AUX_PASSWORD_PROP;
+    property.values.push_back(secret);
+    properties.put(user, property);
+  }
+
+  InMemoryAuxiliaryPropertyPlugin::load(properties);
+}
+
+} // namespace secrets {
+
+} // namespace sasl {
+} // namespace internal {
+} // namespace mesos {
+
+#endif //__SASL_AUTHENTICATOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/616973da/src/sasl/auxprop.cpp
----------------------------------------------------------------------
diff --git a/src/sasl/auxprop.cpp b/src/sasl/auxprop.cpp
new file mode 100644
index 0000000..9547cc8
--- /dev/null
+++ b/src/sasl/auxprop.cpp
@@ -0,0 +1,178 @@
+#include "logging/logging.hpp"
+
+#include "sasl/auxprop.hpp"
+
+using std::list;
+using std::string;
+
+namespace mesos {
+namespace internal {
+namespace sasl {
+
+// Storage for the static members.
+Multimap<string, Property> InMemoryAuxiliaryPropertyPlugin::properties;
+sasl_auxprop_plug_t InMemoryAuxiliaryPropertyPlugin::plugin;
+
+
+int InMemoryAuxiliaryPropertyPlugin::initialize(
+    const sasl_utils_t* utils,
+    int api,
+    int* version,
+    sasl_auxprop_plug_t** plug,
+    const char* name)
+{
+  if (version == NULL || plug == NULL) {
+    return SASL_BADPARAM;
+  }
+
+  // Check if SASL API is older than the one we were compiled against.
+  if (api < SASL_AUXPROP_PLUG_VERSION) {
+    return SASL_BADVERS;
+  }
+
+  *version = SASL_AUXPROP_PLUG_VERSION;
+
+  plugin.features = 0;
+  plugin.spare_int1 = 0;
+  plugin.glob_context = NULL;
+  plugin.auxprop_free = NULL;
+  plugin.auxprop_lookup = &InMemoryAuxiliaryPropertyPlugin::lookup;
+  plugin.name = const_cast<char*>(InMemoryAuxiliaryPropertyPlugin::name());
+  plugin.auxprop_store = NULL;
+
+  *plug = &plugin;
+
+  VLOG(1) << "Initialized in-memory auxiliary property plugin";
+
+  return SASL_OK;
+}
+
+
+void InMemoryAuxiliaryPropertyPlugin::lookup(
+    void* context,
+    sasl_server_params_t* sparams,
+    unsigned flags,
+    const char* user,
+    unsigned length)
+{
+  // Pull out the utils.
+  const sasl_utils_t* utils = sparams->utils;
+
+  // We determine the properties we should be looking up by doing a
+  // 'prop_get' on the property context. Note that some of the
+  // properties we get might might need to be skipped depending on the
+  // flags (see below).
+  const propval* properties = utils->prop_get(sparams->propctx);
+
+  CHECK(properties != NULL)
+    << "Invalid auxiliary properties requested for lookup";
+
+  // TODO(benh): Consider "parsing" 'user' if it has an '@' separating
+  // the actual user and a realm.
+
+  string realm = sparams->user_realm != NULL
+    ? sparams->user_realm
+    : sparams->serverFQDN;
+
+  VLOG(1)
+    << "Request to lookup properties for "
+    << "user: '" << user << "' "
+    << "realm: '" << realm << "' "
+    << "server FQDN: '" << sparams->serverFQDN << "' "
+#ifdef SASL_AUXPROP_VERIFY_AGAINST_HASH
+    << "SASL_AUXPROP_VERIFY_AGAINST_HASH: "
+    << (flags & SASL_AUXPROP_VERIFY_AGAINST_HASH ? "true ": "false ")
+#endif
+    << "SASL_AUXPROP_OVERRIDE: "
+    << (flags & SASL_AUXPROP_OVERRIDE ? "true ": "false ")
+    << "SASL_AUXPROP_AUTHZID: "
+    << (flags & SASL_AUXPROP_AUTHZID ? "true ": "false ");
+
+  // Now iterate through each property requested.
+  const propval* property = properties;
+  for (; property->name != NULL; property++) {
+    const char* name = property->name;
+
+    // Skip properties that don't apply to this lookup given the flags.
+    if (flags & SASL_AUXPROP_AUTHZID) {
+      if (name[0] == '*') {
+        VLOG(1) << "Skipping auxiliary property '" << name
+                << "' since SASL_AUXPROP_AUTHZID == true";
+        continue;
+      }
+    } else {
+      // Only consider properties that start with '*' if
+      // SASL_AUXPROP_AUTHZID is not set but don't include the '*'
+      // when looking up the property name.
+      if (name[0] != '*') {
+        VLOG(1) << "Skipping auxiliary property '" << name
+                << "' since SASL_AUXPROP_AUTHZID == false "
+                << "but property name starts with '*'";
+        continue;
+      } else {
+        name = name + 1;
+      }
+    }
+
+    // Don't override already set values unless instructed otherwise.
+    if (property->values != NULL && !(flags & SASL_AUXPROP_OVERRIDE)) {
+#ifdef SASL_AUXPROP_VERIFY_AGAINST_HASH
+      // Regardless of SASL_AUXPROP_OVERRIDE we're expected to
+      // override property 'userPassword' when the
+      // SASL_AUXPROP_VERIFY_AGAINST_HASH flag is set, so we erase it
+      // here.
+      if (flags & SASL_AUXPROP_VERIFY_AGAINST_HASH &&
+          string(name) == string(SASL_AUX_PASSWORD_PROP)) {
+        VLOG(1) << "Erasing auxiliary property '" << name
+                << "' even though SASL_AUXPROP_OVERRIDE == true "
+                << "since SASL_AUXPROP_VERIFY_AGAINST_HASH == true";
+        utils->prop_erase(sparams->propctx, property->name);
+      } else {
+        VLOG(1) << "Skipping auxiliary property '" << name
+                << "' since SASL_AUXPROP_OVERRIDE == false "
+                << "and value(s) already set";
+        continue;
+      }
+#else
+      VLOG(1) << "Skipping auxiliary property '" << name
+              << "' since SASL_AUXPROP_OVERRIDE == false "
+              << "and value(s) already set";
+      continue;
+#endif
+    } else if (property->values != NULL) {
+      CHECK(flags & SASL_AUXPROP_OVERRIDE);
+      VLOG(1) << "Erasing auxiliary property '" << name
+              << "' since SASL_AUXPROP_OVERRIDE == true";
+      utils->prop_erase(sparams->propctx, property->name);
+    }
+
+    VLOG(1) << "Looking up auxiliary property '" << property->name << "'";
+
+    Option<list<string> > values = lookup(user, name);
+
+    if (values.isSome()) {
+      if (values.get().empty()) {
+        // Add the 'NULL' value to indicate there were no values.
+        utils->prop_set(sparams->propctx, property->name, NULL, 0);
+      } else {
+        // Add all the values. Note that passing NULL as the property
+        // name for 'prop_set' will append values to the same name as
+        // the previous 'prop_set' calls which is the behavior we want
+        // after adding the first value.
+        bool append = false;
+        foreach (const string& value, values.get()) {
+          sparams->utils->prop_set(
+              sparams->propctx,
+              append ? NULL : property->name,
+              value.c_str(),
+              -1); // Let 'prop_set' use strlen.
+          append = true;
+        }
+      }
+    }
+  }
+}
+
+} // namespace sasl {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/616973da/src/sasl/auxprop.hpp
----------------------------------------------------------------------
diff --git a/src/sasl/auxprop.hpp b/src/sasl/auxprop.hpp
new file mode 100644
index 0000000..44e773c
--- /dev/null
+++ b/src/sasl/auxprop.hpp
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __SASL_AUXPROP_HPP__
+#define __SASL_AUXPROP_HPP__
+
+#include <sasl/sasl.h>
+#include <sasl/saslplug.h>
+
+#include <string>
+
+#include <stout/foreach.hpp>
+#include <stout/multimap.hpp>
+#include <stout/none.hpp>
+#include <stout/option.hpp>
+
+namespace mesos {
+namespace internal {
+namespace sasl {
+
+struct Property
+{
+  std::string name;
+  std::list<std::string> values;
+};
+
+
+class InMemoryAuxiliaryPropertyPlugin
+{
+public:
+  static const char* name() { return "in-memory-auxprop"; }
+
+  static void load(const Multimap<std::string, Property>& _properties)
+  {
+    properties = _properties;
+  }
+
+  static Option<std::list<std::string> > lookup(
+      const std::string& user,
+      const std::string& name)
+  {
+    if (properties.contains(user)) {
+      foreach (const Property& property, properties.get(user)) {
+        if (property.name == name) {
+          return property.values;
+        }
+      }
+    }
+    return None();
+  }
+
+  // SASL plugin initialize entry.
+  static int initialize(
+      const sasl_utils_t* utils,
+      int api,
+      int* version,
+      sasl_auxprop_plug_t** plug,
+      const char* name);
+
+private:
+  static void lookup(
+      void* context,
+      sasl_server_params_t* sparams,
+      unsigned flags,
+      const char* user,
+      unsigned length);
+
+  static Multimap<std::string, Property> properties;
+
+  static sasl_auxprop_plug_t plugin;
+};
+
+} // namespace sasl {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SASL_AUXPROP_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/616973da/src/tests/sasl_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/sasl_tests.cpp b/src/tests/sasl_tests.cpp
new file mode 100644
index 0000000..02e761b
--- /dev/null
+++ b/src/tests/sasl_tests.cpp
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <map>
+#include <string>
+
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+
+#include "sasl/authenticatee.hpp"
+#include "sasl/authenticator.hpp"
+
+using namespace process;
+
+using std::map;
+using std::string;
+
+using testing::_;
+using testing::Eq;
+
+namespace mesos {
+namespace internal {
+namespace sasl {
+
+TEST(SASL, success)
+{
+  // Set up secrets.
+  map<string, string> secrets;
+  secrets["benh"] = "secret";
+  sasl::secrets::load(secrets);
+
+  // Launch a dummy process (somebody to send the AuthenticateMessage).
+  UPID pid = spawn(new ProcessBase(), true);
+
+  Credential credential;
+  credential.set_principal("benh");
+  credential.set_secret("secret");
+
+  Authenticatee authenticatee(credential);
+
+  Future<Message> message =
+    FUTURE_MESSAGE(Eq(AuthenticateMessage().GetTypeName()), _, _);
+
+  Future<bool> client = authenticatee.authenticate(pid);
+
+  AWAIT_READY(message);
+
+  Authenticator authenticator(message.get().from);
+
+  Future<bool> server = authenticator.authenticate();
+
+  AWAIT_EQ(true, client);
+  AWAIT_EQ(true, server);
+
+  terminate(pid);
+}
+
+
+// Bad password should return an authentication failure.
+TEST(SASL, failed1)
+{
+  // Set up secrets.
+  map<string, string> secrets;
+  secrets["benh"] = "secret1";
+  sasl::secrets::load(secrets);
+
+  // Launch a dummy process (somebody to send the AuthenticateMessage).
+  UPID pid = spawn(new ProcessBase(), true);
+
+  Credential credential;
+  credential.set_principal("benh");
+  credential.set_secret("secret");
+
+  Authenticatee authenticatee(credential);
+
+  Future<Message> message =
+    FUTURE_MESSAGE(Eq(AuthenticateMessage().GetTypeName()), _, _);
+
+  Future<bool> client = authenticatee.authenticate(pid);
+
+  AWAIT_READY(message);
+
+  Authenticator authenticator(message.get().from);
+
+  Future<bool> server = authenticator.authenticate();
+
+  AWAIT_EQ(false, client);
+  AWAIT_EQ(false, server);
+
+  terminate(pid);
+}
+
+
+// No user should return an authentication failure.
+TEST(SASL, failed2)
+{
+  // Set up secrets.
+  map<string, string> secrets;
+  secrets["vinod"] = "secret";
+  sasl::secrets::load(secrets);
+
+  // Launch a dummy process (somebody to send the AuthenticateMessage).
+  UPID pid = spawn(new ProcessBase(), true);
+
+  Credential credential;
+  credential.set_principal("benh");
+  credential.set_secret("secret");
+
+  Authenticatee authenticatee(credential);
+
+  Future<Message> message =
+    FUTURE_MESSAGE(Eq(AuthenticateMessage().GetTypeName()), _, _);
+
+  Future<bool> client = authenticatee.authenticate(pid);
+
+  AWAIT_READY(message);
+
+  Authenticator authenticator(message.get().from);
+
+  Future<bool> server = authenticator.authenticate();
+
+  AWAIT_EQ(false, client);
+  AWAIT_EQ(false, server);
+
+  terminate(pid);
+}
+
+} // namespace sasl {
+} // namespace internal {
+} // namespace mesos {