You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/25 22:05:53 UTC
[02/12] mesos git commit: Refactor clock to simplify libev cut.
Refactor clock to simplify libev cut.
Review: https://reviews.apache.org/r/28315
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dd37344d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dd37344d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dd37344d
Branch: refs/heads/master
Commit: dd37344d27b8ebc5f5e01e5efb2cb92929b97704
Parents: c513126
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 11:11:08 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Dec 20 17:59:55 2014 -0800
----------------------------------------------------------------------
3rdparty/libprocess/Makefile.am | 1 +
3rdparty/libprocess/src/clock.cpp | 233 ++++++++++++----------------
3rdparty/libprocess/src/event_loop.hpp | 30 ++++
3rdparty/libprocess/src/libev.cpp | 95 ++++++++++++
3rdparty/libprocess/src/process.cpp | 53 +------
5 files changed, 228 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 75870ac..472b5f4 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -34,6 +34,7 @@ libprocess_la_SOURCES = \
src/config.hpp \
src/decoder.hpp \
src/encoder.hpp \
+ src/event_loop.hpp \
src/gate.hpp \
src/help.cpp \
src/http.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
index 2bc7fa9..fcc1eb0 100644
--- a/3rdparty/libprocess/src/clock.cpp
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -1,5 +1,3 @@
-#include <ev.h>
-
#include <glog/logging.h>
#include <list>
@@ -17,6 +15,7 @@
#include <stout/try.hpp>
#include <stout/unreachable.hpp>
+#include "event_loop.hpp"
#include "synchronized.hpp"
using std::list;
@@ -24,24 +23,11 @@ using std::map;
namespace process {
-// Event loop.
-extern struct ev_loop* loop;
-
-// Asynchronous watcher for interrupting loop to specifically deal
-// with updating timers.
-static ev_async async_update_timer_watcher;
-
-// Watcher for timeouts.
-static ev_timer timeouts_watcher;
-
// We store the timers in a map of lists indexed by the timeout of the
// timer so that we can have two timers that have the same timeout. We
// exploit that the map is SORTED!
-static map<Time, list<Timer>>* timeouts = new map<Time, list<Timer>>();
-static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-
-// Flag to indicate whether or to update the timer on async interrupt.
-static bool update_timer = false;
+static map<Time, list<Timer>>* timers = new map<Time, list<Timer>>();
+static synchronizable(timers) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
// We namespace the clock related variables to keep them well
@@ -69,51 +55,46 @@ bool settling = false;
// Lambda function to invoke when timers have expired.
lambda::function<void(const list<Timer>&)> callback;
-} // namespace clock {
-
-void handle_async_update_timer(struct ev_loop* loop, ev_async* _, int revents)
+// Helper for determining the duration until the next timer elapses,
+// or None if no timers are pending. Note that we don't manipulate
+// 'timer's directly so that it's clear from the callsite that the use
+// of 'timers' is within a 'synchronized' block.
+//
+// TODO(benh): Create a generic 'Timer's abstraction which hides this
+// and more away (i.e., all manipulations of 'timers' below).
+Option<Duration> next(const map<Time, list<Timer>>& timers)
{
- synchronized (timeouts) {
- if (update_timer) {
- if (!timeouts->empty()) {
- // Determine when the next timer should fire.
- timeouts_watcher.repeat =
- (timeouts->begin()->first - Clock::now()).secs();
-
- if (timeouts_watcher.repeat <= 0) {
- // Feed the event now!
- timeouts_watcher.repeat = 0;
- ev_timer_again(loop, &timeouts_watcher);
- ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
- } else {
- // Don't fire the timer if the clock is paused since we
- // don't want time to advance (instead a call to
- // clock::advance() will handle the timer).
- if (Clock::paused() && timeouts_watcher.repeat > 0) {
- timeouts_watcher.repeat = 0;
- }
-
- ev_timer_again(loop, &timeouts_watcher);
- }
- }
-
- update_timer = false;
+ if (!timers.empty()) {
+ // Determine when the next "tick" should occur.
+ Duration duration = (timers.begin()->first - Clock::now());
+
+ // Force a duration of 0 seconds (i.e., fire timers now) if the
+ // clock is paused and the duration is greater than 0 since we
+ // want to handle timers right away.
+ if (Clock::paused() && duration > Seconds(0)) {
+ return Seconds(0);
}
+
+ return duration;
}
+
+ return None();
}
+} // namespace clock {
+
-void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
+void tick()
{
- list<Timer> timers;
+ list<Timer> timedout;
- synchronized (timeouts) {
+ synchronized (timers) {
Time now = Clock::now();
- VLOG(3) << "Handling timeouts up to " << now;
+ VLOG(3) << "Handling timers up to " << now;
- foreachkey (const Time& timeout, *timeouts) {
+ foreachkey (const Time& timeout, *timers) {
if (timeout > now) {
break;
}
@@ -127,52 +108,33 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
clock::settling = true;
}
- foreach (const Timer& timer, (*timeouts)[timeout]) {
- timers.push_back(timer);
+ foreach (const Timer& timer, (*timers)[timeout]) {
+ timedout.push_back(timer);
}
}
- // Now erase the range of timeouts that timed out.
- timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));
+ // Now erase the range of timers that timed out.
+ timers->erase(timers->begin(), timers->upper_bound(now));
// Okay, so the timeout for the next timer should not have fired.
- CHECK(timeouts->empty() || (timeouts->begin()->first > now));
-
- // Update the timer as necessary.
- if (!timeouts->empty()) {
- // Determine when the next timer should fire.
- timeouts_watcher.repeat =
- (timeouts->begin()->first - Clock::now()).secs();
-
- if (timeouts_watcher.repeat <= 0) {
- // Feed the event now!
- timeouts_watcher.repeat = 0;
- ev_timer_again(loop, &timeouts_watcher);
- ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
- } else {
- // Don't fire the timer if the clock is paused since we don't
- // want time to advance (instead a call to Clock::advance()
- // will handle the timer).
- if (Clock::paused() && timeouts_watcher.repeat > 0) {
- timeouts_watcher.repeat = 0;
- }
+ CHECK(timers->empty() || (timers->begin()->first > now));
- ev_timer_again(loop, &timeouts_watcher);
- }
+ // Schedule another "tick" if necessary.
+ Option<Duration> duration = clock::next(*timers);
+ if (duration.isSome()) {
+ EventLoop::delay(duration.get(), &tick);
}
-
- update_timer = false; // Since we might have a queued update_timer.
}
- clock::callback(timers);
+ clock::callback(timedout);
- // Mark 'settling' as false since there are not any more timeouts
+ // Mark 'settling' as false since there are not any more timers
// that will expire before the paused time and we've finished
// executing expired timers.
- synchronized (timeouts) {
+ synchronized (timers) {
if (clock::paused &&
- (timeouts->size() == 0 ||
- timeouts->begin()->first > clock::current)) {
+ (timers->size() == 0 ||
+ timers->begin()->first > clock::current)) {
VLOG(3) << "Clock has settled";
clock::settling = false;
}
@@ -182,20 +144,7 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
void Clock::initialize(lambda::function<void(const list<Timer>&)>&& callback)
{
- // TODO(benh): Currently this function is expected to get called
- // just after initializing libev in process::initialize. But that is
- // too tightly coupled so and we really need to move libev specific
- // intialization outside of process::initialize that both
- // process::initialize and Clock::initialize can depend on (and thus
- // call).
-
clock::callback = callback;
-
- ev_async_init(&async_update_timer_watcher, handle_async_update_timer);
- ev_async_start(loop, &async_update_timer_watcher);
-
- ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
- ev_timer_again(loop, &timeouts_watcher);
}
@@ -207,7 +156,7 @@ Time Clock::now()
Time Clock::now(ProcessBase* process)
{
- synchronized (timeouts) {
+ synchronized (timers) {
if (Clock::paused()) {
if (process != NULL) {
if (clock::currents->count(process) != 0) {
@@ -221,8 +170,7 @@ Time Clock::now(ProcessBase* process)
}
}
- // TODO(benh): Versus ev_now()?
- double d = ev_time();
+ double d = EventLoop::time();
Try<Time> time = Time::create(d); // Compensates for clock::advanced.
// TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
@@ -252,17 +200,22 @@ Timer Clock::timer(
<< " in the future (" << timeout.time() << ")";
// Add the timer.
- synchronized (timeouts) {
- if (timeouts->size() == 0 ||
- timer.timeout().time() < timeouts->begin()->first) {
+ synchronized (timers) {
+ if (timers->size() == 0 ||
+ timer.timeout().time() < timers->begin()->first) {
// Need to interrupt the loop to update/set timer repeat.
- (*timeouts)[timer.timeout().time()].push_back(timer);
- update_timer = true;
- ev_async_send(loop, &async_update_timer_watcher);
+
+ (*timers)[timer.timeout().time()].push_back(timer);
+
+ // Schedule another "tick" if necessary.
+ Option<Duration> duration = clock::next(*timers);
+ if (duration.isSome()) {
+ EventLoop::delay(duration.get(), &tick);
+ }
} else {
// Timer repeat is adequate, just add the timeout.
- CHECK(timeouts->size() >= 1);
- (*timeouts)[timer.timeout().time()].push_back(timer);
+ CHECK(timers->size() >= 1);
+ (*timers)[timer.timeout().time()].push_back(timer);
}
}
@@ -273,16 +226,16 @@ Timer Clock::timer(
bool Clock::cancel(const Timer& timer)
{
bool canceled = false;
- synchronized (timeouts) {
+ synchronized (timers) {
// Check if the timeout is still pending, and if so, erase it. In
// addition, erase an empty list if we just removed the last
// timeout.
Time time = timer.timeout().time();
- if (timeouts->count(time) > 0) {
+ if (timers->count(time) > 0) {
canceled = true;
- (*timeouts)[time].remove(timer);
- if ((*timeouts)[time].empty()) {
- timeouts->erase(time);
+ (*timers)[time].remove(timer);
+ if ((*timers)[time].empty()) {
+ timers->erase(time);
}
}
}
@@ -293,9 +246,9 @@ bool Clock::cancel(const Timer& timer)
void Clock::pause()
{
- process::initialize(); // To make sure the libev watchers are ready.
+ process::initialize(); // To make sure the event loop is ready.
- synchronized (timeouts) {
+ synchronized (timers) {
if (!clock::paused) {
clock::initial = clock::current = now();
clock::paused = true;
@@ -303,8 +256,8 @@ void Clock::pause()
}
}
- // Note that after pausing the clock an existing libev timer might
- // still fire (invoking handle_timeout), but since paused == true no
+ // Note that after pausing the clock an existing event loop delay
+ // might still fire (invoking tick), but since paused == true no
// "time" will actually have passed, so no timer will actually fire.
}
@@ -317,16 +270,21 @@ bool Clock::paused()
void Clock::resume()
{
- process::initialize(); // To make sure the libev watchers are ready.
+ process::initialize(); // To make sure the event loop is ready.
- synchronized (timeouts) {
+ synchronized (timers) {
if (clock::paused) {
VLOG(2) << "Clock resumed at " << clock::current;
+
clock::paused = false;
clock::settling = false;
clock::currents->clear();
- update_timer = true;
- ev_async_send(loop, &async_update_timer_watcher);
+
+ // Schedule another "tick" if necessary.
+ Option<Duration> duration = clock::next(*timers);
+ if (duration.isSome()) {
+ EventLoop::delay(duration.get(), &tick);
+ }
}
}
}
@@ -334,14 +292,17 @@ void Clock::resume()
void Clock::advance(const Duration& duration)
{
- synchronized (timeouts) {
+ synchronized (timers) {
if (clock::paused) {
clock::advanced += duration;
clock::current += duration;
+
VLOG(2) << "Clock advanced (" << duration << ") to " << clock::current;
- if (!update_timer) {
- update_timer = true;
- ev_async_send(loop, &async_update_timer_watcher);
+
+ // Schedule another "tick" if necessary.
+ Option<Duration> duration = clock::next(*timers);
+ if (duration.isSome()) {
+ EventLoop::delay(duration.get(), &tick);
}
}
}
@@ -350,7 +311,7 @@ void Clock::advance(const Duration& duration)
void Clock::advance(ProcessBase* process, const Duration& duration)
{
- synchronized (timeouts) {
+ synchronized (timers) {
if (clock::paused) {
Time current = now(process);
current += duration;
@@ -364,15 +325,17 @@ void Clock::advance(ProcessBase* process, const Duration& duration)
void Clock::update(const Time& time)
{
- synchronized (timeouts) {
+ synchronized (timers) {
if (clock::paused) {
if (clock::current < time) {
clock::advanced += (time - clock::current);
clock::current = Time(time);
VLOG(2) << "Clock updated to " << clock::current;
- if (!update_timer) {
- update_timer = true;
- ev_async_send(loop, &async_update_timer_watcher);
+
+ // Schedule another "tick" if necessary.
+ Option<Duration> duration = clock::next(*timers);
+ if (duration.isSome()) {
+ EventLoop::delay(duration.get(), &tick);
}
}
}
@@ -382,7 +345,7 @@ void Clock::update(const Time& time)
void Clock::update(ProcessBase* process, const Time& time, Update update)
{
- synchronized (timeouts) {
+ synchronized (timers) {
if (clock::paused) {
if (now(process) < time || update == Clock::FORCE) {
VLOG(2) << "Clock of " << process->self() << " updated to " << time;
@@ -402,16 +365,14 @@ void Clock::order(ProcessBase* from, ProcessBase* to)
bool Clock::settled()
{
- synchronized (timeouts) {
+ synchronized (timers) {
CHECK(clock::paused);
- if (update_timer) {
- return false;
- } else if (clock::settling) {
+ if (clock::settling) {
VLOG(3) << "Clock still not settled";
return false;
- } else if (timeouts->size() == 0 ||
- timeouts->begin()->first > clock::current) {
+ } else if (timers->size() == 0 ||
+ timers->begin()->first > clock::current) {
VLOG(3) << "Clock is settled";
return true;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/event_loop.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/event_loop.hpp b/3rdparty/libprocess/src/event_loop.hpp
new file mode 100644
index 0000000..34e9f1d
--- /dev/null
+++ b/3rdparty/libprocess/src/event_loop.hpp
@@ -0,0 +1,30 @@
+#ifndef __EVENT_LOOP_HPP__
+#define __EVENT_LOOP_HPP__
+
+#include <stout/duration.hpp>
+
+namespace process {
+
+// The interface that must be implemented by an event management
+// system. This is a class to cleanly isolate the interface and so
+// that in the future we can support multiple implementations.
+class EventLoop
+{
+public:
+ // Initializes the event loop.
+ static void initialize();
+
+ // Invoke the specified function in the event loop after the
+ // specified duration.
+ static void delay(const Duration& duration, void(*function)(void));
+
+ // Returns the current time w.r.t. the event loop.
+ static double time();
+
+ // Runs the event loop.
+ static void* run(void*);
+};
+
+} // namespace process {
+
+#endif // __EVENT_LOOP_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
index 8a557ce..0e8d44c 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -2,8 +2,11 @@
#include <queue>
+#include <stout/duration.hpp>
#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include "event_loop.hpp"
#include "libev.hpp"
namespace process {
@@ -23,4 +26,96 @@ std::queue<lambda::function<void(void)>>* functions =
ThreadLocal<bool>* _in_event_loop_ = new ThreadLocal<bool>();
+
+void handle_async(struct ev_loop* loop, ev_async* _, int revents)
+{
+ synchronized (watchers) {
+ // Start all the new I/O watchers.
+ while (!watchers->empty()) {
+ ev_io* watcher = watchers->front();
+ watchers->pop();
+ ev_io_start(loop, watcher);
+ }
+
+ while (!functions->empty()) {
+ (functions->front())();
+ functions->pop();
+ }
+ }
+}
+
+
+void EventLoop::initialize()
+{
+ synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
+
+ loop = ev_default_loop(EVFLAG_AUTO);
+
+ ev_async_init(&async_watcher, handle_async);
+ ev_async_start(loop, &async_watcher);
+}
+
+
+namespace internal {
+
+void handle_delay(struct ev_loop* loop, ev_timer* timer, int revents)
+{
+ void(*function)(void) = reinterpret_cast<void(*)(void)>(timer->data);
+ function();
+ ev_timer_stop(loop, timer);
+ delete timer;
+}
+
+
+Future<Nothing> delay(const Duration& duration, void(*function)(void))
+{
+ ev_timer* timer = new ev_timer();
+ timer->data = reinterpret_cast<void*>(function);
+
+ // Determine the 'after' parameter to pass to libev and set it to 0
+ // in the event that it's negative so that we always make sure to
+ // invoke 'function' even if libev doesn't support negative 'after'
+ // values.
+ double after = duration.secs();
+
+ if (after < 0) {
+ after = 0;
+ }
+
+ const double repeat = 0.0;
+
+ ev_timer_init(timer, handle_delay, after, repeat);
+ ev_timer_start(loop, timer);
+
+ return Nothing();
+}
+
+} // namespace internal {
+
+
+void EventLoop::delay(const Duration& duration, void(*function)(void))
+{
+ run_in_event_loop<Nothing>(
+ lambda::bind(&internal::delay, duration, function));
+}
+
+
+double EventLoop::time()
+{
+ // TODO(benh): Versus ev_now()?
+ return ev_time();
+}
+
+
+void* EventLoop::run(void*)
+{
+ __in_event_loop__ = true;
+
+ ev_loop(loop, 0);
+
+ __in_event_loop__ = false;
+
+ return NULL;
+}
+
} // namespace process {
http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index fdba949..d3dac4c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1,5 +1,4 @@
#include <errno.h>
-#include <ev.h>
#include <limits.h>
#include <libgen.h>
#include <netdb.h>
@@ -80,6 +79,7 @@
#include "config.hpp"
#include "decoder.hpp"
#include "encoder.hpp"
+#include "event_loop.hpp"
#include "gate.hpp"
#include "libev.hpp"
#include "process_reference.hpp"
@@ -585,24 +585,6 @@ static Message* parse(Request* request)
}
-void handle_async(struct ev_loop* loop, ev_async* _, int revents)
-{
- synchronized (watchers) {
- // Start all the new I/O watchers.
- while (!watchers->empty()) {
- ev_io* watcher = watchers->front();
- watchers->pop();
- ev_io_start(loop, watcher);
- }
-
- while (!functions->empty()) {
- (functions->front())();
- functions->pop();
- }
- }
-}
-
-
namespace internal {
void decode_recv(
@@ -654,18 +636,6 @@ void decode_recv(
} // namespace internal {
-void* serve(void* arg)
-{
- __in_event_loop__ = true;
-
- ev_loop(((struct ev_loop*) arg), 0);
-
- __in_event_loop__ = false;
-
- return NULL;
-}
-
-
void* schedule(void* arg)
{
do {
@@ -896,21 +866,8 @@ void initialize(const string& delegate)
PLOG(FATAL) << "Failed to initialize: " << listen.error();
}
- // Initialize libev.
- //
- // TODO(benh): Eventually move this all out of process.cpp after
- // more is disentangled.
- synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
-
-#ifdef __sun__
- loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT);
-#else
- loop = ev_default_loop(EVFLAG_AUTO);
-#endif // __sun__
-
- ev_async_init(&async_watcher, handle_async);
- ev_async_start(loop, &async_watcher);
-
+ // Initialize the event loop.
+ EventLoop::initialize();
Clock::initialize(lambda::bind(&timedout, lambda::_1));
// ev_child_init(&child_watcher, child_exited, pid, 0);
@@ -929,7 +886,7 @@ void initialize(const string& delegate)
// sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
pthread_t thread; // For now, not saving handles on our threads.
- if (pthread_create(&thread, NULL, serve, loop) != 0) {
+ if (pthread_create(&thread, NULL, &EventLoop::run, NULL) != 0) {
LOG(FATAL) << "Failed to initialize, pthread_create";
}
@@ -2020,7 +1977,7 @@ void SocketManager::close(int s)
// 'sockets' any attempt to send with it will just get ignored.
// TODO(benh): Always do a 'shutdown(s, SHUT_RDWR)' since that
// should keep the file descriptor valid until the last Socket
- // reference does a close but force all libev watchers to stop?
+ // reference does a close but force all event loop watchers to stop?
}