You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/25 22:05:52 UTC

[01/12] mesos git commit: Add multiple include guard to libev.

Repository: mesos
Updated Branches:
  refs/heads/master c51312665 -> 82fdbc7fe


Add multiple include guard to libev.

Review: https://reviews.apache.org/r/28320


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3347a2b7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3347a2b7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3347a2b7

Branch: refs/heads/master
Commit: 3347a2b794bfaf2b02981bdd9e9eee017134201e
Parents: 252d3c7
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:03:12 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Dec 20 17:59:55 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/libev.hpp | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3347a2b7/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
index a43e7d7..e4a403d 100644
--- a/3rdparty/libprocess/src/libev.hpp
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -1,3 +1,6 @@
+#ifndef __LIBEV_HPP__
+#define __LIBEV_HPP__
+
 #include <ev.h>
 
 #include <queue>
@@ -80,3 +83,5 @@ Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
 }
 
 } // namespace process {
+
+#endif // __LIBEV_HPP__


[10/12] mesos git commit: Fixed compilation issue on OS X with libev.

Posted by be...@apache.org.
Fixed compilation issue on OS X with libev.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/82fdbc7f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/82fdbc7f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/82fdbc7f

Branch: refs/heads/master
Commit: 82fdbc7fe6e4eaeeceeb05124e17a9079f4f1699
Parents: 2d46e85
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Dec 25 11:53:26 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 5 -----
 1 file changed, 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/82fdbc7f/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 25981ca..67b6b3b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -81,11 +81,6 @@
 #include "encoder.hpp"
 #include "event_loop.hpp"
 #include "gate.hpp"
-#ifndef USE_LIBEVENT
-#include "libev.hpp"
-#else
-#include "libevent.hpp"
-#endif // USE_LIBEVENT
 #include "process_reference.hpp"
 #include "synchronized.hpp"
 


[04/12] mesos git commit: Initialized EventLoop and Clock earlier.

Posted by be...@apache.org.
Initialized EventLoop and Clock earlier.

Review: https://reviews.apache.org/r/29225


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37be6031
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37be6031
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37be6031

Branch: refs/heads/master
Commit: 37be603106f601c6e656955ef8c903995379da9e
Parents: dd37344
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:02:27 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Dec 20 17:59:55 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/process.cpp | 48 ++++++++++++++++----------------
 1 file changed, 24 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/37be6031/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index d3dac4c..2aeb815 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -798,6 +798,30 @@ void initialize(const string& delegate)
     }
   }
 
+  // Initialize the event loop.
+  EventLoop::initialize();
+  Clock::initialize(lambda::bind(&timedout, lambda::_1));
+
+//   ev_child_init(&child_watcher, child_exited, pid, 0);
+//   ev_child_start(loop, &cw);
+
+//   /* Install signal handler. */
+//   struct sigaction sa;
+
+//   sa.sa_handler = ev_sighandler;
+//   sigfillset (&sa.sa_mask);
+//   sa.sa_flags = SA_RESTART; /* if restarting works we save one iteration */
+//   sigaction (w->signum, &sa, 0);
+
+//   sigemptyset (&sa.sa_mask);
+//   sigaddset (&sa.sa_mask, w->signum);
+//   sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
+
+  pthread_t thread; // For now, not saving handles on our threads.
+  if (pthread_create(&thread, NULL, &EventLoop::run, NULL) != 0) {
+    LOG(FATAL) << "Failed to initialize, pthread_create";
+  }
+
   __node__.ip = 0;
   __node__.port = 0;
 
@@ -866,30 +890,6 @@ void initialize(const string& delegate)
     PLOG(FATAL) << "Failed to initialize: " << listen.error();
   }
 
-  // Initialize the event loop.
-  EventLoop::initialize();
-  Clock::initialize(lambda::bind(&timedout, lambda::_1));
-
-//   ev_child_init(&child_watcher, child_exited, pid, 0);
-//   ev_child_start(loop, &cw);
-
-//   /* Install signal handler. */
-//   struct sigaction sa;
-
-//   sa.sa_handler = ev_sighandler;
-//   sigfillset (&sa.sa_mask);
-//   sa.sa_flags = SA_RESTART; /* if restarting works we save one iteration */
-//   sigaction (w->signum, &sa, 0);
-
-//   sigemptyset (&sa.sa_mask);
-//   sigaddset (&sa.sa_mask, w->signum);
-//   sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
-
-  pthread_t thread; // For now, not saving handles on our threads.
-  if (pthread_create(&thread, NULL, &EventLoop::run, NULL) != 0) {
-    LOG(FATAL) << "Failed to initialize, pthread_create";
-  }
-
   // Need to set initialzing here so that we can actually invoke
   // 'spawn' below for the garbage collector.
   initializing = false;


[07/12] mesos git commit: Introduce libevent poll implementation.

Posted by be...@apache.org.
Introduce libevent poll implementation.

Review: https://reviews.apache.org/r/28323


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f4a41805
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f4a41805
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f4a41805

Branch: refs/heads/master
Commit: f4a41805774afb658cfd30c1b280599448e00b1b
Parents: a027dd1
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:43:54 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:12 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am           |  3 +-
 3rdparty/libprocess/src/libevent_poll.cpp | 74 ++++++++++++++++++++++++++
 2 files changed, 76 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f4a41805/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index b95c0a9..09fce46 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -60,7 +60,8 @@ libprocess_la_CPPFLAGS =		\
 if ENABLE_LIBEVENT
 libprocess_la_SOURCES +=	\
     src/libevent.hpp		\
-    src/libevent.cpp
+    src/libevent.cpp		\
+    src/libevent_poll.cpp
 else
   libprocess_la_SOURCES +=	\
     src/libev.hpp		\

http://git-wip-us.apache.org/repos/asf/mesos/blob/f4a41805/3rdparty/libprocess/src/libevent_poll.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_poll.cpp b/3rdparty/libprocess/src/libevent_poll.cpp
new file mode 100644
index 0000000..d0b8946
--- /dev/null
+++ b/3rdparty/libprocess/src/libevent_poll.cpp
@@ -0,0 +1,74 @@
+#include <event2/event.h>
+
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/process.hpp> // For process::initialize.
+
+#include "libevent.hpp"
+
+namespace process {
+
+namespace io {
+namespace internal {
+
+struct Poll
+{
+  Promise<short> promise;
+  event* ev;
+};
+
+
+void pollCallback(evutil_socket_t, short what, void* arg)
+{
+  Poll* poll = reinterpret_cast<Poll*>(arg);
+
+  if (poll->promise.future().hasDiscard()) {
+    poll->promise.discard();
+  } else {
+    // Convert libevent specific EV_READ / EV_WRITE to io::* specific
+    // values of these enumerations.
+    short events =
+      ((what & EV_READ) ? io::READ : 0) | ((what & EV_WRITE) ? io::WRITE : 0);
+
+    poll->promise.set(events);
+  }
+
+  event_free(poll->ev);
+  delete poll;
+}
+
+
+void pollDiscard(event* ev)
+{
+  event_active(ev, EV_READ, 0);
+}
+
+} // namespace internal {
+
+
+Future<short> poll(int fd, short events)
+{
+  process::initialize();
+
+  internal::Poll* poll = new internal::Poll();
+
+  Future<short> future = poll->promise.future();
+
+  // Convert io::READ / io::WRITE to libevent specific values of these
+  // enumerations.
+  short what =
+    ((events & io::READ) ? EV_READ : 0) | ((events & io::WRITE) ? EV_WRITE : 0);
+
+  poll->ev = event_new(base, fd, what, &internal::pollCallback, poll);
+  if (poll->ev == NULL) {
+    LOG(FATAL) << "Failed to poll, event_new";
+  }
+
+  event_add(poll->ev, NULL);
+
+  return future
+    .onDiscard(lambda::bind(&internal::pollDiscard, poll->ev));
+}
+
+} // namespace io {
+} // namespace process {


[08/12] mesos git commit: Introduce libevent header and implementation.

Posted by be...@apache.org.
Introduce libevent header and implementation.

Review: https://reviews.apache.org/r/28321


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0e29981b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0e29981b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0e29981b

Branch: refs/heads/master
Commit: 0e29981b8bac58886f515485403f3b0c585e18e6
Parents: 3347a2b
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:03:28 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:12 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am      | 18 ++++++++-----
 3rdparty/libprocess/configure.ac     | 15 +++++++++++
 3rdparty/libprocess/src/libevent.cpp | 44 +++++++++++++++++++++++++++++++
 3rdparty/libprocess/src/libevent.hpp | 11 ++++++++
 3rdparty/libprocess/src/process.cpp  |  2 +-
 5 files changed, 83 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0e29981b/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 5c92ee5..b95c0a9 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -58,7 +58,9 @@ libprocess_la_CPPFLAGS =		\
   $(AM_CPPFLAGS)
 
 if ENABLE_LIBEVENT
-# Place for libevent implementations.
+libprocess_la_SOURCES +=	\
+    src/libevent.hpp		\
+    src/libevent.cpp
 else
   libprocess_la_SOURCES +=	\
     src/libev.hpp		\
@@ -82,16 +84,20 @@ else
   HTTP_PARSER_LIB = -lhttp_parser
 endif
 
+if ENABLE_LIBEVENT
+  EVENT_LIB = -levent -levent_pthreads
+else
 if WITH_BUNDLED_LIBEV
-  LIBEV_LIB = $(LIBEV)/libev.la
+  EVENT_LIB = $(LIBEV)/libev.la
 else
-  LIBEV_LIB = -lev
+  EVENT_LIB = -lev
+endif
 endif
 
 libprocess_la_LIBADD =			\
   $(LIBGLOG)				\
   $(HTTP_PARSER_LIB)			\
-  $(LIBEV_LIB)
+  $(EVENT_LIB)
 
 if HAS_GPERFTOOLS
 libprocess_la_CPPFLAGS += -I$(GPERFTOOLS)/src
@@ -132,7 +138,7 @@ tests_LDADD =				\
   libprocess.la				\
   $(LIBGLOG)				\
   $(HTTP_PARSER_LIB)			\
-  $(LIBEV_LIB)
+  $(EVENT_LIB)
 
 benchmarks_SOURCES =			\
   src/tests/benchmarks.cpp
@@ -148,7 +154,7 @@ benchmarks_LDADD =			\
   libprocess.la				\
   $(LIBGLOG)				\
   $(HTTP_PARSER_LIB)			\
-  $(LIBEV_LIB)
+  $(EVENT_LIB)
 
 # We use a check-local target for now to avoid the parallel test
 # runner that ships with newer versions of autotools.

http://git-wip-us.apache.org/repos/asf/mesos/blob/0e29981b/3rdparty/libprocess/configure.ac
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/configure.ac b/3rdparty/libprocess/configure.ac
index 2f01a3b..19bc601 100644
--- a/3rdparty/libprocess/configure.ac
+++ b/3rdparty/libprocess/configure.ac
@@ -589,6 +589,21 @@ libevent is required for mesos to build.
   if test "x$found_libevent" = "xyes"; then
     AC_DEFINE([USE_LIBEVENT], [1])
   fi
+  AC_CHECK_HEADERS([event2/thread.h],
+                   [AC_CHECK_LIB([event_pthreads],
+                                 [evthread_use_pthreads],
+                                 [found_libevent_thread=yes],
+                                 [AC_MSG_ERROR([cannot find libevent_pthreads
+-------------------------------------------------------------------
+libevent_pthreads is required for mesos to build.
+-------------------------------------------------------------------
+                          ])],
+                                 [-levent])],
+                                 [AC_MSG_ERROR([cannot find libevent_pthreads headers
+-------------------------------------------------------------------
+libevent_pthreads is required for mesos to build.
+-------------------------------------------------------------------
+  ])])
 fi
 
 AS_IF([test "x${ac_cv_env_CFLAGS_set}" = "x"],

http://git-wip-us.apache.org/repos/asf/mesos/blob/0e29981b/3rdparty/libprocess/src/libevent.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.cpp b/3rdparty/libprocess/src/libevent.cpp
new file mode 100644
index 0000000..524d811
--- /dev/null
+++ b/3rdparty/libprocess/src/libevent.cpp
@@ -0,0 +1,44 @@
+#include <unistd.h>
+
+#include <event2/event.h>
+#include <event2/thread.h>
+
+#include <process/logging.hpp>
+
+#include "event_loop.hpp"
+#include "libevent.hpp"
+
+namespace process {
+
+struct event_base* base = NULL;
+
+void* EventLoop::run(void*)
+{
+  int result = event_base_loop(base, 0);
+  if (result < 0) {
+    LOG(FATAL) << "Failed to run event loop";
+  } else if (result == 1) {
+    VLOG(1) << "Finished running event loop due to lack of events";
+  }
+
+  return NULL;
+}
+
+
+void EventLoop::initialize()
+{
+  if (evthread_use_pthreads() < 0) {
+    LOG(FATAL) << "Failed to initialize, evthread_use_pthreads";
+  }
+
+  // This enables debugging of libevent calls. We can remove this
+  // when the implementation settles and after we gain confidence.
+  event_enable_debug_mode();
+
+  base = event_base_new();
+  if (base == NULL) {
+    LOG(FATAL) << "Failed to initialize, event_base_new";
+  }
+}
+
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/0e29981b/3rdparty/libprocess/src/libevent.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.hpp b/3rdparty/libprocess/src/libevent.hpp
new file mode 100644
index 0000000..f6cc721
--- /dev/null
+++ b/3rdparty/libprocess/src/libevent.hpp
@@ -0,0 +1,11 @@
+#ifndef __LIBEVENT_HPP__
+#define __LIBEVENT_HPP__
+
+namespace process {
+
+// Event loop.
+extern struct event_base* base;
+
+} // namespace process {
+
+#endif // __LIBEVENT_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/0e29981b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index fabcca4..441ce48 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -84,7 +84,7 @@
 #ifndef USE_LIBEVENT
 #include "libev.hpp"
 #else
-// Place for libevent header.
+#include "libevent.hpp"
 #endif // USE_LIBEVENT
 #include "process_reference.hpp"
 #include "synchronized.hpp"


[09/12] mesos git commit: Add a factory pattern for constructing a Socket.

Posted by be...@apache.org.
Add a factory pattern for constructing a Socket.

Add a factory pattern to allow construction of different
implementations of the socket interface. Remove default constructor
from Socket.

Review: https://reviews.apache.org/r/28670


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b3bb74c2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b3bb74c2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b3bb74c2

Branch: refs/heads/master
Commit: b3bb74c2359c6cf3d73bcb0e57a4732af804a15b
Parents: 62ad3f2
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 17:26:13 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                 |  1 +
 3rdparty/libprocess/include/process/socket.hpp  | 76 +++++-----------
 3rdparty/libprocess/src/poll_socket.cpp         |  6 +-
 3rdparty/libprocess/src/process.cpp             | 92 +++++++++++++-------
 3rdparty/libprocess/src/socket.cpp              | 56 ++++++++++++
 3rdparty/libprocess/src/tests/decoder_tests.cpp | 12 ++-
 6 files changed, 152 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index f401232..817355e 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -46,6 +46,7 @@ libprocess_la_SOURCES =		\
   src/process.cpp		\
   src/process_reference.hpp	\
   src/reap.cpp			\
+  src/socket.cpp		\
   src/subprocess.cpp		\
   src/synchronized.hpp		\
   src/timeseries.cpp

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 7e1e3f2..436761b 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -109,6 +109,20 @@ inline Try<Node> getsockname(int s, sa_family_t family)
 class Socket
 {
 public:
+  // Available kinds of implementations.
+  enum Kind {
+    POLL
+    // TODO(jmlvanre): Add libevent socket.
+  };
+
+  // Returns an instance of a Socket using the specified kind of
+  // implementation and potentially wrapping the specified file
+  // descriptor.
+  static Try<Socket> create(Kind kind = DEFAULT_KIND(), int s = -1);
+
+  // Returns the default kind of implementation of Socket.
+  static const Kind& DEFAULT_KIND();
+
   // Each socket is a reference counted, shared by default, concurrent
   // object. However, since we want to support multiple
   // implementations we use the Pimpl pattern (often called the
@@ -117,24 +131,21 @@ public:
   class Impl : public std::enable_shared_from_this<Impl>
   {
   public:
-    Impl() : s(-1) {}
-
-    explicit Impl(int _s) : s(_s) {}
+    explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
 
     ~Impl()
     {
-      if (s >= 0) {
-        Try<Nothing> close = os::close(s);
-        if (close.isError()) {
-          ABORT("Failed to close socket " +
-                stringify(s) + ": " + close.error());
-        }
+      CHECK(s >= 0);
+      Try<Nothing> close = os::close(s);
+      if (close.isError()) {
+        ABORT("Failed to close socket " +
+              stringify(s) + ": " + close.error());
       }
     }
 
     int get() const
     {
-      return s >= 0 ? s : create().get();
+      return s;
     }
 
     Future<Nothing> connect(const Node& node);
@@ -152,52 +163,9 @@ public:
     Future<Socket> accept();
 
   private:
-    const Impl& create() const
-    {
-      CHECK(s < 0);
-
-      // Supported in Linux >= 2.6.27.
-#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
-      Try<int> fd =
-        network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
-
-      if (fd.isError()) {
-        ABORT("Failed to create socket: " + fd.error());
-      }
-#else
-      Try<int> fd = network::socket(AF_INET, SOCK_STREAM, 0);
-      if (fd.isError()) {
-        ABORT("Failed to create socket: " + fd.error());
-      }
-
-      Try<Nothing> nonblock = os::nonblock(fd.get());
-      if (nonblock.isError()) {
-        ABORT("Failed to create socket, nonblock: " + nonblock.error());
-      }
-
-      Try<Nothing> cloexec = os::cloexec(fd.get());
-      if (cloexec.isError()) {
-        ABORT("Failed to create socket, cloexec: " + cloexec.error());
-      }
-#endif
-
-      s = fd.get();
-      return *this;
-    }
-
-    // Mutable so that the socket can be lazily created.
-    //
-    // TODO(benh): Create a factory for sockets and don't lazily
-    // create but instead return a Try<Socket> from the factory in
-    // order to eliminate the need for a mutable member or the call to
-    // ABORT above.
-    mutable int s;
+    int s;
   };
 
-  Socket() : impl(std::make_shared<Impl>()) {}
-
-  explicit Socket(int s) : impl(std::make_shared<Impl>(s)) {}
-
   bool operator == (const Socket& that) const
   {
     return impl == that.impl;

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 5202750..09cd5a2 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -200,7 +200,11 @@ Future<Socket> accept(int fd)
       "Failed to turn off the Nagle algorithm: " + stringify(error));
   }
 
-  return Socket(s);
+  Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
+  if (socket.isError()) {
+    return Failure("Failed to accept, create socket: " + socket.error());
+  }
+  return socket.get();
 }
 
 } // namespace internal {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 72c0ad4..25981ca 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -306,7 +306,7 @@ private:
   } links;
 
   // Collection of all actice sockets.
-  map<int, Socket> sockets;
+  map<int, Socket*> sockets;
 
   // Collection of sockets that should be disposed when they are
   // finished being used (e.g., when there is no more data to send on
@@ -452,7 +452,7 @@ static uint32_t __id__ = 0;
 static const int LISTEN_BACKLOG = 500000;
 
 // Local server socket.
-static Socket __s__;
+static Socket* __s__ = NULL;
 
 // Local node.
 static Node __node__;
@@ -721,7 +721,7 @@ void on_accept(const Future<Socket>& socket)
           decoder));
   }
 
-  __s__.accept()
+  __s__->accept()
     .onAny(lambda::bind(&on_accept, lambda::_1));
 }
 
@@ -853,14 +853,19 @@ void initialize(const string& delegate)
   }
 
   // Create a "server" socket for communicating with other nodes.
+  Try<Socket> create = Socket::create();
+  if (create.isError()) {
+    PLOG(FATAL) << "Failed to construct server socket:" << create.error();
+  }
+  __s__ = new Socket(create.get());
 
   // Allow address reuse.
   int on = 1;
-  if (setsockopt(__s__, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
+  if (setsockopt(__s__->get(), SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
     PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
   }
 
-  Try<Node> bind = __s__.bind(__node__);
+  Try<Node> bind = __s__->bind(__node__);
   if (bind.isError()) {
     PLOG(FATAL) << "Failed to initialize: " << bind.error();
   }
@@ -889,7 +894,7 @@ void initialize(const string& delegate)
     __node__.ip = ip.get();
   }
 
-  Try<Nothing> listen = __s__.listen(LISTEN_BACKLOG);
+  Try<Nothing> listen = __s__->listen(LISTEN_BACKLOG);
   if (listen.isError()) {
     PLOG(FATAL) << "Failed to initialize: " << listen.error();
   }
@@ -898,7 +903,7 @@ void initialize(const string& delegate)
   // 'spawn' below for the garbage collector.
   initializing = false;
 
-  __s__.accept()
+  __s__->accept()
     .onAny(lambda::bind(&internal::on_accept, lambda::_1));
 
   // TODO(benh): Make sure creating the garbage collector, logging
@@ -1232,7 +1237,7 @@ SocketManager::~SocketManager() {}
 void SocketManager::accepted(const Socket& socket)
 {
   synchronized (this) {
-    sockets[socket] = socket;
+    sockets[socket] = new Socket(socket);
   }
 }
 
@@ -1324,22 +1329,22 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
 
   CHECK(process != NULL);
 
-  Socket socket;
+  Option<Socket> socket = None();
   bool connect = false;
 
   synchronized (this) {
-    links.linkers[to].insert(process);
-    links.linkees[process].insert(to);
-    if (to.node != __node__) {
-      links.remotes[to.node].insert(to);
-    }
-
     // Check if node is remote and there isn't a persistant link.
     if (to.node != __node__  && persists.count(to.node) == 0) {
       // Okay, no link, let's create a socket.
-      int s = socket;
+      Try<Socket> create = Socket::create();
+      if (create.isError()) {
+        VLOG(1) << "Failed to link, create socket: " << create.error();
+        return;
+      }
+      socket = create.get();
+      int s = socket.get().get();
 
-      sockets[s] = socket;
+      sockets[s] = new Socket(socket.get());
       nodes[s] = to.node;
 
       persists[to.node] = s;
@@ -1353,14 +1358,21 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
 
       connect = true;
     }
+
+    links.linkers[to].insert(process);
+    links.linkees[process].insert(to);
+    if (to.node != __node__) {
+      links.remotes[to.node].insert(to);
+    }
   }
 
   if (connect) {
-    socket.connect(to.node)
+    CHECK_SOME(socket);
+    Socket(socket.get()).connect(to.node) // Copy to drop const.
       .onAny(lambda::bind(
           &internal::link_connect,
           lambda::_1,
-          new Socket(socket)));
+          new Socket(socket.get())));
   }
 }
 
@@ -1377,7 +1389,7 @@ PID<HttpProxy> SocketManager::proxy(const Socket& socket)
       if (proxies.count(socket) > 0) {
         return proxies[socket]->self();
       } else {
-        proxy = new HttpProxy(sockets[socket]);
+        proxy = new HttpProxy(*sockets[socket]);
         proxies[socket] = proxy;
       }
     }
@@ -1569,7 +1581,7 @@ void SocketManager::send(Message* message)
 
   const Node& node = message->to.node;
 
-  Socket socket;
+  Option<Socket> socket = None();
   bool connect = false;
 
   synchronized (this) {
@@ -1579,28 +1591,35 @@ void SocketManager::send(Message* message)
     if (persist || temp) {
       int s = persist ? persists[node] : temps[node];
       CHECK(sockets.count(s) > 0);
-      socket = sockets[s];
+      socket = *sockets[s];
 
       // Update whether or not this socket should get disposed after
       // there is no more data to send.
       if (!persist) {
-        dispose.insert(socket);
+        dispose.insert(socket.get());
       }
 
-      if (outgoing.count(socket) > 0) {
-        outgoing[socket].push(new MessageEncoder(socket, message));
+      if (outgoing.count(socket.get()) > 0) {
+        outgoing[socket.get()].push(new MessageEncoder(socket.get(), message));
         return;
       } else {
         // Initialize the outgoing queue.
-        outgoing[socket];
+        outgoing[socket.get()];
       }
 
     } else {
       // No peristent or temporary socket to the node currently
       // exists, so we create a temporary one.
-      int s = socket;
+      Try<Socket> create = Socket::create();
+      if (create.isError()) {
+        VLOG(1) << "Failed to send, create socket: " << create.error();
+        delete message;
+        return;
+      }
+      socket = create.get();
+      int s = socket.get();
 
-      sockets[s] = socket;
+      sockets[s] = new Socket(socket.get());
       nodes[s] = node;
       temps[node] = s;
 
@@ -1614,16 +1633,19 @@ void SocketManager::send(Message* message)
   }
 
   if (connect) {
-    socket.connect(node)
+    CHECK_SOME(socket);
+    Socket(socket.get()).connect(node) // Copy to drop const.
       .onAny(lambda::bind(
           &internal::send_connect,
           lambda::_1,
-          new Socket(socket),
+          new Socket(socket.get()),
           message));
   } else {
     // If we're not connecting and we haven't added the encoder to
     // the 'outgoing' queue then schedule it to be sent.
-    internal::send(new MessageEncoder(socket, message), new Socket(socket));
+    internal::send(
+        new MessageEncoder(socket.get(), message),
+        new Socket(socket.get()));
   }
 }
 
@@ -1675,7 +1697,9 @@ Encoder* SocketManager::next(int s)
           }
 
           dispose.erase(s);
-          sockets.erase(s);
+          auto iterator = sockets.find(s);
+          delete iterator->second;
+          sockets.erase(iterator);
 
           // We don't actually close the socket (we wait for the Socket
           // abstraction to close it once there are no more references),
@@ -1752,7 +1776,9 @@ void SocketManager::close(int s)
       shutdown(s, SHUT_RD);
 
       dispose.erase(s);
-      sockets.erase(s);
+      auto iterator = sockets.find(s);
+      delete iterator->second;
+      sockets.erase(iterator);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
new file mode 100644
index 0000000..842a6ec
--- /dev/null
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -0,0 +1,56 @@
+#include <process/socket.hpp>
+
+namespace process {
+namespace network {
+
+const Socket::Kind& Socket::DEFAULT_KIND()
+{
+  // TODO(jmlvanre): Change the default based on configure or
+  // environment flags.
+  static const Kind DEFAULT = POLL;
+  return DEFAULT;
+}
+
+Try<Socket> Socket::create(Kind kind, int s)
+{
+  if (s < 0) {
+    // Supported in Linux >= 2.6.27.
+#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
+    Try<int> fd =
+      network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
+
+    if (fd.isError()) {
+      return Error("Failed to create socket: " + fd.error());
+    }
+#else
+    Try<int> fd = network::socket(AF_INET, SOCK_STREAM, 0);
+    if (fd.isError()) {
+      return Error("Failed to create socket: " + fd.error());
+    }
+
+    Try<Nothing> nonblock = os::nonblock(fd.get());
+    if (nonblock.isError()) {
+      return Error("Failed to create socket, nonblock: " + nonblock.error());
+    }
+
+    Try<Nothing> cloexec = os::cloexec(fd.get());
+    if (cloexec.isError()) {
+      return Error("Failed to create socket, cloexec: " + cloexec.error());
+    }
+#endif
+
+    s = fd.get();
+  }
+
+  switch (kind) {
+    case POLL: {
+      return Socket(std::make_shared<Socket::Impl>(s));
+    }
+    // By not setting a default we leverage the compiler errors when
+    // the enumeration is augmented to find all the cases we need to
+    // provide.
+  }
+}
+
+} // namespace network {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/tests/decoder_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp
index ef52798..d65f5cf 100644
--- a/3rdparty/libprocess/src/tests/decoder_tests.cpp
+++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp
@@ -19,7 +19,9 @@ using process::network::Socket;
 
 TEST(Decoder, Request)
 {
-  DataDecoder decoder = DataDecoder(Socket());
+  Try<Socket> socket = Socket::create();
+  ASSERT_SOME(socket);
+  DataDecoder decoder = DataDecoder(socket.get());
 
   const string& data =
     "GET /path/file.json?key1=value1&key2=value2#fragment HTTP/1.1\r\n"
@@ -55,7 +57,9 @@ TEST(Decoder, Request)
 
 TEST(Decoder, RequestHeaderContinuation)
 {
-  DataDecoder decoder = DataDecoder(Socket());
+  Try<Socket> socket = Socket::create();
+  ASSERT_SOME(socket);
+  DataDecoder decoder = DataDecoder(socket.get());
 
   const string& data =
     "GET /path/file.json HTTP/1.1\r\n"
@@ -79,7 +83,9 @@ TEST(Decoder, RequestHeaderContinuation)
 // This is expected to fail for now, see my TODO(bmahler) on http::Request.
 TEST(Decoder, DISABLED_RequestHeaderCaseInsensitive)
 {
-  DataDecoder decoder = DataDecoder(Socket());
+  Try<Socket> socket = Socket::create();
+  ASSERT_SOME(socket);
+  DataDecoder decoder = DataDecoder(socket.get());
 
   const string& data =
     "GET /path/file.json HTTP/1.1\r\n"


[06/12] mesos git commit: Introduce libevent clock implementation.

Posted by be...@apache.org.
Introduce libevent clock implementation.

Review: https://reviews.apache.org/r/28322


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a027dd13
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a027dd13
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a027dd13

Branch: refs/heads/master
Commit: a027dd1360325ac489961f6317ba8c385dcf995c
Parents: 25d068f
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:35:57 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:12 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/libevent.cpp | 72 ++++++++++++++++++++++++++++---
 1 file changed, 66 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a027dd13/3rdparty/libprocess/src/libevent.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.cpp b/3rdparty/libprocess/src/libevent.cpp
index 524d811..28c2cf7 100644
--- a/3rdparty/libprocess/src/libevent.cpp
+++ b/3rdparty/libprocess/src/libevent.cpp
@@ -7,21 +7,81 @@
 
 #include "event_loop.hpp"
 #include "libevent.hpp"
+#include "synchronized.hpp"
 
 namespace process {
 
 struct event_base* base = NULL;
 
+
 void* EventLoop::run(void*)
 {
-  int result = event_base_loop(base, 0);
-  if (result < 0) {
-    LOG(FATAL) << "Failed to run event loop";
-  } else if (result == 1) {
-    VLOG(1) << "Finished running event loop due to lack of events";
+  do {
+    int result = event_base_loop(base, EVLOOP_ONCE);
+    if (result < 0) {
+      LOG(FATAL) << "Failed to run event loop";
+    } else if (result == 1) {
+      VLOG(1) << "All events handled, continuing event loop";
+      continue;
+    } else if (event_base_got_break(base)) {
+      break;
+    } else if (event_base_got_exit(base)) {
+      break;
+    }
+  } while (true);
+  return NULL;
+}
+
+
+namespace internal {
+
+struct Delay
+{
+  void(*function)(void);
+  event* timer;
+};
+
+void handle_delay(int, short, void* arg)
+{
+  Delay* delay = reinterpret_cast<Delay*>(arg);
+  delay->function();
+  delete delay;
+}
+
+}  // namespace internal {
+
+
+void EventLoop::delay(const Duration& duration, void(*function)(void))
+{
+  internal::Delay* delay = new internal::Delay();
+  delay->timer = evtimer_new(base, &internal::handle_delay, delay);
+  if (delay->timer == NULL) {
+    LOG(FATAL) << "Failed to delay, evtimer_new";
   }
 
-  return NULL;
+  delay->function = function;
+
+  timeval t{0, 0};
+  if (duration > Seconds(0)) {
+    t = duration.timeval();
+  }
+
+  evtimer_add(delay->timer, &t);
+}
+
+
+double EventLoop::time()
+{
+  // Get the cached time if running the event loop, or call
+  // gettimeofday() to get the current time. Since a lot of logic in
+  // libprocess depends on time math, we want to log fatal rather than
+  // cause logic errors if the time fails.
+  timeval t;
+  if (event_base_gettimeofday_cached(base, &t) < 0) {
+    LOG(FATAL) << "Failed to get time, event_base_gettimeofday_cached";
+  }
+
+  return Duration(t).secs();
 }
 
 


[05/12] mesos git commit: Add construction and conversion from / to timeval for Duration.

Posted by be...@apache.org.
Add construction and conversion from / to timeval for Duration.

Review: https://reviews.apache.org/r/29226


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/25d068fe
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/25d068fe
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/25d068fe

Branch: refs/heads/master
Commit: 25d068fe340fe7c8907567f51d1cf2e58f66018a
Parents: 0e29981
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:05:19 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:12 2014 -0800

----------------------------------------------------------------------
 .../3rdparty/stout/include/stout/duration.hpp   | 16 ++++++++-
 .../3rdparty/stout/tests/duration_tests.cpp     | 34 ++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/25d068fe/3rdparty/libprocess/3rdparty/stout/include/stout/duration.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/duration.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/duration.hpp
index f344705..8a1626c 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/duration.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/duration.hpp
@@ -15,7 +15,8 @@
 #define __STOUT_DURATION_HPP__
 
 #include <ctype.h> // For 'isdigit'.
-#include <limits.h> // For 'LLONG_(MAX|MIN)'
+#include <limits.h> // For 'LLONG_(MAX|MIN)'.
+#include <time.h> // For 'timeval'.
 
 #include <iomanip>
 #include <iostream>
@@ -73,6 +74,11 @@ public:
 
   Duration() : nanos(0) {}
 
+  explicit Duration(const timeval& t)
+  {
+    nanos = t.tv_sec * SECONDS + t.tv_usec * MICROSECONDS;
+  }
+
   int64_t ns() const   { return nanos; }
   double us() const    { return static_cast<double>(nanos) / MICROSECONDS; }
   double ms() const    { return static_cast<double>(nanos) / MILLISECONDS; }
@@ -82,6 +88,14 @@ public:
   double days() const  { return static_cast<double>(nanos) / DAYS; }
   double weeks() const { return static_cast<double>(nanos) / WEEKS; }
 
+  struct timeval timeval() const
+  {
+    struct timeval t;
+    t.tv_sec = secs();
+    t.tv_usec = us() - (t.tv_sec * MILLISECONDS);
+    return t;
+  }
+
   bool operator <  (const Duration& d) const { return nanos <  d.nanos; }
   bool operator <= (const Duration& d) const { return nanos <= d.nanos; }
   bool operator >  (const Duration& d) const { return nanos >  d.nanos; }

http://git-wip-us.apache.org/repos/asf/mesos/blob/25d068fe/3rdparty/libprocess/3rdparty/stout/tests/duration_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/duration_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/duration_tests.cpp
index 4269d3c..724c5fe 100644
--- a/3rdparty/libprocess/3rdparty/stout/tests/duration_tests.cpp
+++ b/3rdparty/libprocess/3rdparty/stout/tests/duration_tests.cpp
@@ -98,3 +98,37 @@ TEST(DurationTest, OutputFormat)
   EXPECT_EQ("15250.2844524715weeks", stringify(Duration::max()));
   EXPECT_EQ("-15250.2844524715weeks", stringify(Duration::min()));
 }
+
+TEST(DurationTest, Timeval)
+{
+  EXPECT_EQ(Duration(timeval{10, 0}), Seconds(10));
+  EXPECT_EQ(Duration(timeval{0, 7}), Microseconds(7));
+  EXPECT_EQ(Duration(timeval{2, 123}), Seconds(2) + Microseconds(123));
+
+  timeval t{2, 123};
+  Duration d(t);
+  EXPECT_EQ(d.timeval().tv_sec, t.tv_sec);
+  EXPECT_EQ(d.timeval().tv_usec, t.tv_usec);
+
+  t.tv_usec = 0;
+  d = Duration(t);
+  EXPECT_EQ(d.timeval().tv_sec, t.tv_sec);
+  EXPECT_EQ(d.timeval().tv_usec, t.tv_usec);
+
+  // Negative times.
+  t.tv_sec = 0;
+  t.tv_usec = -1;
+  d = Duration(t);
+  EXPECT_EQ(d.timeval().tv_sec, t.tv_sec);
+  EXPECT_EQ(d.timeval().tv_usec, t.tv_usec);
+
+  d = Microseconds(-1);
+  EXPECT_EQ(d.timeval().tv_sec, t.tv_sec);
+  EXPECT_EQ(d.timeval().tv_usec, t.tv_usec);
+
+  t.tv_sec = -1;
+  t.tv_usec = -30;
+  d = Duration(t);
+  EXPECT_EQ(d.timeval().tv_sec, t.tv_sec);
+  EXPECT_EQ(d.timeval().tv_usec, t.tv_usec);
+}


[03/12] mesos git commit: Add enable-libevent flag to Libprocess.

Posted by be...@apache.org.
Add enable-libevent flag to Libprocess.

This flag will be used to distinguish between libev (default) and
libevent.

Review: https://reviews.apache.org/r/28319


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/252d3c75
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/252d3c75
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/252d3c75

Branch: refs/heads/master
Commit: 252d3c7575a9a688e6446e4ab302021f09849b4b
Parents: 37be603
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:01:51 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Dec 20 17:59:55 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am     | 12 +++++++++---
 3rdparty/libprocess/configure.ac    | 26 ++++++++++++++++++++++++++
 3rdparty/libprocess/src/process.cpp |  4 ++++
 3 files changed, 39 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/252d3c75/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 472b5f4..5c92ee5 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -40,9 +40,6 @@ libprocess_la_SOURCES =		\
   src/http.cpp			\
   src/io.cpp			\
   src/latch.cpp			\
-  src/libev.hpp			\
-  src/libev.cpp			\
-  src/libev_poll.cpp		\
   src/metrics/metrics.cpp	\
   src/pid.cpp			\
   src/process.cpp		\
@@ -60,6 +57,15 @@ libprocess_la_CPPFLAGS =		\
   -I$(PICOJSON)				\
   $(AM_CPPFLAGS)
 
+if ENABLE_LIBEVENT
+# Place for libevent implementations.
+else
+  libprocess_la_SOURCES +=	\
+    src/libev.hpp		\
+    src/libev.cpp		\
+    src/libev_poll.cpp
+endif
+
 if WITH_BUNDLED_GLOG
   libprocess_la_CPPFLAGS += -I$(GLOG)/src
   LIBGLOG = $(GLOG)/libglog.la

http://git-wip-us.apache.org/repos/asf/mesos/blob/252d3c75/3rdparty/libprocess/configure.ac
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/configure.ac b/3rdparty/libprocess/configure.ac
index 024f892..2f01a3b 100644
--- a/3rdparty/libprocess/configure.ac
+++ b/3rdparty/libprocess/configure.ac
@@ -71,6 +71,11 @@ AC_ARG_ENABLE([bundled],
                              of bundled libraries]),
               [enable_bundled=no], [enable_bundled=yes])
 
+AC_ARG_ENABLE([libevent],
+              AS_HELP_STRING([--enable-libevent],
+                             [use libevent instead of libev default: no]),
+              [enable_libevent=yes], [])
+
 AC_ARG_WITH([boost],
             AS_HELP_STRING([--with-boost@<:@=DIR@:>@],
                            [excludes building and using the bundled Boost
@@ -565,6 +570,27 @@ else
   optimize_flags="-O0"
 fi
 
+AM_CONDITIONAL([ENABLE_LIBEVENT], [test x"$enable_libevent" = "xyes"])
+
+if test "x$enable_libevent" = "xyes"; then
+  AC_CHECK_HEADERS([event2/event.h],
+                   [AC_CHECK_LIB([event],
+                                 [event_base_new],
+                                 [found_libevent=yes],
+                                 [AC_MSG_ERROR([cannot find libevent
+-------------------------------------------------------------------
+libevent is required for mesos to build.
+-------------------------------------------------------------------
+                          ])])], [AC_MSG_ERROR([cannot find libevent headers
+-------------------------------------------------------------------
+libevent is required for mesos to build.
+-------------------------------------------------------------------
+  ])])
+  if test "x$found_libevent" = "xyes"; then
+    AC_DEFINE([USE_LIBEVENT], [1])
+  fi
+fi
+
 AS_IF([test "x${ac_cv_env_CFLAGS_set}" = "x"],
       [CFLAGS="$debug_flags $optimize_flags"])
 AS_IF([test "x${ac_cv_env_CXXFLAGS_set}" = "x"],

http://git-wip-us.apache.org/repos/asf/mesos/blob/252d3c75/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 2aeb815..fabcca4 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -81,7 +81,11 @@
 #include "encoder.hpp"
 #include "event_loop.hpp"
 #include "gate.hpp"
+#ifndef USE_LIBEVENT
 #include "libev.hpp"
+#else
+// Place for libevent header.
+#endif // USE_LIBEVENT
 #include "process_reference.hpp"
 #include "synchronized.hpp"
 


[12/12] mesos git commit: Move poll based socket implementation into poll_socket.cpp.

Posted by be...@apache.org.
Move poll based socket implementation into poll_socket.cpp.

Review: https://reviews.apache.org/r/28467


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/62ad3f27
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/62ad3f27
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/62ad3f27

Branch: refs/heads/master
Commit: 62ad3f27c44dfbad8cb68f952ed2d076e349fd6a
Parents: f4a4180
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:44:46 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am         |   1 +
 3rdparty/libprocess/src/poll_socket.cpp | 216 +++++++++++++++++++++++++++
 3rdparty/libprocess/src/process.cpp     | 203 -------------------------
 3 files changed, 217 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/62ad3f27/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 09fce46..f401232 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -42,6 +42,7 @@ libprocess_la_SOURCES =		\
   src/latch.cpp			\
   src/metrics/metrics.cpp	\
   src/pid.cpp			\
+  src/poll_socket.cpp		\
   src/process.cpp		\
   src/process_reference.hpp	\
   src/reap.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/62ad3f27/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
new file mode 100644
index 0000000..5202750
--- /dev/null
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -0,0 +1,216 @@
+#include <netinet/tcp.h>
+
+#include <process/io.hpp>
+#include <process/socket.hpp>
+
+#include "config.hpp"
+
+using std::string;
+
+namespace process {
+namespace network {
+
+namespace internal {
+
+Future<Nothing> connect(const Socket& socket)
+{
+  // Now check that a successful connection was made.
+  int opt;
+  socklen_t optlen = sizeof(opt);
+  int s = socket.get();
+
+  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+    // Connect failure.
+    VLOG(1) << "Socket error while connecting";
+    return Failure("Socket error while connecting");
+  }
+
+  return Nothing();
+}
+
+} // namespace internal {
+
+
+Future<Nothing> Socket::Impl::connect(const Node& node)
+{
+  Try<int> connect = network::connect(get(), node);
+  if (connect.isError()) {
+    if (errno == EINPROGRESS) {
+      return io::poll(get(), io::WRITE)
+        .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+    }
+
+    return Failure(connect.error());
+  }
+
+  return Nothing();
+}
+
+
+Future<size_t> Socket::Impl::recv(char* data, size_t size)
+{
+  return io::read(get(), data, size);
+}
+
+
+namespace internal {
+
+Future<size_t> socket_send_data(int s, const char* data, size_t size)
+{
+  CHECK(size > 0);
+
+  while (true) {
+    ssize_t length = send(s, data, size, MSG_NOSIGNAL);
+
+    if (length < 0 && (errno == EINTR)) {
+      // Interrupted, try again now.
+      continue;
+    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+      // Might block, try again later.
+      return io::poll(s, io::WRITE)
+        .then(lambda::bind(&internal::socket_send_data, s, data, size));
+    } else if (length <= 0) {
+      // Socket error or closed.
+      if (length < 0) {
+        const char* error = strerror(errno);
+        VLOG(1) << "Socket error while sending: " << error;
+      } else {
+        VLOG(1) << "Socket closed while sending";
+      }
+      if (length == 0) {
+        return length;
+      } else {
+        return Failure(ErrnoError("Socket send failed"));
+      }
+    } else {
+      CHECK(length > 0);
+
+      return length;
+    }
+  }
+}
+
+
+Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
+{
+  CHECK(size > 0);
+
+  while (true) {
+    ssize_t length = os::sendfile(s, fd, offset, size);
+
+    if (length < 0 && (errno == EINTR)) {
+      // Interrupted, try again now.
+      continue;
+    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+      // Might block, try again later.
+      return io::poll(s, io::WRITE)
+        .then(lambda::bind(&internal::socket_send_file, s, fd, offset, size));
+    } else if (length <= 0) {
+      // Socket error or closed.
+      if (length < 0) {
+        const char* error = strerror(errno);
+        VLOG(1) << "Socket error while sending: " << error;
+      } else {
+        VLOG(1) << "Socket closed while sending";
+      }
+      if (length == 0) {
+        return length;
+      } else {
+        return Failure(ErrnoError("Socket sendfile failed"));
+      }
+    } else {
+      CHECK(length > 0);
+
+      return length;
+    }
+  }
+}
+
+} // namespace internal {
+
+
+Future<size_t> Socket::Impl::send(const char* data, size_t size)
+{
+  return io::poll(get(), io::WRITE)
+    .then(lambda::bind(&internal::socket_send_data, get(), data, size));
+}
+
+
+Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
+{
+  return io::poll(get(), io::WRITE)
+    .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
+}
+
+
+Try<Node> Socket::Impl::bind(const Node& node)
+{
+  Try<int> bind = network::bind(get(), node);
+  if (bind.isError()) {
+    return Error(bind.error());
+  }
+
+  // Lookup and store assigned ip and assigned port.
+  return network::getsockname(get(), AF_INET);
+}
+
+
+Try<Nothing> Socket::Impl::listen(int backlog)
+{
+  if (::listen(get(), backlog) < 0) {
+    return ErrnoError();
+  }
+  return Nothing();
+}
+
+
+namespace internal {
+
+Future<Socket> accept(int fd)
+{
+  Try<int> accepted = network::accept(fd, AF_INET);
+  if (accepted.isError()) {
+    return Failure(accepted.error());
+  }
+
+  int s = accepted.get();
+  Try<Nothing> nonblock = os::nonblock(s);
+  if (nonblock.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+                                << nonblock.error();
+    os::close(s);
+    return Failure("Failed to accept, nonblock: " + nonblock.error());
+  }
+
+  Try<Nothing> cloexec = os::cloexec(s);
+  if (cloexec.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+                                << cloexec.error();
+    os::close(s);
+    return Failure("Failed to accept, cloexec: " + cloexec.error());
+  }
+
+  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+  int on = 1;
+  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+    const char* error = strerror(errno);
+    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+    os::close(s);
+    return Failure(
+      "Failed to turn off the Nagle algorithm: " + stringify(error));
+  }
+
+  return Socket(s);
+}
+
+} // namespace internal {
+
+
+Future<Socket> Socket::Impl::accept()
+{
+  return io::poll(get(), io::READ)
+    .then(lambda::bind(&internal::accept, get()));
+}
+
+} // namespace network {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/62ad3f27/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 441ce48..72c0ad4 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1220,209 +1220,6 @@ void HttpProxy::stream(const Future<short>& poll, const Request& request)
 }
 
 
-namespace internal {
-
-Future<Nothing> connect(const Socket& socket)
-{
-  // Now check that a successful connection was made.
-  int opt;
-  socklen_t optlen = sizeof(opt);
-  int s = socket.get();
-
-  if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
-    // Connect failure.
-    VLOG(1) << "Socket error while connecting";
-    return Failure("Socket error while connecting");
-  }
-
-  return Nothing();
-}
-
-} // namespace internal {
-
-
-Future<Nothing> Socket::Impl::connect(const Node& node)
-{
-  Try<int> connect = network::connect(get(), node);
-  if (connect.isError()) {
-    if (errno == EINPROGRESS) {
-      return io::poll(get(), io::WRITE)
-        .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
-    }
-
-    return Failure(connect.error());
-  }
-
-  return Nothing();
-}
-
-
-Future<size_t> Socket::Impl::recv(char* data, size_t size)
-{
-  return io::read(get(), data, size);
-}
-
-
-namespace internal {
-
-Future<size_t> socket_send_data(int s, const char* data, size_t size)
-{
-  CHECK(size > 0);
-
-  while (true) {
-    ssize_t length = send(s, data, size, MSG_NOSIGNAL);
-
-    if (length < 0 && (errno == EINTR)) {
-      // Interrupted, try again now.
-      continue;
-    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-      // Might block, try again later.
-      return io::poll(s, io::WRITE)
-        .then(lambda::bind(&internal::socket_send_data, s, data, size));
-    } else if (length <= 0) {
-      // Socket error or closed.
-      if (length < 0) {
-        const char* error = strerror(errno);
-        VLOG(1) << "Socket error while sending: " << error;
-      } else {
-        VLOG(1) << "Socket closed while sending";
-      }
-      if (length == 0) {
-        return length;
-      } else {
-        return Failure(ErrnoError("Socket send failed"));
-      }
-    } else {
-      CHECK(length > 0);
-
-      return length;
-    }
-  }
-}
-
-
-Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
-{
-  CHECK(size > 0);
-
-  while (true) {
-    ssize_t length = os::sendfile(s, fd, offset, size);
-
-    if (length < 0 && (errno == EINTR)) {
-      // Interrupted, try again now.
-      continue;
-    } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
-      // Might block, try again later.
-      return io::poll(s, io::WRITE)
-        .then(lambda::bind(&internal::socket_send_file, s, fd, offset, size));
-    } else if (length <= 0) {
-      // Socket error or closed.
-      if (length < 0) {
-        const char* error = strerror(errno);
-        VLOG(1) << "Socket error while sending: " << error;
-      } else {
-        VLOG(1) << "Socket closed while sending";
-      }
-      if (length == 0) {
-        return length;
-      } else {
-        return Failure(ErrnoError("Socket sendfile failed"));
-      }
-    } else {
-      CHECK(length > 0);
-
-      return length;
-    }
-  }
-}
-
-} // namespace internal {
-
-
-Future<size_t> Socket::Impl::send(const char* data, size_t size)
-{
-  return io::poll(get(), io::WRITE)
-    .then(lambda::bind(&internal::socket_send_data, get(), data, size));
-}
-
-
-Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
-{
-  return io::poll(get(), io::WRITE)
-    .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
-}
-
-
-Try<Node> Socket::Impl::bind(const Node& node)
-{
-  Try<int> bind = network::bind(get(), node);
-  if (bind.isError()) {
-    return Error(bind.error());
-  }
-
-  // Lookup and store assigned ip and assigned port.
-  return network::getsockname(get(), AF_INET);
-}
-
-
-Try<Nothing> Socket::Impl::listen(int backlog)
-{
-  if (::listen(get(), backlog) < 0) {
-    return ErrnoError();
-  }
-  return Nothing();
-}
-
-
-namespace internal {
-
-Future<Socket> accept(int fd)
-{
-  Try<int> accepted = network::accept(fd, AF_INET);
-  if (accepted.isError()) {
-    return Failure(accepted.error());
-  }
-
-  int s = accepted.get();
-  Try<Nothing> nonblock = os::nonblock(s);
-  if (nonblock.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
-                                << nonblock.error();
-    os::close(s);
-    return Failure("Failed to accept, nonblock: " + nonblock.error());
-  }
-
-  Try<Nothing> cloexec = os::cloexec(s);
-  if (cloexec.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
-                                << cloexec.error();
-    os::close(s);
-    return Failure("Failed to accept, cloexec: " + cloexec.error());
-  }
-
-  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
-  int on = 1;
-  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
-    const char* error = strerror(errno);
-    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
-    os::close(s);
-    return Failure(
-      "Failed to turn off the Nagle algorithm: " + stringify(error));
-  }
-
-  return Socket(s);
-}
-
-} // namespace internal {
-
-
-Future<Socket> Socket::Impl::accept()
-{
-  return io::poll(get(), io::READ)
-    .then(lambda::bind(&internal::accept, get()));
-}
-
-
 SocketManager::SocketManager()
 {
   synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;


[02/12] mesos git commit: Refactor clock to simplify libev cut.

Posted by be...@apache.org.
Refactor clock to simplify libev cut.

Review: https://reviews.apache.org/r/28315


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dd37344d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dd37344d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dd37344d

Branch: refs/heads/master
Commit: dd37344d27b8ebc5f5e01e5efb2cb92929b97704
Parents: c513126
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 11:11:08 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Dec 20 17:59:55 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am        |   1 +
 3rdparty/libprocess/src/clock.cpp      | 233 ++++++++++++----------------
 3rdparty/libprocess/src/event_loop.hpp |  30 ++++
 3rdparty/libprocess/src/libev.cpp      |  95 ++++++++++++
 3rdparty/libprocess/src/process.cpp    |  53 +------
 5 files changed, 228 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 75870ac..472b5f4 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -34,6 +34,7 @@ libprocess_la_SOURCES =		\
   src/config.hpp		\
   src/decoder.hpp		\
   src/encoder.hpp		\
+  src/event_loop.hpp		\
   src/gate.hpp			\
   src/help.cpp			\
   src/http.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
index 2bc7fa9..fcc1eb0 100644
--- a/3rdparty/libprocess/src/clock.cpp
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -1,5 +1,3 @@
-#include <ev.h>
-
 #include <glog/logging.h>
 
 #include <list>
@@ -17,6 +15,7 @@
 #include <stout/try.hpp>
 #include <stout/unreachable.hpp>
 
+#include "event_loop.hpp"
 #include "synchronized.hpp"
 
 using std::list;
@@ -24,24 +23,11 @@ using std::map;
 
 namespace process {
 
-// Event loop.
-extern struct ev_loop* loop;
-
-// Asynchronous watcher for interrupting loop to specifically deal
-// with updating timers.
-static ev_async async_update_timer_watcher;
-
-// Watcher for timeouts.
-static ev_timer timeouts_watcher;
-
 // We store the timers in a map of lists indexed by the timeout of the
 // timer so that we can have two timers that have the same timeout. We
 // exploit that the map is SORTED!
-static map<Time, list<Timer>>* timeouts = new map<Time, list<Timer>>();
-static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-
-// Flag to indicate whether or to update the timer on async interrupt.
-static bool update_timer = false;
+static map<Time, list<Timer>>* timers = new map<Time, list<Timer>>();
+static synchronizable(timers) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 
 
 // We namespace the clock related variables to keep them well
@@ -69,51 +55,46 @@ bool settling = false;
 // Lambda function to invoke when timers have expired.
 lambda::function<void(const list<Timer>&)> callback;
 
-} // namespace clock {
-
 
-void handle_async_update_timer(struct ev_loop* loop, ev_async* _, int revents)
+// Helper for determining the duration until the next timer elapses,
+// or None if no timers are pending. Note that we don't manipulate
+// 'timer's directly so that it's clear from the callsite that the use
+// of 'timers' is within a 'synchronized' block.
+//
+// TODO(benh): Create a generic 'Timer's abstraction which hides this
+// and more away (i.e., all manipulations of 'timers' below).
+Option<Duration> next(const map<Time, list<Timer>>& timers)
 {
-  synchronized (timeouts) {
-    if (update_timer) {
-      if (!timeouts->empty()) {
-        // Determine when the next timer should fire.
-        timeouts_watcher.repeat =
-          (timeouts->begin()->first - Clock::now()).secs();
-
-        if (timeouts_watcher.repeat <= 0) {
-          // Feed the event now!
-          timeouts_watcher.repeat = 0;
-          ev_timer_again(loop, &timeouts_watcher);
-          ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
-        } else {
-          // Don't fire the timer if the clock is paused since we
-          // don't want time to advance (instead a call to
-          // clock::advance() will handle the timer).
-          if (Clock::paused() && timeouts_watcher.repeat > 0) {
-            timeouts_watcher.repeat = 0;
-          }
-
-          ev_timer_again(loop, &timeouts_watcher);
-        }
-      }
-
-      update_timer = false;
+  if (!timers.empty()) {
+    // Determine when the next "tick" should occur.
+    Duration duration = (timers.begin()->first - Clock::now());
+
+    // Force a duration of 0 seconds (i.e., fire timers now) if the
+    // clock is paused and the duration is greater than 0 since we
+    // want to handle timers right away.
+    if (Clock::paused() && duration > Seconds(0)) {
+      return Seconds(0);
     }
+
+    return duration;
   }
+
+  return None();
 }
 
+} // namespace clock {
+
 
-void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
+void tick()
 {
-  list<Timer> timers;
+  list<Timer> timedout;
 
-  synchronized (timeouts) {
+  synchronized (timers) {
     Time now = Clock::now();
 
-    VLOG(3) << "Handling timeouts up to " << now;
+    VLOG(3) << "Handling timers up to " << now;
 
-    foreachkey (const Time& timeout, *timeouts) {
+    foreachkey (const Time& timeout, *timers) {
       if (timeout > now) {
         break;
       }
@@ -127,52 +108,33 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
         clock::settling = true;
       }
 
-      foreach (const Timer& timer, (*timeouts)[timeout]) {
-        timers.push_back(timer);
+      foreach (const Timer& timer, (*timers)[timeout]) {
+        timedout.push_back(timer);
       }
     }
 
-    // Now erase the range of timeouts that timed out.
-    timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));
+    // Now erase the range of timers that timed out.
+    timers->erase(timers->begin(), timers->upper_bound(now));
 
     // Okay, so the timeout for the next timer should not have fired.
-    CHECK(timeouts->empty() || (timeouts->begin()->first > now));
-
-    // Update the timer as necessary.
-    if (!timeouts->empty()) {
-      // Determine when the next timer should fire.
-      timeouts_watcher.repeat =
-        (timeouts->begin()->first - Clock::now()).secs();
-
-      if (timeouts_watcher.repeat <= 0) {
-        // Feed the event now!
-        timeouts_watcher.repeat = 0;
-        ev_timer_again(loop, &timeouts_watcher);
-        ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
-      } else {
-        // Don't fire the timer if the clock is paused since we don't
-        // want time to advance (instead a call to Clock::advance()
-        // will handle the timer).
-        if (Clock::paused() && timeouts_watcher.repeat > 0) {
-          timeouts_watcher.repeat = 0;
-        }
+    CHECK(timers->empty() || (timers->begin()->first > now));
 
-        ev_timer_again(loop, &timeouts_watcher);
-      }
+    // Schedule another "tick" if necessary.
+    Option<Duration> duration = clock::next(*timers);
+    if (duration.isSome()) {
+      EventLoop::delay(duration.get(), &tick);
     }
-
-    update_timer = false; // Since we might have a queued update_timer.
   }
 
-  clock::callback(timers);
+  clock::callback(timedout);
 
-  // Mark 'settling' as false since there are not any more timeouts
+  // Mark 'settling' as false since there are not any more timers
   // that will expire before the paused time and we've finished
   // executing expired timers.
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (clock::paused &&
-        (timeouts->size() == 0 ||
-         timeouts->begin()->first > clock::current)) {
+        (timers->size() == 0 ||
+         timers->begin()->first > clock::current)) {
       VLOG(3) << "Clock has settled";
       clock::settling = false;
     }
@@ -182,20 +144,7 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
 
 void Clock::initialize(lambda::function<void(const list<Timer>&)>&& callback)
 {
-  // TODO(benh): Currently this function is expected to get called
-  // just after initializing libev in process::initialize. But that is
-  // too tightly coupled so and we really need to move libev specific
-  // intialization outside of process::initialize that both
-  // process::initialize and Clock::initialize can depend on (and thus
-  // call).
-
   clock::callback = callback;
-
-  ev_async_init(&async_update_timer_watcher, handle_async_update_timer);
-  ev_async_start(loop, &async_update_timer_watcher);
-
-  ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
-  ev_timer_again(loop, &timeouts_watcher);
 }
 
 
@@ -207,7 +156,7 @@ Time Clock::now()
 
 Time Clock::now(ProcessBase* process)
 {
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (Clock::paused()) {
       if (process != NULL) {
         if (clock::currents->count(process) != 0) {
@@ -221,8 +170,7 @@ Time Clock::now(ProcessBase* process)
     }
   }
 
-  // TODO(benh): Versus ev_now()?
-  double d = ev_time();
+  double d = EventLoop::time();
   Try<Time> time = Time::create(d); // Compensates for clock::advanced.
 
   // TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
@@ -252,17 +200,22 @@ Timer Clock::timer(
           << " in the future (" << timeout.time() << ")";
 
   // Add the timer.
-  synchronized (timeouts) {
-    if (timeouts->size() == 0 ||
-        timer.timeout().time() < timeouts->begin()->first) {
+  synchronized (timers) {
+    if (timers->size() == 0 ||
+        timer.timeout().time() < timers->begin()->first) {
       // Need to interrupt the loop to update/set timer repeat.
-      (*timeouts)[timer.timeout().time()].push_back(timer);
-      update_timer = true;
-      ev_async_send(loop, &async_update_timer_watcher);
+
+      (*timers)[timer.timeout().time()].push_back(timer);
+
+      // Schedule another "tick" if necessary.
+      Option<Duration> duration = clock::next(*timers);
+      if (duration.isSome()) {
+        EventLoop::delay(duration.get(), &tick);
+      }
     } else {
       // Timer repeat is adequate, just add the timeout.
-      CHECK(timeouts->size() >= 1);
-      (*timeouts)[timer.timeout().time()].push_back(timer);
+      CHECK(timers->size() >= 1);
+      (*timers)[timer.timeout().time()].push_back(timer);
     }
   }
 
@@ -273,16 +226,16 @@ Timer Clock::timer(
 bool Clock::cancel(const Timer& timer)
 {
   bool canceled = false;
-  synchronized (timeouts) {
+  synchronized (timers) {
     // Check if the timeout is still pending, and if so, erase it. In
     // addition, erase an empty list if we just removed the last
     // timeout.
     Time time = timer.timeout().time();
-    if (timeouts->count(time) > 0) {
+    if (timers->count(time) > 0) {
       canceled = true;
-      (*timeouts)[time].remove(timer);
-      if ((*timeouts)[time].empty()) {
-        timeouts->erase(time);
+      (*timers)[time].remove(timer);
+      if ((*timers)[time].empty()) {
+        timers->erase(time);
       }
     }
   }
@@ -293,9 +246,9 @@ bool Clock::cancel(const Timer& timer)
 
 void Clock::pause()
 {
-  process::initialize(); // To make sure the libev watchers are ready.
+  process::initialize(); // To make sure the event loop is ready.
 
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (!clock::paused) {
       clock::initial = clock::current = now();
       clock::paused = true;
@@ -303,8 +256,8 @@ void Clock::pause()
     }
   }
 
-  // Note that after pausing the clock an existing libev timer might
-  // still fire (invoking handle_timeout), but since paused == true no
+  // Note that after pausing the clock an existing event loop delay
+  // might still fire (invoking tick), but since paused == true no
   // "time" will actually have passed, so no timer will actually fire.
 }
 
@@ -317,16 +270,21 @@ bool Clock::paused()
 
 void Clock::resume()
 {
-  process::initialize(); // To make sure the libev watchers are ready.
+  process::initialize(); // To make sure the event loop is ready.
 
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (clock::paused) {
       VLOG(2) << "Clock resumed at " << clock::current;
+
       clock::paused = false;
       clock::settling = false;
       clock::currents->clear();
-      update_timer = true;
-      ev_async_send(loop, &async_update_timer_watcher);
+
+      // Schedule another "tick" if necessary.
+      Option<Duration> duration = clock::next(*timers);
+      if (duration.isSome()) {
+        EventLoop::delay(duration.get(), &tick);
+      }
     }
   }
 }
@@ -334,14 +292,17 @@ void Clock::resume()
 
 void Clock::advance(const Duration& duration)
 {
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (clock::paused) {
       clock::advanced += duration;
       clock::current += duration;
+
       VLOG(2) << "Clock advanced ("  << duration << ") to " << clock::current;
-      if (!update_timer) {
-        update_timer = true;
-        ev_async_send(loop, &async_update_timer_watcher);
+
+      // Schedule another "tick" if necessary.
+      Option<Duration> duration = clock::next(*timers);
+      if (duration.isSome()) {
+        EventLoop::delay(duration.get(), &tick);
       }
     }
   }
@@ -350,7 +311,7 @@ void Clock::advance(const Duration& duration)
 
 void Clock::advance(ProcessBase* process, const Duration& duration)
 {
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (clock::paused) {
       Time current = now(process);
       current += duration;
@@ -364,15 +325,17 @@ void Clock::advance(ProcessBase* process, const Duration& duration)
 
 void Clock::update(const Time& time)
 {
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (clock::paused) {
       if (clock::current < time) {
         clock::advanced += (time - clock::current);
         clock::current = Time(time);
         VLOG(2) << "Clock updated to " << clock::current;
-        if (!update_timer) {
-          update_timer = true;
-          ev_async_send(loop, &async_update_timer_watcher);
+
+        // Schedule another "tick" if necessary.
+        Option<Duration> duration = clock::next(*timers);
+        if (duration.isSome()) {
+          EventLoop::delay(duration.get(), &tick);
         }
       }
     }
@@ -382,7 +345,7 @@ void Clock::update(const Time& time)
 
 void Clock::update(ProcessBase* process, const Time& time, Update update)
 {
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (clock::paused) {
       if (now(process) < time || update == Clock::FORCE) {
         VLOG(2) << "Clock of " << process->self() << " updated to " << time;
@@ -402,16 +365,14 @@ void Clock::order(ProcessBase* from, ProcessBase* to)
 
 bool Clock::settled()
 {
-  synchronized (timeouts) {
+  synchronized (timers) {
     CHECK(clock::paused);
 
-    if (update_timer) {
-      return false;
-    } else if (clock::settling) {
+    if (clock::settling) {
       VLOG(3) << "Clock still not settled";
       return false;
-    } else if (timeouts->size() == 0 ||
-               timeouts->begin()->first > clock::current) {
+    } else if (timers->size() == 0 ||
+               timers->begin()->first > clock::current) {
       VLOG(3) << "Clock is settled";
       return true;
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/event_loop.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/event_loop.hpp b/3rdparty/libprocess/src/event_loop.hpp
new file mode 100644
index 0000000..34e9f1d
--- /dev/null
+++ b/3rdparty/libprocess/src/event_loop.hpp
@@ -0,0 +1,30 @@
+#ifndef __EVENT_LOOP_HPP__
+#define __EVENT_LOOP_HPP__
+
+#include <stout/duration.hpp>
+
+namespace process {
+
+// The interface that must be implemented by an event management
+// system. This is a class to cleanly isolate the interface and so
+// that in the future we can support multiple implementations.
+class EventLoop
+{
+public:
+  // Initializes the event loop.
+  static void initialize();
+
+  // Invoke the specified function in the event loop after the
+  // specified duration.
+  static void delay(const Duration& duration, void(*function)(void));
+
+  // Returns the current time w.r.t. the event loop.
+  static double time();
+
+  // Runs the event loop.
+  static void* run(void*);
+};
+
+} // namespace process {
+
+#endif // __EVENT_LOOP_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
index 8a557ce..0e8d44c 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -2,8 +2,11 @@
 
 #include <queue>
 
+#include <stout/duration.hpp>
 #include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
 
+#include "event_loop.hpp"
 #include "libev.hpp"
 
 namespace process {
@@ -23,4 +26,96 @@ std::queue<lambda::function<void(void)>>* functions =
 
 ThreadLocal<bool>* _in_event_loop_ = new ThreadLocal<bool>();
 
+
+void handle_async(struct ev_loop* loop, ev_async* _, int revents)
+{
+  synchronized (watchers) {
+    // Start all the new I/O watchers.
+    while (!watchers->empty()) {
+      ev_io* watcher = watchers->front();
+      watchers->pop();
+      ev_io_start(loop, watcher);
+    }
+
+    while (!functions->empty()) {
+      (functions->front())();
+      functions->pop();
+    }
+  }
+}
+
+
+void EventLoop::initialize()
+{
+  synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
+
+  loop = ev_default_loop(EVFLAG_AUTO);
+
+  ev_async_init(&async_watcher, handle_async);
+  ev_async_start(loop, &async_watcher);
+}
+
+
+namespace internal {
+
+void handle_delay(struct ev_loop* loop, ev_timer* timer, int revents)
+{
+  void(*function)(void) = reinterpret_cast<void(*)(void)>(timer->data);
+  function();
+  ev_timer_stop(loop, timer);
+  delete timer;
+}
+
+
+Future<Nothing> delay(const Duration& duration, void(*function)(void))
+{
+  ev_timer* timer = new ev_timer();
+  timer->data = reinterpret_cast<void*>(function);
+
+  // Determine the 'after' parameter to pass to libev and set it to 0
+  // in the event that it's negative so that we always make sure to
+  // invoke 'function' even if libev doesn't support negative 'after'
+  // values.
+  double after = duration.secs();
+
+  if (after < 0) {
+    after = 0;
+  }
+
+  const double repeat = 0.0;
+
+  ev_timer_init(timer, handle_delay, after, repeat);
+  ev_timer_start(loop, timer);
+
+  return Nothing();
+}
+
+} // namespace internal {
+
+
+void EventLoop::delay(const Duration& duration, void(*function)(void))
+{
+  run_in_event_loop<Nothing>(
+      lambda::bind(&internal::delay, duration, function));
+}
+
+
+double EventLoop::time()
+{
+  // TODO(benh): Versus ev_now()?
+  return ev_time();
+}
+
+
+void* EventLoop::run(void*)
+{
+  __in_event_loop__ = true;
+
+  ev_loop(loop, 0);
+
+  __in_event_loop__ = false;
+
+  return NULL;
+}
+
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index fdba949..d3dac4c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1,5 +1,4 @@
 #include <errno.h>
-#include <ev.h>
 #include <limits.h>
 #include <libgen.h>
 #include <netdb.h>
@@ -80,6 +79,7 @@
 #include "config.hpp"
 #include "decoder.hpp"
 #include "encoder.hpp"
+#include "event_loop.hpp"
 #include "gate.hpp"
 #include "libev.hpp"
 #include "process_reference.hpp"
@@ -585,24 +585,6 @@ static Message* parse(Request* request)
 }
 
 
-void handle_async(struct ev_loop* loop, ev_async* _, int revents)
-{
-  synchronized (watchers) {
-    // Start all the new I/O watchers.
-    while (!watchers->empty()) {
-      ev_io* watcher = watchers->front();
-      watchers->pop();
-      ev_io_start(loop, watcher);
-    }
-
-    while (!functions->empty()) {
-      (functions->front())();
-      functions->pop();
-    }
-  }
-}
-
-
 namespace internal {
 
 void decode_recv(
@@ -654,18 +636,6 @@ void decode_recv(
 } // namespace internal {
 
 
-void* serve(void* arg)
-{
-  __in_event_loop__ = true;
-
-  ev_loop(((struct ev_loop*) arg), 0);
-
-  __in_event_loop__ = false;
-
-  return NULL;
-}
-
-
 void* schedule(void* arg)
 {
   do {
@@ -896,21 +866,8 @@ void initialize(const string& delegate)
     PLOG(FATAL) << "Failed to initialize: " << listen.error();
   }
 
-  // Initialize libev.
-  //
-  // TODO(benh): Eventually move this all out of process.cpp after
-  // more is disentangled.
-  synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
-
-#ifdef __sun__
-  loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT);
-#else
-  loop = ev_default_loop(EVFLAG_AUTO);
-#endif // __sun__
-
-  ev_async_init(&async_watcher, handle_async);
-  ev_async_start(loop, &async_watcher);
-
+  // Initialize the event loop.
+  EventLoop::initialize();
   Clock::initialize(lambda::bind(&timedout, lambda::_1));
 
 //   ev_child_init(&child_watcher, child_exited, pid, 0);
@@ -929,7 +886,7 @@ void initialize(const string& delegate)
 //   sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
 
   pthread_t thread; // For now, not saving handles on our threads.
-  if (pthread_create(&thread, NULL, serve, loop) != 0) {
+  if (pthread_create(&thread, NULL, &EventLoop::run, NULL) != 0) {
     LOG(FATAL) << "Failed to initialize, pthread_create";
   }
 
@@ -2020,7 +1977,7 @@ void SocketManager::close(int s)
   // 'sockets' any attempt to send with it will just get ignored.
   // TODO(benh): Always do a 'shutdown(s, SHUT_RDWR)' since that
   // should keep the file descriptor valid until the last Socket
-  // reference does a close but force all libev watchers to stop?
+  // reference does a close but force all event loop watchers to stop?
 }
 
 


[11/12] mesos git commit: Virtualize Socket::Impl interface.

Posted by be...@apache.org.
Virtualize Socket::Impl interface.

Review: https://reviews.apache.org/r/28671


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d46e853
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d46e853
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d46e853

Branch: refs/heads/master
Commit: 2d46e853321469bcd7e2d3727254205c19809b7f
Parents: b3bb74c
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Dec 25 10:55:06 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am                |   1 +
 3rdparty/libprocess/include/process/socket.hpp |  62 ++++----
 3rdparty/libprocess/src/poll_socket.cpp        | 153 ++++++++++----------
 3rdparty/libprocess/src/poll_socket.hpp        |  29 ++++
 3rdparty/libprocess/src/socket.cpp             |  37 +++--
 5 files changed, 163 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 817355e..6ab9cb8 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -43,6 +43,7 @@ libprocess_la_SOURCES =		\
   src/metrics/metrics.cpp	\
   src/pid.cpp			\
   src/poll_socket.cpp		\
+  src/poll_socket.hpp		\
   src/process.cpp		\
   src/process_reference.hpp	\
   src/reap.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 436761b..ddb9e36 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -111,8 +111,8 @@ class Socket
 public:
   // Available kinds of implementations.
   enum Kind {
-    POLL
-    // TODO(jmlvanre): Add libevent socket.
+    POLL,
+    // TODO(jmlvanre): Add libevent SSL socket.
   };
 
   // Returns an instance of a Socket using the specified kind of
@@ -131,9 +131,7 @@ public:
   class Impl : public std::enable_shared_from_this<Impl>
   {
   public:
-    explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
-
-    ~Impl()
+    virtual ~Impl()
     {
       CHECK(s >= 0);
       Try<Nothing> close = os::close(s);
@@ -148,21 +146,21 @@ public:
       return s;
     }
 
-    Future<Nothing> connect(const Node& node);
-
-    Future<size_t> recv(char* data, size_t size);
-
-    Future<size_t> send(const char* data, size_t size);
-
-    Future<size_t> sendfile(int fd, off_t offset, size_t size);
+    // Socket::Impl interface.
+    virtual Try<Node> bind(const Node& node);
+    virtual Try<Nothing> listen(int backlog) = 0;
+    virtual Future<Socket> accept() = 0;
+    virtual Future<Nothing> connect(const Node& node) = 0;
+    virtual Future<size_t> recv(char* data, size_t size) = 0;
+    virtual Future<size_t> send(const char* data, size_t size) = 0;
+    virtual Future<size_t> sendfile(int fd, off_t offset, size_t size) = 0;
 
-    Try<Node> bind(const Node& node);
-
-    Try<Nothing> listen(int backlog);
+  protected:
+    explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
 
-    Future<Socket> accept();
+    // Construct a Socket wrapper from this implementation.
+    Socket socket() { return Socket(shared_from_this()); }
 
-  private:
     int s;
   };
 
@@ -181,44 +179,46 @@ public:
     return impl->get();
   }
 
-  Future<Nothing> connect(const Node& node)
+  Try<Node> bind(const Node& node)
   {
-    return impl->connect(node);
+    return impl->bind(node);
   }
 
-  Future<size_t> recv(char* data, size_t size) const
+  Try<Nothing> listen(int backlog)
   {
-    return impl->recv(data, size);
+    return impl->listen(backlog);
   }
 
-  Future<size_t> send(const char* data, size_t size) const
+  Future<Socket> accept()
   {
-    return impl->send(data, size);
+    return impl->accept();
   }
 
-  Future<size_t> sendfile(int fd, off_t offset, size_t size) const
+  Future<Nothing> connect(const Node& node)
   {
-    return impl->sendfile(fd, offset, size);
+    return impl->connect(node);
   }
 
-  Try<Node> bind(const Node& node)
+  Future<size_t> recv(char* data, size_t size) const
   {
-    return impl->bind(node);
+    return impl->recv(data, size);
   }
 
-  Try<Nothing> listen(int backlog)
+  Future<size_t> send(const char* data, size_t size) const
   {
-    return impl->listen(backlog);
+    return impl->send(data, size);
   }
 
-  Future<Socket> accept()
+  Future<size_t> sendfile(int fd, off_t offset, size_t size) const
   {
-    return impl->accept();
+    return impl->sendfile(fd, offset, size);
   }
 
 private:
   explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
 
+  explicit Socket(const std::shared_ptr<Impl>& that) : impl(that) {}
+
   std::shared_ptr<Impl> impl;
 };
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 09cd5a2..2e70c6c 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -4,12 +4,81 @@
 #include <process/socket.hpp>
 
 #include "config.hpp"
+#include "poll_socket.hpp"
 
 using std::string;
 
 namespace process {
 namespace network {
 
+Try<std::shared_ptr<Socket::Impl>> PollSocketImpl::create(int s)
+{
+  return std::make_shared<PollSocketImpl>(s);
+}
+
+
+Try<Nothing> PollSocketImpl::listen(int backlog)
+{
+  if (::listen(get(), backlog) < 0) {
+    return ErrnoError();
+  }
+  return Nothing();
+}
+
+
+namespace internal {
+
+Future<Socket> accept(int fd)
+{
+  Try<int> accepted = network::accept(fd, AF_INET);
+  if (accepted.isError()) {
+    return Failure(accepted.error());
+  }
+
+  int s = accepted.get();
+  Try<Nothing> nonblock = os::nonblock(s);
+  if (nonblock.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+                                << nonblock.error();
+    os::close(s);
+    return Failure("Failed to accept, nonblock: " + nonblock.error());
+  }
+
+  Try<Nothing> cloexec = os::cloexec(s);
+  if (cloexec.isError()) {
+    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+                                << cloexec.error();
+    os::close(s);
+    return Failure("Failed to accept, cloexec: " + cloexec.error());
+  }
+
+  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+  int on = 1;
+  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+    const char* error = strerror(errno);
+    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+    os::close(s);
+    return Failure(
+      "Failed to turn off the Nagle algorithm: " + stringify(error));
+  }
+
+  Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
+  if (socket.isError()) {
+    return Failure("Failed to accept, create socket: " + socket.error());
+  }
+  return socket.get();
+}
+
+} // namespace internal {
+
+
+Future<Socket> PollSocketImpl::accept()
+{
+  return io::poll(get(), io::READ)
+    .then(lambda::bind(&internal::accept, get()));
+}
+
+
 namespace internal {
 
 Future<Nothing> connect(const Socket& socket)
@@ -31,13 +100,13 @@ Future<Nothing> connect(const Socket& socket)
 } // namespace internal {
 
 
-Future<Nothing> Socket::Impl::connect(const Node& node)
+Future<Nothing> PollSocketImpl::connect(const Node& node)
 {
   Try<int> connect = network::connect(get(), node);
   if (connect.isError()) {
     if (errno == EINPROGRESS) {
       return io::poll(get(), io::WRITE)
-        .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+        .then(lambda::bind(&internal::connect, socket()));
     }
 
     return Failure(connect.error());
@@ -47,7 +116,7 @@ Future<Nothing> Socket::Impl::connect(const Node& node)
 }
 
 
-Future<size_t> Socket::Impl::recv(char* data, size_t size)
+Future<size_t> PollSocketImpl::recv(char* data, size_t size)
 {
   return io::read(get(), data, size);
 }
@@ -129,92 +198,18 @@ Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
 } // namespace internal {
 
 
-Future<size_t> Socket::Impl::send(const char* data, size_t size)
+Future<size_t> PollSocketImpl::send(const char* data, size_t size)
 {
   return io::poll(get(), io::WRITE)
     .then(lambda::bind(&internal::socket_send_data, get(), data, size));
 }
 
 
-Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
+Future<size_t> PollSocketImpl::sendfile(int fd, off_t offset, size_t size)
 {
   return io::poll(get(), io::WRITE)
     .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
 }
 
-
-Try<Node> Socket::Impl::bind(const Node& node)
-{
-  Try<int> bind = network::bind(get(), node);
-  if (bind.isError()) {
-    return Error(bind.error());
-  }
-
-  // Lookup and store assigned ip and assigned port.
-  return network::getsockname(get(), AF_INET);
-}
-
-
-Try<Nothing> Socket::Impl::listen(int backlog)
-{
-  if (::listen(get(), backlog) < 0) {
-    return ErrnoError();
-  }
-  return Nothing();
-}
-
-
-namespace internal {
-
-Future<Socket> accept(int fd)
-{
-  Try<int> accepted = network::accept(fd, AF_INET);
-  if (accepted.isError()) {
-    return Failure(accepted.error());
-  }
-
-  int s = accepted.get();
-  Try<Nothing> nonblock = os::nonblock(s);
-  if (nonblock.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
-                                << nonblock.error();
-    os::close(s);
-    return Failure("Failed to accept, nonblock: " + nonblock.error());
-  }
-
-  Try<Nothing> cloexec = os::cloexec(s);
-  if (cloexec.isError()) {
-    LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
-                                << cloexec.error();
-    os::close(s);
-    return Failure("Failed to accept, cloexec: " + cloexec.error());
-  }
-
-  // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
-  int on = 1;
-  if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
-    const char* error = strerror(errno);
-    VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
-    os::close(s);
-    return Failure(
-      "Failed to turn off the Nagle algorithm: " + stringify(error));
-  }
-
-  Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
-  if (socket.isError()) {
-    return Failure("Failed to accept, create socket: " + socket.error());
-  }
-  return socket.get();
-}
-
-} // namespace internal {
-
-
-Future<Socket> Socket::Impl::accept()
-{
-  return io::poll(get(), io::READ)
-    .then(lambda::bind(&internal::accept, get()));
-}
-
 } // namespace network {
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/src/poll_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.hpp b/3rdparty/libprocess/src/poll_socket.hpp
new file mode 100644
index 0000000..f7ca08e
--- /dev/null
+++ b/3rdparty/libprocess/src/poll_socket.hpp
@@ -0,0 +1,29 @@
+#include <memory>
+
+#include <process/socket.hpp>
+
+#include <stout/try.hpp>
+
+namespace process {
+namespace network {
+
+class PollSocketImpl : public Socket::Impl
+{
+public:
+  static Try<std::shared_ptr<Socket::Impl>> create(int s);
+
+  PollSocketImpl(int s) : Socket::Impl(s) {}
+
+  virtual ~PollSocketImpl() {}
+
+  // Implementation of the Socket::Impl interface.
+  virtual Try<Nothing> listen(int backlog);
+  virtual Future<Socket> accept();
+  virtual Future<Nothing> connect(const Node& node);
+  virtual Future<size_t> recv(char* data, size_t size);
+  virtual Future<size_t> send(const char* data, size_t size);
+  virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
+};
+
+} // namespace network {
+} // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 842a6ec..4b0f6be 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -1,16 +1,10 @@
 #include <process/socket.hpp>
 
+#include "poll_socket.hpp"
+
 namespace process {
 namespace network {
 
-const Socket::Kind& Socket::DEFAULT_KIND()
-{
-  // TODO(jmlvanre): Change the default based on configure or
-  // environment flags.
-  static const Kind DEFAULT = POLL;
-  return DEFAULT;
-}
-
 Try<Socket> Socket::create(Kind kind, int s)
 {
   if (s < 0) {
@@ -44,7 +38,11 @@ Try<Socket> Socket::create(Kind kind, int s)
 
   switch (kind) {
     case POLL: {
-      return Socket(std::make_shared<Socket::Impl>(s));
+      Try<std::shared_ptr<Socket::Impl>> socket = PollSocketImpl::create(s);
+      if (socket.isError()) {
+        return Error(socket.error());
+      }
+      return Socket(socket.get());
     }
     // By not setting a default we leverage the compiler errors when
     // the enumeration is augmented to find all the cases we need to
@@ -52,5 +50,26 @@ Try<Socket> Socket::create(Kind kind, int s)
   }
 }
 
+
+const Socket::Kind& Socket::DEFAULT_KIND()
+{
+  // TODO(jmlvanre): Change the default based on configure or
+  // environment flags.
+  static const Kind DEFAULT = POLL;
+  return DEFAULT;
+}
+
+
+Try<Node> Socket::Impl::bind(const Node& node)
+{
+  Try<int> bind = network::bind(get(), node);
+  if (bind.isError()) {
+    return Error(bind.error());
+  }
+
+  // Lookup and store assigned IP and assigned port.
+  return network::getsockname(get(), AF_INET);
+}
+
 } // namespace network {
 } // namespace process {