You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2018/06/28 18:22:24 UTC

[10/16] mesos git commit: Organized POSIX and Windows libprocess implementations.

Organized POSIX and Windows libprocess implementations.

In preparation of the new Windows IOCP library, the POSIX and Windows
specific files in libprocess have been moved to their own directories
for better code organization.

Review: https://reviews.apache.org/r/67388/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/78244b53
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/78244b53
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/78244b53

Branch: refs/heads/master
Commit: 78244b533a50e775791025c0c2b9d13506c71e60
Parents: 379e56b
Author: Akash Gupta <ak...@hotmail.com>
Authored: Wed Jun 27 14:30:14 2018 -0700
Committer: Andrew Schwartzmeyer <an...@schwartzmeyer.com>
Committed: Wed Jun 27 15:06:10 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |   41 +-
 3rdparty/libprocess/src/CMakeLists.txt          |   18 +-
 3rdparty/libprocess/src/io.cpp                  |  105 +-
 3rdparty/libprocess/src/io_internal.hpp         |   36 +
 3rdparty/libprocess/src/libev.cpp               |  166 ---
 3rdparty/libprocess/src/libev.hpp               |   96 --
 3rdparty/libprocess/src/libev_poll.cpp          |  142 --
 3rdparty/libprocess/src/libevent.cpp            |  211 ---
 3rdparty/libprocess/src/libevent.hpp            |   48 -
 3rdparty/libprocess/src/libevent_poll.cpp       |  112 --
 3rdparty/libprocess/src/libevent_ssl_socket.cpp | 1249 ------------------
 3rdparty/libprocess/src/libevent_ssl_socket.hpp |  198 ---
 3rdparty/libprocess/src/poll_socket.cpp         |  278 ----
 3rdparty/libprocess/src/posix/io.cpp            |  140 ++
 3rdparty/libprocess/src/posix/libev/libev.cpp   |  166 +++
 3rdparty/libprocess/src/posix/libev/libev.hpp   |   96 ++
 .../libprocess/src/posix/libev/libev_poll.cpp   |  142 ++
 .../libprocess/src/posix/libevent/libevent.cpp  |  211 +++
 .../libprocess/src/posix/libevent/libevent.hpp  |   48 +
 .../src/posix/libevent/libevent_poll.cpp        |  112 ++
 .../src/posix/libevent/libevent_ssl_socket.cpp  | 1249 ++++++++++++++++++
 .../src/posix/libevent/libevent_ssl_socket.hpp  |  198 +++
 3rdparty/libprocess/src/posix/poll_socket.cpp   |  278 ++++
 3rdparty/libprocess/src/posix/subprocess.cpp    |  102 ++
 3rdparty/libprocess/src/posix/subprocess.hpp    |  355 +++++
 3rdparty/libprocess/src/socket.cpp              |    2 +-
 3rdparty/libprocess/src/subprocess.cpp          |    4 +-
 3rdparty/libprocess/src/subprocess_posix.cpp    |  102 --
 3rdparty/libprocess/src/subprocess_posix.hpp    |  355 -----
 3rdparty/libprocess/src/subprocess_windows.cpp  |  101 --
 3rdparty/libprocess/src/subprocess_windows.hpp  |  118 --
 3rdparty/libprocess/src/windows/subprocess.cpp  |  101 ++
 3rdparty/libprocess/src/windows/subprocess.hpp  |  118 ++
 33 files changed, 3391 insertions(+), 3307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index de69d04..2d356aa 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -31,7 +31,7 @@ endif
 
 ACLOCAL_AMFLAGS = -I m4
 
-AUTOMAKE_OPTIONS = foreign
+AUTOMAKE_OPTIONS = foreign subdir-objects
 LIBPROCESS_BUILD_DIR=@abs_builddir@
 
 if STANDALONE_LIBPROCESS
@@ -204,6 +204,7 @@ libprocess_la_SOURCES =		\
   src/http_proxy.cpp		\
   src/http_proxy.hpp		\
   src/io.cpp			\
+  src/io_internal.hpp		\
   src/latch.cpp			\
   src/logging.cpp		\
   src/memory_profiler.cpp	\
@@ -211,7 +212,10 @@ libprocess_la_SOURCES =		\
   src/metrics/metrics.cpp	\
   src/mime.cpp			\
   src/pid.cpp			\
-  src/poll_socket.cpp		\
+  src/posix/io.cpp		\
+  src/posix/poll_socket.cpp	\
+  src/posix/subprocess.cpp	\
+  src/posix/subprocess.hpp	\
   src/poll_socket.hpp		\
   src/process.cpp		\
   src/process_reference.hpp	\
@@ -222,18 +226,16 @@ libprocess_la_SOURCES =		\
   src/socket.cpp		\
   src/socket_manager.hpp	\
   src/subprocess.cpp		\
-  src/subprocess_posix.cpp	\
-  src/subprocess_posix.hpp	\
   src/time.cpp
 
 if ENABLE_SSL
-libprocess_la_SOURCES +=	\
-  src/jwt.cpp			\
-  src/jwt_authenticator.cpp	\
-  src/libevent_ssl_socket.cpp	\
-  src/libevent_ssl_socket.hpp	\
-  src/openssl.cpp		\
-  src/openssl.hpp		\
+libprocess_la_SOURCES +=			\
+  src/jwt.cpp					\
+  src/jwt_authenticator.cpp			\
+  src/posix/libevent/libevent_ssl_socket.cpp	\
+  src/posix/libevent/libevent_ssl_socket.hpp	\
+  src/openssl.cpp				\
+  src/openssl.hpp				\
   src/ssl/utilities.cpp
 endif
 
@@ -241,6 +243,7 @@ endif
 libprocess_la_CPPFLAGS =			\
   -DBUILD_DIR=\"$(LIBPROCESS_BUILD_DIR)\"	\
   -I$(srcdir)/include				\
+  -I$(srcdir)/src				\
   $(BOOST_INCLUDE_FLAGS)			\
   $(CONCURRENTQUEUE_INCLUDE_FLAGS)		\
   $(ELFIO_INCLUDE_FLAGS)			\
@@ -260,15 +263,15 @@ libprocess_la_SOURCES +=	\
 endif
 
 if ENABLE_LIBEVENT
-libprocess_la_SOURCES +=	\
-  src/libevent.hpp		\
-  src/libevent.cpp		\
-  src/libevent_poll.cpp
+libprocess_la_SOURCES +=			\
+  src/posix/libevent/libevent.hpp		\
+  src/posix/libevent/libevent.cpp		\
+  src/posix/libevent/libevent_poll.cpp
 else
-libprocess_la_SOURCES +=	\
-  src/libev.hpp			\
-  src/libev.cpp			\
-  src/libev_poll.cpp
+libprocess_la_SOURCES +=			\
+  src/posix/libev/libev.hpp			\
+  src/posix/libev/libev.cpp			\
+  src/posix/libev/libev_poll.cpp
 endif
 
 if ENABLE_STATIC_LIBPROCESS

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/CMakeLists.txt b/3rdparty/libprocess/src/CMakeLists.txt
index 619183e..3544e91 100644
--- a/3rdparty/libprocess/src/CMakeLists.txt
+++ b/3rdparty/libprocess/src/CMakeLists.txt
@@ -40,7 +40,8 @@ set(PROCESS_SRC
   metrics/metrics.cpp
   mime.cpp
   pid.cpp
-  poll_socket.cpp
+  posix/io.cpp
+  posix/poll_socket.cpp
   process.cpp
   profiler.cpp
   reap.cpp
@@ -48,12 +49,13 @@ set(PROCESS_SRC
   subprocess.cpp
   time.cpp)
 
+
 if (WIN32)
   list(APPEND PROCESS_SRC
-    subprocess_windows.cpp)
+    windows/subprocess.cpp)
 else ()
   list(APPEND PROCESS_SRC
-    subprocess_posix.cpp)
+    posix/subprocess.cpp)
 endif ()
 
 if (ENABLE_GRPC)
@@ -63,12 +65,12 @@ endif ()
 
 if (ENABLE_LIBEVENT)
   list(APPEND PROCESS_SRC
-    libevent.cpp
-    libevent_poll.cpp)
+    posix/libevent/libevent.cpp
+    posix/libevent/libevent_poll.cpp)
 else ()
   list(APPEND PROCESS_SRC
-    libev.cpp
-    libev_poll.cpp)
+    posix/libev/libev.cpp
+    posix/libev/libev_poll.cpp)
 endif ()
 
 if (ENABLE_SSL)
@@ -80,7 +82,7 @@ if (ENABLE_SSL)
 
   if (ENABLE_LIBEVENT)
     list(APPEND PROCESS_SRC
-      libevent_ssl_socket.cpp)
+      posix/libevent/libevent_ssl_socket.cpp)
   endif ()
 endif ()
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/io.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io.cpp b/3rdparty/libprocess/src/io.cpp
index 70715e2..7288d5f 100644
--- a/3rdparty/libprocess/src/io.cpp
+++ b/3rdparty/libprocess/src/io.cpp
@@ -30,120 +30,23 @@
 #include <stout/os/strerror.hpp>
 #include <stout/os/write.hpp>
 
+#include "io_internal.hpp"
+
 using std::string;
 using std::vector;
 
 namespace process {
 namespace io {
-namespace internal {
-
-Future<size_t> read(int_fd fd, void* data, size_t size)
-{
-  // TODO(benh): Let the system calls do what ever they're supposed to
-  // rather than return 0 here?
-  if (size == 0) {
-    return 0;
-  }
-
-  return loop(
-      None(),
-      [=]() -> Future<Option<size_t>> {
-        // Because the file descriptor is non-blocking, we call
-        // read()/recv() immediately. If no data is available than
-        // we'll call `poll` and block. We also observed that for some
-        // combination of libev and Linux kernel versions, the poll
-        // would block for non-deterministically long periods of
-        // time. This may be fixed in a newer version of libev (we use
-        // 3.8 at the time of writing this comment).
-        ssize_t length = os::read(fd, data, size);
-        if (length < 0) {
-#ifdef __WINDOWS__
-          WindowsSocketError error;
-#else
-          ErrnoError error;
-#endif // __WINDOWS__
-
-          if (!net::is_restartable_error(error.code) &&
-              !net::is_retryable_error(error.code)) {
-            return Failure(error.message);
-          }
-
-          return None();
-        }
-
-        return length;
-      },
-      [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
-        // Restart/retry if we don't yet have a result.
-        if (length.isNone()) {
-          return io::poll(fd, io::READ)
-            .then([](short event) -> ControlFlow<size_t> {
-              CHECK_EQ(io::READ, event);
-              return Continue();
-            });
-        }
-        return Break(length.get());
-      });
-}
-
-
-Future<size_t> write(int_fd fd, const void* data, size_t size)
-{
-  // TODO(benh): Let the system calls do what ever they're supposed to
-  // rather than return 0 here?
-  if (size == 0) {
-    return 0;
-  }
-
-  return loop(
-      None(),
-      [=]() -> Future<Option<size_t>> {
-        ssize_t length = os::write(fd, data, size);
-
-        if (length < 0) {
-#ifdef __WINDOWS__
-          WindowsSocketError error;
-#else
-          ErrnoError error;
-#endif // __WINDOWS__
-
-          if (!net::is_restartable_error(error.code) &&
-              !net::is_retryable_error(error.code)) {
-            return Failure(error.message);
-          }
-
-          return None();
-        }
-
-        return length;
-      },
-      [=](const Option<size_t>& length) -> Future<ControlFlow<size_t>> {
-        // Restart/retry if we don't yet have a result.
-        if (length.isNone()) {
-          return io::poll(fd, io::WRITE)
-            .then([](short event) -> ControlFlow<size_t> {
-              CHECK_EQ(io::WRITE, event);
-              return Continue();
-            });
-        }
-        return Break(length.get());
-      });
-}
-
-} // namespace internal {
-
 
 Try<Nothing> prepare_async(int_fd fd)
 {
-  // TODO(akagup): Add windows iocp.
-  return os::nonblock(fd);
+  return internal::prepare_async(fd);
 }
 
 
 Try<bool> is_async(int_fd fd)
 {
-  // TODO(akagup): Add windows iocp.
-  return os::isNonblock(fd);
+  return internal::is_async(fd);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/io_internal.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/io_internal.hpp b/3rdparty/libprocess/src/io_internal.hpp
new file mode 100644
index 0000000..09bfa9a
--- /dev/null
+++ b/3rdparty/libprocess/src/io_internal.hpp
@@ -0,0 +1,36 @@
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License
+
+#ifndef __IO_INTERNAL_HPP__
+#define __IO_INTERNAL_HPP__
+
+#include <process/future.hpp>
+
+#include <stout/os/int_fd.hpp>
+
+namespace process {
+namespace io {
+namespace internal {
+
+Future<size_t> read(int_fd fd, void* data, size_t size);
+
+Future<size_t> write(int_fd fd, const void* data, size_t size);
+
+Try<Nothing> prepare_async(int_fd fd);
+
+Try<bool> is_async(int_fd fd);
+
+} // namespace internal {
+} // namespace io {
+} // namespace process {
+
+#endif // __IO_INTERNAL_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
deleted file mode 100644
index 173ee46..0000000
--- a/3rdparty/libprocess/src/libev.cpp
+++ /dev/null
@@ -1,166 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License
-
-#include <ev.h>
-
-#include <mutex>
-#include <queue>
-
-#include <stout/duration.hpp>
-#include <stout/lambda.hpp>
-#include <stout/nothing.hpp>
-
-#include "event_loop.hpp"
-#include "libev.hpp"
-
-namespace process {
-
-ev_async async_watcher;
-// We need an asynchronous watcher to receive the request to shutdown.
-ev_async shutdown_watcher;
-
-// Define the initial values for all of the declarations made in
-// libev.hpp (since these need to live in the static data space).
-struct ev_loop* loop = nullptr;
-
-std::queue<ev_io*>* watchers = new std::queue<ev_io*>();
-
-std::mutex* watchers_mutex = new std::mutex();
-
-std::queue<lambda::function<void()>>* functions =
-  new std::queue<lambda::function<void()>>();
-
-thread_local bool* _in_event_loop_ = nullptr;
-
-
-void handle_async(struct ev_loop* loop, ev_async* _, int revents)
-{
-  std::queue<lambda::function<void()>> run_functions;
-  synchronized (watchers_mutex) {
-    // Start all the new I/O watchers.
-    while (!watchers->empty()) {
-      ev_io* watcher = watchers->front();
-      watchers->pop();
-      ev_io_start(loop, watcher);
-    }
-
-    // Swap the functions into a temporary queue so that we can invoke
-    // them outside of the mutex.
-    std::swap(run_functions, *functions);
-  }
-
-  // Running the functions outside of the mutex reduces locking
-  // contention as these are arbitrary functions that can take a long
-  // time to execute. Doing this also avoids a deadlock scenario where
-  // (A) mutexes are acquired before calling `run_in_event_loop`,
-  // followed by locking (B) `watchers_mutex`. If we executed the
-  // functions inside the mutex, then the locking order violation
-  // would be this function acquiring the (B) `watchers_mutex`
-  // followed by the arbitrary function acquiring the (A) mutexes.
-  while (!run_functions.empty()) {
-    (run_functions.front())();
-    run_functions.pop();
-  }
-}
-
-
-void handle_shutdown(struct ev_loop* loop, ev_async* _, int revents)
-{
-  ev_unloop(loop, EVUNLOOP_ALL);
-}
-
-
-void EventLoop::initialize()
-{
-  loop = ev_default_loop(EVFLAG_AUTO);
-
-  ev_async_init(&async_watcher, handle_async);
-  ev_async_init(&shutdown_watcher, handle_shutdown);
-
-  ev_async_start(loop, &async_watcher);
-  ev_async_start(loop, &shutdown_watcher);
-}
-
-
-namespace internal {
-
-void handle_delay(struct ev_loop* loop, ev_timer* timer, int revents)
-{
-  lambda::function<void()>* function =
-    reinterpret_cast<lambda::function<void()>*>(timer->data);
-  (*function)();
-  delete function;
-  ev_timer_stop(loop, timer);
-  delete timer;
-}
-
-
-Future<Nothing> delay(
-    const Duration& duration,
-    const lambda::function<void()>& function)
-{
-  ev_timer* timer = new ev_timer();
-  timer->data = reinterpret_cast<void*>(new lambda::function<void()>(function));
-
-  // Determine the 'after' parameter to pass to libev and set it to 0
-  // in the event that it's negative so that we always make sure to
-  // invoke 'function' even if libev doesn't support negative 'after'
-  // values.
-  double after = duration.secs();
-
-  if (after < 0) {
-    after = 0;
-  }
-
-  const double repeat = 0.0;
-
-  ev_timer_init(timer, handle_delay, after, repeat);
-  ev_timer_start(loop, timer);
-
-  return Nothing();
-}
-
-} // namespace internal {
-
-
-void EventLoop::delay(
-    const Duration& duration,
-    const lambda::function<void()>& function)
-{
-  run_in_event_loop<Nothing>(
-      lambda::bind(&internal::delay, duration, function));
-}
-
-
-double EventLoop::time()
-{
-  // TODO(benh): Versus ev_now()?
-  return ev_time();
-}
-
-
-void EventLoop::run()
-{
-  __in_event_loop__ = true;
-
-  ev_loop(loop, 0);
-
-  __in_event_loop__ = false;
-}
-
-
-void EventLoop::stop()
-{
-  ev_async_send(loop, &shutdown_watcher);
-}
-
-} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
deleted file mode 100644
index d451931..0000000
--- a/3rdparty/libprocess/src/libev.hpp
+++ /dev/null
@@ -1,96 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License
-
-#ifndef __LIBEV_HPP__
-#define __LIBEV_HPP__
-
-#include <ev.h>
-
-#include <mutex>
-#include <queue>
-
-#include <process/future.hpp>
-#include <process/owned.hpp>
-
-#include <stout/lambda.hpp>
-#include <stout/synchronized.hpp>
-
-namespace process {
-
-// Event loop.
-extern struct ev_loop* loop;
-
-// Asynchronous watcher for interrupting loop to specifically deal
-// with IO watchers and functions (via run_in_event_loop).
-extern ev_async async_watcher;
-
-// Queue of I/O watchers to be asynchronously added to the event loop
-// (protected by 'watchers' below).
-// TODO(benh): Replace this queue with functions that we put in
-// 'functions' below that perform the ev_io_start themselves.
-extern std::queue<ev_io*>* watchers;
-extern std::mutex* watchers_mutex;
-
-// Queue of functions to be invoked asynchronously within the vent
-// loop (protected by 'watchers' above).
-extern std::queue<lambda::function<void()>>* functions;
-
-// Per thread bool pointer. We use a pointer to lazily construct the
-// actual bool.
-extern thread_local bool* _in_event_loop_;
-
-#define __in_event_loop__ *(_in_event_loop_ == nullptr ?                \
-  _in_event_loop_ = new bool(false) : _in_event_loop_)
-
-
-// Wrapper around function we want to run in the event loop.
-template <typename T>
-void _run_in_event_loop(
-    const lambda::function<Future<T>()>& f,
-    const Owned<Promise<T>>& promise)
-{
-  // Don't bother running the function if the future has been discarded.
-  if (promise->future().hasDiscard()) {
-    promise->discard();
-  } else {
-    promise->set(f());
-  }
-}
-
-
-// Helper for running a function in the event loop.
-template <typename T>
-Future<T> run_in_event_loop(const lambda::function<Future<T>()>& f)
-{
-  // If this is already the event loop then just run the function.
-  if (__in_event_loop__) {
-    return f();
-  }
-
-  Owned<Promise<T>> promise(new Promise<T>());
-
-  Future<T> future = promise->future();
-
-  // Enqueue the function.
-  synchronized (watchers_mutex) {
-    functions->push(lambda::bind(&_run_in_event_loop<T>, f, promise));
-  }
-
-  // Interrupt the loop.
-  ev_async_send(loop, &async_watcher);
-
-  return future;
-}
-
-} // namespace process {
-
-#endif // __LIBEV_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/libev_poll.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev_poll.cpp b/3rdparty/libprocess/src/libev_poll.cpp
deleted file mode 100644
index 96913a6..0000000
--- a/3rdparty/libprocess/src/libev_poll.cpp
+++ /dev/null
@@ -1,142 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License
-
-#include <ev.h>
-
-#include <memory>
-
-#include <process/future.hpp>
-#include <process/process.hpp> // For process::initialize.
-
-#include <stout/lambda.hpp>
-
-#include "libev.hpp"
-
-namespace process {
-
-// Data necessary for polling so we can discard polling and actually
-// stop it in the event loop.
-struct Poll
-{
-  Poll()
-  {
-    // Need to explicitly instantiate the watchers.
-    watcher.io.reset(new ev_io());
-    watcher.async.reset(new ev_async());
-  }
-
-  // An I/O watcher for checking for readability or writeability and
-  // an async watcher for being able to discard the polling.
-  struct {
-    std::shared_ptr<ev_io> io;
-    std::shared_ptr<ev_async> async;
-  } watcher;
-
-  Promise<short> promise;
-};
-
-
-// Event loop callback when I/O is ready on polling file descriptor.
-void polled(struct ev_loop* loop, ev_io* watcher, int revents)
-{
-  Poll* poll = (Poll*) watcher->data;
-
-  ev_io_stop(loop, poll->watcher.io.get());
-
-  // Stop the async watcher (also clears if pending so 'discard_poll'
-  // will not get invoked and we can delete 'poll' here).
-  ev_async_stop(loop, poll->watcher.async.get());
-
-  poll->promise.set(revents);
-
-  delete poll;
-}
-
-
-// Event loop callback when future associated with polling file
-// descriptor has been discarded.
-void discard_poll(struct ev_loop* loop, ev_async* watcher, int revents)
-{
-  Poll* poll = (Poll*) watcher->data;
-
-  // Check and see if we have a pending 'polled' callback and if so
-  // let it "win".
-  if (ev_is_pending(poll->watcher.io.get())) {
-    return;
-  }
-
-  ev_async_stop(loop, poll->watcher.async.get());
-
-  // Stop the I/O watcher (but note we check if pending above) so it
-  // won't get invoked and we can delete 'poll' here.
-  ev_io_stop(loop, poll->watcher.io.get());
-
-  poll->promise.discard();
-
-  delete poll;
-}
-
-
-namespace io {
-namespace internal {
-
-// Helper/continuation of 'poll' on future discard.
-void _poll(const std::shared_ptr<ev_async>& async)
-{
-  ev_async_send(loop, async.get());
-}
-
-
-Future<short> poll(int_fd fd, short events)
-{
-  Poll* poll = new Poll();
-
-  // Have the watchers data point back to the struct.
-  poll->watcher.async->data = poll;
-  poll->watcher.io->data = poll;
-
-  // Get a copy of the future to avoid any races with the event loop.
-  Future<short> future = poll->promise.future();
-
-  // Initialize and start the async watcher.
-  ev_async_init(poll->watcher.async.get(), discard_poll);
-  ev_async_start(loop, poll->watcher.async.get());
-
-  // Make sure we stop polling if a discard occurs on our future.
-  // Note that it's possible that we'll invoke '_poll' when someone
-  // does a discard even after the polling has already completed, but
-  // in this case while we will interrupt the event loop since the
-  // async watcher has already been stopped we won't cause
-  // 'discard_poll' to get invoked.
-  future.onDiscard(lambda::bind(&_poll, poll->watcher.async));
-
-  // Initialize and start the I/O watcher.
-  ev_io_init(poll->watcher.io.get(), polled, fd, events);
-  ev_io_start(loop, poll->watcher.io.get());
-
-  return future;
-}
-
-} // namespace internal {
-
-
-Future<short> poll(int_fd fd, short events)
-{
-  process::initialize();
-
-  // TODO(benh): Check if the file descriptor is non-blocking?
-
-  return run_in_event_loop<short>(lambda::bind(&internal::poll, fd, events));
-}
-
-} // namespace io {
-} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/libevent.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.cpp b/3rdparty/libprocess/src/libevent.cpp
deleted file mode 100644
index fb595bc..0000000
--- a/3rdparty/libprocess/src/libevent.cpp
+++ /dev/null
@@ -1,211 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License
-
-#ifndef __WINDOWS__
-#include <unistd.h>
-#endif // __WINDOWS__
-
-#include <mutex>
-
-#include <event2/event.h>
-#include <event2/thread.h>
-#include <event2/util.h>
-
-#include <process/logging.hpp>
-#include <process/once.hpp>
-
-#include <stout/synchronized.hpp>
-
-#include "event_loop.hpp"
-#include "libevent.hpp"
-
-namespace process {
-
-event_base* base = nullptr;
-
-
-static std::mutex* functions_mutex = new std::mutex();
-std::queue<lambda::function<void()>>* functions =
-  new std::queue<lambda::function<void()>>();
-
-
-thread_local bool* _in_event_loop_ = nullptr;
-
-
-void async_function(evutil_socket_t socket, short which, void* arg)
-{
-  event* ev = reinterpret_cast<event*>(arg);
-  event_free(ev);
-
-  std::queue<lambda::function<void()>> q;
-
-  synchronized (functions_mutex) {
-    std::swap(q, *functions);
-  }
-
-  while (!q.empty()) {
-    q.front()();
-    q.pop();
-  }
-}
-
-
-void run_in_event_loop(
-    const lambda::function<void()>& f,
-    EventLoopLogicFlow event_loop_logic_flow)
-{
-  if (__in_event_loop__ && event_loop_logic_flow == ALLOW_SHORT_CIRCUIT) {
-    f();
-    return;
-  }
-
-  synchronized (functions_mutex) {
-    functions->push(f);
-
-    // Add an event and activate it to interrupt the event loop.
-    // TODO(jmlvanre): after libevent v 2.1 we can use
-    // event_self_cbarg instead of re-assigning the event. For now we
-    // manually re-assign the event to pass in the pointer to the
-    // event itself as the callback argument.
-    event* ev = evtimer_new(base, async_function, nullptr);
-
-    // 'event_assign' is only valid on non-pending AND non-active
-    // events. This means we have to assign the callback before
-    // calling 'event_active'.
-    if (evtimer_assign(ev, base, async_function, ev) < 0) {
-      LOG(FATAL) << "Failed to assign callback on event";
-    }
-
-    event_active(ev, EV_TIMEOUT, 0);
-  }
-}
-
-
-void EventLoop::run()
-{
-  __in_event_loop__ = true;
-
-  do {
-    int result = event_base_loop(base, EVLOOP_ONCE);
-    if (result < 0) {
-      LOG(FATAL) << "Failed to run event loop";
-    } else if (result > 0) {
-      // All events are handled, continue event loop.
-      continue;
-    } else {
-      CHECK_EQ(0, result);
-      if (event_base_got_break(base)) {
-        break;
-      } else if (event_base_got_exit(base)) {
-        break;
-      }
-    }
-  } while (true);
-
-  __in_event_loop__ = false;
-}
-
-
-void EventLoop::stop()
-{
-  event_base_loopexit(base, nullptr);
-}
-
-
-namespace internal {
-
-struct Delay
-{
-  lambda::function<void()> function;
-  event* timer;
-};
-
-void handle_delay(evutil_socket_t, short, void* arg)
-{
-  Delay* delay = reinterpret_cast<Delay*>(arg);
-  delay->function();
-  event_free(delay->timer);
-  delete delay;
-}
-
-}  // namespace internal {
-
-
-void EventLoop::delay(
-    const Duration& duration,
-    const lambda::function<void()>& function)
-{
-  internal::Delay* delay = new internal::Delay();
-  delay->timer = evtimer_new(base, &internal::handle_delay, delay);
-  if (delay->timer == nullptr) {
-    LOG(FATAL) << "Failed to delay, evtimer_new";
-  }
-
-  delay->function = function;
-
-  timeval t{0, 0};
-  if (duration > Seconds(0)) {
-    t = duration.timeval();
-  }
-
-  evtimer_add(delay->timer, &t);
-}
-
-
-double EventLoop::time()
-{
-  // We explicitly call `evutil_gettimeofday()` for now to avoid any
-  // issues that may be introduced by using the cached value provided
-  // by `event_base_gettimeofday_cached()`. Since a lot of logic in
-  // libprocess depends on time math, we want to log fatal rather than
-  // cause logic errors if the time fails.
-  timeval t;
-  if (evutil_gettimeofday(&t, nullptr) < 0) {
-    LOG(FATAL) << "Failed to get time, evutil_gettimeofday";
-  }
-
-  return Duration(t).secs();
-}
-
-
-void EventLoop::initialize()
-{
-  static Once* initialized = new Once();
-
-  if (initialized->once()) {
-    return;
-  }
-
-  // We need to initialize Libevent differently depending on the
-  // operating system threading support.
-#if defined(EVTHREAD_USE_PTHREADS_IMPLEMENTED)
-  if (evthread_use_pthreads() < 0) {
-    LOG(FATAL) << "Failed to initialize, evthread_use_pthreads";
-  }
-#elif defined(EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED)
-  if (evthread_use_windows_threads() < 0) {
-    LOG(FATAL) << "Failed to initialize, evthread_use_windows_threads";
-  }
-#else
-#error "Libevent must be compiled with either pthread or Windows thread support"
-#endif
-
-  base = event_base_new();
-
-  if (base == nullptr) {
-    LOG(FATAL) << "Failed to initialize, event_base_new";
-  }
-
-  initialized->done();
-}
-
-} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/libevent.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.hpp b/3rdparty/libprocess/src/libevent.hpp
deleted file mode 100644
index 2eb9790..0000000
--- a/3rdparty/libprocess/src/libevent.hpp
+++ /dev/null
@@ -1,48 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License
-
-#ifndef __LIBEVENT_HPP__
-#define __LIBEVENT_HPP__
-
-#include <event2/event.h>
-
-#include <stout/lambda.hpp>
-
-namespace process {
-
-// Event loop.
-extern event_base* base;
-
-
-// Per thread bool pointer. We use a pointer to lazily construct the
-// actual bool.
-extern thread_local bool* _in_event_loop_;
-
-
-#define __in_event_loop__ *(_in_event_loop_ == nullptr ?                \
-  _in_event_loop_ = new bool(false) : _in_event_loop_)
-
-
-enum EventLoopLogicFlow
-{
-  ALLOW_SHORT_CIRCUIT,
-  DISALLOW_SHORT_CIRCUIT
-};
-
-
-void run_in_event_loop(
-    const lambda::function<void()>& f,
-    EventLoopLogicFlow event_loop_logic_flow = ALLOW_SHORT_CIRCUIT);
-
-} // namespace process {
-
-#endif // __LIBEVENT_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/libevent_poll.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_poll.cpp b/3rdparty/libprocess/src/libevent_poll.cpp
deleted file mode 100644
index 038dde2..0000000
--- a/3rdparty/libprocess/src/libevent_poll.cpp
+++ /dev/null
@@ -1,112 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License
-
-#include <event2/event.h>
-
-#include <memory>
-
-#include <process/future.hpp>
-#include <process/io.hpp>
-#include <process/process.hpp> // For process::initialize.
-
-#include "libevent.hpp"
-
-namespace process {
-
-namespace io {
-namespace internal {
-
-struct Poll
-{
-  Promise<short> promise;
-  std::shared_ptr<event> ev;
-};
-
-
-void pollCallback(evutil_socket_t, short what, void* arg)
-{
-  Poll* poll = reinterpret_cast<Poll*>(arg);
-
-  if (poll->promise.future().hasDiscard()) {
-    poll->promise.discard();
-  } else {
-    // Convert libevent specific EV_READ / EV_WRITE to io::* specific
-    // values of these enumerations.
-    short events =
-      ((what & EV_READ) ? io::READ : 0) | ((what & EV_WRITE) ? io::WRITE : 0);
-
-    poll->promise.set(events);
-  }
-
-  // Deleting the `poll` also destructs `ev` and hence triggers `event_free`,
-  // which makes the event non-pending.
-  delete poll;
-}
-
-
-void pollDiscard(const std::weak_ptr<event>& ev, short events)
-{
-  // Discarding inside the event loop prevents `pollCallback()` from being
-  // called twice if the future is discarded.
-  run_in_event_loop([=]() {
-    std::shared_ptr<event> shared = ev.lock();
-    // If `ev` cannot be locked `pollCallback` already ran. If it was locked
-    // but not pending, `pollCallback` is scheduled to be executed.
-    if (static_cast<bool>(shared) &&
-        event_pending(shared.get(), events, nullptr)) {
-      // `event_active` will trigger the `pollCallback` to be executed.
-      event_active(shared.get(), EV_READ, 0);
-    }
-  });
-}
-
-} // namespace internal {
-
-
-Future<short> poll(int_fd fd, short events)
-{
-  process::initialize();
-
-  internal::Poll* poll = new internal::Poll();
-
-  Future<short> future = poll->promise.future();
-
-  // Convert io::READ / io::WRITE to libevent specific values of these
-  // enumerations.
-  short what =
-    ((events & io::READ) ? EV_READ : 0) | ((events & io::WRITE) ? EV_WRITE : 0);
-
-  // Bind `event_free` to the destructor of the `ev` shared pointer
-  // guaranteeing that the event will be freed only once.
-  poll->ev.reset(
-      event_new(base, fd, what, &internal::pollCallback, poll),
-      event_free);
-
-  if (poll->ev == nullptr) {
-    LOG(FATAL) << "Failed to poll, event_new";
-  }
-
-  // Using a `weak_ptr` prevents `ev` to become a dangling pointer if
-  // the returned future is discarded after the event is triggered.
-  // The `weak_ptr` needs to be created before `event_add` in case
-  // the event is ready and the callback is executed before creating
-  // `ev`.
-  std::weak_ptr<event> ev(poll->ev);
-
-  event_add(poll->ev.get(), nullptr);
-
-  return future
-    .onDiscard(lambda::bind(&internal::pollDiscard, ev, what));
-}
-
-} // namespace io {
-} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/libevent_ssl_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.cpp b/3rdparty/libprocess/src/libevent_ssl_socket.cpp
deleted file mode 100644
index 436b389..0000000
--- a/3rdparty/libprocess/src/libevent_ssl_socket.cpp
+++ /dev/null
@@ -1,1249 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License
-
-#include <event2/buffer.h>
-#include <event2/bufferevent_ssl.h>
-#include <event2/event.h>
-#include <event2/listener.h>
-#include <event2/thread.h>
-#include <event2/util.h>
-
-#include <openssl/ssl.h>
-#include <openssl/err.h>
-
-#include <process/queue.hpp>
-#include <process/socket.hpp>
-
-#include <process/ssl/flags.hpp>
-
-#include <stout/net.hpp>
-#include <stout/synchronized.hpp>
-
-#include <stout/os/close.hpp>
-#include <stout/os/dup.hpp>
-#include <stout/os/fcntl.hpp>
-
-#include "libevent.hpp"
-#include "libevent_ssl_socket.hpp"
-#include "openssl.hpp"
-#include "poll_socket.hpp"
-
-// Locking:
-//
-// We use the BEV_OPT_THREADSAFE flag when constructing bufferevents
-// so that all **functions that are called from the event loop that
-// take a bufferevent as a parameter will automatically have the
-// lock acquired**.
-//
-// This means that everywhere that the libevent library does not
-// already lock the bev, we need to manually 'synchronize (bev) {'.
-// To further complicate matters, due to a deadlock scneario in
-// libevent-openssl (v 2.0.21) we currently modify bufferevents using
-// continuations in the event loop, but these functions, while run
-// from within the event loop, are not passed the 'bev' as a parameter
-// and thus MUST use 'synchronized (bev)'. See 'Continuation' comment
-// below for more details on why we need to invoke these continuations
-// from within the event loop.
-
-// Continuations via 'run_in_event_loop(...)':
-//
-// There is a deadlock scenario in libevent-openssl (v 2.0.21) when
-// modifying the bufferevent (bev) from another thread (not the event
-// loop). To avoid this we run all bufferevent manipulation logic in
-// continuations that are executed within the event loop.
-
-// DISALLOW_SHORT_CIRCUIT:
-//
-// We disallow short-circuiting in 'run_in_event_loop' due to a bug in
-// libevent_openssl with deferred callbacks still being called (still
-// in the run queue) even though a bev has been disabled.
-
-using std::queue;
-using std::string;
-
-// Specialization of 'synchronize' to use bufferevent with the
-// 'synchronized' macro.
-static Synchronized<bufferevent> synchronize(bufferevent* bev)
-{
-  return Synchronized<bufferevent>(
-      bev,
-      [](bufferevent* bev) { bufferevent_lock(bev); },
-      [](bufferevent* bev) { bufferevent_unlock(bev); });
-}
-
-namespace process {
-namespace network {
-namespace internal {
-
-Try<std::shared_ptr<SocketImpl>> LibeventSSLSocketImpl::create(int_fd s)
-{
-  openssl::initialize();
-
-  if (!openssl::flags().enabled) {
-    return Error("SSL is disabled");
-  }
-
-  auto socket = std::make_shared<LibeventSSLSocketImpl>(s);
-  // See comment at 'initialize' declaration for why we call this.
-  socket->initialize();
-  return socket;
-}
-
-
-LibeventSSLSocketImpl::~LibeventSSLSocketImpl()
-{
-  // We defer termination and destruction of all event loop specific
-  // calls and structures. This is a safety against the socket being
-  // destroyed before existing event loop calls have completed since
-  // they require valid data structures (the weak pointer).
-  //
-  // Release ownership of the file descriptor so that
-  // we can defer closing the socket.
-  int_fd fd = release();
-  CHECK(fd >= 0);
-
-  evconnlistener* _listener = listener;
-  bufferevent* _bev = bev;
-  std::weak_ptr<LibeventSSLSocketImpl>* _event_loop_handle = event_loop_handle;
-
-  run_in_event_loop(
-      [_listener, _bev, _event_loop_handle, fd]() {
-        // Once this lambda is called, it should not be possible for
-        // more event loop callbacks to be triggered with 'this->bev'.
-        // This is important because we delete event_loop_handle which
-        // is the callback argument for any event loop callbacks.
-        // This lambda is responsible for ensuring 'this->bev' is
-        // disabled, and cleaning up any remaining state associated
-        // with the event loop.
-
-        CHECK(__in_event_loop__);
-
-        if (_listener != nullptr) {
-          evconnlistener_free(_listener);
-        }
-
-        if (_bev != nullptr) {
-          // NOTE: Removes all future callbacks using 'this->bev'.
-          bufferevent_disable(_bev, EV_READ | EV_WRITE);
-
-          SSL* ssl = bufferevent_openssl_get_ssl(_bev);
-          SSL_free(ssl);
-          bufferevent_free(_bev);
-        }
-
-        CHECK_SOME(os::close(fd)) << "Failed to close socket";
-
-        delete _event_loop_handle;
-      },
-      DISALLOW_SHORT_CIRCUIT);
-}
-
-
-void LibeventSSLSocketImpl::initialize()
-{
-  event_loop_handle = new std::weak_ptr<LibeventSSLSocketImpl>(shared(this));
-}
-
-
-Try<Nothing, SocketError> LibeventSSLSocketImpl::shutdown(int how)
-{
-  // Nothing to do if this socket was never initialized.
-  synchronized (lock) {
-    if (bev == nullptr) {
-      // If it was not initialized, then there should also be no
-      // requests.
-      CHECK(connect_request.get() == nullptr);
-      CHECK(recv_request.get() == nullptr);
-      CHECK(send_request.get() == nullptr);
-
-      // We expect this to fail and generate an 'ENOTCONN' failure as
-      // no connection should exist at this point.
-      if (::shutdown(s, how) < 0) {
-        return SocketError();
-      }
-
-      return Nothing();
-    }
-  }
-
-  // Extend the life-time of 'this' through the execution of the
-  // lambda in the event loop. Note: The 'self' needs to be explicitly
-  // captured because we're not using it in the body of the lambda. We
-  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
-  // execute.
-  auto self = shared(this);
-
-  run_in_event_loop(
-      [self]() {
-        CHECK(__in_event_loop__);
-        CHECK(self);
-
-        CHECK_NOTNULL(self->bev);
-
-        synchronized (self->bev) {
-          Owned<RecvRequest> request;
-
-          // Swap the 'recv_request' under the object lock.
-          synchronized (self->lock) {
-            std::swap(request, self->recv_request);
-          }
-
-          // If there is still a pending receive request then close it.
-          if (request.get() != nullptr) {
-            request->promise
-              .set(bufferevent_read(self->bev, request->data, request->size));
-          }
-
-          // Workaround for SSL shutdown, see http://www.wangafu.net/~nickm/libevent-book/Ref6a_advanced_bufferevents.html // NOLINT
-          SSL* ssl = bufferevent_openssl_get_ssl(self->bev);
-          SSL_set_shutdown(ssl, SSL_RECEIVED_SHUTDOWN);
-          SSL_shutdown(ssl);
-        }
-      },
-      DISALLOW_SHORT_CIRCUIT);
-
-  return Nothing();
-}
-
-
-// Only runs in event loop. No locks required. See 'Locking' note at
-// top of file.
-void LibeventSSLSocketImpl::recv_callback(bufferevent* /*bev*/, void* arg)
-{
-  CHECK(__in_event_loop__);
-
-  std::weak_ptr<LibeventSSLSocketImpl>* handle =
-    reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg));
-
-  std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
-
-  // Don't call the 'recv_callback' unless the socket is still valid.
-  if (impl != nullptr) {
-    impl->recv_callback();
-  }
-}
-
-
-// Only runs in event loop. Member function continuation of static
-// 'recv_callback'. This function can be called from two places -
-// a) `LibeventSSLSocketImpl::recv` when a new Socket::recv is called and there
-//    is buffer available to read.
-// b) `LibeventSSLSocketImpl::recv_callback when libevent calls the deferred
-//    read callback.
-void LibeventSSLSocketImpl::recv_callback()
-{
-  CHECK(__in_event_loop__);
-
-  Owned<RecvRequest> request;
-
-  const size_t buffer_length = evbuffer_get_length(bufferevent_get_input(bev));
-
-  // Swap out the request object IFF there is buffer available to read. We check
-  // this here because it is possible that when the libevent deferred callback
-  // was called, a Socket::recv context already read the buffer from the event.
-  // Following sequence is possible:
-  // a. libevent finds a buffer ready to be read.
-  // b. libevent queues buffer event to be dispatched.
-  // c. Socket::recv is called that creates a new request.
-  // d. Socket::recv finds buffer length > 0.
-  // e. Socket::recv reads the buffer.
-  // f. A new request Socket::recv is called which creates a new request.
-  // g. libevent callback is called for the event queued at step b.
-  // h. libevent callback finds the length of the buffer as 0 but the request is
-  //    a non-nullptr due to step f.
-  if (buffer_length > 0 || received_eof) {
-    synchronized (lock) {
-      std::swap(request, recv_request);
-    }
-  }
-
-  if (request.get() != nullptr) {
-    if (buffer_length > 0) {
-      size_t length = bufferevent_read(bev, request->data, request->size);
-      CHECK(length > 0);
-
-      request->promise.set(length);
-    } else {
-      CHECK(received_eof);
-      request->promise.set(0);
-    }
-  }
-}
-
-
-// Only runs in event loop. No locks required. See 'Locking' note at
-// top of file.
-void LibeventSSLSocketImpl::send_callback(bufferevent* /*bev*/, void* arg)
-{
-  CHECK(__in_event_loop__);
-
-  std::weak_ptr<LibeventSSLSocketImpl>* handle =
-    reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg));
-
-  std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
-
-  // Don't call the 'send_callback' unless the socket is still valid.
-  if (impl != nullptr) {
-    impl->send_callback();
-  }
-}
-
-
-// Only runs in event loop. Member function continuation of static
-// 'recv_callback'.
-void LibeventSSLSocketImpl::send_callback()
-{
-  CHECK(__in_event_loop__);
-
-  Owned<SendRequest> request;
-
-  synchronized (lock) {
-    std::swap(request, send_request);
-  }
-
-  if (request.get() != nullptr) {
-    request->promise.set(request->size);
-  }
-}
-
-
-// Only runs in event loop. No locks required. See 'Locking' note at
-// top of file.
-void LibeventSSLSocketImpl::event_callback(
-    bufferevent* /*bev*/,
-    short events,
-    void* arg)
-{
-  CHECK(__in_event_loop__);
-
-  std::weak_ptr<LibeventSSLSocketImpl>* handle =
-    reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(CHECK_NOTNULL(arg));
-
-  std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
-
-  // Don't call the 'event_callback' unless the socket is still valid.
-  if (impl != nullptr) {
-    impl->event_callback(events);
-  }
-}
-
-
-// Only runs in event loop. Member function continuation of static
-// 'recv_callback'.
-void LibeventSSLSocketImpl::event_callback(short events)
-{
-  CHECK(__in_event_loop__);
-
-  // TODO(bmahler): Libevent's invariant is that `events` contains:
-  //
-  //   (1) one of BEV_EVENT_READING or BEV_EVENT_WRITING to
-  //       indicate whether the event was on the read or write path.
-  //
-  //   (2) one of BEV_EVENT_EOF, BEV_EVENT_ERROR, BEV_EVENT_TIMEOUT,
-  //       BEV_EVENT_CONNECTED.
-  //
-  // (1) allows us to handle read and write errors separately.
-  // HOWEVER, for SSL bufferevents in 2.0.x, libevent never seems
-  // to tell us about BEV_EVENT_READING or BEV_EVENT_WRITING,
-  // which forces us to write incorrect logic here by treating all
-  // events as affecting both reads and writes.
-  //
-  // This has been fixed in 2.1.x:
-  //   2.1 "What's New":
-  //     https://github.com/libevent/libevent/blob/release-2.1.8-stable/whatsnew-2.1.txt#L333-L335 // NOLINT
-  //   Commit:
-  //     https://github.com/libevent/libevent/commit/f7eb69ace
-  //
-  // We should require 2.1.x so that we can correctly distinguish
-  // between the read and write errors, and not have two code paths
-  // depending on the libevent version, see MESOS-5999, MESOS-6770.
-
-  Owned<RecvRequest> current_recv_request;
-  Owned<SendRequest> current_send_request;
-  Owned<ConnectRequest> current_connect_request;
-
-  if (events & BEV_EVENT_EOF ||
-      events & BEV_EVENT_CONNECTED ||
-      events & BEV_EVENT_ERROR) {
-    synchronized (lock) {
-      std::swap(current_recv_request, recv_request);
-      std::swap(current_send_request, send_request);
-      std::swap(current_connect_request, connect_request);
-    }
-  }
-
-  // First handle EOF, we also look for `BEV_EVENT_ERROR` with
-  // `EVUTIL_SOCKET_ERROR() == 0` since this occurs as a result
-  // of a "dirty" SSL shutdown (i.e. TCP close before SSL close)
-  // or when this socket has been shut down and further sends
-  // are performed.
-  //
-  // TODO(bmahler): We don't expose "dirty" SSL shutdowns as
-  // recv errors, but perhaps we should?
-  if (events & BEV_EVENT_EOF ||
-     (events & BEV_EVENT_ERROR && EVUTIL_SOCKET_ERROR() == 0)) {
-    received_eof = true;
-
-    if (current_recv_request.get() != nullptr) {
-      // Drain any remaining data from the bufferevent or complete the
-      // promise with 0 to signify EOF. Because we set `received_eof`,
-      // subsequent calls to `recv` will return 0 if there is no data
-      // remaining on the buffer.
-      if (evbuffer_get_length(bufferevent_get_input(bev)) > 0) {
-        size_t length =
-          bufferevent_read(
-              bev,
-              current_recv_request->data,
-              current_recv_request->size);
-        CHECK(length > 0);
-
-        current_recv_request->promise.set(length);
-      } else {
-        current_recv_request->promise.set(0);
-      }
-    }
-
-    if (current_send_request.get() != nullptr) {
-      current_send_request->promise.fail("Failed send: connection closed");
-    }
-
-    if (current_connect_request.get() != nullptr) {
-      SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(bev));
-      SSL_free(ssl);
-      bufferevent_free(CHECK_NOTNULL(bev));
-      bev = nullptr;
-      current_connect_request->promise.fail(
-          "Failed connect: connection closed");
-    }
-  } else if (events & BEV_EVENT_CONNECTED) {
-    // We should not have receiving or sending request while still
-    // connecting.
-    CHECK(current_recv_request.get() == nullptr);
-    CHECK(current_send_request.get() == nullptr);
-    CHECK_NOTNULL(current_connect_request.get());
-
-    // If we're connecting, then we've succeeded. Time to do
-    // post-verification.
-    CHECK_NOTNULL(bev);
-
-    // Do post-validation of connection.
-    SSL* ssl = bufferevent_openssl_get_ssl(bev);
-
-    Try<Nothing> verify = openssl::verify(ssl, peer_hostname, peer_ip);
-    if (verify.isError()) {
-      VLOG(1) << "Failed connect, verification error: " << verify.error();
-      SSL_free(ssl);
-      bufferevent_free(bev);
-      bev = nullptr;
-      current_connect_request->promise.fail(verify.error());
-      return;
-    }
-
-    current_connect_request->promise.set(Nothing());
-  } else if (events & BEV_EVENT_ERROR) {
-    CHECK(EVUTIL_SOCKET_ERROR() != 0);
-    std::ostringstream error_stream;
-    error_stream << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
-
-    // If there is a valid error, fail any requests and log the error.
-    VLOG(1) << "Socket error: " << error_stream.str();
-
-    if (current_recv_request.get() != nullptr) {
-      current_recv_request->promise.fail(
-          "Failed recv, connection error: " +
-          error_stream.str());
-    }
-
-    if (current_send_request.get() != nullptr) {
-      current_send_request->promise.fail(
-          "Failed send, connection error: " +
-          error_stream.str());
-    }
-
-    if (current_connect_request.get() != nullptr) {
-      SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(bev));
-      SSL_free(ssl);
-      bufferevent_free(CHECK_NOTNULL(bev));
-      bev = nullptr;
-      current_connect_request->promise.fail(
-          "Failed connect, connection error: " +
-          error_stream.str());
-    }
-  }
-}
-
-
-LibeventSSLSocketImpl::LibeventSSLSocketImpl(int_fd _s)
-  : SocketImpl(_s),
-    bev(nullptr),
-    listener(nullptr),
-    recv_request(nullptr),
-    send_request(nullptr),
-    connect_request(nullptr),
-    event_loop_handle(nullptr) {}
-
-
-LibeventSSLSocketImpl::LibeventSSLSocketImpl(
-    int_fd _s,
-    bufferevent* _bev,
-    Option<string>&& _peer_hostname)
-  : SocketImpl(_s),
-    bev(_bev),
-    listener(nullptr),
-    recv_request(nullptr),
-    send_request(nullptr),
-    connect_request(nullptr),
-    event_loop_handle(nullptr),
-    peer_hostname(std::move(_peer_hostname)) {}
-
-
-Future<Nothing> LibeventSSLSocketImpl::connect(const Address& address)
-{
-  if (bev != nullptr) {
-    return Failure("Socket is already connected");
-  }
-
-  if (connect_request.get() != nullptr) {
-    return Failure("Socket is already connecting");
-  }
-
-  SSL* ssl = SSL_new(openssl::context());
-  if (ssl == nullptr) {
-    return Failure("Failed to connect: SSL_new");
-  }
-
-  // Construct the bufferevent in the connecting state.
-  // We set 'BEV_OPT_DEFER_CALLBACKS' to avoid calling the
-  // 'event_callback' before 'bufferevent_socket_connect' returns.
-  CHECK(bev == nullptr);
-  bev = bufferevent_openssl_socket_new(
-      base,
-      s,
-      ssl,
-      BUFFEREVENT_SSL_CONNECTING,
-      BEV_OPT_THREADSAFE | BEV_OPT_DEFER_CALLBACKS);
-
-  if (bev == nullptr) {
-    // We need to free 'ssl' here because the bev won't clean it up
-    // for us.
-    SSL_free(ssl);
-    return Failure("Failed to connect: bufferevent_openssl_socket_new");
-  }
-
-  if (address.family() == Address::Family::INET4 ||
-      address.family() == Address::Family::INET6) {
-    // Try and determine the 'peer_hostname' from the address we're
-    // connecting to in order to properly verify the certificate
-    // later.
-    const Try<string> hostname =
-      network::convert<inet::Address>(address)->hostname();
-
-    if (hostname.isError()) {
-      VLOG(2) << "Could not determine hostname of peer: " << hostname.error();
-    } else {
-      VLOG(2) << "Connecting to " << hostname.get();
-      peer_hostname = hostname.get();
-    }
-
-    // Determine the 'peer_ip' from the address we're connecting to in
-    // order to properly verify the certificate later.
-    peer_ip = network::convert<inet::Address>(address)->ip;
-  }
-
-  // Optimistically construct a 'ConnectRequest' and future.
-  Owned<ConnectRequest> request(new ConnectRequest());
-  Future<Nothing> future = request->promise.future();
-
-  // Assign 'connect_request' under lock, fail on error.
-  synchronized (lock) {
-    if (connect_request.get() != nullptr) {
-      SSL_free(ssl);
-      bufferevent_free(bev);
-      bev = nullptr;
-      return Failure("Socket is already connecting");
-    }
-    std::swap(request, connect_request);
-  }
-
-  // Extend the life-time of 'this' through the execution of the
-  // lambda in the event loop. Note: The 'self' needs to be explicitly
-  // captured because we're not using it in the body of the lambda. We
-  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
-  // execute.
-  auto self = shared(this);
-
-  run_in_event_loop(
-      [self, address]() {
-        sockaddr_storage addr = address;
-
-          // Assign the callbacks for the bufferevent. We do this
-          // before the 'bufferevent_socket_connect()' call to avoid
-          // any race on the underlying buffer events becoming ready.
-          bufferevent_setcb(
-              self->bev,
-              &LibeventSSLSocketImpl::recv_callback,
-              &LibeventSSLSocketImpl::send_callback,
-              &LibeventSSLSocketImpl::event_callback,
-              CHECK_NOTNULL(self->event_loop_handle));
-
-          if (bufferevent_socket_connect(
-                  self->bev,
-                  reinterpret_cast<sockaddr*>(&addr),
-                  address.size()) < 0) {
-            SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(self->bev));
-            SSL_free(ssl);
-            bufferevent_free(self->bev);
-            self->bev = nullptr;
-
-            Owned<ConnectRequest> request;
-
-            // Swap out the 'connect_request' so we can destroy it
-            // outside of the lock.
-            synchronized (self->lock) {
-              std::swap(request, self->connect_request);
-            }
-
-            CHECK_NOTNULL(request.get());
-
-            // Fail the promise since we failed to connect.
-            request->promise.fail(
-                "Failed to connect: bufferevent_socket_connect");
-          }
-      },
-      DISALLOW_SHORT_CIRCUIT);
-
-  return future;
-}
-
-
-Future<size_t> LibeventSSLSocketImpl::recv(char* data, size_t size)
-{
-  // Optimistically construct a 'RecvRequest' and future.
-  Owned<RecvRequest> request(new RecvRequest(data, size));
-  std::weak_ptr<LibeventSSLSocketImpl> weak_self(shared(this));
-
-  // If the user of the future decides to 'discard', then we want to
-  // test whether the request was already satisfied.
-  // We capture a 'weak_ptr' to 'this' (as opposed to a 'shared_ptr')
-  // because the socket could be destroyed before this lambda is
-  // executed. If we used a 'shared_ptr' then this lambda could extend
-  // the life-time of 'this' unnecessarily.
-  Future<size_t> future = request->promise.future()
-    .onDiscard([weak_self]() {
-      // Extend the life-time of 'this' through the execution of the
-      // lambda in the event loop. Note: The 'self' needs to be
-      // explicitly captured because we're not using it in the body of
-      // the lambda. We can use a 'shared_ptr' because
-      // run_in_event_loop is guaranteed to execute.
-      std::shared_ptr<LibeventSSLSocketImpl> self(weak_self.lock());
-
-      if (self != nullptr) {
-        run_in_event_loop(
-            [self]() {
-              CHECK(__in_event_loop__);
-              CHECK(self);
-
-              Owned<RecvRequest> request;
-
-              synchronized (self->lock) {
-                std::swap(request, self->recv_request);
-              }
-
-              // Only discard if the request hasn't already been
-              // satisfied.
-              if (request.get() != nullptr) {
-                // Discard the promise outside of the object lock as
-                // the callbacks can be expensive.
-                request->promise.discard();
-              }
-            },
-            DISALLOW_SHORT_CIRCUIT);
-      }
-    });
-
-  // Assign 'recv_request' under lock, fail on error.
-  synchronized (lock) {
-    if (recv_request.get() != nullptr) {
-      return Failure("Socket is already receiving");
-    }
-    std::swap(request, recv_request);
-  }
-
-  // Extend the life-time of 'this' through the execution of the
-  // lambda in the event loop. Note: The 'self' needs to be explicitly
-  // captured because we're not using it in the body of the lambda. We
-  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
-  // execute.
-  auto self = shared(this);
-
-  run_in_event_loop(
-      [self]() {
-        CHECK(__in_event_loop__);
-        CHECK(self);
-
-        bool recv = false;
-
-        // We check to see if 'recv_request' is null. It would be null
-        // if a 'discard' happened before this lambda was executed.
-        synchronized (self->lock) {
-          recv = self->recv_request.get() != nullptr;
-        }
-
-        // Only try to read existing data from the bufferevent if the
-        // request has not already been discarded.
-        if (recv) {
-          synchronized (self->bev) {
-            evbuffer* input = bufferevent_get_input(self->bev);
-            size_t length = evbuffer_get_length(input);
-
-            // If there is already data in the buffer or an EOF has
-            // been received, fulfill the 'recv_request' by calling
-            // 'recv_callback()'. Otherwise do nothing and wait for
-            // the 'recv_callback' to run when we receive data over
-            // the network.
-            if (length > 0 || self->received_eof) {
-              self->recv_callback();
-            }
-          }
-        }
-      },
-      DISALLOW_SHORT_CIRCUIT);
-
-  return future;
-}
-
-
-Future<size_t> LibeventSSLSocketImpl::send(const char* data, size_t size)
-{
-  // Optimistically construct a 'SendRequest' and future.
-  Owned<SendRequest> request(new SendRequest(size));
-  Future<size_t> future = request->promise.future();
-
-  // We don't add an 'onDiscard' continuation to send because we can
-  // not accurately detect how many bytes have been sent. Once we pass
-  // the data to the bufferevent, there is the possibility that parts
-  // of it have been sent. Another reason is that if we send partial
-  // messages (discard only a part of the data), then it is likely
-  // that the receiving end will fail parsing the message.
-
-  // Assign 'send_request' under lock, fail on error.
-  synchronized (lock) {
-    if (send_request.get() != nullptr) {
-      return Failure("Socket is already sending");
-    }
-    std::swap(request, send_request);
-  }
-
-  evbuffer* buffer = CHECK_NOTNULL(evbuffer_new());
-
-  int result = evbuffer_add(buffer, data, size);
-  CHECK_EQ(0, result);
-
-  // Extend the life-time of 'this' through the execution of the
-  // lambda in the event loop. Note: The 'self' needs to be explicitly
-  // captured because we're not using it in the body of the lambda. We
-  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
-  // execute.
-  auto self = shared(this);
-
-  run_in_event_loop(
-      [self, buffer]() {
-        CHECK(__in_event_loop__);
-        CHECK(self);
-
-        // Check if the socket is closed or the write end has
-        // encountered an error in the interim (i.e. we received
-        // a BEV_EVENT_ERROR with BEV_EVENT_WRITING).
-        bool write = false;
-
-        synchronized (self->lock) {
-          if (self->send_request.get() != nullptr) {
-            write = true;
-          }
-        }
-
-        if (write) {
-          int result = bufferevent_write_buffer(self->bev, buffer);
-          CHECK_EQ(0, result);
-        }
-
-        evbuffer_free(buffer);
-      },
-      DISALLOW_SHORT_CIRCUIT);
-
-  return future;
-}
-
-
-Future<size_t> LibeventSSLSocketImpl::sendfile(
-    int_fd fd,
-    off_t offset,
-    size_t size)
-{
-  // Optimistically construct a 'SendRequest' and future.
-  Owned<SendRequest> request(new SendRequest(size));
-  Future<size_t> future = request->promise.future();
-
-  // Assign 'send_request' under lock, fail on error.
-  synchronized (lock) {
-    if (send_request.get() != nullptr) {
-      return Failure("Socket is already sending");
-    }
-    std::swap(request, send_request);
-  }
-
-  // Duplicate the file descriptor because Libevent will take ownership
-  // and control the lifecycle separately.
-  //
-  // TODO(josephw): We can avoid duplicating the file descriptor in
-  // future versions of Libevent. In Libevent versions 2.1.2 and later,
-  // we may use `evbuffer_file_segment_new` and `evbuffer_add_file_segment`
-  // instead of `evbuffer_add_file`.
-  Try<int_fd> dup = os::dup(fd);
-  if (dup.isError()) {
-    return Failure(dup.error());
-  }
-
-  // NOTE: This is *not* an `int_fd` because `libevent` requires a CRT
-  // integer file descriptor, which we allocate and then use
-  // exclusively here.
-#ifdef __WINDOWS__
-  int owned_fd = dup->crt();
-  // The `os::cloexec` and `os::nonblock` functions do nothing on
-  // Windows, and cannot be called because they take `int_fd`.
-#else
-  int owned_fd = dup.get();
-
-  // Set the close-on-exec flag.
-  Try<Nothing> cloexec = os::cloexec(owned_fd);
-  if (cloexec.isError()) {
-    os::close(owned_fd);
-    return Failure(
-        "Failed to set close-on-exec on duplicated file descriptor: " +
-        cloexec.error());
-  }
-
-  // Make the file descriptor non-blocking.
-  Try<Nothing> nonblock = os::nonblock(owned_fd);
-  if (nonblock.isError()) {
-    os::close(owned_fd);
-    return Failure(
-        "Failed to make duplicated file descriptor non-blocking: " +
-        nonblock.error());
-  }
-#endif // __WINDOWS__
-
-  // Extend the life-time of 'this' through the execution of the
-  // lambda in the event loop. Note: The 'self' needs to be explicitly
-  // captured because we're not using it in the body of the lambda. We
-  // can use a 'shared_ptr' because run_in_event_loop is guaranteed to
-  // execute.
-  auto self = shared(this);
-
-  run_in_event_loop(
-      [self, owned_fd, offset, size]() {
-        CHECK(__in_event_loop__);
-        CHECK(self);
-
-        // Check if the socket is closed or the write end has
-        // encountered an error in the interim (i.e. we received
-        // a BEV_EVENT_ERROR with BEV_EVENT_WRITING).
-        bool write = false;
-
-        synchronized (self->lock) {
-          if (self->send_request.get() != nullptr) {
-            write = true;
-          }
-        }
-
-        if (write) {
-          // NOTE: `evbuffer_add_file` will take ownership of the file
-          // descriptor and close it after it has finished reading it.
-          int result = evbuffer_add_file(
-              bufferevent_get_output(self->bev),
-              owned_fd,
-              offset,
-              size);
-          CHECK_EQ(0, result);
-        } else {
-#ifdef __WINDOWS__
-          // NOTE: `os::close()` on Windows is not compatible with CRT
-          // file descriptors, only `HANDLE` and `SOCKET` types.
-          ::_close(owned_fd);
-#else
-          os::close(owned_fd);
-#endif // __WINDOWS__
-        }
-      },
-      DISALLOW_SHORT_CIRCUIT);
-
-  return future;
-}
-
-
-Try<Nothing> LibeventSSLSocketImpl::listen(int backlog)
-{
-  if (listener != nullptr) {
-    return Error("Socket is already listening");
-  }
-
-  CHECK(bev == nullptr);
-
-  // NOTE: Accepted sockets are nonblocking by default in libevent, but
-  // can be set to block via the `LEV_OPT_LEAVE_SOCKETS_BLOCKING`
-  // flag for `evconnlistener_new`.
-  listener = evconnlistener_new(
-      base,
-      [](evconnlistener* listener,
-         evutil_socket_t socket,
-         sockaddr* addr,
-         int addr_length,
-         void* arg) {
-        CHECK(__in_event_loop__);
-
-        std::weak_ptr<LibeventSSLSocketImpl>* handle =
-          reinterpret_cast<std::weak_ptr<LibeventSSLSocketImpl>*>(
-              CHECK_NOTNULL(arg));
-
-        std::shared_ptr<LibeventSSLSocketImpl> impl(handle->lock());
-
-#ifndef __WINDOWS__
-        // NOTE: Passing the flag `LEV_OPT_CLOSE_ON_EXEC` into
-        // `evconnlistener_new` would atomically set `SOCK_CLOEXEC`
-        // on the accepted socket. However, this flag is not supported
-        // in the minimum recommended version of libevent (2.0.22).
-        Try<Nothing> cloexec = os::cloexec(socket);
-        if (cloexec.isError()) {
-          VLOG(2) << "Failed to accept, cloexec: " << cloexec.error();
-
-          // Propagate the error through the listener's `accept_queue`.
-          if (impl != nullptr) {
-            impl->accept_queue.put(
-                Failure("Failed to accept, cloexec: " + cloexec.error()));
-          }
-
-          os::close(socket);
-          return;
-        }
-#endif // __WINDOWS__
-
-        if (impl != nullptr) {
-          Try<net::IP> ip = net::IP::create(*addr);
-          if (ip.isError()) {
-            VLOG(2) << "Could not convert sockaddr to net::IP: " << ip.error();
-          }
-
-          // We pass the 'listener' into the 'AcceptRequest' because
-          // this function could be executed before 'this->listener'
-          // is set.
-          AcceptRequest* request =
-            new AcceptRequest(
-                  // NOTE: The `int_fd` must be explicitly constructed
-                  // to avoid the `intptr_t` being casted to an `int`,
-                  // resulting in a `HANDLE` instead of a `SOCKET` on
-                  // Windows.
-                  int_fd(socket),
-                  listener,
-                  ip.isSome() ? Option<net::IP>(ip.get()) : None());
-
-          impl->accept_callback(request);
-        }
-      },
-      event_loop_handle,
-      LEV_OPT_REUSEABLE,
-      backlog,
-      s);
-
-  if (listener == nullptr) {
-    return Error("Failed to listen on socket");
-  }
-
-  // TODO(jmlvanre): attach an error callback.
-
-  return Nothing();
-}
-
-
-Future<std::shared_ptr<SocketImpl>> LibeventSSLSocketImpl::accept()
-{
-  // Note that due to MESOS-8448, when the caller discards, it's
-  // possible that we pull an accepted socket out of the queue but
-  // drop it when `.then` transitions to discarded rather than
-  // executing the continuation. This is currently acceptable since
-  // callers only discard when they're breaking their accept loop.
-  // However, from an API perspective, we shouldn't be dropping
-  // the socket on the floor.
-  //
-  // We explicitly specify the return type to avoid a type deduction
-  // issue in some versions of clang. See MESOS-2943.
-  return accept_queue.get()
-    .then([](const Future<std::shared_ptr<SocketImpl>>& impl)
-      -> Future<std::shared_ptr<SocketImpl>> {
-      CHECK(!impl.isPending());
-      return impl;
-    });
-}
-
-
-void LibeventSSLSocketImpl::peek_callback(
-    evutil_socket_t fd,
-    short what,
-    void* arg)
-{
-  CHECK(__in_event_loop__);
-
-  CHECK(what & EV_READ);
-  char data[6];
-
-  // Try to peek the first 6 bytes of the message.
-  ssize_t size = ::recv(fd, data, 6, MSG_PEEK);
-
-  // Based on the function 'ssl23_get_client_hello' in openssl, we
-  // test whether to dispatch to the SSL or non-SSL based accept based
-  // on the following rules:
-  //   1. If there are fewer than 3 bytes: non-SSL.
-  //   2. If the 1st bit of the 1st byte is set AND the 3rd byte is
-  //          equal to SSL2_MT_CLIENT_HELLO: SSL.
-  //   3. If the 1st byte is equal to SSL3_RT_HANDSHAKE AND the 2nd
-  //      byte is equal to SSL3_VERSION_MAJOR and the 6th byte is
-  //      equal to SSL3_MT_CLIENT_HELLO: SSL.
-  //   4. Otherwise: non-SSL.
-
-  // For an ascii based protocol to falsely get dispatched to SSL it
-  // needs to:
-  //   1. Start with an invalid ascii character (0x80).
-  //   2. OR have the first 2 characters be a SYN followed by ETX, and
-  //          then the 6th character be SOH.
-  // These conditions clearly do not constitute valid HTTP requests,
-  // and are unlikely to collide with other existing protocols.
-
-  bool ssl = false; // Default to rule 4.
-
-  if (size < 2) { // Rule 1.
-    ssl = false;
-  } else if ((data[0] & 0x80) && data[2] == SSL2_MT_CLIENT_HELLO) { // Rule 2.
-    ssl = true;
-  } else if (data[0] == SSL3_RT_HANDSHAKE &&
-             data[1] == SSL3_VERSION_MAJOR &&
-             data[5] == SSL3_MT_CLIENT_HELLO) { // Rule 3.
-    ssl = true;
-  }
-
-  AcceptRequest* request = reinterpret_cast<AcceptRequest*>(arg);
-
-  // We call 'event_free()' here because it ensures the event is made
-  // non-pending and inactive before it gets deallocated.
-  event_free(request->peek_event);
-  request->peek_event = nullptr;
-
-  if (ssl) {
-    accept_SSL_callback(request);
-  } else {
-    // Downgrade to a non-SSL socket implementation.
-    //
-    // NOTE: The `int_fd` must be explicitly constructed to avoid the
-    // `intptr_t` being casted to an `int`, resulting in a `HANDLE`
-    // instead of a `SOCKET` on Windows.
-    Try<std::shared_ptr<SocketImpl>> impl = PollSocketImpl::create(int_fd(fd));
-    if (impl.isError()) {
-      request->promise.fail(impl.error());
-    } else {
-      request->promise.set(impl.get());
-    }
-
-    delete request;
-  }
-}
-
-
-void LibeventSSLSocketImpl::accept_callback(AcceptRequest* request)
-{
-  CHECK(__in_event_loop__);
-
-  Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue_ = accept_queue;
-
-  // After the socket is accepted, it must complete the SSL
-  // handshake (or be downgraded to a regular socket) before
-  // we put it in the queue of connected sockets.
-  request->promise.future()
-    .onAny([accept_queue_](Future<std::shared_ptr<SocketImpl>> impl) mutable {
-      accept_queue_.put(impl);
-    });
-
-  // If we support downgrading the connection, first wait for this
-  // socket to become readable. We will then MSG_PEEK it to test
-  // whether we want to dispatch as SSL or non-SSL.
-  if (openssl::flags().support_downgrade) {
-    request->peek_event = event_new(
-        base,
-        request->socket,
-        EV_READ,
-        &LibeventSSLSocketImpl::peek_callback,
-        request);
-    event_add(request->peek_event, nullptr);
-  } else {
-    accept_SSL_callback(request);
-  }
-}
-
-
-void LibeventSSLSocketImpl::accept_SSL_callback(AcceptRequest* request)
-{
-  CHECK(__in_event_loop__);
-
-  // Set up SSL object.
-  SSL* ssl = SSL_new(openssl::context());
-  if (ssl == nullptr) {
-    request->promise.fail("Accept failed, SSL_new");
-    delete request;
-    return;
-  }
-
-  // We use 'request->listener' because 'this->listener' may not have
-  // been set by the time this function is executed. See comment in
-  // the lambda for evconnlistener_new in
-  // 'LibeventSSLSocketImpl::listen'.
-  event_base* ev_base = evconnlistener_get_base(request->listener);
-
-  // Construct the bufferevent in the accepting state.
-  bufferevent* bev = bufferevent_openssl_socket_new(
-      ev_base,
-      request->socket,
-      ssl,
-      BUFFEREVENT_SSL_ACCEPTING,
-      BEV_OPT_THREADSAFE);
-
-  if (bev == nullptr) {
-    request->promise.fail("Accept failed: bufferevent_openssl_socket_new");
-    SSL_free(ssl);
-    delete request;
-    return;
-  }
-
-  bufferevent_setcb(
-      bev,
-      nullptr,
-      nullptr,
-      [](bufferevent* bev, short events, void* arg) {
-        // This handles error states or 'BEV_EVENT_CONNECTED' events
-        // and satisfies the promise by constructing a new socket if
-        // the connection was successfuly established.
-        CHECK(__in_event_loop__);
-
-        AcceptRequest* request =
-          reinterpret_cast<AcceptRequest*>(CHECK_NOTNULL(arg));
-
-        if (events & BEV_EVENT_EOF) {
-          request->promise.fail("Failed accept: connection closed");
-        } else if (events & BEV_EVENT_CONNECTED) {
-          // We will receive a 'CONNECTED' state on an accepting socket
-          // once the connection is established. Time to do
-          // post-verification. First, we need to determine the peer
-          // hostname.
-          Option<string> peer_hostname = None();
-
-          if (request->ip.isSome()) {
-            Try<string> hostname = net::getHostname(request->ip.get());
-
-            if (hostname.isError()) {
-              VLOG(2) << "Could not determine hostname of peer: "
-                      << hostname.error();
-            } else {
-              VLOG(2) << "Accepting from " << hostname.get();
-              peer_hostname = hostname.get();
-            }
-          }
-
-          SSL* ssl = bufferevent_openssl_get_ssl(bev);
-          CHECK_NOTNULL(ssl);
-
-          Try<Nothing> verify =
-            openssl::verify(ssl, peer_hostname, request->ip);
-
-          if (verify.isError()) {
-            VLOG(1) << "Failed accept, verification error: " << verify.error();
-            request->promise.fail(verify.error());
-            SSL_free(ssl);
-            bufferevent_free(bev);
-            // TODO(jmlvanre): Clean up for readability. Consider RAII
-            // or constructing the impl earlier.
-            CHECK(request->socket >= 0);
-            Try<Nothing> close = os::close(request->socket);
-            if (close.isError()) {
-              LOG(FATAL)
-                << "Failed to close socket " << stringify(request->socket)
-                << ": " << close.error();
-            }
-            delete request;
-            return;
-          }
-
-          auto impl = std::shared_ptr<LibeventSSLSocketImpl>(
-              new LibeventSSLSocketImpl(
-                  request->socket,
-                  bev,
-                  std::move(peer_hostname)));
-
-          // See comment at 'initialize' declaration for why we call
-          // this.
-          impl->initialize();
-
-          // We have to wait till after 'initialize()' is invoked for
-          // event_loop_handle to be valid as a callback argument for
-          // the callbacks.
-          bufferevent_setcb(
-              CHECK_NOTNULL(impl->bev),
-              &LibeventSSLSocketImpl::recv_callback,
-              &LibeventSSLSocketImpl::send_callback,
-              &LibeventSSLSocketImpl::event_callback,
-              CHECK_NOTNULL(impl->event_loop_handle));
-
-          request->promise.set(std::dynamic_pointer_cast<SocketImpl>(impl));
-        } else if (events & BEV_EVENT_ERROR) {
-          std::ostringstream stream;
-          if (EVUTIL_SOCKET_ERROR() != 0) {
-            stream << evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR());
-          } else {
-            char buffer[1024] = {};
-            unsigned long error = bufferevent_get_openssl_error(bev);
-            ERR_error_string_n(error, buffer, sizeof(buffer));
-            stream << buffer;
-          }
-
-          // Fail the accept request and log the error.
-          VLOG(1) << "Socket error: " << stream.str();
-
-          SSL* ssl = bufferevent_openssl_get_ssl(CHECK_NOTNULL(bev));
-          SSL_free(ssl);
-          bufferevent_free(bev);
-
-          // TODO(jmlvanre): Clean up for readability. Consider RAII
-          // or constructing the impl earlier.
-          CHECK(request->socket >= 0);
-          Try<Nothing> close = os::close(request->socket);
-          if (close.isError()) {
-            LOG(FATAL)
-              << "Failed to close socket " << stringify(request->socket)
-              << ": " << close.error();
-          }
-          request->promise.fail(
-              "Failed accept: connection error: " + stream.str());
-        }
-
-        delete request;
-      },
-      request);
-}
-
-} // namespace internal {
-} // namespace network {
-} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/78244b53/3rdparty/libprocess/src/libevent_ssl_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_ssl_socket.hpp b/3rdparty/libprocess/src/libevent_ssl_socket.hpp
deleted file mode 100644
index 6ef5a86..0000000
--- a/3rdparty/libprocess/src/libevent_ssl_socket.hpp
+++ /dev/null
@@ -1,198 +0,0 @@
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License
-
-#ifndef __LIBEVENT_SSL_SOCKET_HPP__
-#define __LIBEVENT_SSL_SOCKET_HPP__
-
-#include <event2/buffer.h>
-#include <event2/bufferevent_ssl.h>
-#include <event2/event.h>
-#include <event2/listener.h>
-#include <event2/util.h>
-
-#include <atomic>
-#include <memory>
-
-#include <process/queue.hpp>
-#include <process/socket.hpp>
-
-namespace process {
-namespace network {
-namespace internal {
-
-class LibeventSSLSocketImpl : public SocketImpl
-{
-public:
-  // See 'Socket::create()'.
-  static Try<std::shared_ptr<SocketImpl>> create(int_fd s);
-
-  LibeventSSLSocketImpl(int_fd _s);
-
-  ~LibeventSSLSocketImpl() override;
-
-  // Implement 'SocketImpl' interface.
-  Future<Nothing> connect(const Address& address) override;
-  Future<size_t> recv(char* data, size_t size) override;
-  // Send does not currently support discard. See implementation.
-  Future<size_t> send(const char* data, size_t size) override;
-  Future<size_t> sendfile(int_fd fd, off_t offset, size_t size) override;
-  Try<Nothing> listen(int backlog) override;
-  Future<std::shared_ptr<SocketImpl>> accept() override;
-  SocketImpl::Kind kind() const override { return SocketImpl::Kind::SSL; }
-
-  // Shuts down the socket.
-  //
-  // NOTE: Although this method accepts an integer which specifies the
-  // shutdown mode, this parameter is ignored because SSL connections
-  // do not have a concept of read/write-only shutdown. If either end
-  // of the socket is closed, then the futures of any outstanding read
-  // requests will be completed (possibly as failures).
-  Try<Nothing, SocketError> shutdown(int how) override;
-
-  // We need a post-initializer because 'shared_from_this()' is not
-  // valid until the constructor has finished.
-  void initialize();
-
-private:
-  // A set of helper functions that transitions an accepted socket to
-  // an SSL connected socket. With the libevent-openssl library, once
-  // we return from the 'accept_callback()' which is scheduled by
-  // 'listen' then we still need to wait for the 'BEV_EVENT_CONNECTED'
-  // state before we know the SSL connection has been established.
-  struct AcceptRequest
-  {
-    AcceptRequest(
-        int_fd _socket,
-        evconnlistener* _listener,
-        const Option<net::IP>& _ip)
-      : peek_event(nullptr),
-        listener(_listener),
-        socket(_socket),
-        ip(_ip) {}
-    event* peek_event;
-    Promise<std::shared_ptr<SocketImpl>> promise;
-    evconnlistener* listener;
-    int_fd socket;
-    Option<net::IP> ip;
-  };
-
-  struct RecvRequest
-  {
-    RecvRequest(char* _data, size_t _size)
-      : data(_data), size(_size) {}
-    Promise<size_t> promise;
-    char* data;
-    size_t size;
-  };
-
-  struct SendRequest
-  {
-    SendRequest(size_t _size)
-      : size(_size) {}
-    Promise<size_t> promise;
-    size_t size;
-  };
-
-  struct ConnectRequest
-  {
-    Promise<Nothing> promise;
-  };
-
-  // This is a private constructor used by the accept helper
-  // functions.
-  LibeventSSLSocketImpl(
-      int_fd _s,
-      bufferevent* bev,
-      Option<std::string>&& peer_hostname);
-
-  // This is called when the equivalent of 'accept' returns. The role
-  // of this function is to set up the SSL object and bev. If we
-  // support both SSL and non-SSL traffic simultaneously then we first
-  // wait for data to be ready and test the hello handshake to
-  // disambiguate between the kinds of traffic.
-  void accept_callback(AcceptRequest* request);
-
-  // This is the continuation of 'accept_callback' that handles an SSL
-  // connection.
-  static void accept_SSL_callback(AcceptRequest* request);
-
-  // This function peeks at the data on an accepted socket to see if
-  // there is an SSL handshake or not. It then dispatches to the
-  // SSL handling function or creates a non-SSL socket.
-  static void peek_callback(evutil_socket_t fd, short what, void* arg);
-
-  // The following are function pairs of static functions to member
-  // functions. The static functions test and hold the weak pointer to
-  // the socket before calling the member functions. This protects
-  // against the socket being destroyed before the event loop calls
-  // the callbacks.
-  static void recv_callback(bufferevent* bev, void* arg);
-  void recv_callback();
-
-  static void send_callback(bufferevent* bev, void* arg);
-  void send_callback();
-
-  static void event_callback(bufferevent* bev, short events, void* arg);
-  void event_callback(short events);
-
-  bufferevent* bev;
-
-  evconnlistener* listener;
-
-  // Protects the following instance variables.
-  std::atomic_flag lock = ATOMIC_FLAG_INIT;
-  Owned<RecvRequest> recv_request;
-  Owned<SendRequest> send_request;
-  Owned<ConnectRequest> connect_request;
-
-  // Indicates whether or not an EOF has been received on this socket.
-  // Our accesses to this member are not synchronized because they all
-  // occur within the event loop, which runs on a single thread.
-  bool received_eof = false;
-
-  // This is a weak pointer to 'this', i.e., ourselves, this class
-  // instance. We need this for our event loop callbacks because it's
-  // possible that we'll actually want to cleanup this socket impl
-  // before the event loop callback gets executed ... and we'll check
-  // in each event loop callback whether or not this weak_ptr is valid
-  // by attempting to upgrade it to shared_ptr. It is the
-  // responsibility of the event loop through the deferred lambda in
-  // the destructor to clean up this pointer.
-  // 1) It is a 'weak_ptr' as opposed to a 'shared_ptr' because we
-  // want to test whether the object is still around from within the
-  // event loop. If it was a 'shared_ptr' then we would be
-  // contributing to the lifetime of the object and would no longer be
-  // able to test the lifetime correctly.
-  // 2) This is a pointer to a 'weak_ptr' so that we can pass this
-  // through to the event loop through the C-interface. We need access
-  // to the 'weak_ptr' from outside the object (in the event loop) to
-  // test if the object is still alive. By maintaining this 'weak_ptr'
-  // on the heap we can be sure it is safe to access from the
-  // event loop until it is destroyed.
-  std::weak_ptr<LibeventSSLSocketImpl>* event_loop_handle;
-
-  // This queue stores accepted sockets that are considered connected
-  // (either the SSL handshake has completed or the socket has been
-  // downgraded). The 'accept()' call returns sockets from this queue.
-  // We wrap the socket in a 'Future' so that we can pass failures or
-  // discards through.
-  Queue<Future<std::shared_ptr<SocketImpl>>> accept_queue;
-
-  Option<std::string> peer_hostname;
-  Option<net::IP> peer_ip;
-};
-
-} // namespace internal {
-} // namespace network {
-} // namespace process {
-
-#endif // __LIBEVENT_SSL_SOCKET_HPP__