You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/12/12 02:36:40 UTC

mesos git commit: Allow users of libprocess to perform authentication of HTTP requests.

Repository: mesos
Updated Branches:
  refs/heads/master d348d2b39 -> 6bd9b65b9


Allow users of libprocess to perform authentication of HTTP requests.

This change integrates the previously added AuthenticationRouter into
libprocess in order to allow users to authenticate HTTP requests. Now,
when making a 'route', a security 'realm' can be specified. The user
of libprocess determines which Authenticator will be used for each
'realm'.

Review: https://reviews.apache.org/r/38000


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6bd9b65b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6bd9b65b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6bd9b65b

Branch: refs/heads/master
Commit: 6bd9b65b9d93f154ff9187fb5e5136262ede043c
Parents: d348d2b
Author: Alexander Rojas <al...@mesosphere.io>
Authored: Fri Dec 11 12:30:37 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Fri Dec 11 17:18:55 2015 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/event.hpp   |   9 +-
 3rdparty/libprocess/include/process/http.hpp    |  26 +++
 3rdparty/libprocess/include/process/process.hpp |  52 ++++-
 .../libprocess/src/authentication_router.hpp    |   3 +
 3rdparty/libprocess/src/process.cpp             | 180 +++++++++++++++-
 3rdparty/libprocess/src/tests/http_tests.cpp    | 208 +++++++++++++++++++
 6 files changed, 464 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6bd9b65b/3rdparty/libprocess/include/process/event.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/event.hpp b/3rdparty/libprocess/include/process/event.hpp
index a03824c..d7f3447 100644
--- a/3rdparty/libprocess/include/process/event.hpp
+++ b/3rdparty/libprocess/include/process/event.hpp
@@ -117,9 +117,11 @@ struct HttpEvent : Event
 {
   HttpEvent(
       http::Request* _request,
-      Promise<http::Response>* _response)
+      Promise<http::Response>* _response,
+      const Option<std::string>& _principal = None())
     : request(_request),
-      response(_response) {}
+      response(_response),
+      principal(_principal) {}
 
   virtual ~HttpEvent()
   {
@@ -138,6 +140,9 @@ struct HttpEvent : Event
   http::Request* const request;
   Promise<http::Response>* response;
 
+  // This will be set for authenticated requests.
+  Option<std::string> principal;
+
 private:
   // Not copyable, not assignable.
   HttpEvent(const HttpEvent&);

http://git-wip-us.apache.org/repos/asf/mesos/blob/6bd9b65b/3rdparty/libprocess/include/process/http.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/http.hpp b/3rdparty/libprocess/include/process/http.hpp
index c9e38e5..f0666f0 100644
--- a/3rdparty/libprocess/include/process/http.hpp
+++ b/3rdparty/libprocess/include/process/http.hpp
@@ -55,6 +55,32 @@ class Socket;
 
 namespace http {
 
+namespace authentication {
+
+class Authenticator;
+
+/**
+ * Sets (or overwrites) the authenticator for the realm.
+ *
+ * Every incoming HTTP request to an endpoint associated
+ * with the realm will be authenticated with the given
+ * authenticator.
+ */
+Future<Nothing> setAuthenticator(
+    const std::string& realm,
+    Owned<Authenticator> authenticator);
+
+
+/**
+ * Unsets the authenticator for the realm.
+ *
+ * Any endpoint mapped to the realm will no
+ * longer be authenticated.
+ */
+Future<Nothing> unsetAuthenticator(const std::string& realm);
+
+} // namespace authentication {
+
 // Status code reason strings, from the HTTP1.1 RFC:
 // http://www.w3.org/Protocols/rfc2616/rfc2616-sec6.html
 extern hashmap<uint16_t, std::string>* statuses;

http://git-wip-us.apache.org/repos/asf/mesos/blob/6bd9b65b/3rdparty/libprocess/include/process/process.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/process.hpp b/3rdparty/libprocess/include/process/process.hpp
index 81c0944..d3eb698 100644
--- a/3rdparty/libprocess/include/process/process.hpp
+++ b/3rdparty/libprocess/include/process/process.hpp
@@ -61,6 +61,7 @@ void install(std::vector<Owned<FirewallRule>>&& rules);
 
 } // namespace firewall {
 
+
 class ProcessBase : public EventVisitor
 {
 public:
@@ -235,6 +236,50 @@ protected:
   }
 
   /**
+   * Any function which takes a `process::http::Request` and an
+   * `Option<std::string>` and returns a `process::http::Response`.
+   *
+   * If the string is set, it represents the authenticated principal
+   * for the request.
+   *
+   * The default visit implementation for HTTP events invokes
+   * installed HTTP handlers.
+   *
+   * @see process::ProcessBase::route
+   */
+  // TODO(arojas): Consider introducing an `authentication::Principal` type.
+  typedef lambda::function<Future<http::Response>(
+      const http::Request&, const Option<std::string>&)>
+      AuthenticatedHttpRequestHandler;
+
+  // TODO(arojas): Consider introducing an `authentication::Realm` type.
+  void route(
+      const std::string& name,
+      const std::string& realm,
+      const Option<std::string>& help,
+      const AuthenticatedHttpRequestHandler& handler);
+
+  /**
+   * @copydoc process::ProcessBase::route
+   */
+  template <typename T>
+  void route(
+      const std::string& name,
+      const std::string& realm,
+      const Option<std::string>& help,
+      Future<http::Response> (T::*method)(
+          const http::Request&,
+          const Option<std::string>&))
+  {
+    // Note that we use dynamic_cast here so a process can use
+    // multiple inheritance if it sees so fit (e.g., to implement
+    // multiple callback interfaces).
+    AuthenticatedHttpRequestHandler handler =
+      lambda::bind(method, dynamic_cast<T*>(this), lambda::_1, lambda::_2);
+    route(name, realm, help, handler);
+  }
+
+  /**
    * Sets up the default HTTP request handler to provide the static
    * asset(s) at the specified _absolute_ path for the specified name.
    *
@@ -313,7 +358,12 @@ private:
   // Handlers for messages and HTTP requests.
   struct {
     std::map<std::string, MessageHandler> message;
-    std::map<std::string, HttpRequestHandler> http;
+
+    // `HttpRequestHandlers` are equivalent to their authenticated
+    // counterparts where the principal is always `None`. Therefore
+    // we convert the regular handler to an authenticated one in
+    // order to store only a single map here.
+    std::map<std::string, AuthenticatedHttpRequestHandler> http;
   } handlers;
 
   // Definition of a static asset.

http://git-wip-us.apache.org/repos/asf/mesos/blob/6bd9b65b/3rdparty/libprocess/src/authentication_router.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/authentication_router.hpp b/3rdparty/libprocess/src/authentication_router.hpp
index 5777dea..c251651 100644
--- a/3rdparty/libprocess/src/authentication_router.hpp
+++ b/3rdparty/libprocess/src/authentication_router.hpp
@@ -49,6 +49,9 @@ public:
   // Unsets the authenticator for the realm.
   Future<Nothing> unsetAuthenticator(const std::string& realm);
 
+  // TODO(arojas): Consider making the realm a property
+  // of the endpoint handler in `ProcessBase` rather than
+  // having the router maintain the mapping.
   Future<Nothing> addEndpoint(
       const std::string& endpoint,
       const std::string& realm);

http://git-wip-us.apache.org/repos/asf/mesos/blob/6bd9b65b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index e93709d..43c83e3 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -72,6 +72,7 @@
 #include <process/owned.hpp>
 #include <process/process.hpp>
 #include <process/profiler.hpp>
+#include <process/sequence.hpp>
 #include <process/socket.hpp>
 #include <process/statistics.hpp>
 #include <process/system.hpp>
@@ -94,6 +95,7 @@
 #include <stout/thread_local.hpp>
 #include <stout/unreachable.hpp>
 
+#include "authentication_router.hpp"
 #include "config.hpp"
 #include "decoder.hpp"
 #include "encoder.hpp"
@@ -119,6 +121,10 @@ using process::http::Request;
 using process::http::Response;
 using process::http::ServiceUnavailable;
 
+using process::http::authentication::Authenticator;
+using process::http::authentication::AuthenticationResult;
+using process::http::authentication::AuthenticationRouter;
+
 using process::network::Address;
 using process::network::Socket;
 
@@ -187,6 +193,10 @@ public:
   // responses have been processed (e.g., waited for and sent).
   void handle(const Future<Response>& future, const Request& request);
 
+  // All requests must go through authentication here before being
+  // passed on to the route handlers.
+  Future<Option<AuthenticationResult>> authenticate(const Request& request);
+
 private:
   // Starts "waiting" on the next available future response.
   void next();
@@ -218,6 +228,15 @@ private:
   queue<Item*> items;
 
   Option<http::Pipe::Reader> pipe; // Current pipe, if streaming.
+
+  // We sequence the authentication results exposed to the caller
+  // in order to satisfy HTTP pipelining.
+  //
+  // Note that this needs to be done explicitly here because
+  // the authentication router does expose ordered completion
+  // of its Futures (it doesn't have the knowledge of sockets
+  // necessary to do it in a per-connection manner).
+  Sequence authentications;
 };
 
 
@@ -460,6 +479,9 @@ static ProcessManager* process_manager = NULL;
 // Scheduling gate that threads wait at when there is nothing to run.
 static Gate* gate = new Gate();
 
+// Used for authenticating HTTP requests.
+static AuthenticationRouter* authentication_router = NULL;
+
 // Filter. Synchronized support for using the filterer needs to be
 // recursive in case a filterer wants to do anything fancy (which is
 // possible and likely given that filters will get used for testing).
@@ -479,6 +501,30 @@ THREAD_LOCAL ProcessBase* __process__ = NULL;
 THREAD_LOCAL Executor* _executor_ = NULL;
 
 
+namespace http {
+namespace authentication {
+
+Future<Nothing> setAuthenticator(
+    const std::string& realm,
+    Owned<Authenticator> authenticator)
+{
+  process::initialize();
+
+  return authentication_router->setAuthenticator(realm, authenticator);
+}
+
+
+Future<Nothing> unsetAuthenticator(const string& realm)
+{
+  process::initialize();
+
+  return authentication_router->unsetAuthenticator(realm);
+}
+
+} // namespace authentication {
+} // namespace http {
+
+
 // NOTE: Clock::* implementations are in clock.cpp except for
 // Clock::settle which currently has a dependency on
 // 'process_manager'.
@@ -926,6 +972,9 @@ void initialize(const string& delegate)
   // Create the global system statistics process.
   spawn(new System(), true);
 
+  // Create the global HTTP authentication router.
+  authentication_router = new AuthenticationRouter();
+
   // Ensure metrics process is running.
   // TODO(bmahler): Consider initializing this consistently with
   // the other global Processes.
@@ -959,6 +1008,11 @@ void finalize()
   // This will terminate any existing processes created via `spawn()`,
   // like `gc`, `help`, `Logging()`, `Profiler()`, and `System()`.
   // NOTE: This will also stop the event loop.
+
+  // TODO(arojas): The HTTP authentication logic in ProcessManager
+  // does not handle the case where the process_manager is deleted
+  // while authentication was in progress!!
+
   delete process_manager;
   process_manager = NULL;
 
@@ -1039,6 +1093,22 @@ void HttpProxy::handle(const Future<Response>& future, const Request& request)
 }
 
 
+Future<Option<AuthenticationResult>> HttpProxy::authenticate(
+    const Request& request)
+{
+  // Start the authentication immediately so that
+  // authentications run in parallel, but expose
+  // a future that is only satisfied after all
+  // previous authentications are satisfied (in
+  // order to respect HTTP pipelining).
+  Future<Option<AuthenticationResult>> authentication =
+    authentication_router->authenticate(request);
+
+  return authentications.add<Option<AuthenticationResult>>(
+      [=]() { return authentication; });
+}
+
+
 void HttpProxy::next()
 {
   if (items.size() > 0) {
@@ -2276,25 +2346,25 @@ void ProcessManager::handle(
   vector<string> tokens = strings::tokenize(request->url.path, "/");
 
   // Try and determine a receiver, otherwise try and delegate.
-  ProcessReference receiver;
+  UPID receiver;
 
   if (tokens.size() == 0 && delegate != "") {
     request->url.path = "/" + delegate;
-    receiver = use(UPID(delegate, __address__));
+    receiver = UPID(delegate, __address__);
   } else if (tokens.size() > 0) {
     // Decode possible percent-encoded path.
     Try<string> decode = http::decode(tokens[0]);
     if (!decode.isError()) {
-      receiver = use(UPID(decode.get(), __address__));
+      receiver = UPID(decode.get(), __address__);
     } else {
       VLOG(1) << "Failed to decode URL path: " << decode.error();
     }
   }
 
-  if (!receiver && delegate != "") {
+  if (!use(receiver) && delegate != "") {
     // Try and delegate the request.
     request->url.path = "/" + delegate + request->url.path;
-    receiver = use(UPID(delegate, __address__));
+    receiver = UPID(delegate, __address__);
   }
 
   synchronized (firewall_mutex) {
@@ -2327,7 +2397,7 @@ void ProcessManager::handle(
     }
   }
 
-  if (receiver) {
+  if (use(receiver)) {
     // The promise is created here but its ownership is passed
     // into the HttpEvent created below.
     Promise<Response>* promise(new Promise<Response>());
@@ -2338,9 +2408,67 @@ void ProcessManager::handle(
     // order of requests to account for HTTP/1.1 pipelining.
     dispatch(proxy, &HttpProxy::handle, promise->future(), *request);
 
-    // TODO(benh): Use the sender PID in order to capture
-    // happens-before timing relationships for testing.
-    deliver(receiver, new HttpEvent(request, promise));
+    // NOTE: We capture `process_manager` in the lambda below, but
+    // it may have been deleted while authentication was in progress.
+    // This is problematic for libprocess finalization, a potential
+    // workaround is to ensure that the authentication future is
+    // satisfied within a Process' execution context.
+    dispatch(proxy, &HttpProxy::authenticate, *request)
+      .onAny([this, request, promise, receiver](
+          const Future<Option<AuthenticationResult>>& authentication) {
+        if (!authentication.isReady()) {
+          promise->set(InternalServerError());
+
+          VLOG(1) << "Returning '" << promise->future()->status << "'"
+                  << " for '" << request->url.path << "'"
+                  << " (authentication failed: "
+                  << (authentication.isFailed()
+                      ? authentication.failure()
+                      : "discarded")
+                  << ")";
+
+          delete request;
+          delete promise;
+          return;
+        }
+
+        // NOTE: The `receiver` process may have terminated while
+        // authentication was in progress. In this case `deliver`
+        // will fail and delete the `HttpEvent` which results in
+        // an `InternalServerError` response.
+
+        if (authentication->isNone()) {
+          // Request didn't need authentication or authentication
+          // is not applicable, just forward the request.
+          //
+          // TODO(benh): Use the sender PID in order to capture
+          // happens-before timing relationships for testing.
+          deliver(receiver, new HttpEvent(request, promise));
+          return;
+        }
+
+        if (authentication.get()->unauthorized.isSome()) {
+          // Request was not authenticated, challenged issued.
+          promise->set(authentication.get()->unauthorized.get());
+          delete request;
+          delete promise;
+        } else if (authentication.get()->forbidden.isSome()) {
+          // Request was not authenticated, no challenge issued.
+          promise->set(authentication.get()->forbidden.get());
+          delete request;
+          delete promise;
+        } else {
+          // Authentication succeeded.
+          const Option<string>& principal = authentication.get()->principal;
+
+          CHECK_SOME(principal); // The authentication router validates this.
+
+          // TODO(benh): Use the sender PID in order to capture
+          // happens-before timing relationships for testing.
+          deliver(receiver, new HttpEvent(request, promise, principal));
+        }
+      });
+
     return;
   }
 
@@ -2567,6 +2695,12 @@ void ProcessManager::cleanup(ProcessBase* process)
     delete event;
   }
 
+  // Remove all routes from the authentication router, we don't
+  // need to wait for these operations to complete.
+  foreachkey (const string& endpoint, process->handlers.http) {
+    authentication_router->removeEndpoint('/' + process->self().id + endpoint);
+  }
+
   // Possible gate non-libprocess threads are waiting at.
   Gate* gate = NULL;
 
@@ -3096,7 +3230,8 @@ void ProcessBase::visit(const HttpEvent& event)
   while (Path(name).dirname() != name) {
     if (handlers.http.count(name) > 0) {
       // Now call the handler and associate the response with the promise.
-      event.response->associate(handlers.http[name](*event.request));
+      event.response->associate(
+          handlers.http[name](*event.request, event.principal));
 
       return;
     }
@@ -3174,11 +3309,34 @@ void ProcessBase::route(
 {
   // Routes must start with '/'.
   CHECK(name.find('/') == 0);
-  handlers.http[name.substr(1)] = handler;
+
+  auto wrapper = [handler](const Request& request, const Option<string>&) {
+    return handler(request);
+  };
+
+  handlers.http[name.substr(1)] = wrapper;
+
   dispatch(help, &Help::add, pid.id, name, help_);
 }
 
 
+void ProcessBase::route(
+    const string& name,
+    const std::string& realm,
+    const Option<string>& help_,
+    const AuthenticatedHttpRequestHandler& handler)
+{
+  // Routes must start with '/'.
+  CHECK(name.find('/') == 0);
+  handlers.http[name.substr(1)] = handler;
+
+  // Add the endpoint to the authentication router, we don't need
+  // to wait for the operation to complete.
+  authentication_router->addEndpoint('/' + self().id + name, realm);
+
+  dispatch(help, &Help::add, pid.id, name, help_);
+}
+
 
 UPID spawn(ProcessBase* process, bool manage)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/6bd9b65b/3rdparty/libprocess/src/tests/http_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/http_tests.cpp b/3rdparty/libprocess/src/tests/http_tests.cpp
index d284361..9fe2703 100644
--- a/3rdparty/libprocess/src/tests/http_tests.cpp
+++ b/3rdparty/libprocess/src/tests/http_tests.cpp
@@ -21,16 +21,19 @@
 #include <vector>
 
 #include <process/address.hpp>
+#include <process/authenticator.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/gtest.hpp>
 #include <process/http.hpp>
+#include <process/id.hpp>
 #include <process/io.hpp>
 #include <process/owned.hpp>
 #include <process/socket.hpp>
 
 #include <stout/base64.hpp>
 #include <stout/gtest.hpp>
+#include <stout/hashset.hpp>
 #include <stout/none.hpp>
 #include <stout/nothing.hpp>
 #include <stout/os.hpp>
@@ -38,10 +41,16 @@
 
 #include "encoder.hpp"
 
+namespace authentication = process::http::authentication;
+namespace ID = process::ID;
 namespace http = process::http;
 
+using authentication::Authenticator;
+using authentication::AuthenticationResult;
+
 using process::Future;
 using process::Owned;
+using process::PID;
 using process::Process;
 using process::Promise;
 
@@ -72,6 +81,10 @@ public:
   MOCK_METHOD1(a, Future<http::Response>(const http::Request&));
   MOCK_METHOD1(abc, Future<http::Response>(const http::Request&));
 
+  MOCK_METHOD2(
+      authenticated,
+      Future<http::Response>(const http::Request&, const Option<string>&));
+
 protected:
   virtual void initialize()
   {
@@ -83,6 +96,7 @@ protected:
     route("/delete", None(), &HttpProcess::requestDelete);
     route("/a", None(), &HttpProcess::a);
     route("/a/b/c", None(), &HttpProcess::abc);
+    route("/authenticated", "realm", None(), &HttpProcess::authenticated);
   }
 
   Future<http::Response> auth(const http::Request& request)
@@ -1177,3 +1191,197 @@ TEST(URLTest, Stringification)
   EXPECT_TRUE(url4 == "http://172.158.1.23:80/path?baz=bam&foo=bar#fragment" ||
               url4 == "http://172.158.1.23:80/path?foo=bar&baz=bam#fragment");
 }
+
+
+class MockAuthenticator : public Authenticator
+{
+public:
+  MOCK_METHOD1(
+      authenticate,
+      Future<AuthenticationResult>(const http::Request&));
+
+  virtual string scheme() const { return "Basic"; }
+};
+
+
+class HttpAuthenticationTest : public ::testing::Test
+{
+protected:
+  Future<Nothing> setAuthenticator(
+      const string& realm,
+      Owned<Authenticator> authenticator)
+  {
+    realms.insert(realm);
+
+    return authentication::setAuthenticator(realm, authenticator);
+  }
+
+  virtual void TearDown()
+  {
+    foreach (const string& realm, realms) {
+      // We need to wait in order to ensure that the operation
+      // completes before we leave TearDown. Otherwise, we may
+      // leak a mock object.
+      AWAIT_READY(authentication::unsetAuthenticator(realm));
+    }
+    realms.clear();
+  }
+
+private:
+  hashset<string> realms;
+};
+
+
+// Ensures that when there is no authenticator for a realm,
+// requests are not authenticated (i.e. the principal is None).
+TEST_F(HttpAuthenticationTest, NoAuthenticator)
+{
+  Http http;
+
+  EXPECT_CALL(*http.process, authenticated(_, Option<string>::none()))
+    .WillOnce(Return(http::OK()));
+
+  Future<http::Response> response =
+    http::get(http.process->self(), "authenticated");
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+}
+
+
+// Tests that an authentication Unauthorized result is exposed correctly.
+TEST_F(HttpAuthenticationTest, Unauthorized)
+{
+  MockAuthenticator* authenticator = new MockAuthenticator();
+  setAuthenticator("realm", Owned<Authenticator>(authenticator));
+
+  Http http;
+
+  AuthenticationResult authentication;
+  authentication.unauthorized =
+    http::Unauthorized(vector<string>({"Basic realm=\"realm\""}));
+
+  EXPECT_CALL(*authenticator, authenticate(_))
+    .WillOnce(Return(authentication));
+
+  Future<http::Response> response =
+    http::get(http.process->self(), "authenticated");
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(
+      http::Unauthorized(vector<string>()).status,
+      response);
+
+  EXPECT_EQ(
+      authentication.unauthorized->headers.get("WWW-Authenticate"),
+      response.get().headers.get("WWW-Authenticate"));
+}
+
+
+// Tests that an authentication Forbidden result is exposed correctly.
+TEST_F(HttpAuthenticationTest, Forbidden)
+{
+  MockAuthenticator* authenticator = new MockAuthenticator();
+  setAuthenticator("realm", Owned<Authenticator>(authenticator));
+
+  Http http;
+
+  AuthenticationResult authentication;
+  authentication.forbidden = http::Forbidden();
+
+  EXPECT_CALL(*authenticator, authenticate(_))
+    .WillOnce(Return(authentication));
+
+  Future<http::Response> response =
+    http::get(http.process->self(), "authenticated");
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response);
+}
+
+
+// Tests that a successful authentication hits the endpoint.
+TEST_F(HttpAuthenticationTest, Authenticated)
+{
+  MockAuthenticator* authenticator = new MockAuthenticator();
+  setAuthenticator("realm", Owned<Authenticator>(authenticator));
+
+  Http http;
+
+  AuthenticationResult authentication;
+  authentication.principal = "principal";
+
+  EXPECT_CALL((*authenticator), authenticate(_))
+    .WillOnce(Return(authentication));
+
+  EXPECT_CALL(*http.process, authenticated(_, Option<string>("principal")))
+    .WillOnce(Return(http::OK()));
+
+  // Note that we don't bother pretending to specify a valid
+  // 'Authorization' header since we force authentication success.
+  Future<http::Response> response =
+    http::get(http.process->self(), "authenticated");
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+}
+
+
+// Tests that HTTP pipelining is respected even when
+// authentications are satisfied out-of-order.
+TEST_F(HttpAuthenticationTest, Pipelining)
+{
+  MockAuthenticator* authenticator = new MockAuthenticator();
+  setAuthenticator("realm", Owned<Authenticator>(authenticator));
+
+  Http http;
+
+  // We satisfy the authentication futures in reverse
+  // order. Libprocess should not re-order requests
+  // when this occurs.
+  Promise<AuthenticationResult> promise1;
+  Promise<AuthenticationResult> promise2;
+  EXPECT_CALL((*authenticator), authenticate(_))
+    .WillOnce(Return(promise1.future()))
+    .WillOnce(Return(promise2.future()));
+
+  Future<Option<string>> principal1;
+  Future<Option<string>> principal2;
+  EXPECT_CALL(*http.process, authenticated(_, _))
+    .WillOnce(DoAll(FutureArg<1>(&principal1), Return(http::OK("1"))))
+    .WillOnce(DoAll(FutureArg<1>(&principal2), Return(http::OK("2"))));
+
+  http::URL url = http::URL(
+      "http",
+      http.process->self().address.ip,
+      http.process->self().address.port,
+      http.process->self().id + "/authenticated");
+
+  Future<http::Connection> connect = http::connect(url);
+  AWAIT_READY(connect);
+
+  http::Connection connection = connect.get();
+
+  // Note that we don't bother pretending to specify a valid
+  // 'Authorization' header since we force authentication success.
+  http::Request request;
+  request.method = "GET";
+  request.url = url;
+  request.keepAlive = true;
+
+  Future<http::Response> response1 = connection.send(request);
+  Future<http::Response> response2 = connection.send(request);
+
+  AuthenticationResult authentiation2;
+  authentiation2.principal = "principal2";
+  promise2.set(authentiation2);
+
+  AuthenticationResult authentiation1;
+  authentiation1.principal = "princpal1";
+  promise1.set(authentiation1);
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response1);
+  EXPECT_EQ("1", response1->body);
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response2);
+  EXPECT_EQ("2", response2->body);
+
+  AWAIT_EXPECT_EQ(authentiation1.principal, principal1);
+  AWAIT_EXPECT_EQ(authentiation2.principal, principal2);
+}