You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2015/06/14 13:17:56 UTC
[3/6] mesos git commit: Introduce libevent SSL socket.
Introduce libevent SSL socket.
Requires:
configure --enable-libevent --enable-libevent-socket --enable-ssl
New environment variables:
```
SSL_ENABLED=(false|0,true|1)
SSL_CERT_FILE=(path to certificate)
SSL_KEY_FILE=(path to key)
SSL_VERIFY_CERT=(false|0,true|1)
SSL_REQUIRE_CERT=(false|0,true|1)
SSL_VERIFY_DEPTH=(4)
SSL_CA_DIR=(path to CA directory)
SSL_CA_FILE=(path to CA file)
SSL_CIPHERS=(accepted ciphers separated by ':')
SSL_ENABLE_SSL_V2=(false|0,true|1)
SSL_ENABLE_SSL_V3=(false|0,true|1)
SSL_ENABLE_TLS_V1_0=(false|0,true|1)
SSL_ENABLE_TLS_V1_1=(false|0,true|1)
SSL_ENABLE_TLS_V1_2=(false|0,true|1)
```
Only TLSV1.2 is enabled by default.
Use the `ENABLE_SSL_V*` and `ENABLE_TLS_V*` environment variables to
open up more protocols.
Use the `SSL_CIPHERS` environment variable to restrict or open up the
supported ciphers.
Review: https://reviews.apache.org/r/29406
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/654cabf9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/654cabf9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/654cabf9
Branch: refs/heads/master
Commit: 654cabf9092f03a1857bd99ee510a204bfaaba51
Parents: 1f6c99b
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Jun 14 03:11:00 2015 -0700
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sun Jun 14 04:12:01 2015 -0700
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 26 +-
3rdparty/libprocess/include/process/address.hpp | 21 +
3rdparty/libprocess/include/process/socket.hpp | 49 +-
3rdparty/libprocess/src/libevent.cpp | 94 +-
3rdparty/libprocess/src/libevent.hpp | 27 +-
3rdparty/libprocess/src/libevent_ssl_socket.cpp | 926 +++++++++++++++++++
3rdparty/libprocess/src/libevent_ssl_socket.hpp | 165 ++++
3rdparty/libprocess/src/openssl.cpp | 563 +++++++++++
3rdparty/libprocess/src/openssl.hpp | 79 ++
3rdparty/libprocess/src/process.cpp | 28 +-
3rdparty/libprocess/src/socket.cpp | 24 +-
11 files changed, 1971 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 489ce35..c781f6c 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -53,6 +53,22 @@ libprocess_la_SOURCES = \
src/subprocess.cpp \
src/timeseries.cpp
+if ENABLE_LIBEVENT
+else
+if WITH_BUNDLED_LIBEV
+ EVENT_LIB = $(LIBEV)/libev.la
+else
+ EVENT_LIB = -lev
+endif
+endif
+
+if ENABLE_SSL
+libprocess_la_SOURCES += \
+ src/libevent_ssl_socket.cpp \
+ src/openssl.cpp \
+ src/openssl.hpp
+endif
+
libprocess_la_CPPFLAGS = \
-I$(srcdir)/include \
-I$(srcdir)/$(STOUT)/include \
@@ -89,16 +105,6 @@ else
HTTP_PARSER_LIB = -lhttp_parser
endif
-if ENABLE_LIBEVENT
- EVENT_LIB = -levent -levent_pthreads
-else
-if WITH_BUNDLED_LIBEV
- EVENT_LIB = $(LIBEV)/libev.la
-else
- EVENT_LIB = -lev
-endif
-endif
-
libprocess_la_LIBADD = \
$(LIBGLOG) \
$(HTTP_PARSER_LIB) \
http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/include/process/address.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/address.hpp b/3rdparty/libprocess/include/process/address.hpp
index 729f5cd..88946d5 100644
--- a/3rdparty/libprocess/include/process/address.hpp
+++ b/3rdparty/libprocess/include/process/address.hpp
@@ -14,6 +14,7 @@
#include <stout/abort.hpp>
#include <stout/ip.hpp>
+#include <stout/net.hpp>
#include <stout/stringify.hpp>
namespace process {
@@ -56,6 +57,26 @@ public:
return ip.family();
}
+ /**
+ * Returns the hostname of this address's IP.
+ *
+ * @returns the hostname of this address's IP.
+ */
+ // TODO(jmlvanre): Consider making this return a Future in order to
+ // deal with slow name resolution.
+ Try<std::string> hostname() const
+ {
+ const Try<std::string> hostname = ip == net::IP(INADDR_ANY)
+ ? net::hostname()
+ : net::getHostname(ip);
+
+ if (hostname.isError()) {
+ return Error(hostname.error());
+ }
+
+ return hostname.get();
+ }
+
// Returns the storage size (i.e., either sizeof(sockaddr_in) or
// sizeof(sockaddr_in6) depending on the family) of this address.
size_t size() const
http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index b8c2274..2cf3c10 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -24,7 +24,9 @@ public:
// Available kinds of implementations.
enum Kind {
POLL,
- // TODO(jmlvanre): Add libevent SSL socket.
+#ifdef USE_SSL_SOCKET
+ SSL
+#endif
};
// Returns an instance of a Socket using the specified kind of
@@ -58,9 +60,11 @@ public:
return s;
}
+ // Interface functions implemented by this base class.
+ Try<Address> address() const;
+ Try<Address> bind(const Address& address);
+
// Socket::Impl interface.
- virtual Try<Address> address() const;
- virtual Try<Address> bind(const Address& address);
virtual Try<Nothing> listen(int backlog) = 0;
virtual Future<Socket> accept() = 0;
virtual Future<Nothing> connect(const Address& address) = 0;
@@ -93,12 +97,46 @@ public:
// enabling reuse of a pool of preallocated strings/buffers.
virtual Future<Nothing> send(const std::string& data);
+ virtual void shutdown()
+ {
+ if (::shutdown(s, SHUT_RD) < 0) {
+ PLOG(ERROR) << "Shutdown failed on fd=" << s;
+ }
+ }
+
+ // Construct a new Socket from the given impl. This is a proxy
+ // function, as Impls derived from this won't have access to the
+ // Socket::Socket(...) constructors.
+ //
+ // TODO(jmlvanre): These should be protected; however, gcc
+ // complains when using them from within a lambda of a derived
+ // class.
+ static Socket socket(std::shared_ptr<Impl>&& that)
+ {
+ return Socket(std::move(that));
+ }
+
+ static Socket socket(const std::shared_ptr<Impl>& that)
+ {
+ return Socket(that);
+ }
+
protected:
explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
// Construct a Socket wrapper from this implementation.
Socket socket() { return Socket(shared_from_this()); }
+ // Returns a std::shared_ptr<T> from this implementation.
+ template <typename T>
+ static std::shared_ptr<T> shared(T* t)
+ {
+ std::shared_ptr<T> pointer =
+ std::dynamic_pointer_cast<T>(CHECK_NOTNULL(t)->shared_from_this());
+ CHECK(pointer);
+ return pointer;
+ }
+
int s;
};
@@ -167,6 +205,11 @@ public:
return impl->send(data);
}
+ void shutdown()
+ {
+ impl->shutdown();
+ }
+
private:
explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/libevent.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.cpp b/3rdparty/libprocess/src/libevent.cpp
index fb03859..2b82930 100644
--- a/3rdparty/libprocess/src/libevent.cpp
+++ b/3rdparty/libprocess/src/libevent.cpp
@@ -1,10 +1,14 @@
+#include <signal.h>
#include <unistd.h>
+#include <mutex>
+
#include <event2/event.h>
#include <event2/thread.h>
#include <process/logging.hpp>
+#include <stout/os/signals.hpp>
#include <stout/synchronized.hpp>
#include "event_loop.hpp"
@@ -12,24 +16,100 @@
namespace process {
-struct event_base* base = NULL;
+event_base* base = NULL;
+
+
+static std::mutex* functions_mutex = new std::mutex();
+std::queue<lambda::function<void(void)>>* functions =
+ new std::queue<lambda::function<void(void)>>();
+
+
+ThreadLocal<bool>* _in_event_loop_ = new ThreadLocal<bool>();
+
+
+void async_function(int socket, short which, void* arg)
+{
+ event* ev = reinterpret_cast<event*>(arg);
+ event_free(ev);
+
+ std::queue<lambda::function<void(void)>> q;
+
+ synchronized (functions_mutex) {
+ std::swap(q, *functions);
+ }
+
+ while (!q.empty()) {
+ q.front()();
+ q.pop();
+ }
+}
+
+
+void run_in_event_loop(
+ const lambda::function<void(void)>& f,
+ EventLoopLogicFlow event_loop_logic_flow)
+{
+ if (__in_event_loop__ && event_loop_logic_flow == ALLOW_SHORT_CIRCUIT) {
+ f();
+ return;
+ }
+
+ synchronized (functions_mutex) {
+ functions->push(f);
+
+ // Add an event and activate it to interrupt the event loop.
+ // TODO(jmlvanre): after libevent v 2.1 we can use
+ // event_self_cbarg instead of re-assigning the event. For now we
+ // manually re-assign the event to pass in the pointer to the
+ // event itself as the callback argument.
+ event* ev = evtimer_new(base, async_function, NULL);
+
+ // 'event_assign' is only valid on non-pending AND non-active
+ // events. This means we have to assign the callback before
+ // calling 'event_active'.
+ if (evtimer_assign(ev, base, async_function, ev) < 0) {
+ LOG(FATAL) << "Failed to assign callback on event";
+ }
+
+ event_active(ev, EV_TIMEOUT, 0);
+ }
+}
void* EventLoop::run(void*)
{
+ __in_event_loop__ = true;
+
+ // Block SIGPIPE in the event loop because we can not force
+ // underlying implementations such as SSL bufferevents to use
+ // MSG_NOSIGNAL.
+ bool unblock = os::signals::block(SIGPIPE);
+
do {
int result = event_base_loop(base, EVLOOP_ONCE);
if (result < 0) {
LOG(FATAL) << "Failed to run event loop";
- } else if (result == 1) {
- VLOG(1) << "All events handled, continuing event loop";
+ } else if (result > 0) {
+ // All events are handled, continue event loop.
continue;
- } else if (event_base_got_break(base)) {
- break;
- } else if (event_base_got_exit(base)) {
- break;
+ } else {
+ CHECK_EQ(0, result);
+ if (event_base_got_break(base)) {
+ break;
+ } else if (event_base_got_exit(base)) {
+ break;
+ }
}
} while (true);
+
+ __in_event_loop__ = false;
+
+ if (unblock) {
+ if (!os::signals::unblock(SIGPIPE)) {
+ LOG(FATAL) << "Failure to unblock SIGPIPE";
+ }
+ }
+
return NULL;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/libevent.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.hpp b/3rdparty/libprocess/src/libevent.hpp
index f6cc721..a20f2c3 100644
--- a/3rdparty/libprocess/src/libevent.hpp
+++ b/3rdparty/libprocess/src/libevent.hpp
@@ -1,10 +1,35 @@
#ifndef __LIBEVENT_HPP__
#define __LIBEVENT_HPP__
+#include <stout/lambda.hpp>
+#include <stout/thread.hpp>
+
namespace process {
// Event loop.
-extern struct event_base* base;
+extern event_base* base;
+
+
+// Per thread bool pointer. The extra level of indirection from
+// _in_event_loop_ to __in_event_loop__ is used in order to take
+// advantage of the ThreadLocal operators without needing the extra
+// dereference as well as lazily construct the actual bool.
+extern ThreadLocal<bool>* _in_event_loop_;
+
+
+#define __in_event_loop__ *(*_in_event_loop_ == NULL ? \
+ *_in_event_loop_ = new bool(false) : *_in_event_loop_)
+
+
+enum EventLoopLogicFlow {
+ ALLOW_SHORT_CIRCUIT,
+ DISALLOW_SHORT_CIRCUIT
+};
+
+
+void run_in_event_loop(
+ const lambda::function<void(void)>& f,
+ EventLoopLogicFlow event_loop_logic_flow = ALLOW_SHORT_CIRCUIT);
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
new file mode 100644
index 0000000..5955796
--- /dev/null
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
@@ -0,0 +1,926 @@
+#include <event2/buffer.h>
+#include <event2/bufferevent_ssl.h>
+#include <event2/event.h>
+#include <event2/listener.h>
+#include <event2/thread.h>
+#include <event2/util.h>
+
+#include <openssl/ssl.h>
+#include <openssl/err.h>
+
+#include <process/queue.hpp>
+#include <process/socket.hpp>
+
+#include <stout/net.hpp>
+#include <stout/synchronized.hpp>
+
+#include "libevent.hpp"
+#include "libevent_ssl_socket.hpp"
+#include "openssl.hpp"
+
+// Locking:
+//
+// We use the BEV_OPT_THREADSAFE flag when constructing bufferevents
+// so that all **functions that are called from the event loop that
+// take a bufferevent as a parameter will automatically have the
+// lock acquired**.
+//
+// This means that everywhere that the libevent library does not
+// already lock the bev, we need to manually 'synchronize (bev) {'.
+// To further complicate matters, due to a deadlock scneario in
+// libevent-openssl (v 2.0.21) we currently modify bufferevents using
+// continuations in the event loop, but these functions, while run
+// from within the event loop, are not passed the 'bev' as a parameter
+// and thus MUST use 'synchronized (bev)'. See 'Continuation' comment
+// below for more details on why we need to invoke these continuations
+// from within the event loop.
+
+// Continuations via 'run_in_event_loop(...)':
+//
+// There is a deadlock scenario in libevent-openssl (v 2.0.21) when
+// modifying the bufferevent (bev) from another thread (not the event
+// loop). To avoid this we run all bufferevent manipulation logic in
+// continuations that are executed within the event loop.
+
+// Connection Extra FD:
+//
+// In libevent-openssl (v 2.0.21) we've had issues using the
+// 'bufferevent_openssl_socket_new' call with the CONNECTING state and
+// an existing socket. Therefore we allow it to construct its own
+// fd and clean it up along with the Impl object when the bev is
+// freed using the BEV_OPT_CLOSE_ON_FREE option.
+
+// DISALLOW_SHORT_CIRCUIT:
+//
+// We disallow short-circuiting in 'run_in_event_loop' due to a bug in
+// libevent_openssl with deferred callbacks still being called (still
+// in the run queue) even though a bev has been disabled.
+
+using std::queue;
+using std::string;
+
+// Specialization of 'synchronize' to use bufferevent with the
+// 'synchronized' macro.
+static Synchronized<bufferevent> synchronize(bufferevent* bev)
+{
+ return Synchronized<bufferevent>(
+ bev,
+ [](bufferevent* bev) { bufferevent_lock(bev); },
+ [](bufferevent* bev) { bufferevent_unlock(bev); });
+}
+
+namespace process {
+namespace network {
+
+Try<std::shared_ptr<Socket::Impl>> LibeventSSLSocketImpl::create(int s)
+{
+ openssl::initialize();
+ auto socket = std::make_shared<LibeventSSLSocketImpl>(s);
+ // See comment at 'initialize' declaration for why we call this.
+ socket->initialize();
+ return socket;
+}
+
+
+LibeventSSLSocketImpl::~LibeventSSLSocketImpl()
+{
+ // We defer termination and destruction of all event loop specific
+ // calls and structures. This is a safety against the socket being
+ // destroyed before existing event loop calls have completed since
+ // they require valid data structures (the weak pointer).
+
+ // Copy the members that we are interested in. This is necessary
+ // because 'this' points to memory that may be re-allocated and
+ // invalidate any reference to 'this->XXX'. We want to manipulate
+ // or use these data structures within the finalization lambda
+ // below.
+ evconnlistener* _listener = listener;
+ bufferevent* _bev = bev;
+ bool _accepted = accepted;
+ std::weak_ptr<LibeventSSLSocketImpl>* _event_loop_handle = event_loop_handle;
+
+ run_in_event_loop(
+ [_listener, _bev, _accepted, _event_loop_handle]() {
+ // Once this lambda is called, it should not be possible for
+ // more event loop callbacks to be triggered with 'this->bev'.
+ // This is important because we delete event_loop_handle which
+ // is the callback argument for any event loop callbacks.
+ // This lambda is responsible for ensuring 'this->bev' is
+ // disabled, and cleaning up any remaining state associated
+ // with the event loop.
+
+ CHECK(__in_event_loop__);
+
+ if (_listener != NULL) {
+ evconnlistener_free(_listener);
+ }
+
+ if (_bev != NULL) {
+ SSL* ssl = bufferevent_openssl_get_ssl(_bev);
+ // Workaround for SSL shutdown, see http://www.wangafu.net/~nickm/libevent-book/Ref6a_advanced_bufferevents.html // NOLINT
+ SSL_set_shutdown(ssl, SSL_RECEIVED_SHUTDOWN);
+ SSL_shutdown(ssl);
+
+ // NOTE: Removes all future callbacks using 'this->bev'.
+ bufferevent_disable(_bev, EV_READ | EV_WRITE);
+
+ // Since we are using a separate fd for the connecting socket we
+ // end up using BEV_OPT_CLOSE_ON_FREE for the connecting, but
+ // not for the accepting side. since the BEV_OPT_CLOSE_ON_FREE
+ // also frees the SSL object, we need to manually free it for
+ // the accepting case. See the 'Connection Extra FD' note at top
+ // of file.
+ if (_accepted) {
+ SSL_free(ssl);
+ }
+
+ // For the connecting socket BEV_OPT_CLOSE_ON_FREE will close
+ // the fd. See note below.
+ bufferevent_free(_bev);
+ }
+
+ delete _event_loop_handle;
+ },
+ DISALLOW_SHORT_CIRCUIT);
+}
+
+
+void LibeventSSLSocketImpl::initialize()
+{
+ event_loop_handle = new std::weak_ptr<LibeventSSLSocketImpl>(shared(this));
+}
+
+
+void LibeventSSLSocketImpl::shutdown()
+{
+ // Nothing to do if this socket was never initialized.
+ synchronized (lock) {
+ if (bev == NULL) {
+ // If it was not initialized, then there should also be no
+ // requests.
+ CHECK(connect_request.get() == NULL);
+ CHECK(recv_request.get() == NULL);
+ CHECK(send_request.get() == NULL);
+
+ return;
+ }
+ }
+
+ // Extend the life-time of 'this' through the execution of the
+ // lambda in the event loop. Note: The 'self' needs to be explicitly
+ // captured because we're not using it in the body of the lambda. We
+ // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+ // execute.
+ auto self = shared(this);
+
+ run_in_event_loop(
+ [self]() {
+ CHECK(__in_event_loop__);
+ CHECK(self);
+
+ CHECK_NOTNULL(self->bev);
+
+ synchronized (self->bev) {
+ Owned<RecvRequest> request;
+
+ // Swap the 'recv_request' under the object lock.
+ synchronized (self->lock) {
+ std::swap(request, self->recv_request);
+ }
+
+ // If there is still a pending receive request then close it.
+ if (request.get() != NULL) {
+ request->promise
+ .set(bufferevent_read(self->bev, request->data, request->size));
+ }
+ }
+ },
+ DISALLOW_SHORT_CIRCUIT);
+}
+
+
+// Only runs in event loop. No locks required. See 'Locking' note at
+// top of file.
+void LibeventSSLSocketImpl::recv_callback(bufferevent* /*bev*/, void* arg)
+{
+ CHECK(__in_event_loop__);
+
+ std::weak_ptr<LibeventSSLSocketImpl>* handle =
+ reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg));
+
+ std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
+
+ // Don't call the 'recv_callback' unless the socket is still valid.
+ if (impl != NULL) {
+ impl->recv_callback();
+ }
+}
+
+
+// Only runs in event loop. Member function continuation of static
+// 'recv_callback'.
+void LibeventSSLSocketImpl::recv_callback()
+{
+ CHECK(__in_event_loop__);
+
+ Owned<RecvRequest> request;
+
+ synchronized (lock) {
+ std::swap(request, recv_request);
+ }
+
+ if (request.get() != NULL) {
+ // There is an invariant that if we are executing a
+ // 'recv_callback' and we have a request there must be data here
+ // because we should not be getting a spurrious receive callback
+ // invocation. Even if we discarded a request, the manual
+ // invocation of 'recv_callback' guarantees that there is a
+ // non-zero amount of data available in the bufferevent.
+ size_t length = bufferevent_read(bev, request->data, request->size);
+ CHECK(length > 0);
+
+ request->promise.set(length);
+ }
+}
+
+
+// Only runs in event loop. No locks required. See 'Locking' note at
+// top of file.
+void LibeventSSLSocketImpl::send_callback(bufferevent* /*bev*/, void* arg)
+{
+ CHECK(__in_event_loop__);
+
+ std::weak_ptr<LibeventSSLSocketImpl>* handle =
+ reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg));
+
+ std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
+
+ // Don't call the 'send_callback' unless the socket is still valid.
+ if (impl != NULL) {
+ impl->send_callback();
+ }
+}
+
+
+// Only runs in event loop. Member function continuation of static
+// 'recv_callback'.
+void LibeventSSLSocketImpl::send_callback()
+{
+ CHECK(__in_event_loop__);
+
+ Owned<SendRequest> request;
+
+ synchronized (lock) {
+ std::swap(request, send_request);
+ }
+
+ if (request.get() != NULL) {
+ request->promise.set(request->size);
+ }
+}
+
+
+// Only runs in event loop. No locks required. See 'Locking' note at
+// top of file.
+void LibeventSSLSocketImpl::event_callback(
+ bufferevent* /*bev*/,
+ short events,
+ void* arg)
+{
+ CHECK(__in_event_loop__);
+
+ std::weak_ptr<LibeventSSLSocketImpl>* handle =
+ reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg));
+
+ std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
+
+ // Don't call the 'event_callback' unless the socket is still valid.
+ if (impl != NULL) {
+ impl->event_callback(events);
+ }
+}
+
+
+// Only runs in event loop. Member function continuation of static
+// 'recv_callback'.
+void LibeventSSLSocketImpl::event_callback(short events)
+{
+ CHECK(__in_event_loop__);
+
+ Owned<RecvRequest> current_recv_request;
+ Owned<SendRequest> current_send_request;
+ Owned<ConnectRequest> current_connect_request;
+
+ // In all of the following conditions, we're interested in swapping
+ // the value of the requests with null (if they are already null,
+ // then there's no harm).
+ if (events & BEV_EVENT_EOF ||
+ events & BEV_EVENT_CONNECTED ||
+ events & BEV_EVENT_ERROR) {
+ synchronized (lock) {
+ std::swap(current_recv_request, recv_request);
+ std::swap(current_send_request, send_request);
+ std::swap(current_connect_request, connect_request);
+ }
+ }
+
+ // If a request below is null, then no such request is in progress,
+ // either because it was never created, it has already been
+ // completed, or it has been discarded.
+
+ if (events & BEV_EVENT_EOF ||
+ (events & BEV_EVENT_ERROR && EVUTIL_SOCKET_ERROR() == 0)) {
+ // At end of file, close the connection.
+ if (current_recv_request.get() != NULL) {
+ current_recv_request->promise.set(0);
+ }
+
+ if (current_send_request.get() != NULL) {
+ current_send_request->promise.set(0);
+ }
+
+ if (current_connect_request.get() != NULL) {
+ bufferevent_free(CHECK_NOTNULL(bev));
+ bev = NULL;
+ current_connect_request->promise.fail(
+ "Failed connect: connection closed");
+ }
+ } else if (events & BEV_EVENT_CONNECTED) {
+ // We should not have receiving or sending request while still
+ // connecting.
+ CHECK(current_recv_request.get() == NULL);
+ CHECK(current_send_request.get() == NULL);
+ CHECK_NOTNULL(current_connect_request.get());
+
+ // If we're connecting, then we've succeeded. Time to do
+ // post-verification.
+ CHECK_NOTNULL(bev);
+
+ // Do post-validation of connection.
+ SSL* ssl = bufferevent_openssl_get_ssl(bev);
+
+ Try<Nothing> verify = openssl::verify(ssl, peer_hostname);
+ if (verify.isError()) {
+ VLOG(1) << "Failed connect, verification error: " << verify.error();
+ bufferevent_free(bev);
+ bev = NULL;
+ current_connect_request->promise.fail(verify.error());
+ return;
+ }
+
+ current_connect_request->promise.set(Nothing());
+ } else if (events & BEV_EVENT_ERROR) {
+ CHECK(EVUTIL_SOCKET_ERROR() != 0);
+ std::ostringstream error_stream;
+ error_stream << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
+
+ // If there is a valid error, fail any requests and log the error.
+ VLOG(1) << "Socket error: " << error_stream.str();
+
+ if (current_recv_request.get() != NULL) {
+ current_recv_request->promise.fail(
+ "Failed recv, connection error: " +
+ error_stream.str());
+ }
+
+ if (current_send_request.get() != NULL) {
+ current_send_request->promise.fail(
+ "Failed send, connection error: " +
+ error_stream.str());
+ }
+
+ if (current_connect_request.get() != NULL) {
+ bufferevent_free(CHECK_NOTNULL(bev));
+ bev = NULL;
+ current_connect_request->promise.fail(
+ "Failed connect, connection error: " +
+ error_stream.str());
+ }
+ }
+}
+
+
+// For the connecting socket we currently don't use the fd associated
+// with 'Socket'. See the 'Connection Extra FD' note at top of file.
+LibeventSSLSocketImpl::LibeventSSLSocketImpl(int _s)
+ : Socket::Impl(_s),
+ bev(NULL),
+ listener(NULL),
+ lock(ATOMIC_FLAG_INIT),
+ recv_request(NULL),
+ send_request(NULL),
+ connect_request(NULL),
+ event_loop_handle(NULL),
+ accepted(false) {}
+
+
+// For the connecting socket we currently don't use the fd associated
+// with 'Socket'. See the 'Connection Extra FD' note at top of file.
+LibeventSSLSocketImpl::LibeventSSLSocketImpl(
+ int _s,
+ bufferevent* _bev,
+ Option<std::string>&& _peer_hostname)
+ : Socket::Impl(_s),
+ bev(_bev),
+ listener(NULL),
+ lock(ATOMIC_FLAG_INIT),
+ recv_request(NULL),
+ send_request(NULL),
+ connect_request(NULL),
+ event_loop_handle(NULL),
+ accepted(true),
+ peer_hostname(std::move(_peer_hostname)) {}
+
+
+Future<Nothing> LibeventSSLSocketImpl::connect(const Address& address)
+{
+ if (bev != NULL) {
+ return Failure("Socket is already connected");
+ }
+
+ if (connect_request.get() != NULL) {
+ return Failure("Socket is already connecting");
+ }
+
+ SSL* ssl = SSL_new(openssl::context());
+ if (ssl == NULL) {
+ return Failure("Failed to connect: SSL_new");
+ }
+
+ // Construct the bufferevent in the connecting state. We don't use
+ // the existing FD due to an issue in libevent-openssl. See the
+ // 'Connection Extra FD' note at top of file.
+ CHECK(bev == NULL);
+ bev = bufferevent_openssl_socket_new(
+ base,
+ -1,
+ ssl,
+ BUFFEREVENT_SSL_CONNECTING,
+ BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
+
+ if (bev == NULL) {
+ // We need to free 'ssl' here because the bev won't clean it up
+ // for us.
+ SSL_free(ssl);
+ return Failure("Failed to connect: bufferevent_openssl_socket_new");
+ }
+
+ // From this point on, as long as 'bev' is freed properly, it will
+ // free 'ssl' along with it due to the BEV_OPT_CLOSE_ON_FREE' flag.
+
+ // Assign the callbacks for the bufferevent.
+ bufferevent_setcb(
+ bev,
+ &LibeventSSLSocketImpl::recv_callback,
+ &LibeventSSLSocketImpl::send_callback,
+ &LibeventSSLSocketImpl::event_callback,
+ CHECK_NOTNULL(event_loop_handle));
+
+ // Try and determine the 'peer_hostname' from the address we're
+ // connecting to in order to properly verify the SSL connection later.
+ const Try<string> hostname = address.hostname();
+
+ if (hostname.isError()) {
+ VLOG(2) << "Could not determine hostname of peer: " << hostname.error();
+ } else {
+ VLOG(2) << "Connecting to " << hostname.get();
+ peer_hostname = hostname.get();
+ }
+
+ // Optimistically construct a 'ConnectRequest' and future.
+ Owned<ConnectRequest> request(new ConnectRequest());
+ Future<Nothing> future = request->promise.future();
+
+ // Assign 'connect_request' under lock, fail on error.
+ synchronized (lock) {
+ if (connect_request.get() != NULL) {
+ bufferevent_free(bev);
+ bev = NULL;
+ return Failure("Socket is already connecting");
+ }
+ std::swap(request, connect_request);
+ }
+
+ sockaddr_storage addr = net::createSockaddrStorage(address.ip, address.port);
+
+ if (bufferevent_socket_connect(
+ bev,
+ reinterpret_cast<sockaddr*>(&addr),
+ sizeof(addr)) < 0) {
+ bufferevent_free(bev);
+ bev = NULL;
+ return Failure("Failed to connect: bufferevent_socket_connect");
+ }
+
+ return future;
+}
+
+
+Future<size_t> LibeventSSLSocketImpl::recv(char* data, size_t size)
+{
+ // Optimistically construct a 'RecvRequest' and future.
+ Owned<RecvRequest> request(new RecvRequest(data, size));
+ std::weak_ptr<LibeventSSLSocketImpl> weak_self(shared(this));
+
+ // If the user of the future decides to 'discard', then we want to
+ // test whether the request was already satisfied.
+ // We capture a 'weak_ptr' to 'this' (as opposed to a 'shared_ptr')
+ // because the socket could be destroyed before this lambda is
+ // executed. If we used a 'shared_ptr' then this lambda could extend
+ // the life-time of 'this' unnecessarily.
+ Future<size_t> future = request->promise.future()
+ .onDiscard([weak_self]() {
+ // Extend the life-time of 'this' through the execution of the
+ // lambda in the event loop. Note: The 'self' needs to be
+ // explicitly captured because we're not using it in the body of
+ // the lambda. We can use a 'shared_ptr' because
+ // run_in_event_loop is guaranteed to execute.
+ std::shared_ptr<LibeventSSLSocketImpl> self(weak_self.lock());
+
+ if (self != NULL) {
+ run_in_event_loop(
+ [self]() {
+ CHECK(__in_event_loop__);
+ CHECK(self);
+
+ Owned<RecvRequest> request;
+
+ synchronized (self->lock) {
+ std::swap(request, self->recv_request);
+ }
+
+ // Only discard if the request hasn't already been
+ // satisfied.
+ if (request.get() != NULL) {
+ // Discard the promise outside of the object lock as
+ // the callbacks can be expensive.
+ request->promise.discard();
+ }
+ },
+ DISALLOW_SHORT_CIRCUIT);
+ }
+ });
+
+ // Assign 'recv_request' under lock, fail on error.
+ synchronized (lock) {
+ if (recv_request.get() != NULL) {
+ return Failure("Socket is already receiving");
+ }
+ std::swap(request, recv_request);
+ }
+
+ // Extend the life-time of 'this' through the execution of the
+ // lambda in the event loop. Note: The 'self' needs to be explicitly
+ // captured because we're not using it in the body of the lambda. We
+ // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+ // execute.
+ auto self = shared(this);
+
+ run_in_event_loop(
+ [self]() {
+ CHECK(__in_event_loop__);
+ CHECK(self);
+
+ bool recv = false;
+
+ // We check to see if 'recv_request' is null. It would be null
+ // if a 'discard' happened before this lambda was executed.
+ synchronized (self->lock) {
+ recv = self->recv_request.get() != NULL;
+ }
+
+ // Only try to read existing data from the bufferevent if the
+ // request has not already been discarded.
+ if (recv) {
+ synchronized (self->bev) {
+ evbuffer* input = bufferevent_get_input(self->bev);
+ size_t length = evbuffer_get_length(input);
+
+ // If there is already data in the buffer, fulfill the
+ // 'recv_request' by calling 'recv_callback()'. Otherwise
+ // do nothing and wait for the 'recv_callback' to run when
+ // we receive data over the network.
+ if (length > 0) {
+ self->recv_callback();
+ }
+ }
+ }
+ },
+ DISALLOW_SHORT_CIRCUIT);
+
+ return future;
+}
+
+
+Future<size_t> LibeventSSLSocketImpl::send(const char* data, size_t size)
+{
+ // Optimistically construct a 'SendRequest' and future.
+ Owned<SendRequest> request(new SendRequest(size));
+ Future<size_t> future = request->promise.future();
+
+ // We don't add an 'onDiscard' continuation to send because we can
+ // not accurately detect how many bytes have been sent. Once we pass
+ // the data to the bufferevent, there is the possibility that parts
+ // of it have been sent. Another reason is that if we send partial
+ // messages (discard only a part of the data), then it is likely
+ // that the receiving end will fail parsing the message.
+
+ // Assign 'send_request' under lock, fail on error.
+ synchronized (lock) {
+ if (send_request.get() != NULL) {
+ return Failure("Socket is already sending");
+ }
+ std::swap(request, send_request);
+ }
+
+ // Extend the life-time of 'this' through the execution of the
+ // lambda in the event loop. Note: The 'self' needs to be explicitly
+ // captured because we're not using it in the body of the lambda. We
+ // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+ // execute.
+ auto self = shared(this);
+
+ run_in_event_loop(
+ [self, data, size]() {
+ CHECK(__in_event_loop__);
+ CHECK(self);
+
+ // We check that send_request is valid, because we do not
+ // allow discards. This means there is no race between the
+ // entry of 'send' and the execution of this lambda.
+ synchronized (self->lock) {
+ CHECK_NOTNULL(self->send_request.get());
+ }
+
+ bufferevent_write(self->bev, data, size);
+ },
+ DISALLOW_SHORT_CIRCUIT);
+
+ return future;
+}
+
+
+Future<size_t> LibeventSSLSocketImpl::sendfile(
+ int fd,
+ off_t offset,
+ size_t size)
+{
+ // Optimistically construct a 'SendRequest' and future.
+ Owned<SendRequest> request(new SendRequest(size));
+ Future<size_t> future = request->promise.future();
+
+ // Assign 'send_request' under lock, fail on error.
+ synchronized (lock) {
+ if (send_request.get() != NULL) {
+ return Failure("Socket is already sending");
+ }
+ std::swap(request, send_request);
+ }
+
+ // Extend the life-time of 'this' through the execution of the
+ // lambda in the event loop. Note: The 'self' needs to be explicitly
+ // captured because we're not using it in the body of the lambda. We
+ // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
+ // execute.
+ auto self = shared(this);
+
+ run_in_event_loop(
+ [self, fd, offset, size]() {
+ CHECK(__in_event_loop__);
+ CHECK(self);
+
+ // We check that send_request is valid, because we do not
+ // allow discards. This means there is no race between the
+ // entry of 'sendfile' and the execution of this lambda.
+ synchronized (self->lock) {
+ CHECK_NOTNULL(self->send_request.get());
+ }
+
+ evbuffer_add_file(
+ bufferevent_get_output(self->bev),
+ fd,
+ offset,
+ size);
+ },
+ DISALLOW_SHORT_CIRCUIT);
+
+ return future;
+}
+
+
+Try<Nothing> LibeventSSLSocketImpl::listen(int backlog)
+{
+ if (listener != NULL) {
+ return Error("Socket is already listening");
+ }
+
+ CHECK(bev == NULL);
+
+ listener = evconnlistener_new(
+ base,
+ [](evconnlistener* listener,
+ int socket,
+ sockaddr* addr,
+ int addr_length,
+ void* arg) {
+ CHECK(__in_event_loop__);
+
+ std::weak_ptr<LibeventSSLSocketImpl>* handle =
+ reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(
+ CHECK_NOTNULL(arg));
+
+ std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
+
+ if (impl != NULL) {
+ Try<net::IP> ip = net::IP::create(*addr);
+ if (ip.isError()) {
+ VLOG(2) << "Could not convert sockaddr to net::IP: " << ip.error();
+ }
+
+ // We pass the 'listener' into the 'AcceptRequest' because
+ // this function could be executed before 'this->listener'
+ // is set.
+ AcceptRequest* request =
+ new AcceptRequest(
+ socket,
+ listener,
+ ip.isSome() ? Option<net::IP>(ip.get()) : None());
+
+ impl->accept_callback(request);
+ }
+ },
+ event_loop_handle,
+ LEV_OPT_REUSEABLE,
+ backlog,
+ s);
+
+ if (listener == NULL) {
+ return Error("Failed to listen on socket");
+ }
+
+ // TODO(jmlvanre): attach an error callback.
+
+ return Nothing();
+}
+
+
+Future<Socket> LibeventSSLSocketImpl::accept()
+{
+ return accept_queue.get()
+ .then([](const Future<Socket>& future) { return future; });
+}
+
+
+// Only runs in event loop.
+void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request)
+{
+ CHECK(__in_event_loop__);
+
+ // Enqueue a potential socket that we will set up SSL state for and
+ // verify.
+ accept_queue.put(request->promise.future());
+
+ // Set up SSL object.
+ SSL* ssl = SSL_new(openssl::context());
+ if (ssl == NULL) {
+ request->promise.fail("Accept failed, SSL_new");
+ delete request;
+ return;
+ }
+
+ // We use 'request->listener' because 'this->listener' may not have
+ // been set by the time this function is executed. See comment in
+ // the lambda for evconnlistener_new in
+ // 'LibeventSSLSocketImpl::listen'.
+ event_base* ev_base = evconnlistener_get_base(request->listener);
+
+ // Construct the bufferevent in the accepting state.
+ bufferevent* bev = bufferevent_openssl_socket_new(
+ ev_base,
+ request->socket,
+ ssl,
+ BUFFEREVENT_SSL_ACCEPTING,
+ BEV_OPT_THREADSAFE);
+
+ if (bev == NULL) {
+ request->promise.fail("Accept failed: bufferevent_openssl_socket_new");
+ SSL_free(ssl);
+ delete request;
+ return;
+ }
+
+ bufferevent_setcb(
+ bev,
+ NULL,
+ NULL,
+ [](bufferevent* bev, short events, void* arg) {
+ // This handles error states or 'BEV_EVENT_CONNECTED' events
+ // and satisfies the promise by constructing a new socket if
+ // the connection was successfuly established.
+ CHECK(__in_event_loop__);
+
+ AcceptRequest* request =
+ reinterpret_cast<AcceptRequest*>(CHECK_NOTNULL(arg));
+
+ if (events & BEV_EVENT_EOF) {
+ request->promise.fail("Failed accept: connection closed");
+ } else if (events & BEV_EVENT_CONNECTED) {
+ // We will receive a 'CONNECTED' state on an accepting socket
+ // once the connection is established. Time to do
+ // post-verification. First, we need to determine the peer
+ // hostname.
+ Option<string> peer_hostname = None();
+ if (request->ip.isSome()) {
+ Try<string> hostname = net::getHostname(request->ip.get());
+ if (hostname.isError()) {
+ VLOG(2) << "Could not determine hostname of peer: "
+ << hostname.error();
+ } else {
+ VLOG(2) << "Accepting from " << hostname.get();
+ peer_hostname = hostname.get();
+ }
+ }
+
+ SSL* ssl = bufferevent_openssl_get_ssl(bev);
+ CHECK_NOTNULL(ssl);
+
+ Try<Nothing> verify = openssl::verify(ssl, peer_hostname);
+ if (verify.isError()) {
+ VLOG(1) << "Failed accept, verification error: " << verify.error();
+ request->promise.fail(verify.error());
+ SSL_free(ssl);
+ bufferevent_free(bev);
+ // TODO(jmlvanre): Clean up for readability. Consider RAII
+ // or constructing the impl earlier.
+ CHECK(request->socket >= 0);
+ Try<Nothing> close = os::close(request->socket);
+ if (close.isError()) {
+ LOG(FATAL)
+ << "Failed to close socket " << stringify(request->socket)
+ << ": " << close.error();
+ }
+ delete request;
+ return;
+ }
+
+ auto impl = std::shared_ptr<LibeventSSLSocketImpl>(
+ new LibeventSSLSocketImpl(
+ request->socket,
+ bev,
+ std::move(peer_hostname)));
+
+ // See comment at 'initialize' declaration for why we call
+ // this.
+ impl->initialize();
+
+ // We have to wait till after 'initialize()' is invoked for
+ // event_loop_handle to be valid as a callback argument for
+ // the callbacks.
+ bufferevent_setcb(
+ CHECK_NOTNULL(impl->bev),
+ &LibeventSSLSocketImpl::recv_callback,
+ &LibeventSSLSocketImpl::send_callback,
+ &LibeventSSLSocketImpl::event_callback,
+ CHECK_NOTNULL(impl->event_loop_handle));
+
+ Socket socket = Socket::Impl::socket(std::move(impl));
+
+ request->promise.set(socket);
+ } else if (events & BEV_EVENT_ERROR) {
+ std::ostringstream stream;
+ if (EVUTIL_SOCKET_ERROR() != 0) {
+ stream << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
+ } else {
+ char buffer[1024] = {};
+ unsigned long error = bufferevent_get_openssl_error(bev);
+ ERR_error_string_n(error, buffer, sizeof(buffer));
+ stream << buffer;
+ }
+
+ // Fail the accept request and log the error.
+ VLOG(1) << "Socket error: " << stream.str();
+
+ SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(bev));
+ SSL_free(ssl);
+ bufferevent_free(bev);
+
+ // TODO(jmlvanre): Clean up for readability. Consider RAII
+ // or constructing the impl earlier.
+ CHECK(request->socket >= 0);
+ Try<Nothing> close = os::close(request->socket);
+ if (close.isError()) {
+ LOG(FATAL)
+ << "Failed to close socket " << stringify(request->socket)
+ << ": " << close.error();
+ }
+ request->promise.fail(
+ "Failed accept: connection error: " + stream.str());
+ }
+
+ delete request;
+ },
+ request);
+}
+
+} // namespace network {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/libevent_ssl_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.hpp b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
new file mode 100644
index 0000000..d65638b
--- /dev/null
+++ b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
@@ -0,0 +1,165 @@
+#ifndef __LIBEVENT_SSL_SOCKET_HPP__
+#define __LIBEVENT_SSL_SOCKET_HPP__
+
+#include <event2/buffer.h>
+#include <event2/bufferevent_ssl.h>
+#include <event2/event.h>
+#include <event2/listener.h>
+#include <event2/util.h>
+
+#include <atomic>
+#include <memory>
+
+#include <process/queue.hpp>
+#include <process/socket.hpp>
+
+namespace process {
+namespace network {
+
+class LibeventSSLSocketImpl : public Socket::Impl
+{
+public:
+ // See 'Socket::create()'.
+ static Try<std::shared_ptr<Socket::Impl>> create(int s);
+
+ LibeventSSLSocketImpl(int _s);
+
+ virtual ~LibeventSSLSocketImpl();
+
+ // Implement 'Socket::Impl' interface.
+ virtual Future<Nothing> connect(const Address& address);
+ virtual Future<size_t> recv(char* data, size_t size);
+ // Send does not currently support discard. See implementation.
+ virtual Future<size_t> send(const char* data, size_t size);
+ virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
+ virtual Try<Nothing> listen(int backlog);
+ virtual Future<Socket> accept();
+ virtual Socket::Kind kind() const { return Socket::SSL; }
+
+ // This call is used to do the equivalent of shutting down the read
+ // end. This means finishing the future of any outstanding read
+ // request.
+ virtual void shutdown();
+
+ // We need a post-initializer because 'shared_from_this()' is not
+ // valid until the constructor has finished.
+ void initialize();
+
+private:
+ // A set of helper functions that transitions an accepted socket to
+ // an SSL connected socket. With the libevent-openssl library, once
+ // we return from the 'accept_callback()' which is scheduled by
+ // 'listen' then we still need to wait for the 'BEV_EVENT_CONNECTED'
+ // state before we know the SSL connection has been established.
+ struct AcceptRequest
+ {
+ AcceptRequest(
+ int _socket,
+ evconnlistener* _listener,
+ const Option<net::IP>& _ip)
+ : listener(_listener),
+ socket(_socket),
+ ip(_ip) {}
+ Promise<Socket> promise;
+ evconnlistener* listener;
+ int socket;
+ Option<net::IP> ip;
+ };
+
+ struct RecvRequest
+ {
+ RecvRequest(char* _data, size_t _size)
+ : data(_data), size(_size) {}
+ Promise<size_t> promise;
+ char* data;
+ size_t size;
+ };
+
+ struct SendRequest
+ {
+ SendRequest(size_t _size)
+ : size(_size) {}
+ Promise<size_t> promise;
+ size_t size;
+ };
+
+ struct ConnectRequest
+ {
+ Promise<Nothing> promise;
+ };
+
+ // This is a private constructor used by the accept helper
+ // functions.
+ LibeventSSLSocketImpl(
+ int _s,
+ bufferevent* bev,
+ Option<std::string>&& peer_hostname);
+
+ // This is called when the equivalent of 'accept' returns. The role
+ // of this function is to set up the SSL object and bev.
+ void accept_callback(AcceptRequest* request);
+
+ // The following are function pairs of static functions to member
+ // functions. The static functions test and hold the weak pointer to
+ // the socket before calling the member functions. This protects
+ // against the socket being destroyed before the event loop calls
+ // the callbacks.
+ static void recv_callback(bufferevent* bev, void* arg);
+ void recv_callback();
+
+ static void send_callback(bufferevent* bev, void* arg);
+ void send_callback();
+
+ static void event_callback(bufferevent* bev, short events, void* arg);
+ void event_callback(short events);
+
+ bufferevent* bev;
+
+ evconnlistener* listener;
+
+ // Protects the following instance variables.
+ std::atomic_flag lock;
+ Owned<RecvRequest> recv_request;
+ Owned<SendRequest> send_request;
+ Owned<ConnectRequest> connect_request;
+
+ // This is a weak pointer to 'this', i.e., ourselves, this class
+ // instance. We need this for our event loop callbacks because it's
+ // possible that we'll actually want to cleanup this socket impl
+ // before the event loop callback gets executed ... and we'll check
+ // in each event loop callback whether or not this weak_ptr is valid
+ // by attempting to upgrade it to shared_ptr. It is the
+ // responsibility of the event loop through the deferred lambda in
+ // the destructor to clean up this pointer.
+ // 1) It is a 'weak_ptr' as opposed to a 'shared_ptr' because we
+ // want to test whether the object is still around from within the
+ // event loop. If it was a 'shared_ptr' then we would be
+ // contributing to the lifetime of the object and would no longer be
+ // able to test the lifetime correctly.
+ // 2) This is a pointer to a 'weak_ptr' so that we can pass this
+ // through to the event loop through the C-interface. We need access
+ // to the 'weak_ptr' from outside the object (in the event loop) to
+ // test if the object is still alive. By maintaining this 'weak_ptr'
+ // on the heap we can be sure it is safe to access from the
+ // event loop until it is destroyed.
+ std::weak_ptr<LibeventSSLSocketImpl>* event_loop_handle;
+
+ // This queue stores buffered accepted sockets. 'Queue' is a thread
+ // safe queue implementation, and the event loop pushes connected
+ // sockets onto it, the 'accept()' call pops them off. We wrap these
+ // sockets with futures so that we can pass errors through and chain
+ // futures as well.
+ Queue<Future<Socket>> accept_queue;
+
+ // This bool represents whether this socket was created through an
+ // 'accept' flow. We use this in the destructor to change
+ // the clean-up behavior for the SSL context object.
+ bool accepted;
+
+ Option<std::string> peer_hostname;
+};
+
+} // namespace network {
+} // namespace process {
+
+#endif // __LIBEVENT_SSL_SOCKET_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/openssl.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/openssl.cpp b/3rdparty/libprocess/src/openssl.cpp
new file mode 100644
index 0000000..090e985
--- /dev/null
+++ b/3rdparty/libprocess/src/openssl.cpp
@@ -0,0 +1,563 @@
+#include "openssl.hpp"
+
+#include <sys/param.h>
+
+#include <openssl/err.h>
+#include <openssl/rand.h>
+#include <openssl/ssl.h>
+#include <openssl/x509v3.h>
+
+#include <mutex>
+#include <string>
+
+#include <process/once.hpp>
+
+#include <stout/flags.hpp>
+
+using std::ostringstream;
+using std::string;
+
+// Must be defined by us for OpenSSL in order to capture the necessary
+// data for doing locking. Note, this needs to be defined in the
+// global namespace as well.
+struct CRYPTO_dynlock_value
+{
+ std::mutex mutex;
+};
+
+
+namespace process {
+namespace network {
+namespace openssl {
+
+// _Global_ OpenSSL context, initialized via 'initialize'.
+static SSL_CTX* ctx = NULL;
+
+
+Flags::Flags()
+{
+ add(&Flags::enabled,
+ "enabled",
+ "Whether SSL is enabled.",
+ false);
+
+ add(&Flags::cert_file,
+ "cert_file",
+ "Path to certifcate.");
+
+ add(&Flags::key_file,
+ "key_file",
+ "Path to key.");
+
+ add(&Flags::verify_cert,
+ "verify_cert",
+ "Whether or not to verify peer certificates.",
+ false);
+
+ add(&Flags::require_cert,
+ "require_cert",
+ "Whether or not to require peer certificates. Requiring a peer "
+ "certificate implies verifying it.",
+ false);
+
+ add(&Flags::verification_depth,
+ "verification_depth",
+ "Maximum depth for the certificate chain verification that shall be "
+ "allowed.",
+ 4);
+
+ add(&Flags::ca_dir,
+ "ca_dir",
+ "Path to certifcate authority (CA) directory.");
+
+ add(&Flags::ca_file,
+ "ca_file",
+ "Path to certifcate authority (CA) file.");
+
+ add(&Flags::ciphers,
+ "ciphers",
+ "Cryptographic ciphers to use.",
+ // Default TLSv1 ciphers chosen based on Amazon's security
+ // policy, see:
+ // http://docs.aws.amazon.com/ElasticLoadBalancing/latest/
+ // DeveloperGuide/elb-security-policy-table.html
+ "AES128-SHA:AES256-SHA:RC4-SHA:DHE-RSA-AES128-SHA:DHE-DSS-AES128-SHA:"
+ "DHE-RSA-AES256-SHA:DHE-DSS-AES256-SHA");
+
+ add(&Flags::enable_ssl_v2,
+ "enable_ssl_v2",
+ "Enable SSLV2.",
+ false);
+
+ add(&Flags::enable_ssl_v3,
+ "enable_ssl_v3",
+ "Enable SSLV3.",
+ false);
+
+ add(&Flags::enable_tls_v1_0,
+ "enable_tls_v1_0",
+ "Enable SSLV1.0.",
+ false);
+
+ add(&Flags::enable_tls_v1_1,
+ "enable_tls_v1_1",
+ "Enable SSLV1.1.",
+ false);
+
+ add(&Flags::enable_tls_v1_2,
+ "enable_tls_v1_2",
+ "Enable SSLV1.2.",
+ true);
+}
+
+
+static Flags* ssl_flags = new Flags();
+
+
+const Flags& flags()
+{
+ openssl::initialize();
+ return *ssl_flags;
+}
+
+
+// Mutexes necessary to support OpenSSL locking on shared data
+// structures. See 'locking_function' for more information.
+static std::mutex* mutexes = NULL;
+
+
+// Callback needed to perform locking on shared data structures. From
+// the OpenSSL documentation:
+//
+// OpenSSL uses a number of global data structures that will be
+// implicitly shared whenever multiple threads use OpenSSL.
+// Multi-threaded applications will crash at random if [the locking
+// function] is not set.
+void locking_function(int mode, int n, const char* /*file*/, int /*line*/)
+{
+ if (mode & CRYPTO_LOCK) {
+ mutexes[n].lock();
+ } else {
+ mutexes[n].unlock();
+ }
+}
+
+
+// OpenSSL callback that returns the current thread ID, necessary for
+// OpenSSL threading.
+unsigned long id_function()
+{
+ pthread_t pthread = pthread_self();
+#ifdef __APPLE__
+ mach_port_t id = pthread_mach_thread_np(pthread);
+#else
+ pthread_t id = pthread;
+#endif // __APPLE__
+ return static_cast<unsigned long>(id);
+}
+
+
+// OpenSSL callback for creating new dynamic "locks", abstracted by
+// the CRYPTO_dynlock_value structure.
+CRYPTO_dynlock_value* dyn_create_function(const char* /*file*/, int /*line*/)
+{
+ CRYPTO_dynlock_value* value = new CRYPTO_dynlock_value();
+
+ if (value == NULL) {
+ return NULL;
+ }
+
+ return value;
+}
+
+
+// OpenSSL callback for locking and unlocking dynamic "locks",
+// abstracted by the CRYPTO_dynlock_value structure.
+void dyn_lock_function(
+ int mode,
+ CRYPTO_dynlock_value* value,
+ const char* /*file*/,
+ int /*line*/)
+{
+ if (mode & CRYPTO_LOCK) {
+ value->mutex.lock();
+ } else {
+ value->mutex.unlock();
+ }
+}
+
+
+// OpenSSL callback for destroying dynamic "locks", abstracted by the
+// CRYPTO_dynlock_value structure.
+void dyn_destroy_function(
+ CRYPTO_dynlock_value* value,
+ const char* /*file*/,
+ int /*line*/)
+{
+ delete value;
+}
+
+
+// Callback for OpenSSL peer certificate verification.
+int verify_callback(int ok, X509_STORE_CTX* store)
+{
+ if (ok != 1) {
+ // Construct and log a warning message.
+ ostringstream message;
+
+ X509* cert = X509_STORE_CTX_get_current_cert(store);
+ int error = X509_STORE_CTX_get_error(store);
+ int depth = X509_STORE_CTX_get_error_depth(store);
+
+ message << "Error with certificate at depth: " << stringify(depth) << "\n";
+
+ char buffer[256] {};
+
+ // TODO(jmlvanre): use X509_NAME_print_ex instead.
+ X509_NAME_oneline(X509_get_issuer_name(cert), buffer, sizeof(buffer) - 1);
+
+ message << "Issuer: " << stringify(buffer) << "\n";
+
+ // TODO(jmlvanre): use X509_NAME_print_ex instead.
+ memset(buffer, 0, sizeof(buffer));
+ X509_NAME_oneline(X509_get_subject_name(cert), buffer, sizeof(buffer) - 1);
+
+ message << "Subject: " << stringify(buffer) << "\n";
+
+ message << "Error (" << stringify(error) << "): " <<
+ stringify(X509_verify_cert_error_string(error));
+
+ LOG(WARNING) << message.str();
+ }
+
+ return ok;
+}
+
+
+string error_string(unsigned long code)
+{
+ // SSL library guarantees to stay within 120 bytes.
+ char buffer[128];
+
+ ERR_error_string_n(code, buffer, sizeof(buffer));
+ string s(buffer);
+
+ if (code == SSL_ERROR_SYSCALL) {
+ s += error_string(ERR_get_error());
+ }
+
+ return s;
+}
+
+
+void initialize()
+{
+ static Once* initialized = new Once();
+
+ if (initialized->once()) {
+ return;
+ }
+
+ // Load all the flags prefixed by SSL_ from the environment. See
+ // comment at top of openssl.hpp for a full list.
+ Try<Nothing> load = ssl_flags->load("SSL_");
+ if (load.isError()) {
+ EXIT(EXIT_FAILURE)
+ << "Failed to load flags from environment variables (prefixed by SSL_):"
+ << load.error();
+ }
+
+ // Exit early if SSL is not enabled.
+ if (!ssl_flags->enabled) {
+ initialized->done();
+ return;
+ }
+
+ // We MUST have entropy, or else there's no point to crypto.
+ if (!RAND_poll()) {
+ EXIT(EXIT_FAILURE) << "SSL socket requires entropy";
+ }
+
+ // Initialize the OpenSSL library.
+ SSL_library_init();
+ SSL_load_error_strings();
+
+ // Prepare mutexes for threading callbacks.
+ mutexes = new std::mutex[CRYPTO_num_locks()];
+
+ // Install SSL threading callbacks.
+ // TODO(jmlvanre): the id mechanism is deprecated in OpenSSL.
+ CRYPTO_set_id_callback(&id_function);
+ CRYPTO_set_locking_callback(&locking_function);
+ CRYPTO_set_dynlock_create_callback(&dyn_create_function);
+ CRYPTO_set_dynlock_lock_callback(&dyn_lock_function);
+ CRYPTO_set_dynlock_destroy_callback(&dyn_destroy_function);
+
+ ctx = SSL_CTX_new(SSLv23_method());
+ CHECK(ctx) << "Failed to create SSL context: "
+ << ERR_error_string(ERR_get_error(), NULL);
+
+ // Disable SSL session caching.
+ SSL_CTX_set_session_cache_mode(ctx, SSL_SESS_CACHE_OFF);
+
+ // Set a session id to avoid connection termination upon
+ // re-connect. We can use something more relevant when we care
+ // about session caching.
+ const uint64_t session_ctx = 7;
+
+ const unsigned char* session_id =
+ reinterpret_cast<const unsigned char*>(&session_ctx);
+
+ if (SSL_CTX_set_session_id_context(
+ ctx,
+ session_id,
+ sizeof(session_ctx)) != 1) {
+ LOG(FATAL) << "Session id context size exceeds maximum";
+ }
+
+ // Now do some validation of the flags/environment variables.
+ if (ssl_flags->key_file.isNone()) {
+ EXIT(EXIT_FAILURE) << "SSL requires key! NOTE: Set path with SSL_KEY_FILE";
+ }
+
+ if (ssl_flags->cert_file.isNone()) {
+ EXIT(EXIT_FAILURE)
+ << "SSL requires certificate! NOTE: Set path with SSL_CERT_FILE";
+ }
+
+ if (ssl_flags->ca_file.isNone()) {
+ VLOG(2) << "CA file path is unspecified! NOTE: "
+ << "Set CA file path with SSL_CA_FILE=<filepath>";
+ }
+
+ if (ssl_flags->ca_dir.isNone()) {
+ VLOG(2) << "CA directory path unspecified! NOTE: "
+ << "Set CA directory path with SSL_CA_DIR=<dirpath>";
+ }
+
+ if (!ssl_flags->verify_cert) {
+ VLOG(2) << "Will not verify peer certificate!\n"
+ << "NOTE: Set SSL_VERIFY_CERT=1 to enable peer certificate "
+ << "verification";
+ }
+
+ if (!ssl_flags->require_cert) {
+ VLOG(2) << "Will only verify peer certificate if presented!\n"
+ << "NOTE: Set SSL_REQUIRE_CERT=1 to require peer certificate "
+ << "verification";
+ }
+
+ if (ssl_flags->require_cert && !ssl_flags->verify_cert) {
+ // Requiring a certificate implies that is should be verified.
+ ssl_flags->verify_cert = true;
+
+ VLOG(2) << "SSL_REQUIRE_CERT implies peer certificate verification.\n"
+ << "SSL_VERIFY_CERT set to true";
+ }
+
+ // Initialize OpenSSL if we've been asked to do verification of peer
+ // certificates.
+ if (ssl_flags->verify_cert) {
+ // Set CA locations.
+ if (ssl_flags->ca_file.isSome() || ssl_flags->ca_dir.isSome()) {
+ const char* ca_file = ssl_flags->ca_file.isSome()
+ ? ssl_flags->ca_file.get().c_str()
+ : NULL;
+
+ const char* ca_dir = ssl_flags->ca_dir.isSome()
+ ? ssl_flags->ca_dir.get().c_str()
+ : NULL;
+
+ if (SSL_CTX_load_verify_locations(ctx, ca_file, ca_dir) != 1) {
+ unsigned long error = ERR_get_error();
+ EXIT(EXIT_FAILURE)
+ << "Could not load CA file and/or directory ("
+ << stringify(error) << "): "
+ << error_string(error) << " -> "
+ << (ca_file != NULL ? (stringify("FILE: ") + ca_file) : "")
+ << (ca_dir != NULL ? (stringify("DIR: ") + ca_dir) : "");
+ }
+
+ if (ca_file != NULL) {
+ VLOG(2) << "Using CA file: " << ca_file;
+ }
+ if (ca_dir != NULL) {
+ VLOG(2) << "Using CA dir: " << ca_dir;
+ }
+ } else {
+ if (SSL_CTX_set_default_verify_paths(ctx) != 1) {
+ EXIT(EXIT_FAILURE) << "Could not load default CA file and/or directory";
+ }
+
+ VLOG(2) << "Using default CA file and/or directory";
+ }
+
+ // Set SSL peer verification callback.
+ int mode = SSL_VERIFY_PEER;
+
+ if (ssl_flags->require_cert) {
+ mode |= SSL_VERIFY_FAIL_IF_NO_PEER_CERT;
+ }
+
+ SSL_CTX_set_verify(ctx, mode, &verify_callback);
+
+ SSL_CTX_set_verify_depth(ctx, ssl_flags->verification_depth);
+ } else {
+ SSL_CTX_set_verify(ctx, SSL_VERIFY_NONE, NULL);
+ }
+
+ // Set certificate chain.
+ if (SSL_CTX_use_certificate_chain_file(
+ ctx,
+ ssl_flags->cert_file.get().c_str()) != 1) {
+ EXIT(EXIT_FAILURE) << "Could not load cert file";
+ }
+
+ // Set private key.
+ if (SSL_CTX_use_PrivateKey_file(
+ ctx,
+ ssl_flags->key_file.get().c_str(),
+ SSL_FILETYPE_PEM) != 1) {
+ EXIT(EXIT_FAILURE) << "Could not load key file";
+ }
+
+ // Validate key.
+ if (SSL_CTX_check_private_key(ctx) != 1) {
+ EXIT(EXIT_FAILURE)
+ << "Private key does not match the certificate public key";
+ }
+
+ VLOG(2) << "Using ciphers: " << ssl_flags->ciphers;
+
+ if (SSL_CTX_set_cipher_list(ctx, ssl_flags->ciphers.c_str()) == 0) {
+ EXIT(EXIT_FAILURE) << "Could not set ciphers: " << ssl_flags->ciphers;
+ }
+
+ // Use server preference for cipher.
+ long ssl_options = SSL_OP_CIPHER_SERVER_PREFERENCE;
+ // Disable SSLv2.
+ if (!ssl_flags->enable_ssl_v2) { ssl_options |= SSL_OP_NO_SSLv2; }
+ // Disable SSLv3.
+ if (!ssl_flags->enable_ssl_v3) { ssl_options |= SSL_OP_NO_SSLv3; }
+ // Disable TLSv1.
+ if (!ssl_flags->enable_tls_v1_0) { ssl_options |= SSL_OP_NO_TLSv1; }
+ // Disable TLSv1.1.
+ if (!ssl_flags->enable_tls_v1_1) { ssl_options |= SSL_OP_NO_TLSv1_1; }
+ // Disable TLSv1.2.
+ if (!ssl_flags->enable_tls_v1_2) { ssl_options |= SSL_OP_NO_TLSv1_2; }
+
+ SSL_CTX_set_options(ctx, ssl_options);
+
+ initialized->done();
+}
+
+
+SSL_CTX* context()
+{
+ // TODO(benh): Always call 'initialize' just in case?
+ return ctx;
+}
+
+
+Try<Nothing> verify(const SSL* const ssl, const Option<string>& hostname)
+{
+ // Return early if we don't need to verify.
+ if (!ssl_flags->verify_cert) {
+ return Nothing();
+ }
+
+ // The X509 object must be freed if this call succeeds.
+ // TODO(jmlvanre): handle this better. How about RAII?
+ X509* cert = SSL_get_peer_certificate(ssl);
+
+ if (cert == NULL) {
+ return ssl_flags->require_cert
+ ? Error("Peer did not provide certificate")
+ : Try<Nothing>(Nothing());
+ }
+
+ if (SSL_get_verify_result(ssl) != X509_V_OK) {
+ X509_free(cert);
+ return Error("Could not verify peer certificate");
+ }
+
+ if (hostname.isNone()) {
+ X509_free(cert);
+ return ssl_flags->require_cert
+ ? Error("Cannot verify peer certificate: peer hostname unknown")
+ : Try<Nothing>(Nothing());
+ }
+
+ int extcount = X509_get_ext_count(cert);
+ if (extcount <= 0) {
+ X509_free(cert);
+ return Error("X509_get_ext_count failed: " + stringify(extcount));
+ }
+
+ for (int i = 0; i < extcount; i++) {
+ X509_EXTENSION* ext = X509_get_ext(cert, i);
+
+ const string extstr =
+ OBJ_nid2sn(OBJ_obj2nid(X509_EXTENSION_get_object(ext)));
+
+ if (extstr == "subjectAltName") {
+#if OPENSSL_VERSION_NUMBER <= 0x00909000L
+ X509V3_EXT_METHOD* method = X509V3_EXT_get(ext);
+#else
+ const X509V3_EXT_METHOD* method = X509V3_EXT_get(ext);
+#endif
+ if (method == NULL) {
+ break;
+ }
+
+ const unsigned char* data = ext->value->data;
+
+ STACK_OF(CONF_VALUE)* values = method->i2v(
+ method,
+ method->d2i(NULL, &data, ext->value->length),
+ NULL);
+
+ for (int j = 0; j < sk_CONF_VALUE_num(values); j++) {
+ CONF_VALUE* value = sk_CONF_VALUE_value(values, j);
+ if ((strcmp(value->name, "DNS") == 0) &&
+ (value->value == hostname.get())) {
+ X509_free(cert);
+ return Nothing();
+ }
+ }
+ }
+ }
+
+ // If we still haven't verified the hostname, try doing it via
+ // the certificate subject name.
+ X509_NAME* name = X509_get_subject_name(cert);
+
+ if (name != NULL) {
+ char text[_POSIX_HOST_NAME_MAX] {};
+
+ if (X509_NAME_get_text_by_NID(
+ name,
+ NID_commonName,
+ text,
+ sizeof(text)) > 0) {
+ if (hostname.get() != text) {
+ X509_free(cert);
+ return Error(
+ "Presented Certificate Name: " + stringify(text) +
+ " does not match peer hostname name: " + hostname.get());
+ }
+
+ X509_free(cert);
+ return Nothing();
+ }
+ }
+
+ // If we still haven't exited, we haven't verified it, and we give up.
+ X509_free(cert);
+ return Error(
+ "Could not verify presented certificate with hostname " + hostname.get());
+}
+
+} // namespace openssl {
+} // namespace network {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/openssl.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/openssl.hpp b/3rdparty/libprocess/src/openssl.hpp
new file mode 100644
index 0000000..60c7b07
--- /dev/null
+++ b/3rdparty/libprocess/src/openssl.hpp
@@ -0,0 +1,79 @@
+#ifndef __OPENSSL_HPP__
+#define __OPENSSL_HPP__
+
+#include <openssl/ssl.h>
+
+#include <string>
+
+#include <stout/flags.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+namespace process {
+namespace network {
+namespace openssl {
+
+// Capture the environment variables that influence how we initialize
+// the OpenSSL library via flags.
+class Flags : public virtual flags::FlagsBase
+{
+public:
+ Flags();
+
+ bool enabled;
+ Option<std::string> cert_file;
+ Option<std::string> key_file;
+ bool verify_cert;
+ bool require_cert;
+ unsigned int verification_depth;
+ Option<std::string> ca_dir;
+ Option<std::string> ca_file;
+ std::string ciphers;
+ bool enable_ssl_v2;
+ bool enable_ssl_v3;
+ bool enable_tls_v1_0;
+ bool enable_tls_v1_1;
+ bool enable_tls_v1_2;
+};
+
+const Flags& flags();
+
+// Initializes the _global_ OpenSSL context (SSL_CTX) as well as the
+// crypto library in order to support multi-threading. The global
+// context gets initialized using the environment variables:
+//
+// SSL_ENABLED=(false|0,true|1)
+// SSL_CERT_FILE=(path to certificate)
+// SSL_KEY_FILE=(path to key)
+// SSL_VERIFY_CERT=(false|0,true|1)
+// SSL_REQUIRE_CERT=(false|0,true|1)
+// SSL_VERIFY_DEPTH=(4)
+// SSL_CA_DIR=(path to CA directory)
+// SSL_CA_FILE=(path to CA file)
+// SSL_CIPHERS=(accepted ciphers separated by ':')
+// SSL_ENABLE_SSL_V2=(false|0,true|1)
+// SSL_ENABLE_SSL_V3=(false|0,true|1)
+// SSL_ENABLE_TLS_V1_0=(false|0,true|1)
+// SSL_ENABLE_TLS_V1_1=(false|0,true|1)
+// SSL_ENABLE_TLS_V1_2=(false|0,true|1)
+//
+// TODO(benh): When/If we need to support multiple contexts in the
+// same process, for example for Server Name Indication (SNI), then
+// we'll add other functions for initializing an SSL_CTX based on
+// these environment variables.
+// TODO(nneilsen): Support certification revocation.
+void initialize();
+
+// Returns the _global_ OpenSSL context.
+SSL_CTX* context();
+
+// Verify that the hostname is properly associated with the peer
+// certificate associated with the specified SSL connection.
+Try<Nothing> verify(const SSL* const ssl, const Option<std::string>& hostname);
+
+} // namespace openssl {
+} // namespace network {
+} // namespace process {
+
+#endif // __OPENSSL_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index c2baa6c..f919b99 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1646,15 +1646,22 @@ Encoder* SocketManager::next(int s)
}
dispose.erase(s);
+
auto iterator = sockets.find(s);
- delete iterator->second;
- sockets.erase(iterator);
// We don't actually close the socket (we wait for the Socket
// abstraction to close it once there are no more references),
// but we do shutdown the receiving end so any DataDecoder
// will get cleaned up (which might have the last reference).
- shutdown(s, SHUT_RD);
+
+ // Hold on to the Socket and remove it from the 'sockets'
+ // map so that in the case where 'shutdown()' ends up
+ // calling close the termination logic is not run twice.
+ Socket* socket = iterator->second;
+ sockets.erase(iterator);
+ socket->shutdown();
+
+ delete socket;
}
}
}
@@ -1714,6 +1721,9 @@ void SocketManager::close(int s)
proxies.erase(s);
}
+ dispose.erase(s);
+ auto iterator = sockets.find(s);
+
// We need to stop any 'ignore_data' receivers as they may have
// the last Socket reference so we shutdown recvs but don't do a
// full close (since that will be taken care of by ~Socket, see
@@ -1722,12 +1732,16 @@ void SocketManager::close(int s)
// from the socket. Note we need to do this before we call
// 'sockets.erase(s)' to avoid the potential race with the last
// reference being in 'sockets'.
- shutdown(s, SHUT_RD);
- dispose.erase(s);
- auto iterator = sockets.find(s);
- delete iterator->second;
+
+ // Hold on to the Socket and remove it from the 'sockets' map so
+ // that in the case where 'shutdown()' ends up calling close the
+ // termination logic is not run twice.
+ Socket* socket = iterator->second;
sockets.erase(iterator);
+ socket->shutdown();
+
+ delete socket;
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/654cabf9/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 0e1cebb..b2a27b5 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -9,6 +9,10 @@
#include <process/owned.hpp>
#include <process/socket.hpp>
+#ifdef USE_SSL_SOCKET
+#include "libevent_ssl_socket.hpp"
+#include "openssl.hpp"
+#endif
#include "poll_socket.hpp"
using std::string;
@@ -55,6 +59,16 @@ Try<Socket> Socket::create(Kind kind, int s)
}
return Socket(socket.get());
}
+#ifdef USE_SSL_SOCKET
+ case SSL: {
+ Try<std::shared_ptr<Socket::Impl>> socket =
+ LibeventSSLSocketImpl::create(s);
+ if (socket.isError()) {
+ return Error(socket.error());
+ }
+ return Socket(socket.get());
+ }
+#endif
// By not setting a default we leverage the compiler errors when
// the enumeration is augmented to find all the cases we need to
// provide.
@@ -64,9 +78,13 @@ Try<Socket> Socket::create(Kind kind, int s)
const Socket::Kind& Socket::DEFAULT_KIND()
{
- // TODO(jmlvanre): Change the default based on configure or
- // environment flags.
- static const Kind DEFAULT = POLL;
+ static const Kind DEFAULT =
+#ifdef USE_SSL_SOCKET
+ network::openssl::flags().enabled ? Socket::SSL : Socket::POLL;
+#else
+ Socket::POLL;
+#endif
+
return DEFAULT;
}