You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/14 13:17:56 UTC

[3/6] mesos git commit: Introduce libevent SSL socket.

Introduce libevent SSL socket.

Requires:
configure --enable-libevent --enable-libevent-socket --enable-ssl

New environment variables:
```
SSL_ENABLED=(false|0,true|1)
SSL_CERT_FILE=(path to certificate)
SSL_KEY_FILE=(path to key)
SSL_VERIFY_CERT=(false|0,true|1)
SSL_REQUIRE_CERT=(false|0,true|1)
SSL_VERIFY_DEPTH=(4)
SSL_CA_DIR=(path to CA directory)
SSL_CA_FILE=(path to CA file)
SSL_CIPHERS=(accepted ciphers separated by ':')
SSL_ENABLE_SSL_V2=(false|0,true|1)
SSL_ENABLE_SSL_V3=(false|0,true|1)
SSL_ENABLE_TLS_V1_0=(false|0,true|1)
SSL_ENABLE_TLS_V1_1=(false|0,true|1)
SSL_ENABLE_TLS_V1_2=(false|0,true|1)
```

Only TLSV1.2 is enabled by default.

Use the `ENABLE_SSL_V*` and `ENABLE_TLS_V*` environment variables to
open up more protocols.

Use the `SSL_CIPHERS` environment variable to restrict or open up the
supported ciphers.

Review: https://reviews.apache.org/r/29406


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/654cabf9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/654cabf9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/654cabf9

Branch: refs/heads/master
Commit: 654cabf9092f03a1857bd99ee510a204bfaaba51
Parents: 1f6c99b
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Jun 14 03:11:00 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 04:12:01 2015 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |  26 +-
 3rdparty/libprocess/include/process/address.hpp |  21 +
 3rdparty/libprocess/include/process/socket.hpp  |  49 +-
 3rdparty/libprocess/src/libevent.cpp            |  94 +-
 3rdparty/libprocess/src/libevent.hpp            |  27 +-
 3rdparty/libprocess/src/libevent_ssl_socket.cpp | 926 +++++++++++++++++++
 3rdparty/libprocess/src/libevent_ssl_socket.hpp | 165 ++++
 3rdparty/libprocess/src/openssl.cpp             | 563 +++++++++++
 3rdparty/libprocess/src/openssl.hpp             |  79 ++
 3rdparty/libprocess/src/process.cpp             |  28 +-
 3rdparty/libprocess/src/socket.cpp              |  24 +-
 11 files changed, 1971 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 489ce35..c781f6c 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -53,6 +53,22 @@ libprocess_la_SOURCES =		\
   src/subprocess.cpp		\
   src/timeseries.cpp
 
+if ENABLE_LIBEVENT
+else
+if WITH_BUNDLED_LIBEV
+  EVENT_LIB = $(LIBEV)/libev.la
+else
+  EVENT_LIB = -lev
+endif
+endif
+
+if ENABLE_SSL
+libprocess_la_SOURCES +=	\
+    src/libevent_ssl_socket.cpp	\
+    src/openssl.cpp		\
+    src/openssl.hpp
+endif
+
 libprocess_la_CPPFLAGS =		\
   -I$(srcdir)/include			\
   -I$(srcdir)/$(STOUT)/include		\
@@ -89,16 +105,6 @@ else
   HTTP_PARSER_LIB = -lhttp_parser
 endif
 
-if ENABLE_LIBEVENT
-  EVENT_LIB = -levent -levent_pthreads
-else
-if WITH_BUNDLED_LIBEV
-  EVENT_LIB = $(LIBEV)/libev.la
-else
-  EVENT_LIB = -lev
-endif
-endif
-
 libprocess_la_LIBADD =			\
   $(LIBGLOG)				\
   $(HTTP_PARSER_LIB)			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/include/process/address.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/address.hpp b/3rdparty/libprocess/include/process/address.hpp
index 729f5cd..88946d5 100644
--- a/3rdparty/libprocess/include/process/address.hpp
+++ b/3rdparty/libprocess/include/process/address.hpp
@@ -14,6 +14,7 @@
 
 #include <stout/abort.hpp>
 #include <stout/ip.hpp>
+#include <stout/net.hpp>
 #include <stout/stringify.hpp>
 
 namespace process {
@@ -56,6 +57,26 @@ public:
     return ip.family();
   }
 
+  /**
+   * Returns the hostname of this address's IP.
+   *
+   * @returns the hostname of this address's IP.
+   */
+  // TODO(jmlvanre): Consider making this return a Future in order to
+  // deal with slow name resolution.
+  Try<std::string> hostname() const
+  {
+    const Try<std::string> hostname = ip == net::IP(INADDR_ANY)
+      ? net::hostname()
+      : net::getHostname(ip);
+
+    if (hostname.isError()) {
+      return Error(hostname.error());
+    }
+
+    return hostname.get();
+  }
+
   // Returns the storage size (i.e., either sizeof(sockaddr_in) or
   // sizeof(sockaddr_in6) depending on the family) of this address.
   size_t size() const

http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index b8c2274..2cf3c10 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -24,7 +24,9 @@ public:
   // Available kinds of implementations.
   enum Kind {
     POLL,
-    // TODO(jmlvanre): Add libevent SSL socket.
+#ifdef USE_SSL_SOCKET
+    SSL
+#endif
   };
 
   // Returns an instance of a Socket using the specified kind of
@@ -58,9 +60,11 @@ public:
       return s;
     }
 
+    // Interface functions implemented by this base class.
+    Try<Address> address() const;
+    Try<Address> bind(const Address& address);
+
     // Socket::Impl interface.
-    virtual Try<Address> address() const;
-    virtual Try<Address> bind(const Address& address);
     virtual Try<Nothing> listen(int backlog) = 0;
     virtual Future<Socket> accept() = 0;
     virtual Future<Nothing> connect(const Address& address) = 0;
@@ -93,12 +97,46 @@ public:
     // enabling reuse of a pool of preallocated strings/buffers.
     virtual Future<Nothing> send(const std::string& data);
 
+    virtual void shutdown()
+    {
+      if (::shutdown(s, SHUT_RD) < 0) {
+        PLOG(ERROR) << "Shutdown failed on fd=" << s;
+      }
+    }
+
+    // Construct a new Socket from the given impl. This is a proxy
+    // function, as Impls derived from this won't have access to the
+    // Socket::Socket(...) constructors.
+    //
+    // TODO(jmlvanre): These should be protected; however, gcc
+    // complains when using them from within a lambda of a derived
+    // class.
+    static Socket socket(std::shared_ptr<Impl>&& that)
+    {
+      return Socket(std::move(that));
+    }
+
+    static Socket socket(const std::shared_ptr<Impl>& that)
+    {
+      return Socket(that);
+    }
+
   protected:
     explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
 
     // Construct a Socket wrapper from this implementation.
     Socket socket() { return Socket(shared_from_this()); }
 
+    // Returns a std::shared_ptr<T> from this implementation.
+    template <typename T>
+    static std::shared_ptr<T> shared(T* t)
+    {
+      std::shared_ptr<T> pointer =
+        std::dynamic_pointer_cast<T>(CHECK_NOTNULL(t)->shared_from_this());
+      CHECK(pointer);
+      return pointer;
+    }
+
     int s;
   };
 
@@ -167,6 +205,11 @@ public:
     return impl->send(data);
   }
 
+  void shutdown()
+  {
+    impl->shutdown();
+  }
+
 private:
   explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/libevent.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.cpp b/3rdparty/libprocess/src/libevent.cpp
index fb03859..2b82930 100644
--- a/3rdparty/libprocess/src/libevent.cpp
+++ b/3rdparty/libprocess/src/libevent.cpp
@@ -1,10 +1,14 @@
+#include <signal.h>
 #include <unistd.h>
 
+#include <mutex>
+
 #include <event2/event.h>
 #include <event2/thread.h>
 
 #include <process/logging.hpp>
 
+#include <stout/os/signals.hpp>
 #include <stout/synchronized.hpp>
 
 #include "event_loop.hpp"
@@ -12,24 +16,100 @@
 
 namespace process {
 
-struct event_base* base = NULL;
+event_base* base = NULL;
+
+
+static std::mutex* functions_mutex = new std::mutex();
+std::queue<lambda::function<void(void)>>* functions =
+  new std::queue<lambda::function<void(void)>>();
+
+
+ThreadLocal<bool>* _in_event_loop_ = new ThreadLocal<bool>();
+
+
+void async_function(int socket, short which, void* arg)
+{
+  event* ev = reinterpret_cast<event*>(arg);
+  event_free(ev);
+
+  std::queue<lambda::function<void(void)>> q;
+
+  synchronized (functions_mutex) {
+    std::swap(q, *functions);
+  }
+
+  while (!q.empty()) {
+    q.front()();
+    q.pop();
+  }
+}
+
+
+void run_in_event_loop(
+    const lambda::function<void(void)>& f,
+    EventLoopLogicFlow event_loop_logic_flow)
+{
+  if (__in_event_loop__ && event_loop_logic_flow == ALLOW_SHORT_CIRCUIT) {
+    f();
+    return;
+  }
+
+  synchronized (functions_mutex) {
+    functions->push(f);
+
+    // Add an event and activate it to interrupt the event loop.
+    // TODO(jmlvanre): after libevent v 2.1 we can use
+    // event_self_cbarg instead of re-assigning the event. For now we
+    // manually re-assign the event to pass in the pointer to the
+    // event itself as the callback argument.
+    event* ev = evtimer_new(base, async_function, NULL);
+
+    // 'event_assign' is only valid on non-pending AND non-active
+    // events. This means we have to assign the callback before
+    // calling 'event_active'.
+    if (evtimer_assign(ev, base, async_function, ev) < 0) {
+      LOG(FATAL) << "Failed to assign callback on event";
+    }
+
+    event_active(ev, EV_TIMEOUT, 0);
+  }
+}
 
 
 void* EventLoop::run(void*)
 {
+  __in_event_loop__ = true;
+
+  // Block SIGPIPE in the event loop because we can not force
+  // underlying implementations such as SSL bufferevents to use
+  // MSG_NOSIGNAL.
+  bool unblock = os::signals::block(SIGPIPE);
+
   do {
     int result = event_base_loop(base, EVLOOP_ONCE);
     if (result < 0) {
       LOG(FATAL) << "Failed to run event loop";
-    } else if (result == 1) {
-      VLOG(1) << "All events handled, continuing event loop";
+    } else if (result > 0) {
+      // All events are handled, continue event loop.
       continue;
-    } else if (event_base_got_break(base)) {
-      break;
-    } else if (event_base_got_exit(base)) {
-      break;
+    } else {
+      CHECK_EQ(0, result);
+      if (event_base_got_break(base)) {
+        break;
+      } else if (event_base_got_exit(base)) {
+        break;
+      }
     }
   } while (true);
+
+  __in_event_loop__ = false;
+
+  if (unblock) {
+    if (!os::signals::unblock(SIGPIPE)) {
+      LOG(FATAL) << "Failure to unblock SIGPIPE";
+    }
+  }
+
   return NULL;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/libevent.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.hpp b/3rdparty/libprocess/src/libevent.hpp
index f6cc721..a20f2c3 100644
--- a/3rdparty/libprocess/src/libevent.hpp
+++ b/3rdparty/libprocess/src/libevent.hpp
@@ -1,10 +1,35 @@
 #ifndef __LIBEVENT_HPP__
 #define __LIBEVENT_HPP__
 
+#include <stout/lambda.hpp>
+#include <stout/thread.hpp>
+
 namespace process {
 
 // Event loop.
-extern struct event_base* base;
+extern event_base* base;
+
+
+// Per thread bool pointer. The extra level of indirection from
+// _in_event_loop_ to __in_event_loop__ is used in order to take
+// advantage of the ThreadLocal operators without needing the extra
+// dereference as well as lazily construct the actual bool.
+extern ThreadLocal<bool>* _in_event_loop_;
+
+
+#define __in_event_loop__ *(*_in_event_loop_ == NULL ?               \
+  *_in_event_loop_ = new bool(false) : *_in_event_loop_)
+
+
+enum EventLoopLogicFlow {
+  ALLOW_SHORT_CIRCUIT,
+  DISALLOW_SHORT_CIRCUIT
+};
+
+
+void run_in_event_loop(
+    const lambda::function<void(void)>& f,
+    EventLoopLogicFlow event_loop_logic_flow = ALLOW_SHORT_CIRCUIT);
 
 } // namespace process {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
new file mode 100644
index 0000000..5955796
--- /dev/null
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
@@ -0,0 +1,926 @@
+#include <event2/buffer.h>
+#include <event2/bufferevent_ssl.h>
+#include <event2/event.h>
+#include <event2/listener.h>
+#include <event2/thread.h>
+#include <event2/util.h>
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+#include <process/queue.hpp>
+#include <process/socket.hpp>
+
+#include <stout/net.hpp>
+#include <stout/synchronized.hpp>
+
+#include "libevent.hpp"
+#include "libevent_ssl_socket.hpp"
+#include "openssl.hpp"
+
+// Locking:
+//
+// We use the BEV_OPT_THREADSAFE flag when constructing bufferevents
+// so that all **functions that are called from the event loop that
+// take a bufferevent as a parameter will automatically have the
+// lock acquired**.
+//
+// This means that everywhere that the libevent library does not
+// already lock the bev, we need to manually 'synchronize (bev) {'.
+// To further complicate matters, due to a deadlock scneario in
+// libevent-openssl (v 2.0.21) we currently modify bufferevents using
+// continuations in the event loop, but these functions, while run
+// from within the event loop, are not passed the 'bev' as a parameter
+// and thus MUST use 'synchronized (bev)'. See 'Continuation' comment
+// below for more details on why we need to invoke these continuations
+// from within the event loop.
+
+// Continuations via 'run_in_event_loop(...)':
+//
+// There is a deadlock scenario in libevent-openssl (v 2.0.21) when
+// modifying the bufferevent (bev) from another thread (not the event
+// loop). To avoid this we run all bufferevent manipulation logic in
+// continuations that are executed within the event loop.
+
+// Connection Extra FD:
+//
+// In libevent-openssl (v 2.0.21) we've had issues using the
+// 'bufferevent_openssl_socket_new' call with the CONNECTING state and
+// an existing socket. Therefore we allow it to construct its own
+// fd and clean it up along with the Impl object when the bev is
+// freed using the BEV_OPT_CLOSE_ON_FREE option.
+
+// DISALLOW_SHORT_CIRCUIT:
+//
+// We disallow short-circuiting in 'run_in_event_loop' due to a bug in
+// libevent_openssl with deferred callbacks still being called (still
+// in the run queue) even though a bev has been disabled.
+
+using std::queue;
+using std::string;
+
+// Specialization of 'synchronize' to use bufferevent with the
+// 'synchronized' macro.
+static Synchronized<bufferevent> synchronize(bufferevent* bev)
+{
+  return Synchronized<bufferevent>(
+      bev,
+      [](bufferevent* bev) { bufferevent_lock(bev); },
+      [](bufferevent* bev) { bufferevent_unlock(bev); });
+}
+
+namespace process {
+namespace network {
+
+Try<std::shared_ptr<Socket::Impl>> LibeventSSLSocketImpl::create(int s)
+{
+  openssl::initialize();
+  auto socket = std::make_shared<LibeventSSLSocketImpl>(s);
+  // See comment at 'initialize' declaration for why we call this.
+  socket->initialize();
+  return socket;
+}
+
+
+LibeventSSLSocketImpl::~LibeventSSLSocketImpl()
+{
+  // We defer termination and destruction of all event loop specific
+  // calls and structures. This is a safety against the socket being
+  // destroyed before existing event loop calls have completed since
+  // they require valid data structures (the weak pointer).
+
+  // Copy the members that we are interested in. This is necessary
+  // because 'this' points to memory that may be re-allocated and
+  // invalidate any reference to 'this->XXX'. We want to manipulate
+  // or use these data structures within the finalization lambda
+  // below.
+  evconnlistener* _listener = listener;
+  bufferevent* _bev = bev;
+  bool _accepted = accepted;
+  std::weak_ptr<LibeventSSLSocketImpl>* _event_loop_handle = event_loop_handle;
+
+  run_in_event_loop(
+      [_listener, _bev, _accepted, _event_loop_handle]() {
+        // Once this lambda is called, it should not be possible for
+        // more event loop callbacks to be triggered with 'this->bev'.
+        // This is important because we delete event_loop_handle which
+        // is the callback argument for any event loop callbacks.
+        // This lambda is responsible for ensuring 'this->bev' is
+        // disabled, and cleaning up any remaining state associated
+        // with the event loop.
+
+        CHECK(__in_event_loop__);
+
+        if (_listener != NULL) {
+          evconnlistener_free(_listener);
+        }
+
+        if (_bev != NULL) {
+          SSL* ssl = bufferevent_openssl_get_ssl(_bev);
+          // Workaround for SSL shutdown, see http://www.wangafu.net/~nickm/libevent-book/Ref6a_advanced_bufferevents.html // NOLINT
+          SSL_set_shutdown(ssl, SSL_RECEIVED_SHUTDOWN);
+          SSL_shutdown(ssl);
+
+          // NOTE: Removes all future callbacks using 'this->bev'.
+          bufferevent_disable(_bev, EV_READ | EV_WRITE);
+
+          // Since we are using a separate fd for the connecting socket we
+          // end up using BEV_OPT_CLOSE_ON_FREE for the connecting, but
+          // not for the accepting side. since the BEV_OPT_CLOSE_ON_FREE
+          // also frees the SSL object, we need to manually free it for
+          // the accepting case. See the 'Connection Extra FD' note at top
+          // of file.
+          if (_accepted) {
+            SSL_free(ssl);
+          }
+
+          // For the connecting socket BEV_OPT_CLOSE_ON_FREE will close
+          // the fd. See note below.
+          bufferevent_free(_bev);
+        }
+
+        delete _event_loop_handle;
+      },
+      DISALLOW_SHORT_CIRCUIT);
+}
+
+
+void LibeventSSLSocketImpl::initialize()
+{
+  event_loop_handle = new std::weak_ptr<LibeventSSLSocketImpl>(shared(this));
+}
+
+
+void LibeventSSLSocketImpl::shutdown()
+{
+  // Nothing to do if this socket was never initialized.
+  synchronized (lock) {
+    if (bev == NULL) {
+      // If it was not initialized, then there should also be no
+      // requests.
+      CHECK(connect_request.get() == NULL);
+      CHECK(recv_request.get() == NULL);
+      CHECK(send_request.get() == NULL);
+
+      return;
+    }
+  }
+
+  // Extend the life-time of 'this' through the execution of the
+  // lambda in the event loop. Note: The 'self' needs to be explicitly
+  // captured because we're not using it in the body of the lambda. We
+  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+  // execute.
+  auto self = shared(this);
+
+  run_in_event_loop(
+      [self]() {
+        CHECK(__in_event_loop__);
+        CHECK(self);
+
+        CHECK_NOTNULL(self->bev);
+
+        synchronized (self->bev) {
+          Owned<RecvRequest> request;
+
+          // Swap the 'recv_request' under the object lock.
+          synchronized (self->lock) {
+            std::swap(request, self->recv_request);
+          }
+
+          // If there is still a pending receive request then close it.
+          if (request.get() != NULL) {
+            request->promise
+              .set(bufferevent_read(self->bev, request->data, request->size));
+          }
+        }
+      },
+      DISALLOW_SHORT_CIRCUIT);
+}
+
+
+// Only runs in event loop. No locks required. See 'Locking' note at
+// top of file.
+void LibeventSSLSocketImpl::recv_callback(bufferevent* /*bev*/, void* arg)
+{
+  CHECK(__in_event_loop__);
+
+  std::weak_ptr<LibeventSSLSocketImpl>* handle =
+    reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg));
+
+  std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
+
+  // Don't call the 'recv_callback' unless the socket is still valid.
+  if (impl != NULL) {
+    impl->recv_callback();
+  }
+}
+
+
+// Only runs in event loop. Member function continuation of static
+// 'recv_callback'.
+void LibeventSSLSocketImpl::recv_callback()
+{
+  CHECK(__in_event_loop__);
+
+  Owned<RecvRequest> request;
+
+  synchronized (lock) {
+    std::swap(request, recv_request);
+  }
+
+  if (request.get() != NULL) {
+    // There is an invariant that if we are executing a
+    // 'recv_callback' and we have a request there must be data here
+    // because we should not be getting a spurrious receive callback
+    // invocation. Even if we discarded a request, the manual
+    // invocation of 'recv_callback' guarantees that there is a
+    // non-zero amount of data available in the bufferevent.
+    size_t length = bufferevent_read(bev, request->data, request->size);
+    CHECK(length > 0);
+
+    request->promise.set(length);
+  }
+}
+
+
+// Only runs in event loop. No locks required. See 'Locking' note at
+// top of file.
+void LibeventSSLSocketImpl::send_callback(bufferevent* /*bev*/, void* arg)
+{
+  CHECK(__in_event_loop__);
+
+  std::weak_ptr<LibeventSSLSocketImpl>* handle =
+    reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg));
+
+  std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
+
+  // Don't call the 'send_callback' unless the socket is still valid.
+  if (impl != NULL) {
+    impl->send_callback();
+  }
+}
+
+
+// Only runs in event loop. Member function continuation of static
+// 'recv_callback'.
+void LibeventSSLSocketImpl::send_callback()
+{
+  CHECK(__in_event_loop__);
+
+  Owned<SendRequest> request;
+
+  synchronized (lock) {
+    std::swap(request, send_request);
+  }
+
+  if (request.get() != NULL) {
+    request->promise.set(request->size);
+  }
+}
+
+
+// Only runs in event loop. No locks required. See 'Locking' note at
+// top of file.
+void LibeventSSLSocketImpl::event_callback(
+    bufferevent* /*bev*/,
+    short events,
+    void* arg)
+{
+  CHECK(__in_event_loop__);
+
+  std::weak_ptr<LibeventSSLSocketImpl>* handle =
+    reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg));
+
+  std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
+
+  // Don't call the 'event_callback' unless the socket is still valid.
+  if (impl != NULL) {
+    impl->event_callback(events);
+  }
+}
+
+
+// Only runs in event loop. Member function continuation of static
+// 'recv_callback'.
+void LibeventSSLSocketImpl::event_callback(short events)
+{
+  CHECK(__in_event_loop__);
+
+  Owned<RecvRequest> current_recv_request;
+  Owned<SendRequest> current_send_request;
+  Owned<ConnectRequest> current_connect_request;
+
+  // In all of the following conditions, we're interested in swapping
+  // the value of the requests with null (if they are already null,
+  // then there's no harm).
+  if (events & BEV_EVENT_EOF ||
+      events & BEV_EVENT_CONNECTED ||
+      events & BEV_EVENT_ERROR) {
+    synchronized (lock) {
+      std::swap(current_recv_request, recv_request);
+      std::swap(current_send_request, send_request);
+      std::swap(current_connect_request, connect_request);
+    }
+  }
+
+  // If a request below is null, then no such request is in progress,
+  // either because it was never created, it has already been
+  // completed, or it has been discarded.
+
+  if (events & BEV_EVENT_EOF ||
+     (events & BEV_EVENT_ERROR && EVUTIL_SOCKET_ERROR() == 0)) {
+    // At end of file, close the connection.
+    if (current_recv_request.get() != NULL) {
+      current_recv_request->promise.set(0);
+    }
+
+    if (current_send_request.get() != NULL) {
+      current_send_request->promise.set(0);
+    }
+
+    if (current_connect_request.get() != NULL) {
+      bufferevent_free(CHECK_NOTNULL(bev));
+      bev = NULL;
+      current_connect_request->promise.fail(
+          "Failed connect: connection closed");
+    }
+  } else if (events & BEV_EVENT_CONNECTED) {
+    // We should not have receiving or sending request while still
+    // connecting.
+    CHECK(current_recv_request.get() == NULL);
+    CHECK(current_send_request.get() == NULL);
+    CHECK_NOTNULL(current_connect_request.get());
+
+    // If we're connecting, then we've succeeded. Time to do
+    // post-verification.
+    CHECK_NOTNULL(bev);
+
+    // Do post-validation of connection.
+    SSL* ssl = bufferevent_openssl_get_ssl(bev);
+
+    Try<Nothing> verify = openssl::verify(ssl, peer_hostname);
+    if (verify.isError()) {
+      VLOG(1) << "Failed connect, verification error: " << verify.error();
+      bufferevent_free(bev);
+      bev = NULL;
+      current_connect_request->promise.fail(verify.error());
+      return;
+    }
+
+    current_connect_request->promise.set(Nothing());
+  } else if (events & BEV_EVENT_ERROR) {
+    CHECK(EVUTIL_SOCKET_ERROR() != 0);
+    std::ostringstream error_stream;
+    error_stream << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
+
+    // If there is a valid error, fail any requests and log the error.
+    VLOG(1) << "Socket error: " << error_stream.str();
+
+    if (current_recv_request.get() != NULL) {
+      current_recv_request->promise.fail(
+          "Failed recv, connection error: " +
+          error_stream.str());
+    }
+
+    if (current_send_request.get() != NULL) {
+      current_send_request->promise.fail(
+          "Failed send, connection error: " +
+          error_stream.str());
+    }
+
+    if (current_connect_request.get() != NULL) {
+      bufferevent_free(CHECK_NOTNULL(bev));
+      bev = NULL;
+      current_connect_request->promise.fail(
+          "Failed connect, connection error: " +
+          error_stream.str());
+    }
+  }
+}
+
+
+// For the connecting socket we currently don't use the fd associated
+// with 'Socket'. See the 'Connection Extra FD' note at top of file.
+LibeventSSLSocketImpl::LibeventSSLSocketImpl(int _s)
+  : Socket::Impl(_s),
+    bev(NULL),
+    listener(NULL),
+    lock(ATOMIC_FLAG_INIT),
+    recv_request(NULL),
+    send_request(NULL),
+    connect_request(NULL),
+    event_loop_handle(NULL),
+    accepted(false) {}
+
+
+// For the connecting socket we currently don't use the fd associated
+// with 'Socket'. See the 'Connection Extra FD' note at top of file.
+LibeventSSLSocketImpl::LibeventSSLSocketImpl(
+    int _s,
+    bufferevent* _bev,
+    Option<std::string>&& _peer_hostname)
+  : Socket::Impl(_s),
+    bev(_bev),
+    listener(NULL),
+    lock(ATOMIC_FLAG_INIT),
+    recv_request(NULL),
+    send_request(NULL),
+    connect_request(NULL),
+    event_loop_handle(NULL),
+    accepted(true),
+    peer_hostname(std::move(_peer_hostname)) {}
+
+
+Future<Nothing> LibeventSSLSocketImpl::connect(const Address& address)
+{
+  if (bev != NULL) {
+    return Failure("Socket is already connected");
+  }
+
+  if (connect_request.get() != NULL) {
+    return Failure("Socket is already connecting");
+  }
+
+  SSL* ssl = SSL_new(openssl::context());
+  if (ssl == NULL) {
+    return Failure("Failed to connect: SSL_new");
+  }
+
+  // Construct the bufferevent in the connecting state. We don't use
+  // the existing FD due to an issue in libevent-openssl. See the
+  // 'Connection Extra FD' note at top of file.
+  CHECK(bev == NULL);
+  bev = bufferevent_openssl_socket_new(
+      base,
+      -1,
+      ssl,
+      BUFFEREVENT_SSL_CONNECTING,
+      BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
+
+  if (bev == NULL) {
+    // We need to free 'ssl' here because the bev won't clean it up
+    // for us.
+    SSL_free(ssl);
+    return Failure("Failed to connect: bufferevent_openssl_socket_new");
+  }
+
+  // From this point on, as long as 'bev' is freed properly, it will
+  // free 'ssl' along with it due to the BEV_OPT_CLOSE_ON_FREE' flag.
+
+  // Assign the callbacks for the bufferevent.
+  bufferevent_setcb(
+      bev,
+      &LibeventSSLSocketImpl::recv_callback,
+      &LibeventSSLSocketImpl::send_callback,
+      &LibeventSSLSocketImpl::event_callback,
+      CHECK_NOTNULL(event_loop_handle));
+
+  // Try and determine the 'peer_hostname' from the address we're
+  // connecting to in order to properly verify the SSL connection later.
+  const Try<string> hostname = address.hostname();
+
+  if (hostname.isError()) {
+    VLOG(2) << "Could not determine hostname of peer: " << hostname.error();
+  } else {
+    VLOG(2) << "Connecting to " << hostname.get();
+    peer_hostname = hostname.get();
+  }
+
+  // Optimistically construct a 'ConnectRequest' and future.
+  Owned<ConnectRequest> request(new ConnectRequest());
+  Future<Nothing> future = request->promise.future();
+
+  // Assign 'connect_request' under lock, fail on error.
+  synchronized (lock) {
+    if (connect_request.get() != NULL) {
+      bufferevent_free(bev);
+      bev = NULL;
+      return Failure("Socket is already connecting");
+    }
+    std::swap(request, connect_request);
+  }
+
+  sockaddr_storage addr = net::createSockaddrStorage(address.ip, address.port);
+
+  if (bufferevent_socket_connect(
+          bev,
+          reinterpret_cast<sockaddr*>(&addr),
+          sizeof(addr)) < 0) {
+    bufferevent_free(bev);
+    bev = NULL;
+    return Failure("Failed to connect: bufferevent_socket_connect");
+  }
+
+  return future;
+}
+
+
+Future<size_t> LibeventSSLSocketImpl::recv(char* data, size_t size)
+{
+  // Optimistically construct a 'RecvRequest' and future.
+  Owned<RecvRequest> request(new RecvRequest(data, size));
+  std::weak_ptr<LibeventSSLSocketImpl> weak_self(shared(this));
+
+  // If the user of the future decides to 'discard', then we want to
+  // test whether the request was already satisfied.
+  // We capture a 'weak_ptr' to 'this' (as opposed to a 'shared_ptr')
+  // because the socket could be destroyed before this lambda is
+  // executed. If we used a 'shared_ptr' then this lambda could extend
+  // the life-time of 'this' unnecessarily.
+  Future<size_t> future = request->promise.future()
+    .onDiscard([weak_self]() {
+      // Extend the life-time of 'this' through the execution of the
+      // lambda in the event loop. Note: The 'self' needs to be
+      // explicitly captured because we're not using it in the body of
+      // the lambda. We can use a 'shared_ptr' because
+      // run_in_event_loop is guaranteed to execute.
+      std::shared_ptr<LibeventSSLSocketImpl> self(weak_self.lock());
+
+      if (self != NULL) {
+        run_in_event_loop(
+            [self]() {
+              CHECK(__in_event_loop__);
+              CHECK(self);
+
+              Owned<RecvRequest> request;
+
+              synchronized (self->lock) {
+                std::swap(request, self->recv_request);
+              }
+
+              // Only discard if the request hasn't already been
+              // satisfied.
+              if (request.get() != NULL) {
+                // Discard the promise outside of the object lock as
+                // the callbacks can be expensive.
+                request->promise.discard();
+              }
+            },
+            DISALLOW_SHORT_CIRCUIT);
+      }
+    });
+
+  // Assign 'recv_request' under lock, fail on error.
+  synchronized (lock) {
+    if (recv_request.get() != NULL) {
+      return Failure("Socket is already receiving");
+    }
+    std::swap(request, recv_request);
+  }
+
+  // Extend the life-time of 'this' through the execution of the
+  // lambda in the event loop. Note: The 'self' needs to be explicitly
+  // captured because we're not using it in the body of the lambda. We
+  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+  // execute.
+  auto self = shared(this);
+
+  run_in_event_loop(
+      [self]() {
+        CHECK(__in_event_loop__);
+        CHECK(self);
+
+        bool recv = false;
+
+        // We check to see if 'recv_request' is null. It would be null
+        // if a 'discard' happened before this lambda was executed.
+        synchronized (self->lock) {
+          recv = self->recv_request.get() != NULL;
+        }
+
+        // Only try to read existing data from the bufferevent if the
+        // request has not already been discarded.
+        if (recv) {
+          synchronized (self->bev) {
+            evbuffer* input = bufferevent_get_input(self->bev);
+            size_t length = evbuffer_get_length(input);
+
+            // If there is already data in the buffer, fulfill the
+            // 'recv_request' by calling 'recv_callback()'. Otherwise
+            // do nothing and wait for the 'recv_callback' to run when
+            // we receive data over the network.
+            if (length > 0) {
+              self->recv_callback();
+            }
+          }
+        }
+      },
+      DISALLOW_SHORT_CIRCUIT);
+
+  return future;
+}
+
+
+Future<size_t> LibeventSSLSocketImpl::send(const char* data, size_t size)
+{
+  // Optimistically construct a 'SendRequest' and future.
+  Owned<SendRequest> request(new SendRequest(size));
+  Future<size_t> future = request->promise.future();
+
+  // We don't add an 'onDiscard' continuation to send because we can
+  // not accurately detect how many bytes have been sent. Once we pass
+  // the data to the bufferevent, there is the possibility that parts
+  // of it have been sent. Another reason is that if we send partial
+  // messages (discard only a part of the data), then it is likely
+  // that the receiving end will fail parsing the message.
+
+  // Assign 'send_request' under lock, fail on error.
+  synchronized (lock) {
+    if (send_request.get() != NULL) {
+      return Failure("Socket is already sending");
+    }
+    std::swap(request, send_request);
+  }
+
+  // Extend the life-time of 'this' through the execution of the
+  // lambda in the event loop. Note: The 'self' needs to be explicitly
+  // captured because we're not using it in the body of the lambda. We
+  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+  // execute.
+  auto self = shared(this);
+
+  run_in_event_loop(
+      [self, data, size]() {
+        CHECK(__in_event_loop__);
+        CHECK(self);
+
+        // We check that send_request is valid, because we do not
+        // allow discards. This means there is no race between the
+        // entry of 'send' and the execution of this lambda.
+        synchronized (self->lock) {
+          CHECK_NOTNULL(self->send_request.get());
+        }
+
+        bufferevent_write(self->bev, data, size);
+      },
+      DISALLOW_SHORT_CIRCUIT);
+
+  return future;
+}
+
+
+Future<size_t> LibeventSSLSocketImpl::sendfile(
+    int fd,
+    off_t offset,
+    size_t size)
+{
+  // Optimistically construct a 'SendRequest' and future.
+  Owned<SendRequest> request(new SendRequest(size));
+  Future<size_t> future = request->promise.future();
+
+  // Assign 'send_request' under lock, fail on error.
+  synchronized (lock) {
+    if (send_request.get() != NULL) {
+      return Failure("Socket is already sending");
+    }
+    std::swap(request, send_request);
+  }
+
+  // Extend the life-time of 'this' through the execution of the
+  // lambda in the event loop. Note: The 'self' needs to be explicitly
+  // captured because we're not using it in the body of the lambda. We
+  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+  // execute.
+  auto self = shared(this);
+
+  run_in_event_loop(
+      [self, fd, offset, size]() {
+        CHECK(__in_event_loop__);
+        CHECK(self);
+
+        // We check that send_request is valid, because we do not
+        // allow discards. This means there is no race between the
+        // entry of 'sendfile' and the execution of this lambda.
+        synchronized (self->lock) {
+          CHECK_NOTNULL(self->send_request.get());
+        }
+
+        evbuffer_add_file(
+            bufferevent_get_output(self->bev),
+            fd,
+            offset,
+            size);
+      },
+      DISALLOW_SHORT_CIRCUIT);
+
+  return future;
+}
+
+
+Try<Nothing> LibeventSSLSocketImpl::listen(int backlog)
+{
+  if (listener != NULL) {
+    return Error("Socket is already listening");
+  }
+
+  CHECK(bev == NULL);
+
+  listener = evconnlistener_new(
+      base,
+      [](evconnlistener* listener,
+         int socket,
+         sockaddr* addr,
+         int addr_length,
+         void* arg) {
+        CHECK(__in_event_loop__);
+
+        std::weak_ptr<LibeventSSLSocketImpl>* handle =
+          reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(
+              CHECK_NOTNULL(arg));
+
+        std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
+
+        if (impl != NULL) {
+          Try<net::IP> ip = net::IP::create(*addr);
+          if (ip.isError()) {
+            VLOG(2) << "Could not convert sockaddr to net::IP: " << ip.error();
+          }
+
+          // We pass the 'listener' into the 'AcceptRequest' because
+          // this function could be executed before 'this->listener'
+          // is set.
+          AcceptRequest* request =
+            new AcceptRequest(
+                  socket,
+                  listener,
+                  ip.isSome() ? Option<net::IP>(ip.get()) : None());
+
+          impl->accept_callback(request);
+        }
+      },
+      event_loop_handle,
+      LEV_OPT_REUSEABLE,
+      backlog,
+      s);
+
+  if (listener == NULL) {
+    return Error("Failed to listen on socket");
+  }
+
+  // TODO(jmlvanre): attach an error callback.
+
+  return Nothing();
+}
+
+
+Future<Socket> LibeventSSLSocketImpl::accept()
+{
+  return accept_queue.get()
+    .then([](const Future<Socket>& future) { return future; });
+}
+
+
+// Only runs in event loop.
+void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request)
+{
+  CHECK(__in_event_loop__);
+
+  // Enqueue a potential socket that we will set up SSL state for and
+  // verify.
+  accept_queue.put(request->promise.future());
+
+  // Set up SSL object.
+  SSL* ssl = SSL_new(openssl::context());
+  if (ssl == NULL) {
+    request->promise.fail("Accept failed, SSL_new");
+    delete request;
+    return;
+  }
+
+  // We use 'request->listener' because 'this->listener' may not have
+  // been set by the time this function is executed. See comment in
+  // the lambda for evconnlistener_new in
+  // 'LibeventSSLSocketImpl::listen'.
+  event_base* ev_base = evconnlistener_get_base(request->listener);
+
+  // Construct the bufferevent in the accepting state.
+  bufferevent* bev = bufferevent_openssl_socket_new(
+      ev_base,
+      request->socket,
+      ssl,
+      BUFFEREVENT_SSL_ACCEPTING,
+      BEV_OPT_THREADSAFE);
+
+  if (bev == NULL) {
+    request->promise.fail("Accept failed: bufferevent_openssl_socket_new");
+    SSL_free(ssl);
+    delete request;
+    return;
+  }
+
+  bufferevent_setcb(
+      bev,
+      NULL,
+      NULL,
+      [](bufferevent* bev, short events, void* arg) {
+        // This handles error states or 'BEV_EVENT_CONNECTED' events
+        // and satisfies the promise by constructing a new socket if
+        // the connection was successfuly established.
+        CHECK(__in_event_loop__);
+
+        AcceptRequest* request =
+          reinterpret_cast<AcceptRequest*>(CHECK_NOTNULL(arg));
+
+        if (events & BEV_EVENT_EOF) {
+          request->promise.fail("Failed accept: connection closed");
+        } else if (events & BEV_EVENT_CONNECTED) {
+          // We will receive a 'CONNECTED' state on an accepting socket
+          // once the connection is established. Time to do
+          // post-verification. First, we need to determine the peer
+          // hostname.
+          Option<string> peer_hostname = None();
+          if (request->ip.isSome()) {
+            Try<string> hostname = net::getHostname(request->ip.get());
+            if (hostname.isError()) {
+              VLOG(2) << "Could not determine hostname of peer: "
+                      << hostname.error();
+            } else {
+              VLOG(2) << "Accepting from " << hostname.get();
+              peer_hostname = hostname.get();
+            }
+          }
+
+          SSL* ssl = bufferevent_openssl_get_ssl(bev);
+          CHECK_NOTNULL(ssl);
+
+          Try<Nothing> verify = openssl::verify(ssl, peer_hostname);
+          if (verify.isError()) {
+            VLOG(1) << "Failed accept, verification error: " << verify.error();
+            request->promise.fail(verify.error());
+            SSL_free(ssl);
+            bufferevent_free(bev);
+            // TODO(jmlvanre): Clean up for readability. Consider RAII
+            // or constructing the impl earlier.
+            CHECK(request->socket >= 0);
+            Try<Nothing> close = os::close(request->socket);
+            if (close.isError()) {
+              LOG(FATAL)
+                << "Failed to close socket " << stringify(request->socket)
+                << ": " << close.error();
+            }
+            delete request;
+            return;
+          }
+
+          auto impl = std::shared_ptr<LibeventSSLSocketImpl>(
+              new LibeventSSLSocketImpl(
+                  request->socket,
+                  bev,
+                  std::move(peer_hostname)));
+
+          // See comment at 'initialize' declaration for why we call
+          // this.
+          impl->initialize();
+
+          // We have to wait till after 'initialize()' is invoked for
+          // event_loop_handle to be valid as a callback argument for
+          // the callbacks.
+          bufferevent_setcb(
+              CHECK_NOTNULL(impl->bev),
+              &LibeventSSLSocketImpl::recv_callback,
+              &LibeventSSLSocketImpl::send_callback,
+              &LibeventSSLSocketImpl::event_callback,
+              CHECK_NOTNULL(impl->event_loop_handle));
+
+          Socket socket = Socket::Impl::socket(std::move(impl));
+
+          request->promise.set(socket);
+        } else if (events & BEV_EVENT_ERROR) {
+          std::ostringstream stream;
+          if (EVUTIL_SOCKET_ERROR() != 0) {
+            stream << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
+          } else {
+            char buffer[1024] = {};
+            unsigned long error = bufferevent_get_openssl_error(bev);
+            ERR_error_string_n(error, buffer, sizeof(buffer));
+            stream << buffer;
+          }
+
+          // Fail the accept request and log the error.
+          VLOG(1) << "Socket error: " << stream.str();
+
+          SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(bev));
+          SSL_free(ssl);
+          bufferevent_free(bev);
+
+          // TODO(jmlvanre): Clean up for readability. Consider RAII
+          // or constructing the impl earlier.
+          CHECK(request->socket >= 0);
+          Try<Nothing> close = os::close(request->socket);
+          if (close.isError()) {
+            LOG(FATAL)
+              << "Failed to close socket " << stringify(request->socket)
+              << ": " << close.error();
+          }
+          request->promise.fail(
+              "Failed accept: connection error: " + stream.str());
+        }
+
+        delete request;
+      },
+      request);
+}
+
+} // namespace network {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/libevent_ssl_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.hpp b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
new file mode 100644
index 0000000..d65638b
--- /dev/null
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
@@ -0,0 +1,165 @@
+#ifndef __LIBEVENT_SSL_SOCKET_HPP__
+#define __LIBEVENT_SSL_SOCKET_HPP__
+
+#include <event2/buffer.h>
+#include <event2/bufferevent_ssl.h>
+#include <event2/event.h>
+#include <event2/listener.h>
+#include <event2/util.h>
+
+#include <atomic>
+#include <memory>
+
+#include <process/queue.hpp>
+#include <process/socket.hpp>
+
+namespace process {
+namespace network {
+
+class LibeventSSLSocketImpl : public Socket::Impl
+{
+public:
+  // See 'Socket::create()'.
+  static Try<std::shared_ptr<Socket::Impl>> create(int s);
+
+  LibeventSSLSocketImpl(int _s);
+
+  virtual ~LibeventSSLSocketImpl();
+
+  // Implement 'Socket::Impl' interface.
+  virtual Future<Nothing> connect(const Address& address);
+  virtual Future<size_t> recv(char* data, size_t size);
+  // Send does not currently support discard. See implementation.
+  virtual Future<size_t> send(const char* data, size_t size);
+  virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
+  virtual Try<Nothing> listen(int backlog);
+  virtual Future<Socket> accept();
+  virtual Socket::Kind kind() const { return Socket::SSL; }
+
+  // This call is used to do the equivalent of shutting down the read
+  // end. This means finishing the future of any outstanding read
+  // request.
+  virtual void shutdown();
+
+  // We need a post-initializer because 'shared_from_this()' is not
+  // valid until the constructor has finished.
+  void initialize();
+
+private:
+  // A set of helper functions that transitions an accepted socket to
+  // an SSL connected socket. With the libevent-openssl library, once
+  // we return from the 'accept_callback()' which is scheduled by
+  // 'listen' then we still need to wait for the 'BEV_EVENT_CONNECTED'
+  // state before we know the SSL connection has been established.
+  struct AcceptRequest
+  {
+    AcceptRequest(
+        int _socket,
+        evconnlistener* _listener,
+        const Option<net::IP>& _ip)
+      : listener(_listener),
+        socket(_socket),
+        ip(_ip) {}
+    Promise<Socket> promise;
+    evconnlistener* listener;
+    int socket;
+    Option<net::IP> ip;
+  };
+
+  struct RecvRequest
+  {
+    RecvRequest(char* _data, size_t _size)
+      : data(_data), size(_size) {}
+    Promise<size_t> promise;
+    char* data;
+    size_t size;
+  };
+
+  struct SendRequest
+  {
+    SendRequest(size_t _size)
+      : size(_size) {}
+    Promise<size_t> promise;
+    size_t size;
+  };
+
+  struct ConnectRequest
+  {
+    Promise<Nothing> promise;
+  };
+
+  // This is a private constructor used by the accept helper
+  // functions.
+  LibeventSSLSocketImpl(
+      int _s,
+      bufferevent* bev,
+      Option<std::string>&& peer_hostname);
+
+  // This is called when the equivalent of 'accept' returns. The role
+  // of this function is to set up the SSL object and bev.
+  void accept_callback(AcceptRequest* request);
+
+  // The following are function pairs of static functions to member
+  // functions. The static functions test and hold the weak pointer to
+  // the socket before calling the member functions. This protects
+  // against the socket being destroyed before the event loop calls
+  // the callbacks.
+  static void recv_callback(bufferevent* bev, void* arg);
+  void recv_callback();
+
+  static void send_callback(bufferevent* bev, void* arg);
+  void send_callback();
+
+  static void event_callback(bufferevent* bev, short events, void* arg);
+  void event_callback(short events);
+
+  bufferevent* bev;
+
+  evconnlistener* listener;
+
+  // Protects the following instance variables.
+  std::atomic_flag lock;
+  Owned<RecvRequest> recv_request;
+  Owned<SendRequest> send_request;
+  Owned<ConnectRequest> connect_request;
+
+  // This is a weak pointer to 'this', i.e., ourselves, this class
+  // instance. We need this for our event loop callbacks because it's
+  // possible that we'll actually want to cleanup this socket impl
+  // before the event loop callback gets executed ... and we'll check
+  // in each event loop callback whether or not this weak_ptr is valid
+  // by attempting to upgrade it to shared_ptr. It is the
+  // responsibility of the event loop through the deferred lambda in
+  // the destructor to clean up this pointer.
+  // 1) It is a 'weak_ptr' as opposed to a 'shared_ptr' because we
+  // want to test whether the object is still around from within the
+  // event loop. If it was a 'shared_ptr' then we would be
+  // contributing to the lifetime of the object and would no longer be
+  // able to test the lifetime correctly.
+  // 2) This is a pointer to a 'weak_ptr' so that we can pass this
+  // through to the event loop through the C-interface. We need access
+  // to the 'weak_ptr' from outside the object (in the event loop) to
+  // test if the object is still alive. By maintaining this 'weak_ptr'
+  // on the heap we can be sure it is safe to access from the
+  // event loop until it is destroyed.
+  std::weak_ptr<LibeventSSLSocketImpl>* event_loop_handle;
+
+  // This queue stores buffered accepted sockets. 'Queue' is a thread
+  // safe queue implementation, and the event loop pushes connected
+  // sockets onto it, the 'accept()' call pops them off. We wrap these
+  // sockets with futures so that we can pass errors through and chain
+  // futures as well.
+  Queue<Future<Socket>> accept_queue;
+
+  // This bool represents whether this socket was created through an
+  // 'accept' flow. We use this in the destructor to change
+  // the clean-up behavior for the SSL context object.
+  bool accepted;
+
+  Option<std::string> peer_hostname;
+};
+
+} // namespace network {
+} // namespace process {
+
+#endif // __LIBEVENT_SSL_SOCKET_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/openssl.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/openssl.cpp b/3rdparty/libprocess/src/openssl.cpp
new file mode 100644
index 0000000..090e985
--- /dev/null
+++ b/3rdparty/libprocess/src/openssl.cpp
@@ -0,0 +1,563 @@
+#include "openssl.hpp"
+
+#include <sys/param.h>
+
+#include <openssl/err.h>
+#include <openssl/rand.h>
+#include <openssl/ssl.h>
+#include <openssl/x509v3.h>
+
+#include <mutex>
+#include <string>
+
+#include <process/once.hpp>
+
+#include <stout/flags.hpp>
+
+using std::ostringstream;
+using std::string;
+
+// Must be defined by us for OpenSSL in order to capture the necessary
+// data for doing locking. Note, this needs to be defined in the
+// global namespace as well.
+struct CRYPTO_dynlock_value
+{
+  std::mutex mutex;
+};
+
+
+namespace process {
+namespace network {
+namespace openssl {
+
+// _Global_ OpenSSL context, initialized via 'initialize'.
+static SSL_CTX* ctx = NULL;
+
+
+Flags::Flags()
+{
+  add(&Flags::enabled,
+      "enabled",
+      "Whether SSL is enabled.",
+      false);
+
+  add(&Flags::cert_file,
+      "cert_file",
+      "Path to certifcate.");
+
+  add(&Flags::key_file,
+      "key_file",
+      "Path to key.");
+
+  add(&Flags::verify_cert,
+      "verify_cert",
+      "Whether or not to verify peer certificates.",
+      false);
+
+  add(&Flags::require_cert,
+      "require_cert",
+      "Whether or not to require peer certificates. Requiring a peer "
+      "certificate implies verifying it.",
+      false);
+
+  add(&Flags::verification_depth,
+      "verification_depth",
+      "Maximum depth for the certificate chain verification that shall be "
+      "allowed.",
+      4);
+
+  add(&Flags::ca_dir,
+      "ca_dir",
+      "Path to certifcate authority (CA) directory.");
+
+  add(&Flags::ca_file,
+      "ca_file",
+      "Path to certifcate authority (CA) file.");
+
+  add(&Flags::ciphers,
+      "ciphers",
+      "Cryptographic ciphers to use.",
+      // Default TLSv1 ciphers chosen based on Amazon's security
+      // policy, see:
+      // http://docs.aws.amazon.com/ElasticLoadBalancing/latest/
+      // DeveloperGuide/elb-security-policy-table.html
+      "AES128-SHA:AES256-SHA:RC4-SHA:DHE-RSA-AES128-SHA:DHE-DSS-AES128-SHA:"
+      "DHE-RSA-AES256-SHA:DHE-DSS-AES256-SHA");
+
+  add(&Flags::enable_ssl_v2,
+      "enable_ssl_v2",
+      "Enable SSLV2.",
+      false);
+
+  add(&Flags::enable_ssl_v3,
+      "enable_ssl_v3",
+      "Enable SSLV3.",
+      false);
+
+  add(&Flags::enable_tls_v1_0,
+      "enable_tls_v1_0",
+      "Enable SSLV1.0.",
+      false);
+
+  add(&Flags::enable_tls_v1_1,
+      "enable_tls_v1_1",
+      "Enable SSLV1.1.",
+      false);
+
+  add(&Flags::enable_tls_v1_2,
+      "enable_tls_v1_2",
+      "Enable SSLV1.2.",
+      true);
+}
+
+
+static Flags* ssl_flags = new Flags();
+
+
+const Flags& flags()
+{
+  openssl::initialize();
+  return *ssl_flags;
+}
+
+
+// Mutexes necessary to support OpenSSL locking on shared data
+// structures. See 'locking_function' for more information.
+static std::mutex* mutexes = NULL;
+
+
+// Callback needed to perform locking on shared data structures. From
+// the OpenSSL documentation:
+//
+// OpenSSL uses a number of global data structures that will be
+// implicitly shared whenever multiple threads use OpenSSL.
+// Multi-threaded applications will crash at random if [the locking
+// function] is not set.
+void locking_function(int mode, int n, const char* /*file*/, int /*line*/)
+{
+  if (mode & CRYPTO_LOCK) {
+    mutexes[n].lock();
+  } else {
+    mutexes[n].unlock();
+  }
+}
+
+
+// OpenSSL callback that returns the current thread ID, necessary for
+// OpenSSL threading.
+unsigned long id_function()
+{
+  pthread_t pthread = pthread_self();
+#ifdef __APPLE__
+  mach_port_t id = pthread_mach_thread_np(pthread);
+#else
+  pthread_t id = pthread;
+#endif // __APPLE__
+  return static_cast<unsigned long>(id);
+}
+
+
+// OpenSSL callback for creating new dynamic "locks", abstracted by
+// the CRYPTO_dynlock_value structure.
+CRYPTO_dynlock_value* dyn_create_function(const char* /*file*/, int /*line*/)
+{
+  CRYPTO_dynlock_value* value = new CRYPTO_dynlock_value();
+
+  if (value == NULL) {
+    return NULL;
+  }
+
+  return value;
+}
+
+
+// OpenSSL callback for locking and unlocking dynamic "locks",
+// abstracted by the CRYPTO_dynlock_value structure.
+void dyn_lock_function(
+    int mode,
+    CRYPTO_dynlock_value* value,
+    const char* /*file*/,
+    int /*line*/)
+{
+  if (mode & CRYPTO_LOCK) {
+    value->mutex.lock();
+  } else {
+    value->mutex.unlock();
+  }
+}
+
+
+// OpenSSL callback for destroying dynamic "locks", abstracted by the
+// CRYPTO_dynlock_value structure.
+void dyn_destroy_function(
+    CRYPTO_dynlock_value* value,
+    const char* /*file*/,
+    int /*line*/)
+{
+  delete value;
+}
+
+
+// Callback for OpenSSL peer certificate verification.
+int verify_callback(int ok, X509_STORE_CTX* store)
+{
+  if (ok != 1) {
+    // Construct and log a warning message.
+    ostringstream message;
+
+    X509* cert = X509_STORE_CTX_get_current_cert(store);
+    int error = X509_STORE_CTX_get_error(store);
+    int depth = X509_STORE_CTX_get_error_depth(store);
+
+    message << "Error with certificate at depth: " << stringify(depth) << "\n";
+
+    char buffer[256] {};
+
+    // TODO(jmlvanre): use X509_NAME_print_ex instead.
+    X509_NAME_oneline(X509_get_issuer_name(cert), buffer, sizeof(buffer) - 1);
+
+    message << "Issuer: " << stringify(buffer) << "\n";
+
+    // TODO(jmlvanre): use X509_NAME_print_ex instead.
+    memset(buffer, 0, sizeof(buffer));
+    X509_NAME_oneline(X509_get_subject_name(cert), buffer, sizeof(buffer) - 1);
+
+    message << "Subject: " << stringify(buffer) << "\n";
+
+    message << "Error (" << stringify(error) << "): " <<
+      stringify(X509_verify_cert_error_string(error));
+
+    LOG(WARNING) << message.str();
+  }
+
+  return ok;
+}
+
+
+string error_string(unsigned long code)
+{
+  // SSL library guarantees to stay within 120 bytes.
+  char buffer[128];
+
+  ERR_error_string_n(code, buffer, sizeof(buffer));
+  string s(buffer);
+
+  if (code == SSL_ERROR_SYSCALL) {
+    s += error_string(ERR_get_error());
+  }
+
+  return s;
+}
+
+
+void initialize()
+{
+  static Once* initialized = new Once();
+
+  if (initialized->once()) {
+    return;
+  }
+
+  // Load all the flags prefixed by SSL_ from the environment. See
+  // comment at top of openssl.hpp for a full list.
+  Try<Nothing> load = ssl_flags->load("SSL_");
+  if (load.isError()) {
+    EXIT(EXIT_FAILURE)
+      << "Failed to load flags from environment variables (prefixed by SSL_):"
+      << load.error();
+  }
+
+  // Exit early if SSL is not enabled.
+  if (!ssl_flags->enabled) {
+    initialized->done();
+    return;
+  }
+
+  // We MUST have entropy, or else there's no point to crypto.
+  if (!RAND_poll()) {
+    EXIT(EXIT_FAILURE) << "SSL socket requires entropy";
+  }
+
+  // Initialize the OpenSSL library.
+  SSL_library_init();
+  SSL_load_error_strings();
+
+  // Prepare mutexes for threading callbacks.
+  mutexes = new std::mutex[CRYPTO_num_locks()];
+
+  // Install SSL threading callbacks.
+  // TODO(jmlvanre): the id mechanism is deprecated in OpenSSL.
+  CRYPTO_set_id_callback(&id_function);
+  CRYPTO_set_locking_callback(&locking_function);
+  CRYPTO_set_dynlock_create_callback(&dyn_create_function);
+  CRYPTO_set_dynlock_lock_callback(&dyn_lock_function);
+  CRYPTO_set_dynlock_destroy_callback(&dyn_destroy_function);
+
+  ctx = SSL_CTX_new(SSLv23_method());
+  CHECK(ctx) << "Failed to create SSL context: "
+             << ERR_error_string(ERR_get_error(), NULL);
+
+  // Disable SSL session caching.
+  SSL_CTX_set_session_cache_mode(ctx, SSL_SESS_CACHE_OFF);
+
+  // Set a session id to avoid connection termination upon
+  // re-connect. We can use something more relevant when we care
+  // about session caching.
+  const uint64_t session_ctx = 7;
+
+  const unsigned char* session_id =
+    reinterpret_cast<const unsigned char*>(&session_ctx);
+
+  if (SSL_CTX_set_session_id_context(
+          ctx,
+          session_id,
+          sizeof(session_ctx)) != 1) {
+    LOG(FATAL) << "Session id context size exceeds maximum";
+  }
+
+  // Now do some validation of the flags/environment variables.
+  if (ssl_flags->key_file.isNone()) {
+    EXIT(EXIT_FAILURE) << "SSL requires key! NOTE: Set path with SSL_KEY_FILE";
+  }
+
+  if (ssl_flags->cert_file.isNone()) {
+    EXIT(EXIT_FAILURE)
+      << "SSL requires certificate! NOTE: Set path with SSL_CERT_FILE";
+  }
+
+  if (ssl_flags->ca_file.isNone()) {
+    VLOG(2) << "CA file path is unspecified! NOTE: "
+            << "Set CA file path with SSL_CA_FILE=<filepath>";
+  }
+
+  if (ssl_flags->ca_dir.isNone()) {
+    VLOG(2) << "CA directory path unspecified! NOTE: "
+            << "Set CA directory path with SSL_CA_DIR=<dirpath>";
+  }
+
+  if (!ssl_flags->verify_cert) {
+    VLOG(2) << "Will not verify peer certificate!\n"
+            << "NOTE: Set SSL_VERIFY_CERT=1 to enable peer certificate "
+            << "verification";
+  }
+
+  if (!ssl_flags->require_cert) {
+    VLOG(2) << "Will only verify peer certificate if presented!\n"
+            << "NOTE: Set SSL_REQUIRE_CERT=1 to require peer certificate "
+            << "verification";
+  }
+
+  if (ssl_flags->require_cert && !ssl_flags->verify_cert) {
+    // Requiring a certificate implies that is should be verified.
+    ssl_flags->verify_cert = true;
+
+    VLOG(2) << "SSL_REQUIRE_CERT implies peer certificate verification.\n"
+            << "SSL_VERIFY_CERT set to true";
+  }
+
+  // Initialize OpenSSL if we've been asked to do verification of peer
+  // certificates.
+  if (ssl_flags->verify_cert) {
+    // Set CA locations.
+    if (ssl_flags->ca_file.isSome() || ssl_flags->ca_dir.isSome()) {
+      const char* ca_file = ssl_flags->ca_file.isSome()
+        ? ssl_flags->ca_file.get().c_str()
+        : NULL;
+
+      const char* ca_dir = ssl_flags->ca_dir.isSome()
+        ? ssl_flags->ca_dir.get().c_str()
+        : NULL;
+
+      if (SSL_CTX_load_verify_locations(ctx, ca_file, ca_dir) != 1) {
+        unsigned long error = ERR_get_error();
+        EXIT(EXIT_FAILURE)
+          << "Could not load CA file and/or directory ("
+          << stringify(error)  << "): "
+          << error_string(error) << " -> "
+          << (ca_file != NULL ? (stringify("FILE: ") + ca_file) : "")
+          << (ca_dir != NULL ? (stringify("DIR: ") + ca_dir) : "");
+      }
+
+      if (ca_file != NULL) {
+        VLOG(2) << "Using CA file: " << ca_file;
+      }
+      if (ca_dir != NULL) {
+        VLOG(2) << "Using CA dir: " << ca_dir;
+      }
+    } else {
+      if (SSL_CTX_set_default_verify_paths(ctx) != 1) {
+        EXIT(EXIT_FAILURE) << "Could not load default CA file and/or directory";
+      }
+
+      VLOG(2) << "Using default CA file and/or directory";
+    }
+
+    // Set SSL peer verification callback.
+    int mode = SSL_VERIFY_PEER;
+
+    if (ssl_flags->require_cert) {
+      mode |= SSL_VERIFY_FAIL_IF_NO_PEER_CERT;
+    }
+
+    SSL_CTX_set_verify(ctx, mode, &verify_callback);
+
+    SSL_CTX_set_verify_depth(ctx, ssl_flags->verification_depth);
+  } else {
+    SSL_CTX_set_verify(ctx, SSL_VERIFY_NONE, NULL);
+  }
+
+  // Set certificate chain.
+  if (SSL_CTX_use_certificate_chain_file(
+          ctx,
+          ssl_flags->cert_file.get().c_str()) != 1) {
+    EXIT(EXIT_FAILURE) << "Could not load cert file";
+  }
+
+  // Set private key.
+  if (SSL_CTX_use_PrivateKey_file(
+          ctx,
+          ssl_flags->key_file.get().c_str(),
+          SSL_FILETYPE_PEM) != 1) {
+    EXIT(EXIT_FAILURE) << "Could not load key file";
+  }
+
+  // Validate key.
+  if (SSL_CTX_check_private_key(ctx) != 1) {
+    EXIT(EXIT_FAILURE)
+      << "Private key does not match the certificate public key";
+  }
+
+  VLOG(2) << "Using ciphers: " << ssl_flags->ciphers;
+
+  if (SSL_CTX_set_cipher_list(ctx, ssl_flags->ciphers.c_str()) == 0) {
+    EXIT(EXIT_FAILURE) << "Could not set ciphers: " << ssl_flags->ciphers;
+  }
+
+  // Use server preference for cipher.
+  long ssl_options = SSL_OP_CIPHER_SERVER_PREFERENCE;
+  // Disable SSLv2.
+  if (!ssl_flags->enable_ssl_v2) { ssl_options |= SSL_OP_NO_SSLv2; }
+  // Disable SSLv3.
+  if (!ssl_flags->enable_ssl_v3) { ssl_options |= SSL_OP_NO_SSLv3; }
+  // Disable TLSv1.
+  if (!ssl_flags->enable_tls_v1_0) { ssl_options |= SSL_OP_NO_TLSv1; }
+  // Disable TLSv1.1.
+  if (!ssl_flags->enable_tls_v1_1) { ssl_options |= SSL_OP_NO_TLSv1_1; }
+  // Disable TLSv1.2.
+  if (!ssl_flags->enable_tls_v1_2) { ssl_options |= SSL_OP_NO_TLSv1_2; }
+
+  SSL_CTX_set_options(ctx, ssl_options);
+
+  initialized->done();
+}
+
+
+SSL_CTX* context()
+{
+  // TODO(benh): Always call 'initialize' just in case?
+  return ctx;
+}
+
+
+Try<Nothing> verify(const SSL* const ssl, const Option<string>& hostname)
+{
+  // Return early if we don't need to verify.
+  if (!ssl_flags->verify_cert) {
+    return Nothing();
+  }
+
+  // The X509 object must be freed if this call succeeds.
+  // TODO(jmlvanre): handle this better. How about RAII?
+  X509* cert = SSL_get_peer_certificate(ssl);
+
+  if (cert == NULL) {
+    return ssl_flags->require_cert
+      ? Error("Peer did not provide certificate")
+      : Try<Nothing>(Nothing());
+  }
+
+  if (SSL_get_verify_result(ssl) != X509_V_OK) {
+    X509_free(cert);
+    return Error("Could not verify peer certificate");
+  }
+
+  if (hostname.isNone()) {
+    X509_free(cert);
+    return ssl_flags->require_cert
+      ? Error("Cannot verify peer certificate: peer hostname unknown")
+      : Try<Nothing>(Nothing());
+  }
+
+  int extcount = X509_get_ext_count(cert);
+  if (extcount <= 0) {
+    X509_free(cert);
+    return Error("X509_get_ext_count failed: " + stringify(extcount));
+  }
+
+  for (int i = 0; i < extcount; i++) {
+    X509_EXTENSION* ext = X509_get_ext(cert, i);
+
+    const string extstr =
+      OBJ_nid2sn(OBJ_obj2nid(X509_EXTENSION_get_object(ext)));
+
+    if (extstr == "subjectAltName") {
+#if OPENSSL_VERSION_NUMBER <= 0x00909000L
+      X509V3_EXT_METHOD* method = X509V3_EXT_get(ext);
+#else
+      const X509V3_EXT_METHOD* method = X509V3_EXT_get(ext);
+#endif
+      if (method == NULL) {
+        break;
+      }
+
+      const unsigned char* data = ext->value->data;
+
+      STACK_OF(CONF_VALUE)* values = method->i2v(
+          method,
+          method->d2i(NULL, &data, ext->value->length),
+          NULL);
+
+      for (int j = 0; j < sk_CONF_VALUE_num(values); j++) {
+        CONF_VALUE* value = sk_CONF_VALUE_value(values, j);
+        if ((strcmp(value->name, "DNS") == 0) &&
+            (value->value == hostname.get())) {
+          X509_free(cert);
+          return Nothing();
+        }
+      }
+    }
+  }
+
+  // If we still haven't verified the hostname, try doing it via
+  // the certificate subject name.
+  X509_NAME* name = X509_get_subject_name(cert);
+
+  if (name != NULL) {
+    char text[_POSIX_HOST_NAME_MAX] {};
+
+    if (X509_NAME_get_text_by_NID(
+            name,
+            NID_commonName,
+            text,
+            sizeof(text)) > 0) {
+      if (hostname.get() != text) {
+        X509_free(cert);
+        return Error(
+          "Presented Certificate Name: " + stringify(text) +
+          " does not match peer hostname name: " + hostname.get());
+      }
+
+      X509_free(cert);
+      return Nothing();
+    }
+  }
+
+  // If we still haven't exited, we haven't verified it, and we give up.
+  X509_free(cert);
+  return Error(
+    "Could not verify presented certificate with hostname " + hostname.get());
+}
+
+} // namespace openssl {
+} // namespace network {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/openssl.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/openssl.hpp b/3rdparty/libprocess/src/openssl.hpp
new file mode 100644
index 0000000..60c7b07
--- /dev/null
+++ b/3rdparty/libprocess/src/openssl.hpp
@@ -0,0 +1,79 @@
+#ifndef __OPENSSL_HPP__
+#define __OPENSSL_HPP__
+
+#include <openssl/ssl.h>
+
+#include <string>
+
+#include <stout/flags.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+namespace network {
+namespace openssl {
+
+// Capture the environment variables that influence how we initialize
+// the OpenSSL library via flags.
+class Flags : public virtual flags::FlagsBase
+{
+public:
+  Flags();
+
+  bool enabled;
+  Option<std::string> cert_file;
+  Option<std::string> key_file;
+  bool verify_cert;
+  bool require_cert;
+  unsigned int verification_depth;
+  Option<std::string> ca_dir;
+  Option<std::string> ca_file;
+  std::string ciphers;
+  bool enable_ssl_v2;
+  bool enable_ssl_v3;
+  bool enable_tls_v1_0;
+  bool enable_tls_v1_1;
+  bool enable_tls_v1_2;
+};
+
+const Flags& flags();
+
+// Initializes the _global_ OpenSSL context (SSL_CTX) as well as the
+// crypto library in order to support multi-threading. The global
+// context gets initialized using the environment variables:
+//
+//    SSL_ENABLED=(false|0,true|1)
+//    SSL_CERT_FILE=(path to certificate)
+//    SSL_KEY_FILE=(path to key)
+//    SSL_VERIFY_CERT=(false|0,true|1)
+//    SSL_REQUIRE_CERT=(false|0,true|1)
+//    SSL_VERIFY_DEPTH=(4)
+//    SSL_CA_DIR=(path to CA directory)
+//    SSL_CA_FILE=(path to CA file)
+//    SSL_CIPHERS=(accepted ciphers separated by ':')
+//    SSL_ENABLE_SSL_V2=(false|0,true|1)
+//    SSL_ENABLE_SSL_V3=(false|0,true|1)
+//    SSL_ENABLE_TLS_V1_0=(false|0,true|1)
+//    SSL_ENABLE_TLS_V1_1=(false|0,true|1)
+//    SSL_ENABLE_TLS_V1_2=(false|0,true|1)
+//
+// TODO(benh): When/If we need to support multiple contexts in the
+// same process, for example for Server Name Indication (SNI), then
+// we'll add other functions for initializing an SSL_CTX based on
+// these environment variables.
+// TODO(nneilsen): Support certification revocation.
+void initialize();
+
+// Returns the _global_ OpenSSL context.
+SSL_CTX* context();
+
+// Verify that the hostname is properly associated with the peer
+// certificate associated with the specified SSL connection.
+Try<Nothing> verify(const SSL* const ssl, const Option<std::string>& hostname);
+
+} // namespace openssl {
+} // namespace network {
+} // namespace process {
+
+#endif // __OPENSSL_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index c2baa6c..f919b99 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1646,15 +1646,22 @@ Encoder* SocketManager::next(int s)
           }
 
           dispose.erase(s);
+
           auto iterator = sockets.find(s);
-          delete iterator->second;
-          sockets.erase(iterator);
 
           // We don't actually close the socket (we wait for the Socket
           // abstraction to close it once there are no more references),
           // but we do shutdown the receiving end so any DataDecoder
           // will get cleaned up (which might have the last reference).
-          shutdown(s, SHUT_RD);
+
+          // Hold on to the Socket and remove it from the 'sockets'
+          // map so that in the case where 'shutdown()' ends up
+          // calling close the termination logic is not run twice.
+          Socket* socket = iterator->second;
+          sockets.erase(iterator);
+          socket->shutdown();
+
+          delete socket;
         }
       }
     }
@@ -1714,6 +1721,9 @@ void SocketManager::close(int s)
         proxies.erase(s);
       }
 
+      dispose.erase(s);
+      auto iterator = sockets.find(s);
+
       // We need to stop any 'ignore_data' receivers as they may have
       // the last Socket reference so we shutdown recvs but don't do a
       // full close (since that will be taken care of by ~Socket, see
@@ -1722,12 +1732,16 @@ void SocketManager::close(int s)
       // from the socket. Note we need to do this before we call
       // 'sockets.erase(s)' to avoid the potential race with the last
       // reference being in 'sockets'.
-      shutdown(s, SHUT_RD);
 
-      dispose.erase(s);
-      auto iterator = sockets.find(s);
-      delete iterator->second;
+
+      // Hold on to the Socket and remove it from the 'sockets' map so
+      // that in the case where 'shutdown()' ends up calling close the
+      // termination logic is not run twice.
+      Socket* socket = iterator->second;
       sockets.erase(iterator);
+      socket->shutdown();
+
+      delete socket;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 0e1cebb..b2a27b5 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -9,6 +9,10 @@
 #include <process/owned.hpp>
 #include <process/socket.hpp>
 
+#ifdef USE_SSL_SOCKET
+#include "libevent_ssl_socket.hpp"
+#include "openssl.hpp"
+#endif
 #include "poll_socket.hpp"
 
 using std::string;
@@ -55,6 +59,16 @@ Try<Socket> Socket::create(Kind kind, int s)
       }
       return Socket(socket.get());
     }
+#ifdef USE_SSL_SOCKET
+    case SSL: {
+      Try<std::shared_ptr<Socket::Impl>> socket =
+        LibeventSSLSocketImpl::create(s);
+      if (socket.isError()) {
+        return Error(socket.error());
+      }
+      return Socket(socket.get());
+    }
+#endif
     // By not setting a default we leverage the compiler errors when
     // the enumeration is augmented to find all the cases we need to
     // provide.
@@ -64,9 +78,13 @@ Try<Socket> Socket::create(Kind kind, int s)
 
 const Socket::Kind& Socket::DEFAULT_KIND()
 {
-  // TODO(jmlvanre): Change the default based on configure or
-  // environment flags.
-  static const Kind DEFAULT = POLL;
+  static const Kind DEFAULT =
+#ifdef USE_SSL_SOCKET
+      network::openssl::flags().enabled ? Socket::SSL : Socket::POLL;
+#else
+      Socket::POLL;
+#endif
+
   return DEFAULT;
 }