You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/25 22:05:52 UTC
[01/12] mesos git commit: Add multiple include guard to libev.
Repository: mesos
Updated Branches:
refs/heads/master c51312665 -> 82fdbc7fe
Add multiple include guard to libev.
Review: https://reviews.apache.org/r/28320
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3347a2b7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3347a2b7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3347a2b7
Branch: refs/heads/master
Commit: 3347a2b794bfaf2b02981bdd9e9eee017134201e
Parents: 252d3c7
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:03:12 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Dec 20 17:59:55 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/libev.hpp | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3347a2b7/3rdparty/libprocess/src/libev.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.hpp b/3rdparty/libprocess/src/libev.hpp
index a43e7d7..e4a403d 100644
--- a/3rdparty/libprocess/src/libev.hpp
+++ b/3rdparty/libprocess/src/libev.hpp
@@ -1,3 +1,6 @@
+#ifndef __LIBEV_HPP__
+#define __LIBEV_HPP__
+
#include <ev.h>
#include <queue>
@@ -80,3 +83,5 @@ Future<T> run_in_event_loop(const lambda::function<Future<T>(void)>& f)
}
} // namespace process {
+
+#endif // __LIBEV_HPP__
[10/12] mesos git commit: Fixed compilation issue on OS X with libev.
Posted by be...@apache.org.
Fixed compilation issue on OS X with libev.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/82fdbc7f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/82fdbc7f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/82fdbc7f
Branch: refs/heads/master
Commit: 82fdbc7fe6e4eaeeceeb05124e17a9079f4f1699
Parents: 2d46e85
Author: Benjamin Hindman <be...@gmail.com>
Authored: Thu Dec 25 11:53:26 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 5 -----
1 file changed, 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/82fdbc7f/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 25981ca..67b6b3b 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -81,11 +81,6 @@
#include "encoder.hpp"
#include "event_loop.hpp"
#include "gate.hpp"
-#ifndef USE_LIBEVENT
-#include "libev.hpp"
-#else
-#include "libevent.hpp"
-#endif // USE_LIBEVENT
#include "process_reference.hpp"
#include "synchronized.hpp"
[04/12] mesos git commit: Initialized EventLoop and Clock earlier.
Posted by be...@apache.org.
Initialized EventLoop and Clock earlier.
Review: https://reviews.apache.org/r/29225
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/37be6031
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/37be6031
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/37be6031
Branch: refs/heads/master
Commit: 37be603106f601c6e656955ef8c903995379da9e
Parents: dd37344
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:02:27 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Dec 20 17:59:55 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/process.cpp | 48 ++++++++++++++++----------------
1 file changed, 24 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/37be6031/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index d3dac4c..2aeb815 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -798,6 +798,30 @@ void initialize(const string& delegate)
}
}
+ // Initialize the event loop.
+ EventLoop::initialize();
+ Clock::initialize(lambda::bind(&timedout, lambda::_1));
+
+// ev_child_init(&child_watcher, child_exited, pid, 0);
+// ev_child_start(loop, &cw);
+
+// /* Install signal handler. */
+// struct sigaction sa;
+
+// sa.sa_handler = ev_sighandler;
+// sigfillset (&sa.sa_mask);
+// sa.sa_flags = SA_RESTART; /* if restarting works we save one iteration */
+// sigaction (w->signum, &sa, 0);
+
+// sigemptyset (&sa.sa_mask);
+// sigaddset (&sa.sa_mask, w->signum);
+// sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
+
+ pthread_t thread; // For now, not saving handles on our threads.
+ if (pthread_create(&thread, NULL, &EventLoop::run, NULL) != 0) {
+ LOG(FATAL) << "Failed to initialize, pthread_create";
+ }
+
__node__.ip = 0;
__node__.port = 0;
@@ -866,30 +890,6 @@ void initialize(const string& delegate)
PLOG(FATAL) << "Failed to initialize: " << listen.error();
}
- // Initialize the event loop.
- EventLoop::initialize();
- Clock::initialize(lambda::bind(&timedout, lambda::_1));
-
-// ev_child_init(&child_watcher, child_exited, pid, 0);
-// ev_child_start(loop, &cw);
-
-// /* Install signal handler. */
-// struct sigaction sa;
-
-// sa.sa_handler = ev_sighandler;
-// sigfillset (&sa.sa_mask);
-// sa.sa_flags = SA_RESTART; /* if restarting works we save one iteration */
-// sigaction (w->signum, &sa, 0);
-
-// sigemptyset (&sa.sa_mask);
-// sigaddset (&sa.sa_mask, w->signum);
-// sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
-
- pthread_t thread; // For now, not saving handles on our threads.
- if (pthread_create(&thread, NULL, &EventLoop::run, NULL) != 0) {
- LOG(FATAL) << "Failed to initialize, pthread_create";
- }
-
// Need to set initialzing here so that we can actually invoke
// 'spawn' below for the garbage collector.
initializing = false;
[07/12] mesos git commit: Introduce libevent poll implementation.
Posted by be...@apache.org.
Introduce libevent poll implementation.
Review: https://reviews.apache.org/r/28323
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f4a41805
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f4a41805
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f4a41805
Branch: refs/heads/master
Commit: f4a41805774afb658cfd30c1b280599448e00b1b
Parents: a027dd1
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:43:54 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:12 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 3 +-
3rdparty/libprocess/src/libevent_poll.cpp | 74 ++++++++++++++++++++++++++
2 files changed, 76 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f4a41805/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index b95c0a9..09fce46 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -60,7 +60,8 @@ libprocess_la_CPPFLAGS = \
if ENABLE_LIBEVENT
libprocess_la_SOURCES += \
src/libevent.hpp \
- src/libevent.cpp
+ src/libevent.cpp \
+ src/libevent_poll.cpp
else
libprocess_la_SOURCES += \
src/libev.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/f4a41805/3rdparty/libprocess/src/libevent_poll.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent_poll.cpp b/3rdparty/libprocess/src/libevent_poll.cpp
new file mode 100644
index 0000000..d0b8946
--- /dev/null
+++ b/3rdparty/libprocess/src/libevent_poll.cpp
@@ -0,0 +1,74 @@
+#include <event2/event.h>
+
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/process.hpp> // For process::initialize.
+
+#include "libevent.hpp"
+
+namespace process {
+
+namespace io {
+namespace internal {
+
+struct Poll
+{
+ Promise<short> promise;
+ event* ev;
+};
+
+
+void pollCallback(evutil_socket_t, short what, void* arg)
+{
+ Poll* poll = reinterpret_cast<Poll*>(arg);
+
+ if (poll->promise.future().hasDiscard()) {
+ poll->promise.discard();
+ } else {
+ // Convert libevent specific EV_READ / EV_WRITE to io::* specific
+ // values of these enumerations.
+ short events =
+ ((what & EV_READ) ? io::READ : 0) | ((what & EV_WRITE) ? io::WRITE : 0);
+
+ poll->promise.set(events);
+ }
+
+ event_free(poll->ev);
+ delete poll;
+}
+
+
+void pollDiscard(event* ev)
+{
+ event_active(ev, EV_READ, 0);
+}
+
+} // namespace internal {
+
+
+Future<short> poll(int fd, short events)
+{
+ process::initialize();
+
+ internal::Poll* poll = new internal::Poll();
+
+ Future<short> future = poll->promise.future();
+
+ // Convert io::READ / io::WRITE to libevent specific values of these
+ // enumerations.
+ short what =
+ ((events & io::READ) ? EV_READ : 0) | ((events & io::WRITE) ? EV_WRITE : 0);
+
+ poll->ev = event_new(base, fd, what, &internal::pollCallback, poll);
+ if (poll->ev == NULL) {
+ LOG(FATAL) << "Failed to poll, event_new";
+ }
+
+ event_add(poll->ev, NULL);
+
+ return future
+ .onDiscard(lambda::bind(&internal::pollDiscard, poll->ev));
+}
+
+} // namespace io {
+} // namespace process {
[08/12] mesos git commit: Introduce libevent header and
implementation.
Posted by be...@apache.org.
Introduce libevent header and implementation.
Review: https://reviews.apache.org/r/28321
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0e29981b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0e29981b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0e29981b
Branch: refs/heads/master
Commit: 0e29981b8bac58886f515485403f3b0c585e18e6
Parents: 3347a2b
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:03:28 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:12 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 18 ++++++++-----
3rdparty/libprocess/configure.ac | 15 +++++++++++
3rdparty/libprocess/src/libevent.cpp | 44 +++++++++++++++++++++++++++++++
3rdparty/libprocess/src/libevent.hpp | 11 ++++++++
3rdparty/libprocess/src/process.cpp | 2 +-
5 files changed, 83 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e29981b/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 5c92ee5..b95c0a9 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -58,7 +58,9 @@ libprocess_la_CPPFLAGS = \
$(AM_CPPFLAGS)
if ENABLE_LIBEVENT
-# Place for libevent implementations.
+libprocess_la_SOURCES += \
+ src/libevent.hpp \
+ src/libevent.cpp
else
libprocess_la_SOURCES += \
src/libev.hpp \
@@ -82,16 +84,20 @@ else
HTTP_PARSER_LIB = -lhttp_parser
endif
+if ENABLE_LIBEVENT
+ EVENT_LIB = -levent -levent_pthreads
+else
if WITH_BUNDLED_LIBEV
- LIBEV_LIB = $(LIBEV)/libev.la
+ EVENT_LIB = $(LIBEV)/libev.la
else
- LIBEV_LIB = -lev
+ EVENT_LIB = -lev
+endif
endif
libprocess_la_LIBADD = \
$(LIBGLOG) \
$(HTTP_PARSER_LIB) \
- $(LIBEV_LIB)
+ $(EVENT_LIB)
if HAS_GPERFTOOLS
libprocess_la_CPPFLAGS += -I$(GPERFTOOLS)/src
@@ -132,7 +138,7 @@ tests_LDADD = \
libprocess.la \
$(LIBGLOG) \
$(HTTP_PARSER_LIB) \
- $(LIBEV_LIB)
+ $(EVENT_LIB)
benchmarks_SOURCES = \
src/tests/benchmarks.cpp
@@ -148,7 +154,7 @@ benchmarks_LDADD = \
libprocess.la \
$(LIBGLOG) \
$(HTTP_PARSER_LIB) \
- $(LIBEV_LIB)
+ $(EVENT_LIB)
# We use a check-local target for now to avoid the parallel test
# runner that ships with newer versions of autotools.
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e29981b/3rdparty/libprocess/configure.ac
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/configure.ac b/3rdparty/libprocess/configure.ac
index 2f01a3b..19bc601 100644
--- a/3rdparty/libprocess/configure.ac
+++ b/3rdparty/libprocess/configure.ac
@@ -589,6 +589,21 @@ libevent is required for mesos to build.
if test "x$found_libevent" = "xyes"; then
AC_DEFINE([USE_LIBEVENT], [1])
fi
+ AC_CHECK_HEADERS([event2/thread.h],
+ [AC_CHECK_LIB([event_pthreads],
+ [evthread_use_pthreads],
+ [found_libevent_thread=yes],
+ [AC_MSG_ERROR([cannot find libevent_pthreads
+-------------------------------------------------------------------
+libevent_pthreads is required for mesos to build.
+-------------------------------------------------------------------
+ ])],
+ [-levent])],
+ [AC_MSG_ERROR([cannot find libevent_pthreads headers
+-------------------------------------------------------------------
+libevent_pthreads is required for mesos to build.
+-------------------------------------------------------------------
+ ])])
fi
AS_IF([test "x${ac_cv_env_CFLAGS_set}" = "x"],
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e29981b/3rdparty/libprocess/src/libevent.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.cpp b/3rdparty/libprocess/src/libevent.cpp
new file mode 100644
index 0000000..524d811
--- /dev/null
+++ b/3rdparty/libprocess/src/libevent.cpp
@@ -0,0 +1,44 @@
+#include <unistd.h>
+
+#include <event2/event.h>
+#include <event2/thread.h>
+
+#include <process/logging.hpp>
+
+#include "event_loop.hpp"
+#include "libevent.hpp"
+
+namespace process {
+
+struct event_base* base = NULL;
+
+void* EventLoop::run(void*)
+{
+ int result = event_base_loop(base, 0);
+ if (result < 0) {
+ LOG(FATAL) << "Failed to run event loop";
+ } else if (result == 1) {
+ VLOG(1) << "Finished running event loop due to lack of events";
+ }
+
+ return NULL;
+}
+
+
+void EventLoop::initialize()
+{
+ if (evthread_use_pthreads() < 0) {
+ LOG(FATAL) << "Failed to initialize, evthread_use_pthreads";
+ }
+
+ // This enables debugging of libevent calls. We can remove this
+ // when the implementation settles and after we gain confidence.
+ event_enable_debug_mode();
+
+ base = event_base_new();
+ if (base == NULL) {
+ LOG(FATAL) << "Failed to initialize, event_base_new";
+ }
+}
+
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e29981b/3rdparty/libprocess/src/libevent.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.hpp b/3rdparty/libprocess/src/libevent.hpp
new file mode 100644
index 0000000..f6cc721
--- /dev/null
+++ b/3rdparty/libprocess/src/libevent.hpp
@@ -0,0 +1,11 @@
+#ifndef __LIBEVENT_HPP__
+#define __LIBEVENT_HPP__
+
+namespace process {
+
+// Event loop.
+extern struct event_base* base;
+
+} // namespace process {
+
+#endif // __LIBEVENT_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e29981b/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index fabcca4..441ce48 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -84,7 +84,7 @@
#ifndef USE_LIBEVENT
#include "libev.hpp"
#else
-// Place for libevent header.
+#include "libevent.hpp"
#endif // USE_LIBEVENT
#include "process_reference.hpp"
#include "synchronized.hpp"
[09/12] mesos git commit: Add a factory pattern for constructing a
Socket.
Posted by be...@apache.org.
Add a factory pattern for constructing a Socket.
Add a factory pattern to allow construction of different
implementations of the socket interface. Remove default constructor
from Socket.
Review: https://reviews.apache.org/r/28670
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b3bb74c2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b3bb74c2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b3bb74c2
Branch: refs/heads/master
Commit: b3bb74c2359c6cf3d73bcb0e57a4732af804a15b
Parents: 62ad3f2
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 17:26:13 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/include/process/socket.hpp | 76 +++++-----------
3rdparty/libprocess/src/poll_socket.cpp | 6 +-
3rdparty/libprocess/src/process.cpp | 92 +++++++++++++-------
3rdparty/libprocess/src/socket.cpp | 56 ++++++++++++
3rdparty/libprocess/src/tests/decoder_tests.cpp | 12 ++-
6 files changed, 152 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index f401232..817355e 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -46,6 +46,7 @@ libprocess_la_SOURCES = \
src/process.cpp \
src/process_reference.hpp \
src/reap.cpp \
+ src/socket.cpp \
src/subprocess.cpp \
src/synchronized.hpp \
src/timeseries.cpp
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 7e1e3f2..436761b 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -109,6 +109,20 @@ inline Try<Node> getsockname(int s, sa_family_t family)
class Socket
{
public:
+ // Available kinds of implementations.
+ enum Kind {
+ POLL
+ // TODO(jmlvanre): Add libevent socket.
+ };
+
+ // Returns an instance of a Socket using the specified kind of
+ // implementation and potentially wrapping the specified file
+ // descriptor.
+ static Try<Socket> create(Kind kind = DEFAULT_KIND(), int s = -1);
+
+ // Returns the default kind of implementation of Socket.
+ static const Kind& DEFAULT_KIND();
+
// Each socket is a reference counted, shared by default, concurrent
// object. However, since we want to support multiple
// implementations we use the Pimpl pattern (often called the
@@ -117,24 +131,21 @@ public:
class Impl : public std::enable_shared_from_this<Impl>
{
public:
- Impl() : s(-1) {}
-
- explicit Impl(int _s) : s(_s) {}
+ explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
~Impl()
{
- if (s >= 0) {
- Try<Nothing> close = os::close(s);
- if (close.isError()) {
- ABORT("Failed to close socket " +
- stringify(s) + ": " + close.error());
- }
+ CHECK(s >= 0);
+ Try<Nothing> close = os::close(s);
+ if (close.isError()) {
+ ABORT("Failed to close socket " +
+ stringify(s) + ": " + close.error());
}
}
int get() const
{
- return s >= 0 ? s : create().get();
+ return s;
}
Future<Nothing> connect(const Node& node);
@@ -152,52 +163,9 @@ public:
Future<Socket> accept();
private:
- const Impl& create() const
- {
- CHECK(s < 0);
-
- // Supported in Linux >= 2.6.27.
-#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
- Try<int> fd =
- network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
-
- if (fd.isError()) {
- ABORT("Failed to create socket: " + fd.error());
- }
-#else
- Try<int> fd = network::socket(AF_INET, SOCK_STREAM, 0);
- if (fd.isError()) {
- ABORT("Failed to create socket: " + fd.error());
- }
-
- Try<Nothing> nonblock = os::nonblock(fd.get());
- if (nonblock.isError()) {
- ABORT("Failed to create socket, nonblock: " + nonblock.error());
- }
-
- Try<Nothing> cloexec = os::cloexec(fd.get());
- if (cloexec.isError()) {
- ABORT("Failed to create socket, cloexec: " + cloexec.error());
- }
-#endif
-
- s = fd.get();
- return *this;
- }
-
- // Mutable so that the socket can be lazily created.
- //
- // TODO(benh): Create a factory for sockets and don't lazily
- // create but instead return a Try<Socket> from the factory in
- // order to eliminate the need for a mutable member or the call to
- // ABORT above.
- mutable int s;
+ int s;
};
- Socket() : impl(std::make_shared<Impl>()) {}
-
- explicit Socket(int s) : impl(std::make_shared<Impl>(s)) {}
-
bool operator == (const Socket& that) const
{
return impl == that.impl;
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 5202750..09cd5a2 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -200,7 +200,11 @@ Future<Socket> accept(int fd)
"Failed to turn off the Nagle algorithm: " + stringify(error));
}
- return Socket(s);
+ Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
+ if (socket.isError()) {
+ return Failure("Failed to accept, create socket: " + socket.error());
+ }
+ return socket.get();
}
} // namespace internal {
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 72c0ad4..25981ca 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -306,7 +306,7 @@ private:
} links;
// Collection of all actice sockets.
- map<int, Socket> sockets;
+ map<int, Socket*> sockets;
// Collection of sockets that should be disposed when they are
// finished being used (e.g., when there is no more data to send on
@@ -452,7 +452,7 @@ static uint32_t __id__ = 0;
static const int LISTEN_BACKLOG = 500000;
// Local server socket.
-static Socket __s__;
+static Socket* __s__ = NULL;
// Local node.
static Node __node__;
@@ -721,7 +721,7 @@ void on_accept(const Future<Socket>& socket)
decoder));
}
- __s__.accept()
+ __s__->accept()
.onAny(lambda::bind(&on_accept, lambda::_1));
}
@@ -853,14 +853,19 @@ void initialize(const string& delegate)
}
// Create a "server" socket for communicating with other nodes.
+ Try<Socket> create = Socket::create();
+ if (create.isError()) {
+ PLOG(FATAL) << "Failed to construct server socket:" << create.error();
+ }
+ __s__ = new Socket(create.get());
// Allow address reuse.
int on = 1;
- if (setsockopt(__s__, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
+ if (setsockopt(__s__->get(), SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {
PLOG(FATAL) << "Failed to initialize, setsockopt(SO_REUSEADDR)";
}
- Try<Node> bind = __s__.bind(__node__);
+ Try<Node> bind = __s__->bind(__node__);
if (bind.isError()) {
PLOG(FATAL) << "Failed to initialize: " << bind.error();
}
@@ -889,7 +894,7 @@ void initialize(const string& delegate)
__node__.ip = ip.get();
}
- Try<Nothing> listen = __s__.listen(LISTEN_BACKLOG);
+ Try<Nothing> listen = __s__->listen(LISTEN_BACKLOG);
if (listen.isError()) {
PLOG(FATAL) << "Failed to initialize: " << listen.error();
}
@@ -898,7 +903,7 @@ void initialize(const string& delegate)
// 'spawn' below for the garbage collector.
initializing = false;
- __s__.accept()
+ __s__->accept()
.onAny(lambda::bind(&internal::on_accept, lambda::_1));
// TODO(benh): Make sure creating the garbage collector, logging
@@ -1232,7 +1237,7 @@ SocketManager::~SocketManager() {}
void SocketManager::accepted(const Socket& socket)
{
synchronized (this) {
- sockets[socket] = socket;
+ sockets[socket] = new Socket(socket);
}
}
@@ -1324,22 +1329,22 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
CHECK(process != NULL);
- Socket socket;
+ Option<Socket> socket = None();
bool connect = false;
synchronized (this) {
- links.linkers[to].insert(process);
- links.linkees[process].insert(to);
- if (to.node != __node__) {
- links.remotes[to.node].insert(to);
- }
-
// Check if node is remote and there isn't a persistant link.
if (to.node != __node__ && persists.count(to.node) == 0) {
// Okay, no link, let's create a socket.
- int s = socket;
+ Try<Socket> create = Socket::create();
+ if (create.isError()) {
+ VLOG(1) << "Failed to link, create socket: " << create.error();
+ return;
+ }
+ socket = create.get();
+ int s = socket.get().get();
- sockets[s] = socket;
+ sockets[s] = new Socket(socket.get());
nodes[s] = to.node;
persists[to.node] = s;
@@ -1353,14 +1358,21 @@ void SocketManager::link(ProcessBase* process, const UPID& to)
connect = true;
}
+
+ links.linkers[to].insert(process);
+ links.linkees[process].insert(to);
+ if (to.node != __node__) {
+ links.remotes[to.node].insert(to);
+ }
}
if (connect) {
- socket.connect(to.node)
+ CHECK_SOME(socket);
+ Socket(socket.get()).connect(to.node) // Copy to drop const.
.onAny(lambda::bind(
&internal::link_connect,
lambda::_1,
- new Socket(socket)));
+ new Socket(socket.get())));
}
}
@@ -1377,7 +1389,7 @@ PID<HttpProxy> SocketManager::proxy(const Socket& socket)
if (proxies.count(socket) > 0) {
return proxies[socket]->self();
} else {
- proxy = new HttpProxy(sockets[socket]);
+ proxy = new HttpProxy(*sockets[socket]);
proxies[socket] = proxy;
}
}
@@ -1569,7 +1581,7 @@ void SocketManager::send(Message* message)
const Node& node = message->to.node;
- Socket socket;
+ Option<Socket> socket = None();
bool connect = false;
synchronized (this) {
@@ -1579,28 +1591,35 @@ void SocketManager::send(Message* message)
if (persist || temp) {
int s = persist ? persists[node] : temps[node];
CHECK(sockets.count(s) > 0);
- socket = sockets[s];
+ socket = *sockets[s];
// Update whether or not this socket should get disposed after
// there is no more data to send.
if (!persist) {
- dispose.insert(socket);
+ dispose.insert(socket.get());
}
- if (outgoing.count(socket) > 0) {
- outgoing[socket].push(new MessageEncoder(socket, message));
+ if (outgoing.count(socket.get()) > 0) {
+ outgoing[socket.get()].push(new MessageEncoder(socket.get(), message));
return;
} else {
// Initialize the outgoing queue.
- outgoing[socket];
+ outgoing[socket.get()];
}
} else {
// No peristent or temporary socket to the node currently
// exists, so we create a temporary one.
- int s = socket;
+ Try<Socket> create = Socket::create();
+ if (create.isError()) {
+ VLOG(1) << "Failed to send, create socket: " << create.error();
+ delete message;
+ return;
+ }
+ socket = create.get();
+ int s = socket.get();
- sockets[s] = socket;
+ sockets[s] = new Socket(socket.get());
nodes[s] = node;
temps[node] = s;
@@ -1614,16 +1633,19 @@ void SocketManager::send(Message* message)
}
if (connect) {
- socket.connect(node)
+ CHECK_SOME(socket);
+ Socket(socket.get()).connect(node) // Copy to drop const.
.onAny(lambda::bind(
&internal::send_connect,
lambda::_1,
- new Socket(socket),
+ new Socket(socket.get()),
message));
} else {
// If we're not connecting and we haven't added the encoder to
// the 'outgoing' queue then schedule it to be sent.
- internal::send(new MessageEncoder(socket, message), new Socket(socket));
+ internal::send(
+ new MessageEncoder(socket.get(), message),
+ new Socket(socket.get()));
}
}
@@ -1675,7 +1697,9 @@ Encoder* SocketManager::next(int s)
}
dispose.erase(s);
- sockets.erase(s);
+ auto iterator = sockets.find(s);
+ delete iterator->second;
+ sockets.erase(iterator);
// We don't actually close the socket (we wait for the Socket
// abstraction to close it once there are no more references),
@@ -1752,7 +1776,9 @@ void SocketManager::close(int s)
shutdown(s, SHUT_RD);
dispose.erase(s);
- sockets.erase(s);
+ auto iterator = sockets.find(s);
+ delete iterator->second;
+ sockets.erase(iterator);
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
new file mode 100644
index 0000000..842a6ec
--- /dev/null
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -0,0 +1,56 @@
+#include <process/socket.hpp>
+
+namespace process {
+namespace network {
+
+const Socket::Kind& Socket::DEFAULT_KIND()
+{
+ // TODO(jmlvanre): Change the default based on configure or
+ // environment flags.
+ static const Kind DEFAULT = POLL;
+ return DEFAULT;
+}
+
+Try<Socket> Socket::create(Kind kind, int s)
+{
+ if (s < 0) {
+ // Supported in Linux >= 2.6.27.
+#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC)
+ Try<int> fd =
+ network::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
+
+ if (fd.isError()) {
+ return Error("Failed to create socket: " + fd.error());
+ }
+#else
+ Try<int> fd = network::socket(AF_INET, SOCK_STREAM, 0);
+ if (fd.isError()) {
+ return Error("Failed to create socket: " + fd.error());
+ }
+
+ Try<Nothing> nonblock = os::nonblock(fd.get());
+ if (nonblock.isError()) {
+ return Error("Failed to create socket, nonblock: " + nonblock.error());
+ }
+
+ Try<Nothing> cloexec = os::cloexec(fd.get());
+ if (cloexec.isError()) {
+ return Error("Failed to create socket, cloexec: " + cloexec.error());
+ }
+#endif
+
+ s = fd.get();
+ }
+
+ switch (kind) {
+ case POLL: {
+ return Socket(std::make_shared<Socket::Impl>(s));
+ }
+ // By not setting a default we leverage the compiler errors when
+ // the enumeration is augmented to find all the cases we need to
+ // provide.
+ }
+}
+
+} // namespace network {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/b3bb74c2/3rdparty/libprocess/src/tests/decoder_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/decoder_tests.cpp b/3rdparty/libprocess/src/tests/decoder_tests.cpp
index ef52798..d65f5cf 100644
--- a/3rdparty/libprocess/src/tests/decoder_tests.cpp
+++ b/3rdparty/libprocess/src/tests/decoder_tests.cpp
@@ -19,7 +19,9 @@ using process::network::Socket;
TEST(Decoder, Request)
{
- DataDecoder decoder = DataDecoder(Socket());
+ Try<Socket> socket = Socket::create();
+ ASSERT_SOME(socket);
+ DataDecoder decoder = DataDecoder(socket.get());
const string& data =
"GET /path/file.json?key1=value1&key2=value2#fragment HTTP/1.1\r\n"
@@ -55,7 +57,9 @@ TEST(Decoder, Request)
TEST(Decoder, RequestHeaderContinuation)
{
- DataDecoder decoder = DataDecoder(Socket());
+ Try<Socket> socket = Socket::create();
+ ASSERT_SOME(socket);
+ DataDecoder decoder = DataDecoder(socket.get());
const string& data =
"GET /path/file.json HTTP/1.1\r\n"
@@ -79,7 +83,9 @@ TEST(Decoder, RequestHeaderContinuation)
// This is expected to fail for now, see my TODO(bmahler) on http::Request.
TEST(Decoder, DISABLED_RequestHeaderCaseInsensitive)
{
- DataDecoder decoder = DataDecoder(Socket());
+ Try<Socket> socket = Socket::create();
+ ASSERT_SOME(socket);
+ DataDecoder decoder = DataDecoder(socket.get());
const string& data =
"GET /path/file.json HTTP/1.1\r\n"
[06/12] mesos git commit: Introduce libevent clock implementation.
Posted by be...@apache.org.
Introduce libevent clock implementation.
Review: https://reviews.apache.org/r/28322
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a027dd13
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a027dd13
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a027dd13
Branch: refs/heads/master
Commit: a027dd1360325ac489961f6317ba8c385dcf995c
Parents: 25d068f
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:35:57 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:12 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/src/libevent.cpp | 72 ++++++++++++++++++++++++++++---
1 file changed, 66 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a027dd13/3rdparty/libprocess/src/libevent.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libevent.cpp b/3rdparty/libprocess/src/libevent.cpp
index 524d811..28c2cf7 100644
--- a/3rdparty/libprocess/src/libevent.cpp
+++ b/3rdparty/libprocess/src/libevent.cpp
@@ -7,21 +7,81 @@
#include "event_loop.hpp"
#include "libevent.hpp"
+#include "synchronized.hpp"
namespace process {
struct event_base* base = NULL;
+
void* EventLoop::run(void*)
{
- int result = event_base_loop(base, 0);
- if (result < 0) {
- LOG(FATAL) << "Failed to run event loop";
- } else if (result == 1) {
- VLOG(1) << "Finished running event loop due to lack of events";
+ do {
+ int result = event_base_loop(base, EVLOOP_ONCE);
+ if (result < 0) {
+ LOG(FATAL) << "Failed to run event loop";
+ } else if (result == 1) {
+ VLOG(1) << "All events handled, continuing event loop";
+ continue;
+ } else if (event_base_got_break(base)) {
+ break;
+ } else if (event_base_got_exit(base)) {
+ break;
+ }
+ } while (true);
+ return NULL;
+}
+
+
+namespace internal {
+
+struct Delay
+{
+ void(*function)(void);
+ event* timer;
+};
+
+void handle_delay(int, short, void* arg)
+{
+ Delay* delay = reinterpret_cast<Delay*>(arg);
+ delay->function();
+ delete delay;
+}
+
+} // namespace internal {
+
+
+void EventLoop::delay(const Duration& duration, void(*function)(void))
+{
+ internal::Delay* delay = new internal::Delay();
+ delay->timer = evtimer_new(base, &internal::handle_delay, delay);
+ if (delay->timer == NULL) {
+ LOG(FATAL) << "Failed to delay, evtimer_new";
}
- return NULL;
+ delay->function = function;
+
+ timeval t{0, 0};
+ if (duration > Seconds(0)) {
+ t = duration.timeval();
+ }
+
+ evtimer_add(delay->timer, &t);
+}
+
+
+double EventLoop::time()
+{
+ // Get the cached time if running the event loop, or call
+ // gettimeofday() to get the current time. Since a lot of logic in
+ // libprocess depends on time math, we want to log fatal rather than
+ // cause logic errors if the time fails.
+ timeval t;
+ if (event_base_gettimeofday_cached(base, &t) < 0) {
+ LOG(FATAL) << "Failed to get time, event_base_gettimeofday_cached";
+ }
+
+ return Duration(t).secs();
}
[05/12] mesos git commit: Add construction and conversion from / to
timeval for Duration.
Posted by be...@apache.org.
Add construction and conversion from / to timeval for Duration.
Review: https://reviews.apache.org/r/29226
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/25d068fe
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/25d068fe
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/25d068fe
Branch: refs/heads/master
Commit: 25d068fe340fe7c8907567f51d1cf2e58f66018a
Parents: 0e29981
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:05:19 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:12 2014 -0800
----------------------------------------------------------------------
.../3rdparty/stout/include/stout/duration.hpp | 16 ++++++++-
.../3rdparty/stout/tests/duration_tests.cpp | 34 ++++++++++++++++++++
2 files changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/25d068fe/3rdparty/libprocess/3rdparty/stout/include/stout/duration.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/duration.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/duration.hpp
index f344705..8a1626c 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/duration.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/duration.hpp
@@ -15,7 +15,8 @@
#define __STOUT_DURATION_HPP__
#include <ctype.h> // For 'isdigit'.
-#include <limits.h> // For 'LLONG_(MAX|MIN)'
+#include <limits.h> // For 'LLONG_(MAX|MIN)'.
+#include <time.h> // For 'timeval'.
#include <iomanip>
#include <iostream>
@@ -73,6 +74,11 @@ public:
Duration() : nanos(0) {}
+ explicit Duration(const timeval& t)
+ {
+ nanos = t.tv_sec * SECONDS + t.tv_usec * MICROSECONDS;
+ }
+
int64_t ns() const { return nanos; }
double us() const { return static_cast<double>(nanos) / MICROSECONDS; }
double ms() const { return static_cast<double>(nanos) / MILLISECONDS; }
@@ -82,6 +88,14 @@ public:
double days() const { return static_cast<double>(nanos) / DAYS; }
double weeks() const { return static_cast<double>(nanos) / WEEKS; }
+ struct timeval timeval() const
+ {
+ struct timeval t;
+ t.tv_sec = secs();
+ t.tv_usec = us() - (t.tv_sec * MILLISECONDS);
+ return t;
+ }
+
bool operator < (const Duration& d) const { return nanos < d.nanos; }
bool operator <= (const Duration& d) const { return nanos <= d.nanos; }
bool operator > (const Duration& d) const { return nanos > d.nanos; }
http://git-wip-us.apache.org/repos/asf/mesos/blob/25d068fe/3rdparty/libprocess/3rdparty/stout/tests/duration_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/tests/duration_tests.cpp b/3rdparty/libprocess/3rdparty/stout/tests/duration_tests.cpp
index 4269d3c..724c5fe 100644
--- a/3rdparty/libprocess/3rdparty/stout/tests/duration_tests.cpp
+++ b/3rdparty/libprocess/3rdparty/stout/tests/duration_tests.cpp
@@ -98,3 +98,37 @@ TEST(DurationTest, OutputFormat)
EXPECT_EQ("15250.2844524715weeks", stringify(Duration::max()));
EXPECT_EQ("-15250.2844524715weeks", stringify(Duration::min()));
}
+
+TEST(DurationTest, Timeval)
+{
+ EXPECT_EQ(Duration(timeval{10, 0}), Seconds(10));
+ EXPECT_EQ(Duration(timeval{0, 7}), Microseconds(7));
+ EXPECT_EQ(Duration(timeval{2, 123}), Seconds(2) + Microseconds(123));
+
+ timeval t{2, 123};
+ Duration d(t);
+ EXPECT_EQ(d.timeval().tv_sec, t.tv_sec);
+ EXPECT_EQ(d.timeval().tv_usec, t.tv_usec);
+
+ t.tv_usec = 0;
+ d = Duration(t);
+ EXPECT_EQ(d.timeval().tv_sec, t.tv_sec);
+ EXPECT_EQ(d.timeval().tv_usec, t.tv_usec);
+
+ // Negative times.
+ t.tv_sec = 0;
+ t.tv_usec = -1;
+ d = Duration(t);
+ EXPECT_EQ(d.timeval().tv_sec, t.tv_sec);
+ EXPECT_EQ(d.timeval().tv_usec, t.tv_usec);
+
+ d = Microseconds(-1);
+ EXPECT_EQ(d.timeval().tv_sec, t.tv_sec);
+ EXPECT_EQ(d.timeval().tv_usec, t.tv_usec);
+
+ t.tv_sec = -1;
+ t.tv_usec = -30;
+ d = Duration(t);
+ EXPECT_EQ(d.timeval().tv_sec, t.tv_sec);
+ EXPECT_EQ(d.timeval().tv_usec, t.tv_usec);
+}
[03/12] mesos git commit: Add enable-libevent flag to Libprocess.
Posted by be...@apache.org.
Add enable-libevent flag to Libprocess.
This flag will be used to distinguish between libev (default) and
libevent.
Review: https://reviews.apache.org/r/28319
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/252d3c75
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/252d3c75
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/252d3c75
Branch: refs/heads/master
Commit: 252d3c7575a9a688e6446e4ab302021f09849b4b
Parents: 37be603
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:01:51 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Dec 20 17:59:55 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 12 +++++++++---
3rdparty/libprocess/configure.ac | 26 ++++++++++++++++++++++++++
3rdparty/libprocess/src/process.cpp | 4 ++++
3 files changed, 39 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/252d3c75/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 472b5f4..5c92ee5 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -40,9 +40,6 @@ libprocess_la_SOURCES = \
src/http.cpp \
src/io.cpp \
src/latch.cpp \
- src/libev.hpp \
- src/libev.cpp \
- src/libev_poll.cpp \
src/metrics/metrics.cpp \
src/pid.cpp \
src/process.cpp \
@@ -60,6 +57,15 @@ libprocess_la_CPPFLAGS = \
-I$(PICOJSON) \
$(AM_CPPFLAGS)
+if ENABLE_LIBEVENT
+# Place for libevent implementations.
+else
+ libprocess_la_SOURCES += \
+ src/libev.hpp \
+ src/libev.cpp \
+ src/libev_poll.cpp
+endif
+
if WITH_BUNDLED_GLOG
libprocess_la_CPPFLAGS += -I$(GLOG)/src
LIBGLOG = $(GLOG)/libglog.la
http://git-wip-us.apache.org/repos/asf/mesos/blob/252d3c75/3rdparty/libprocess/configure.ac
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/configure.ac b/3rdparty/libprocess/configure.ac
index 024f892..2f01a3b 100644
--- a/3rdparty/libprocess/configure.ac
+++ b/3rdparty/libprocess/configure.ac
@@ -71,6 +71,11 @@ AC_ARG_ENABLE([bundled],
of bundled libraries]),
[enable_bundled=no], [enable_bundled=yes])
+AC_ARG_ENABLE([libevent],
+ AS_HELP_STRING([--enable-libevent],
+ [use libevent instead of libev default: no]),
+ [enable_libevent=yes], [])
+
AC_ARG_WITH([boost],
AS_HELP_STRING([--with-boost@<:@=DIR@:>@],
[excludes building and using the bundled Boost
@@ -565,6 +570,27 @@ else
optimize_flags="-O0"
fi
+AM_CONDITIONAL([ENABLE_LIBEVENT], [test x"$enable_libevent" = "xyes"])
+
+if test "x$enable_libevent" = "xyes"; then
+ AC_CHECK_HEADERS([event2/event.h],
+ [AC_CHECK_LIB([event],
+ [event_base_new],
+ [found_libevent=yes],
+ [AC_MSG_ERROR([cannot find libevent
+-------------------------------------------------------------------
+libevent is required for mesos to build.
+-------------------------------------------------------------------
+ ])])], [AC_MSG_ERROR([cannot find libevent headers
+-------------------------------------------------------------------
+libevent is required for mesos to build.
+-------------------------------------------------------------------
+ ])])
+ if test "x$found_libevent" = "xyes"; then
+ AC_DEFINE([USE_LIBEVENT], [1])
+ fi
+fi
+
AS_IF([test "x${ac_cv_env_CFLAGS_set}" = "x"],
[CFLAGS="$debug_flags $optimize_flags"])
AS_IF([test "x${ac_cv_env_CXXFLAGS_set}" = "x"],
http://git-wip-us.apache.org/repos/asf/mesos/blob/252d3c75/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 2aeb815..fabcca4 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -81,7 +81,11 @@
#include "encoder.hpp"
#include "event_loop.hpp"
#include "gate.hpp"
+#ifndef USE_LIBEVENT
#include "libev.hpp"
+#else
+// Place for libevent header.
+#endif // USE_LIBEVENT
#include "process_reference.hpp"
#include "synchronized.hpp"
[12/12] mesos git commit: Move poll based socket implementation into
poll_socket.cpp.
Posted by be...@apache.org.
Move poll based socket implementation into poll_socket.cpp.
Review: https://reviews.apache.org/r/28467
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/62ad3f27
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/62ad3f27
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/62ad3f27
Branch: refs/heads/master
Commit: 62ad3f27c44dfbad8cb68f952ed2d076e349fd6a
Parents: f4a4180
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 12:44:46 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/src/poll_socket.cpp | 216 +++++++++++++++++++++++++++
3rdparty/libprocess/src/process.cpp | 203 -------------------------
3 files changed, 217 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/62ad3f27/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 09fce46..f401232 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -42,6 +42,7 @@ libprocess_la_SOURCES = \
src/latch.cpp \
src/metrics/metrics.cpp \
src/pid.cpp \
+ src/poll_socket.cpp \
src/process.cpp \
src/process_reference.hpp \
src/reap.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/62ad3f27/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
new file mode 100644
index 0000000..5202750
--- /dev/null
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -0,0 +1,216 @@
+#include <netinet/tcp.h>
+
+#include <process/io.hpp>
+#include <process/socket.hpp>
+
+#include "config.hpp"
+
+using std::string;
+
+namespace process {
+namespace network {
+
+namespace internal {
+
+Future<Nothing> connect(const Socket& socket)
+{
+ // Now check that a successful connection was made.
+ int opt;
+ socklen_t optlen = sizeof(opt);
+ int s = socket.get();
+
+ if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
+ // Connect failure.
+ VLOG(1) << "Socket error while connecting";
+ return Failure("Socket error while connecting");
+ }
+
+ return Nothing();
+}
+
+} // namespace internal {
+
+
+Future<Nothing> Socket::Impl::connect(const Node& node)
+{
+ Try<int> connect = network::connect(get(), node);
+ if (connect.isError()) {
+ if (errno == EINPROGRESS) {
+ return io::poll(get(), io::WRITE)
+ .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+ }
+
+ return Failure(connect.error());
+ }
+
+ return Nothing();
+}
+
+
+Future<size_t> Socket::Impl::recv(char* data, size_t size)
+{
+ return io::read(get(), data, size);
+}
+
+
+namespace internal {
+
+Future<size_t> socket_send_data(int s, const char* data, size_t size)
+{
+ CHECK(size > 0);
+
+ while (true) {
+ ssize_t length = send(s, data, size, MSG_NOSIGNAL);
+
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ return io::poll(s, io::WRITE)
+ .then(lambda::bind(&internal::socket_send_data, s, data, size));
+ } else if (length <= 0) {
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Socket error while sending: " << error;
+ } else {
+ VLOG(1) << "Socket closed while sending";
+ }
+ if (length == 0) {
+ return length;
+ } else {
+ return Failure(ErrnoError("Socket send failed"));
+ }
+ } else {
+ CHECK(length > 0);
+
+ return length;
+ }
+ }
+}
+
+
+Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
+{
+ CHECK(size > 0);
+
+ while (true) {
+ ssize_t length = os::sendfile(s, fd, offset, size);
+
+ if (length < 0 && (errno == EINTR)) {
+ // Interrupted, try again now.
+ continue;
+ } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ // Might block, try again later.
+ return io::poll(s, io::WRITE)
+ .then(lambda::bind(&internal::socket_send_file, s, fd, offset, size));
+ } else if (length <= 0) {
+ // Socket error or closed.
+ if (length < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Socket error while sending: " << error;
+ } else {
+ VLOG(1) << "Socket closed while sending";
+ }
+ if (length == 0) {
+ return length;
+ } else {
+ return Failure(ErrnoError("Socket sendfile failed"));
+ }
+ } else {
+ CHECK(length > 0);
+
+ return length;
+ }
+ }
+}
+
+} // namespace internal {
+
+
+Future<size_t> Socket::Impl::send(const char* data, size_t size)
+{
+ return io::poll(get(), io::WRITE)
+ .then(lambda::bind(&internal::socket_send_data, get(), data, size));
+}
+
+
+Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
+{
+ return io::poll(get(), io::WRITE)
+ .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
+}
+
+
+Try<Node> Socket::Impl::bind(const Node& node)
+{
+ Try<int> bind = network::bind(get(), node);
+ if (bind.isError()) {
+ return Error(bind.error());
+ }
+
+ // Lookup and store assigned ip and assigned port.
+ return network::getsockname(get(), AF_INET);
+}
+
+
+Try<Nothing> Socket::Impl::listen(int backlog)
+{
+ if (::listen(get(), backlog) < 0) {
+ return ErrnoError();
+ }
+ return Nothing();
+}
+
+
+namespace internal {
+
+Future<Socket> accept(int fd)
+{
+ Try<int> accepted = network::accept(fd, AF_INET);
+ if (accepted.isError()) {
+ return Failure(accepted.error());
+ }
+
+ int s = accepted.get();
+ Try<Nothing> nonblock = os::nonblock(s);
+ if (nonblock.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+ << nonblock.error();
+ os::close(s);
+ return Failure("Failed to accept, nonblock: " + nonblock.error());
+ }
+
+ Try<Nothing> cloexec = os::cloexec(s);
+ if (cloexec.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+ << cloexec.error();
+ os::close(s);
+ return Failure("Failed to accept, cloexec: " + cloexec.error());
+ }
+
+ // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+ int on = 1;
+ if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+ os::close(s);
+ return Failure(
+ "Failed to turn off the Nagle algorithm: " + stringify(error));
+ }
+
+ return Socket(s);
+}
+
+} // namespace internal {
+
+
+Future<Socket> Socket::Impl::accept()
+{
+ return io::poll(get(), io::READ)
+ .then(lambda::bind(&internal::accept, get()));
+}
+
+} // namespace network {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/62ad3f27/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index 441ce48..72c0ad4 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1220,209 +1220,6 @@ void HttpProxy::stream(const Future<short>& poll, const Request& request)
}
-namespace internal {
-
-Future<Nothing> connect(const Socket& socket)
-{
- // Now check that a successful connection was made.
- int opt;
- socklen_t optlen = sizeof(opt);
- int s = socket.get();
-
- if (getsockopt(s, SOL_SOCKET, SO_ERROR, &opt, &optlen) < 0 || opt != 0) {
- // Connect failure.
- VLOG(1) << "Socket error while connecting";
- return Failure("Socket error while connecting");
- }
-
- return Nothing();
-}
-
-} // namespace internal {
-
-
-Future<Nothing> Socket::Impl::connect(const Node& node)
-{
- Try<int> connect = network::connect(get(), node);
- if (connect.isError()) {
- if (errno == EINPROGRESS) {
- return io::poll(get(), io::WRITE)
- .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
- }
-
- return Failure(connect.error());
- }
-
- return Nothing();
-}
-
-
-Future<size_t> Socket::Impl::recv(char* data, size_t size)
-{
- return io::read(get(), data, size);
-}
-
-
-namespace internal {
-
-Future<size_t> socket_send_data(int s, const char* data, size_t size)
-{
- CHECK(size > 0);
-
- while (true) {
- ssize_t length = send(s, data, size, MSG_NOSIGNAL);
-
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- return io::poll(s, io::WRITE)
- .then(lambda::bind(&internal::socket_send_data, s, data, size));
- } else if (length <= 0) {
- // Socket error or closed.
- if (length < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while sending: " << error;
- } else {
- VLOG(1) << "Socket closed while sending";
- }
- if (length == 0) {
- return length;
- } else {
- return Failure(ErrnoError("Socket send failed"));
- }
- } else {
- CHECK(length > 0);
-
- return length;
- }
- }
-}
-
-
-Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
-{
- CHECK(size > 0);
-
- while (true) {
- ssize_t length = os::sendfile(s, fd, offset, size);
-
- if (length < 0 && (errno == EINTR)) {
- // Interrupted, try again now.
- continue;
- } else if (length < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
- // Might block, try again later.
- return io::poll(s, io::WRITE)
- .then(lambda::bind(&internal::socket_send_file, s, fd, offset, size));
- } else if (length <= 0) {
- // Socket error or closed.
- if (length < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Socket error while sending: " << error;
- } else {
- VLOG(1) << "Socket closed while sending";
- }
- if (length == 0) {
- return length;
- } else {
- return Failure(ErrnoError("Socket sendfile failed"));
- }
- } else {
- CHECK(length > 0);
-
- return length;
- }
- }
-}
-
-} // namespace internal {
-
-
-Future<size_t> Socket::Impl::send(const char* data, size_t size)
-{
- return io::poll(get(), io::WRITE)
- .then(lambda::bind(&internal::socket_send_data, get(), data, size));
-}
-
-
-Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
-{
- return io::poll(get(), io::WRITE)
- .then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
-}
-
-
-Try<Node> Socket::Impl::bind(const Node& node)
-{
- Try<int> bind = network::bind(get(), node);
- if (bind.isError()) {
- return Error(bind.error());
- }
-
- // Lookup and store assigned ip and assigned port.
- return network::getsockname(get(), AF_INET);
-}
-
-
-Try<Nothing> Socket::Impl::listen(int backlog)
-{
- if (::listen(get(), backlog) < 0) {
- return ErrnoError();
- }
- return Nothing();
-}
-
-
-namespace internal {
-
-Future<Socket> accept(int fd)
-{
- Try<int> accepted = network::accept(fd, AF_INET);
- if (accepted.isError()) {
- return Failure(accepted.error());
- }
-
- int s = accepted.get();
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
- << nonblock.error();
- os::close(s);
- return Failure("Failed to accept, nonblock: " + nonblock.error());
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
- << cloexec.error();
- os::close(s);
- return Failure("Failed to accept, cloexec: " + cloexec.error());
- }
-
- // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
- int on = 1;
- if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
- os::close(s);
- return Failure(
- "Failed to turn off the Nagle algorithm: " + stringify(error));
- }
-
- return Socket(s);
-}
-
-} // namespace internal {
-
-
-Future<Socket> Socket::Impl::accept()
-{
- return io::poll(get(), io::READ)
- .then(lambda::bind(&internal::accept, get()));
-}
-
-
SocketManager::SocketManager()
{
synchronizer(this) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
[02/12] mesos git commit: Refactor clock to simplify libev cut.
Posted by be...@apache.org.
Refactor clock to simplify libev cut.
Review: https://reviews.apache.org/r/28315
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dd37344d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dd37344d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dd37344d
Branch: refs/heads/master
Commit: dd37344d27b8ebc5f5e01e5efb2cb92929b97704
Parents: c513126
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 11:11:08 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Dec 20 17:59:55 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/src/clock.cpp | 233 ++++++++++++----------------
3rdparty/libprocess/src/event_loop.hpp | 30 ++++
3rdparty/libprocess/src/libev.cpp | 95 ++++++++++++
3rdparty/libprocess/src/process.cpp | 53 +------
5 files changed, 228 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 75870ac..472b5f4 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -34,6 +34,7 @@ libprocess_la_SOURCES = \
src/config.hpp \
src/decoder.hpp \
src/encoder.hpp \
+ src/event_loop.hpp \
src/gate.hpp \
src/help.cpp \
src/http.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
index 2bc7fa9..fcc1eb0 100644
--- a/3rdparty/libprocess/src/clock.cpp
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -1,5 +1,3 @@
-#include <ev.h>
-
#include <glog/logging.h>
#include <list>
@@ -17,6 +15,7 @@
#include <stout/try.hpp>
#include <stout/unreachable.hpp>
+#include "event_loop.hpp"
#include "synchronized.hpp"
using std::list;
@@ -24,24 +23,11 @@ using std::map;
namespace process {
-// Event loop.
-extern struct ev_loop* loop;
-
-// Asynchronous watcher for interrupting loop to specifically deal
-// with updating timers.
-static ev_async async_update_timer_watcher;
-
-// Watcher for timeouts.
-static ev_timer timeouts_watcher;
-
// We store the timers in a map of lists indexed by the timeout of the
// timer so that we can have two timers that have the same timeout. We
// exploit that the map is SORTED!
-static map<Time, list<Timer>>* timeouts = new map<Time, list<Timer>>();
-static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-
-// Flag to indicate whether or to update the timer on async interrupt.
-static bool update_timer = false;
+static map<Time, list<Timer>>* timers = new map<Time, list<Timer>>();
+static synchronizable(timers) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
// We namespace the clock related variables to keep them well
@@ -69,51 +55,46 @@ bool settling = false;
// Lambda function to invoke when timers have expired.
lambda::function<void(const list<Timer>&)> callback;
-} // namespace clock {
-
-void handle_async_update_timer(struct ev_loop* loop, ev_async* _, int revents)
+// Helper for determining the duration until the next timer elapses,
+// or None if no timers are pending. Note that we don't manipulate
+// 'timer's directly so that it's clear from the callsite that the use
+// of 'timers' is within a 'synchronized' block.
+//
+// TODO(benh): Create a generic 'Timer's abstraction which hides this
+// and more away (i.e., all manipulations of 'timers' below).
+Option<Duration> next(const map<Time, list<Timer>>& timers)
{
- synchronized (timeouts) {
- if (update_timer) {
- if (!timeouts->empty()) {
- // Determine when the next timer should fire.
- timeouts_watcher.repeat =
- (timeouts->begin()->first - Clock::now()).secs();
-
- if (timeouts_watcher.repeat <= 0) {
- // Feed the event now!
- timeouts_watcher.repeat = 0;
- ev_timer_again(loop, &timeouts_watcher);
- ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
- } else {
- // Don't fire the timer if the clock is paused since we
- // don't want time to advance (instead a call to
- // clock::advance() will handle the timer).
- if (Clock::paused() && timeouts_watcher.repeat > 0) {
- timeouts_watcher.repeat = 0;
- }
-
- ev_timer_again(loop, &timeouts_watcher);
- }
- }
-
- update_timer = false;
+ if (!timers.empty()) {
+ // Determine when the next "tick" should occur.
+ Duration duration = (timers.begin()->first - Clock::now());
+
+ // Force a duration of 0 seconds (i.e., fire timers now) if the
+ // clock is paused and the duration is greater than 0 since we
+ // want to handle timers right away.
+ if (Clock::paused() && duration > Seconds(0)) {
+ return Seconds(0);
}
+
+ return duration;
}
+
+ return None();
}
+} // namespace clock {
+
-void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
+void tick()
{
- list<Timer> timers;
+ list<Timer> timedout;
- synchronized (timeouts) {
+ synchronized (timers) {
Time now = Clock::now();
- VLOG(3) << "Handling timeouts up to " << now;
+ VLOG(3) << "Handling timers up to " << now;
- foreachkey (const Time& timeout, *timeouts) {
+ foreachkey (const Time& timeout, *timers) {
if (timeout > now) {
break;
}
@@ -127,52 +108,33 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
clock::settling = true;
}
- foreach (const Timer& timer, (*timeouts)[timeout]) {
- timers.push_back(timer);
+ foreach (const Timer& timer, (*timers)[timeout]) {
+ timedout.push_back(timer);
}
}
- // Now erase the range of timeouts that timed out.
- timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));
+ // Now erase the range of timers that timed out.
+ timers->erase(timers->begin(), timers->upper_bound(now));
// Okay, so the timeout for the next timer should not have fired.
- CHECK(timeouts->empty() || (timeouts->begin()->first > now));
-
- // Update the timer as necessary.
- if (!timeouts->empty()) {
- // Determine when the next timer should fire.
- timeouts_watcher.repeat =
- (timeouts->begin()->first - Clock::now()).secs();
-
- if (timeouts_watcher.repeat <= 0) {
- // Feed the event now!
- timeouts_watcher.repeat = 0;
- ev_timer_again(loop, &timeouts_watcher);
- ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
- } else {
- // Don't fire the timer if the clock is paused since we don't
- // want time to advance (instead a call to Clock::advance()
- // will handle the timer).
- if (Clock::paused() && timeouts_watcher.repeat > 0) {
- timeouts_watcher.repeat = 0;
- }
+ CHECK(timers->empty() || (timers->begin()->first > now));
- ev_timer_again(loop, &timeouts_watcher);
- }
+ // Schedule another "tick" if necessary.
+ Option<Duration> duration = clock::next(*timers);
+ if (duration.isSome()) {
+ EventLoop::delay(duration.get(), &tick);
}
-
- update_timer = false; // Since we might have a queued update_timer.
}
- clock::callback(timers);
+ clock::callback(timedout);
- // Mark 'settling' as false since there are not any more timeouts
+ // Mark 'settling' as false since there are not any more timers
// that will expire before the paused time and we've finished
// executing expired timers.
- synchronized (timeouts) {
+ synchronized (timers) {
if (clock::paused &&
- (timeouts->size() == 0 ||
- timeouts->begin()->first > clock::current)) {
+ (timers->size() == 0 ||
+ timers->begin()->first > clock::current)) {
VLOG(3) << "Clock has settled";
clock::settling = false;
}
@@ -182,20 +144,7 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
void Clock::initialize(lambda::function<void(const list<Timer>&)>&& callback)
{
- // TODO(benh): Currently this function is expected to get called
- // just after initializing libev in process::initialize. But that is
- // too tightly coupled so and we really need to move libev specific
- // intialization outside of process::initialize that both
- // process::initialize and Clock::initialize can depend on (and thus
- // call).
-
clock::callback = callback;
-
- ev_async_init(&async_update_timer_watcher, handle_async_update_timer);
- ev_async_start(loop, &async_update_timer_watcher);
-
- ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
- ev_timer_again(loop, &timeouts_watcher);
}
@@ -207,7 +156,7 @@ Time Clock::now()
Time Clock::now(ProcessBase* process)
{
- synchronized (timeouts) {
+ synchronized (timers) {
if (Clock::paused()) {
if (process != NULL) {
if (clock::currents->count(process) != 0) {
@@ -221,8 +170,7 @@ Time Clock::now(ProcessBase* process)
}
}
- // TODO(benh): Versus ev_now()?
- double d = ev_time();
+ double d = EventLoop::time();
Try<Time> time = Time::create(d); // Compensates for clock::advanced.
// TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
@@ -252,17 +200,22 @@ Timer Clock::timer(
<< " in the future (" << timeout.time() << ")";
// Add the timer.
- synchronized (timeouts) {
- if (timeouts->size() == 0 ||
- timer.timeout().time() < timeouts->begin()->first) {
+ synchronized (timers) {
+ if (timers->size() == 0 ||
+ timer.timeout().time() < timers->begin()->first) {
// Need to interrupt the loop to update/set timer repeat.
- (*timeouts)[timer.timeout().time()].push_back(timer);
- update_timer = true;
- ev_async_send(loop, &async_update_timer_watcher);
+
+ (*timers)[timer.timeout().time()].push_back(timer);
+
+ // Schedule another "tick" if necessary.
+ Option<Duration> duration = clock::next(*timers);
+ if (duration.isSome()) {
+ EventLoop::delay(duration.get(), &tick);
+ }
} else {
// Timer repeat is adequate, just add the timeout.
- CHECK(timeouts->size() >= 1);
- (*timeouts)[timer.timeout().time()].push_back(timer);
+ CHECK(timers->size() >= 1);
+ (*timers)[timer.timeout().time()].push_back(timer);
}
}
@@ -273,16 +226,16 @@ Timer Clock::timer(
bool Clock::cancel(const Timer& timer)
{
bool canceled = false;
- synchronized (timeouts) {
+ synchronized (timers) {
// Check if the timeout is still pending, and if so, erase it. In
// addition, erase an empty list if we just removed the last
// timeout.
Time time = timer.timeout().time();
- if (timeouts->count(time) > 0) {
+ if (timers->count(time) > 0) {
canceled = true;
- (*timeouts)[time].remove(timer);
- if ((*timeouts)[time].empty()) {
- timeouts->erase(time);
+ (*timers)[time].remove(timer);
+ if ((*timers)[time].empty()) {
+ timers->erase(time);
}
}
}
@@ -293,9 +246,9 @@ bool Clock::cancel(const Timer& timer)
void Clock::pause()
{
- process::initialize(); // To make sure the libev watchers are ready.
+ process::initialize(); // To make sure the event loop is ready.
- synchronized (timeouts) {
+ synchronized (timers) {
if (!clock::paused) {
clock::initial = clock::current = now();
clock::paused = true;
@@ -303,8 +256,8 @@ void Clock::pause()
}
}
- // Note that after pausing the clock an existing libev timer might
- // still fire (invoking handle_timeout), but since paused == true no
+ // Note that after pausing the clock an existing event loop delay
+ // might still fire (invoking tick), but since paused == true no
// "time" will actually have passed, so no timer will actually fire.
}
@@ -317,16 +270,21 @@ bool Clock::paused()
void Clock::resume()
{
- process::initialize(); // To make sure the libev watchers are ready.
+ process::initialize(); // To make sure the event loop is ready.
- synchronized (timeouts) {
+ synchronized (timers) {
if (clock::paused) {
VLOG(2) << "Clock resumed at " << clock::current;
+
clock::paused = false;
clock::settling = false;
clock::currents->clear();
- update_timer = true;
- ev_async_send(loop, &async_update_timer_watcher);
+
+ // Schedule another "tick" if necessary.
+ Option<Duration> duration = clock::next(*timers);
+ if (duration.isSome()) {
+ EventLoop::delay(duration.get(), &tick);
+ }
}
}
}
@@ -334,14 +292,17 @@ void Clock::resume()
void Clock::advance(const Duration& duration)
{
- synchronized (timeouts) {
+ synchronized (timers) {
if (clock::paused) {
clock::advanced += duration;
clock::current += duration;
+
VLOG(2) << "Clock advanced (" << duration << ") to " << clock::current;
- if (!update_timer) {
- update_timer = true;
- ev_async_send(loop, &async_update_timer_watcher);
+
+ // Schedule another "tick" if necessary.
+ Option<Duration> duration = clock::next(*timers);
+ if (duration.isSome()) {
+ EventLoop::delay(duration.get(), &tick);
}
}
}
@@ -350,7 +311,7 @@ void Clock::advance(const Duration& duration)
void Clock::advance(ProcessBase* process, const Duration& duration)
{
- synchronized (timeouts) {
+ synchronized (timers) {
if (clock::paused) {
Time current = now(process);
current += duration;
@@ -364,15 +325,17 @@ void Clock::advance(ProcessBase* process, const Duration& duration)
void Clock::update(const Time& time)
{
- synchronized (timeouts) {
+ synchronized (timers) {
if (clock::paused) {
if (clock::current < time) {
clock::advanced += (time - clock::current);
clock::current = Time(time);
VLOG(2) << "Clock updated to " << clock::current;
- if (!update_timer) {
- update_timer = true;
- ev_async_send(loop, &async_update_timer_watcher);
+
+ // Schedule another "tick" if necessary.
+ Option<Duration> duration = clock::next(*timers);
+ if (duration.isSome()) {
+ EventLoop::delay(duration.get(), &tick);
}
}
}
@@ -382,7 +345,7 @@ void Clock::update(const Time& time)
void Clock::update(ProcessBase* process, const Time& time, Update update)
{
- synchronized (timeouts) {
+ synchronized (timers) {
if (clock::paused) {
if (now(process) < time || update == Clock::FORCE) {
VLOG(2) << "Clock of " << process->self() << " updated to " << time;
@@ -402,16 +365,14 @@ void Clock::order(ProcessBase* from, ProcessBase* to)
bool Clock::settled()
{
- synchronized (timeouts) {
+ synchronized (timers) {
CHECK(clock::paused);
- if (update_timer) {
- return false;
- } else if (clock::settling) {
+ if (clock::settling) {
VLOG(3) << "Clock still not settled";
return false;
- } else if (timeouts->size() == 0 ||
- timeouts->begin()->first > clock::current) {
+ } else if (timers->size() == 0 ||
+ timers->begin()->first > clock::current) {
VLOG(3) << "Clock is settled";
return true;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/event_loop.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/event_loop.hpp b/3rdparty/libprocess/src/event_loop.hpp
new file mode 100644
index 0000000..34e9f1d
--- /dev/null
+++ b/3rdparty/libprocess/src/event_loop.hpp
@@ -0,0 +1,30 @@
+#ifndef __EVENT_LOOP_HPP__
+#define __EVENT_LOOP_HPP__
+
+#include <stout/duration.hpp>
+
+namespace process {
+
+// The interface that must be implemented by an event management
+// system. This is a class to cleanly isolate the interface and so
+// that in the future we can support multiple implementations.
+class EventLoop
+{
+public:
+ // Initializes the event loop.
+ static void initialize();
+
+ // Invoke the specified function in the event loop after the
+ // specified duration.
+ static void delay(const Duration& duration, void(*function)(void));
+
+ // Returns the current time w.r.t. the event loop.
+ static double time();
+
+ // Runs the event loop.
+ static void* run(void*);
+};
+
+} // namespace process {
+
+#endif // __EVENT_LOOP_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
index 8a557ce..0e8d44c 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -2,8 +2,11 @@
#include <queue>
+#include <stout/duration.hpp>
#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include "event_loop.hpp"
#include "libev.hpp"
namespace process {
@@ -23,4 +26,96 @@ std::queue<lambda::function<void(void)>>* functions =
ThreadLocal<bool>* _in_event_loop_ = new ThreadLocal<bool>();
+
+void handle_async(struct ev_loop* loop, ev_async* _, int revents)
+{
+ synchronized (watchers) {
+ // Start all the new I/O watchers.
+ while (!watchers->empty()) {
+ ev_io* watcher = watchers->front();
+ watchers->pop();
+ ev_io_start(loop, watcher);
+ }
+
+ while (!functions->empty()) {
+ (functions->front())();
+ functions->pop();
+ }
+ }
+}
+
+
+void EventLoop::initialize()
+{
+ synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
+
+ loop = ev_default_loop(EVFLAG_AUTO);
+
+ ev_async_init(&async_watcher, handle_async);
+ ev_async_start(loop, &async_watcher);
+}
+
+
+namespace internal {
+
+void handle_delay(struct ev_loop* loop, ev_timer* timer, int revents)
+{
+ void(*function)(void) = reinterpret_cast<void(*)(void)>(timer->data);
+ function();
+ ev_timer_stop(loop, timer);
+ delete timer;
+}
+
+
+Future<Nothing> delay(const Duration& duration, void(*function)(void))
+{
+ ev_timer* timer = new ev_timer();
+ timer->data = reinterpret_cast<void*>(function);
+
+ // Determine the 'after' parameter to pass to libev and set it to 0
+ // in the event that it's negative so that we always make sure to
+ // invoke 'function' even if libev doesn't support negative 'after'
+ // values.
+ double after = duration.secs();
+
+ if (after < 0) {
+ after = 0;
+ }
+
+ const double repeat = 0.0;
+
+ ev_timer_init(timer, handle_delay, after, repeat);
+ ev_timer_start(loop, timer);
+
+ return Nothing();
+}
+
+} // namespace internal {
+
+
+void EventLoop::delay(const Duration& duration, void(*function)(void))
+{
+ run_in_event_loop<Nothing>(
+ lambda::bind(&internal::delay, duration, function));
+}
+
+
+double EventLoop::time()
+{
+ // TODO(benh): Versus ev_now()?
+ return ev_time();
+}
+
+
+void* EventLoop::run(void*)
+{
+ __in_event_loop__ = true;
+
+ ev_loop(loop, 0);
+
+ __in_event_loop__ = false;
+
+ return NULL;
+}
+
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index fdba949..d3dac4c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1,5 +1,4 @@
#include <errno.h>
-#include <ev.h>
#include <limits.h>
#include <libgen.h>
#include <netdb.h>
@@ -80,6 +79,7 @@
#include "config.hpp"
#include "decoder.hpp"
#include "encoder.hpp"
+#include "event_loop.hpp"
#include "gate.hpp"
#include "libev.hpp"
#include "process_reference.hpp"
@@ -585,24 +585,6 @@ static Message* parse(Request* request)
}
-void handle_async(struct ev_loop* loop, ev_async* _, int revents)
-{
- synchronized (watchers) {
- // Start all the new I/O watchers.
- while (!watchers->empty()) {
- ev_io* watcher = watchers->front();
- watchers->pop();
- ev_io_start(loop, watcher);
- }
-
- while (!functions->empty()) {
- (functions->front())();
- functions->pop();
- }
- }
-}
-
-
namespace internal {
void decode_recv(
@@ -654,18 +636,6 @@ void decode_recv(
} // namespace internal {
-void* serve(void* arg)
-{
- __in_event_loop__ = true;
-
- ev_loop(((struct ev_loop*) arg), 0);
-
- __in_event_loop__ = false;
-
- return NULL;
-}
-
-
void* schedule(void* arg)
{
do {
@@ -896,21 +866,8 @@ void initialize(const string& delegate)
PLOG(FATAL) << "Failed to initialize: " << listen.error();
}
- // Initialize libev.
- //
- // TODO(benh): Eventually move this all out of process.cpp after
- // more is disentangled.
- synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
-
-#ifdef __sun__
- loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT);
-#else
- loop = ev_default_loop(EVFLAG_AUTO);
-#endif // __sun__
-
- ev_async_init(&async_watcher, handle_async);
- ev_async_start(loop, &async_watcher);
-
+ // Initialize the event loop.
+ EventLoop::initialize();
Clock::initialize(lambda::bind(&timedout, lambda::_1));
// ev_child_init(&child_watcher, child_exited, pid, 0);
@@ -929,7 +886,7 @@ void initialize(const string& delegate)
// sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
pthread_t thread; // For now, not saving handles on our threads.
- if (pthread_create(&thread, NULL, serve, loop) != 0) {
+ if (pthread_create(&thread, NULL, &EventLoop::run, NULL) != 0) {
LOG(FATAL) << "Failed to initialize, pthread_create";
}
@@ -2020,7 +1977,7 @@ void SocketManager::close(int s)
// 'sockets' any attempt to send with it will just get ignored.
// TODO(benh): Always do a 'shutdown(s, SHUT_RDWR)' since that
// should keep the file descriptor valid until the last Socket
- // reference does a close but force all libev watchers to stop?
+ // reference does a close but force all event loop watchers to stop?
}
[11/12] mesos git commit: Virtualize Socket::Impl interface.
Posted by be...@apache.org.
Virtualize Socket::Impl interface.
Review: https://reviews.apache.org/r/28671
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d46e853
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d46e853
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d46e853
Branch: refs/heads/master
Commit: 2d46e853321469bcd7e2d3727254205c19809b7f
Parents: b3bb74c
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Thu Dec 25 10:55:06 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Thu Dec 25 11:54:13 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/include/process/socket.hpp | 62 ++++----
3rdparty/libprocess/src/poll_socket.cpp | 153 ++++++++++----------
3rdparty/libprocess/src/poll_socket.hpp | 29 ++++
3rdparty/libprocess/src/socket.cpp | 37 +++--
5 files changed, 163 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 817355e..6ab9cb8 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -43,6 +43,7 @@ libprocess_la_SOURCES = \
src/metrics/metrics.cpp \
src/pid.cpp \
src/poll_socket.cpp \
+ src/poll_socket.hpp \
src/process.cpp \
src/process_reference.hpp \
src/reap.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/include/process/socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/socket.hpp b/3rdparty/libprocess/include/process/socket.hpp
index 436761b..ddb9e36 100644
--- a/3rdparty/libprocess/include/process/socket.hpp
+++ b/3rdparty/libprocess/include/process/socket.hpp
@@ -111,8 +111,8 @@ class Socket
public:
// Available kinds of implementations.
enum Kind {
- POLL
- // TODO(jmlvanre): Add libevent socket.
+ POLL,
+ // TODO(jmlvanre): Add libevent SSL socket.
};
// Returns an instance of a Socket using the specified kind of
@@ -131,9 +131,7 @@ public:
class Impl : public std::enable_shared_from_this<Impl>
{
public:
- explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
-
- ~Impl()
+ virtual ~Impl()
{
CHECK(s >= 0);
Try<Nothing> close = os::close(s);
@@ -148,21 +146,21 @@ public:
return s;
}
- Future<Nothing> connect(const Node& node);
-
- Future<size_t> recv(char* data, size_t size);
-
- Future<size_t> send(const char* data, size_t size);
-
- Future<size_t> sendfile(int fd, off_t offset, size_t size);
+ // Socket::Impl interface.
+ virtual Try<Node> bind(const Node& node);
+ virtual Try<Nothing> listen(int backlog) = 0;
+ virtual Future<Socket> accept() = 0;
+ virtual Future<Nothing> connect(const Node& node) = 0;
+ virtual Future<size_t> recv(char* data, size_t size) = 0;
+ virtual Future<size_t> send(const char* data, size_t size) = 0;
+ virtual Future<size_t> sendfile(int fd, off_t offset, size_t size) = 0;
- Try<Node> bind(const Node& node);
-
- Try<Nothing> listen(int backlog);
+ protected:
+ explicit Impl(int _s) : s(_s) { CHECK(s >= 0); }
- Future<Socket> accept();
+ // Construct a Socket wrapper from this implementation.
+ Socket socket() { return Socket(shared_from_this()); }
- private:
int s;
};
@@ -181,44 +179,46 @@ public:
return impl->get();
}
- Future<Nothing> connect(const Node& node)
+ Try<Node> bind(const Node& node)
{
- return impl->connect(node);
+ return impl->bind(node);
}
- Future<size_t> recv(char* data, size_t size) const
+ Try<Nothing> listen(int backlog)
{
- return impl->recv(data, size);
+ return impl->listen(backlog);
}
- Future<size_t> send(const char* data, size_t size) const
+ Future<Socket> accept()
{
- return impl->send(data, size);
+ return impl->accept();
}
- Future<size_t> sendfile(int fd, off_t offset, size_t size) const
+ Future<Nothing> connect(const Node& node)
{
- return impl->sendfile(fd, offset, size);
+ return impl->connect(node);
}
- Try<Node> bind(const Node& node)
+ Future<size_t> recv(char* data, size_t size) const
{
- return impl->bind(node);
+ return impl->recv(data, size);
}
- Try<Nothing> listen(int backlog)
+ Future<size_t> send(const char* data, size_t size) const
{
- return impl->listen(backlog);
+ return impl->send(data, size);
}
- Future<Socket> accept()
+ Future<size_t> sendfile(int fd, off_t offset, size_t size) const
{
- return impl->accept();
+ return impl->sendfile(fd, offset, size);
}
private:
explicit Socket(std::shared_ptr<Impl>&& that) : impl(std::move(that)) {}
+ explicit Socket(const std::shared_ptr<Impl>& that) : impl(that) {}
+
std::shared_ptr<Impl> impl;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/src/poll_socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.cpp b/3rdparty/libprocess/src/poll_socket.cpp
index 09cd5a2..2e70c6c 100644
--- a/3rdparty/libprocess/src/poll_socket.cpp
+++ b/3rdparty/libprocess/src/poll_socket.cpp
@@ -4,12 +4,81 @@
#include <process/socket.hpp>
#include "config.hpp"
+#include "poll_socket.hpp"
using std::string;
namespace process {
namespace network {
+Try<std::shared_ptr<Socket::Impl>> PollSocketImpl::create(int s)
+{
+ return std::make_shared<PollSocketImpl>(s);
+}
+
+
+Try<Nothing> PollSocketImpl::listen(int backlog)
+{
+ if (::listen(get(), backlog) < 0) {
+ return ErrnoError();
+ }
+ return Nothing();
+}
+
+
+namespace internal {
+
+Future<Socket> accept(int fd)
+{
+ Try<int> accepted = network::accept(fd, AF_INET);
+ if (accepted.isError()) {
+ return Failure(accepted.error());
+ }
+
+ int s = accepted.get();
+ Try<Nothing> nonblock = os::nonblock(s);
+ if (nonblock.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
+ << nonblock.error();
+ os::close(s);
+ return Failure("Failed to accept, nonblock: " + nonblock.error());
+ }
+
+ Try<Nothing> cloexec = os::cloexec(s);
+ if (cloexec.isError()) {
+ LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
+ << cloexec.error();
+ os::close(s);
+ return Failure("Failed to accept, cloexec: " + cloexec.error());
+ }
+
+ // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
+ int on = 1;
+ if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
+ const char* error = strerror(errno);
+ VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
+ os::close(s);
+ return Failure(
+ "Failed to turn off the Nagle algorithm: " + stringify(error));
+ }
+
+ Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
+ if (socket.isError()) {
+ return Failure("Failed to accept, create socket: " + socket.error());
+ }
+ return socket.get();
+}
+
+} // namespace internal {
+
+
+Future<Socket> PollSocketImpl::accept()
+{
+ return io::poll(get(), io::READ)
+ .then(lambda::bind(&internal::accept, get()));
+}
+
+
namespace internal {
Future<Nothing> connect(const Socket& socket)
@@ -31,13 +100,13 @@ Future<Nothing> connect(const Socket& socket)
} // namespace internal {
-Future<Nothing> Socket::Impl::connect(const Node& node)
+Future<Nothing> PollSocketImpl::connect(const Node& node)
{
Try<int> connect = network::connect(get(), node);
if (connect.isError()) {
if (errno == EINPROGRESS) {
return io::poll(get(), io::WRITE)
- .then(lambda::bind(&internal::connect, Socket(shared_from_this())));
+ .then(lambda::bind(&internal::connect, socket()));
}
return Failure(connect.error());
@@ -47,7 +116,7 @@ Future<Nothing> Socket::Impl::connect(const Node& node)
}
-Future<size_t> Socket::Impl::recv(char* data, size_t size)
+Future<size_t> PollSocketImpl::recv(char* data, size_t size)
{
return io::read(get(), data, size);
}
@@ -129,92 +198,18 @@ Future<size_t> socket_send_file(int s, int fd, off_t offset, size_t size)
} // namespace internal {
-Future<size_t> Socket::Impl::send(const char* data, size_t size)
+Future<size_t> PollSocketImpl::send(const char* data, size_t size)
{
return io::poll(get(), io::WRITE)
.then(lambda::bind(&internal::socket_send_data, get(), data, size));
}
-Future<size_t> Socket::Impl::sendfile(int fd, off_t offset, size_t size)
+Future<size_t> PollSocketImpl::sendfile(int fd, off_t offset, size_t size)
{
return io::poll(get(), io::WRITE)
.then(lambda::bind(&internal::socket_send_file, get(), fd, offset, size));
}
-
-Try<Node> Socket::Impl::bind(const Node& node)
-{
- Try<int> bind = network::bind(get(), node);
- if (bind.isError()) {
- return Error(bind.error());
- }
-
- // Lookup and store assigned ip and assigned port.
- return network::getsockname(get(), AF_INET);
-}
-
-
-Try<Nothing> Socket::Impl::listen(int backlog)
-{
- if (::listen(get(), backlog) < 0) {
- return ErrnoError();
- }
- return Nothing();
-}
-
-
-namespace internal {
-
-Future<Socket> accept(int fd)
-{
- Try<int> accepted = network::accept(fd, AF_INET);
- if (accepted.isError()) {
- return Failure(accepted.error());
- }
-
- int s = accepted.get();
- Try<Nothing> nonblock = os::nonblock(s);
- if (nonblock.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, nonblock: "
- << nonblock.error();
- os::close(s);
- return Failure("Failed to accept, nonblock: " + nonblock.error());
- }
-
- Try<Nothing> cloexec = os::cloexec(s);
- if (cloexec.isError()) {
- LOG_IF(INFO, VLOG_IS_ON(1)) << "Failed to accept, cloexec: "
- << cloexec.error();
- os::close(s);
- return Failure("Failed to accept, cloexec: " + cloexec.error());
- }
-
- // Turn off Nagle (TCP_NODELAY) so pipelined requests don't wait.
- int on = 1;
- if (setsockopt(s, SOL_TCP, TCP_NODELAY, &on, sizeof(on)) < 0) {
- const char* error = strerror(errno);
- VLOG(1) << "Failed to turn off the Nagle algorithm: " << error;
- os::close(s);
- return Failure(
- "Failed to turn off the Nagle algorithm: " + stringify(error));
- }
-
- Try<Socket> socket = Socket::create(Socket::DEFAULT_KIND(), s);
- if (socket.isError()) {
- return Failure("Failed to accept, create socket: " + socket.error());
- }
- return socket.get();
-}
-
-} // namespace internal {
-
-
-Future<Socket> Socket::Impl::accept()
-{
- return io::poll(get(), io::READ)
- .then(lambda::bind(&internal::accept, get()));
-}
-
} // namespace network {
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/src/poll_socket.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/poll_socket.hpp b/3rdparty/libprocess/src/poll_socket.hpp
new file mode 100644
index 0000000..f7ca08e
--- /dev/null
+++ b/3rdparty/libprocess/src/poll_socket.hpp
@@ -0,0 +1,29 @@
+#include <memory>
+
+#include <process/socket.hpp>
+
+#include <stout/try.hpp>
+
+namespace process {
+namespace network {
+
+class PollSocketImpl : public Socket::Impl
+{
+public:
+ static Try<std::shared_ptr<Socket::Impl>> create(int s);
+
+ PollSocketImpl(int s) : Socket::Impl(s) {}
+
+ virtual ~PollSocketImpl() {}
+
+ // Implementation of the Socket::Impl interface.
+ virtual Try<Nothing> listen(int backlog);
+ virtual Future<Socket> accept();
+ virtual Future<Nothing> connect(const Node& node);
+ virtual Future<size_t> recv(char* data, size_t size);
+ virtual Future<size_t> send(const char* data, size_t size);
+ virtual Future<size_t> sendfile(int fd, off_t offset, size_t size);
+};
+
+} // namespace network {
+} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/2d46e853/3rdparty/libprocess/src/socket.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/socket.cpp b/3rdparty/libprocess/src/socket.cpp
index 842a6ec..4b0f6be 100644
--- a/3rdparty/libprocess/src/socket.cpp
+++ b/3rdparty/libprocess/src/socket.cpp
@@ -1,16 +1,10 @@
#include <process/socket.hpp>
+#include "poll_socket.hpp"
+
namespace process {
namespace network {
-const Socket::Kind& Socket::DEFAULT_KIND()
-{
- // TODO(jmlvanre): Change the default based on configure or
- // environment flags.
- static const Kind DEFAULT = POLL;
- return DEFAULT;
-}
-
Try<Socket> Socket::create(Kind kind, int s)
{
if (s < 0) {
@@ -44,7 +38,11 @@ Try<Socket> Socket::create(Kind kind, int s)
switch (kind) {
case POLL: {
- return Socket(std::make_shared<Socket::Impl>(s));
+ Try<std::shared_ptr<Socket::Impl>> socket = PollSocketImpl::create(s);
+ if (socket.isError()) {
+ return Error(socket.error());
+ }
+ return Socket(socket.get());
}
// By not setting a default we leverage the compiler errors when
// the enumeration is augmented to find all the cases we need to
@@ -52,5 +50,26 @@ Try<Socket> Socket::create(Kind kind, int s)
}
}
+
+const Socket::Kind& Socket::DEFAULT_KIND()
+{
+ // TODO(jmlvanre): Change the default based on configure or
+ // environment flags.
+ static const Kind DEFAULT = POLL;
+ return DEFAULT;
+}
+
+
+Try<Node> Socket::Impl::bind(const Node& node)
+{
+ Try<int> bind = network::bind(get(), node);
+ if (bind.isError()) {
+ return Error(bind.error());
+ }
+
+ // Lookup and store assigned IP and assigned port.
+ return network::getsockname(get(), AF_INET);
+}
+
} // namespace network {
} // namespace process {