You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2014/12/25 22:05:53 UTC

[02/12] mesos git commit: Refactor clock to simplify libev cut.

Refactor clock to simplify libev cut.

Review: https://reviews.apache.org/r/28315


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/dd37344d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/dd37344d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/dd37344d

Branch: refs/heads/master
Commit: dd37344d27b8ebc5f5e01e5efb2cb92929b97704
Parents: c513126
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sat Dec 20 11:11:08 2014 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Sat Dec 20 17:59:55 2014 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/Makefile.am        |   1 +
 3rdparty/libprocess/src/clock.cpp      | 233 ++++++++++++----------------
 3rdparty/libprocess/src/event_loop.hpp |  30 ++++
 3rdparty/libprocess/src/libev.cpp      |  95 ++++++++++++
 3rdparty/libprocess/src/process.cpp    |  53 +------
 5 files changed, 228 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/Makefile.am
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/Makefile.am b/3rdparty/libprocess/Makefile.am
index 75870ac..472b5f4 100644
--- a/3rdparty/libprocess/Makefile.am
+++ b/3rdparty/libprocess/Makefile.am
@@ -34,6 +34,7 @@ libprocess_la_SOURCES =		\
   src/config.hpp		\
   src/decoder.hpp		\
   src/encoder.hpp		\
+  src/event_loop.hpp		\
   src/gate.hpp			\
   src/help.cpp			\
   src/http.cpp			\

http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
index 2bc7fa9..fcc1eb0 100644
--- a/3rdparty/libprocess/src/clock.cpp
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -1,5 +1,3 @@
-#include <ev.h>
-
 #include <glog/logging.h>
 
 #include <list>
@@ -17,6 +15,7 @@
 #include <stout/try.hpp>
 #include <stout/unreachable.hpp>
 
+#include "event_loop.hpp"
 #include "synchronized.hpp"
 
 using std::list;
@@ -24,24 +23,11 @@ using std::map;
 
 namespace process {
 
-// Event loop.
-extern struct ev_loop* loop;
-
-// Asynchronous watcher for interrupting loop to specifically deal
-// with updating timers.
-static ev_async async_update_timer_watcher;
-
-// Watcher for timeouts.
-static ev_timer timeouts_watcher;
-
 // We store the timers in a map of lists indexed by the timeout of the
 // timer so that we can have two timers that have the same timeout. We
 // exploit that the map is SORTED!
-static map<Time, list<Timer>>* timeouts = new map<Time, list<Timer>>();
-static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
-
-// Flag to indicate whether or to update the timer on async interrupt.
-static bool update_timer = false;
+static map<Time, list<Timer>>* timers = new map<Time, list<Timer>>();
+static synchronizable(timers) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 
 
 // We namespace the clock related variables to keep them well
@@ -69,51 +55,46 @@ bool settling = false;
 // Lambda function to invoke when timers have expired.
 lambda::function<void(const list<Timer>&)> callback;
 
-} // namespace clock {
-
 
-void handle_async_update_timer(struct ev_loop* loop, ev_async* _, int revents)
+// Helper for determining the duration until the next timer elapses,
+// or None if no timers are pending. Note that we don't manipulate
+// 'timer's directly so that it's clear from the callsite that the use
+// of 'timers' is within a 'synchronized' block.
+//
+// TODO(benh): Create a generic 'Timer's abstraction which hides this
+// and more away (i.e., all manipulations of 'timers' below).
+Option<Duration> next(const map<Time, list<Timer>>& timers)
 {
-  synchronized (timeouts) {
-    if (update_timer) {
-      if (!timeouts->empty()) {
-        // Determine when the next timer should fire.
-        timeouts_watcher.repeat =
-          (timeouts->begin()->first - Clock::now()).secs();
-
-        if (timeouts_watcher.repeat <= 0) {
-          // Feed the event now!
-          timeouts_watcher.repeat = 0;
-          ev_timer_again(loop, &timeouts_watcher);
-          ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
-        } else {
-          // Don't fire the timer if the clock is paused since we
-          // don't want time to advance (instead a call to
-          // clock::advance() will handle the timer).
-          if (Clock::paused() && timeouts_watcher.repeat > 0) {
-            timeouts_watcher.repeat = 0;
-          }
-
-          ev_timer_again(loop, &timeouts_watcher);
-        }
-      }
-
-      update_timer = false;
+  if (!timers.empty()) {
+    // Determine when the next "tick" should occur.
+    Duration duration = (timers.begin()->first - Clock::now());
+
+    // Force a duration of 0 seconds (i.e., fire timers now) if the
+    // clock is paused and the duration is greater than 0 since we
+    // want to handle timers right away.
+    if (Clock::paused() && duration > Seconds(0)) {
+      return Seconds(0);
     }
+
+    return duration;
   }
+
+  return None();
 }
 
+} // namespace clock {
+
 
-void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
+void tick()
 {
-  list<Timer> timers;
+  list<Timer> timedout;
 
-  synchronized (timeouts) {
+  synchronized (timers) {
     Time now = Clock::now();
 
-    VLOG(3) << "Handling timeouts up to " << now;
+    VLOG(3) << "Handling timers up to " << now;
 
-    foreachkey (const Time& timeout, *timeouts) {
+    foreachkey (const Time& timeout, *timers) {
       if (timeout > now) {
         break;
       }
@@ -127,52 +108,33 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
         clock::settling = true;
       }
 
-      foreach (const Timer& timer, (*timeouts)[timeout]) {
-        timers.push_back(timer);
+      foreach (const Timer& timer, (*timers)[timeout]) {
+        timedout.push_back(timer);
       }
     }
 
-    // Now erase the range of timeouts that timed out.
-    timeouts->erase(timeouts->begin(), timeouts->upper_bound(now));
+    // Now erase the range of timers that timed out.
+    timers->erase(timers->begin(), timers->upper_bound(now));
 
     // Okay, so the timeout for the next timer should not have fired.
-    CHECK(timeouts->empty() || (timeouts->begin()->first > now));
-
-    // Update the timer as necessary.
-    if (!timeouts->empty()) {
-      // Determine when the next timer should fire.
-      timeouts_watcher.repeat =
-        (timeouts->begin()->first - Clock::now()).secs();
-
-      if (timeouts_watcher.repeat <= 0) {
-        // Feed the event now!
-        timeouts_watcher.repeat = 0;
-        ev_timer_again(loop, &timeouts_watcher);
-        ev_feed_event(loop, &timeouts_watcher, EV_TIMEOUT);
-      } else {
-        // Don't fire the timer if the clock is paused since we don't
-        // want time to advance (instead a call to Clock::advance()
-        // will handle the timer).
-        if (Clock::paused() && timeouts_watcher.repeat > 0) {
-          timeouts_watcher.repeat = 0;
-        }
+    CHECK(timers->empty() || (timers->begin()->first > now));
 
-        ev_timer_again(loop, &timeouts_watcher);
-      }
+    // Schedule another "tick" if necessary.
+    Option<Duration> duration = clock::next(*timers);
+    if (duration.isSome()) {
+      EventLoop::delay(duration.get(), &tick);
     }
-
-    update_timer = false; // Since we might have a queued update_timer.
   }
 
-  clock::callback(timers);
+  clock::callback(timedout);
 
-  // Mark 'settling' as false since there are not any more timeouts
+  // Mark 'settling' as false since there are not any more timers
   // that will expire before the paused time and we've finished
   // executing expired timers.
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (clock::paused &&
-        (timeouts->size() == 0 ||
-         timeouts->begin()->first > clock::current)) {
+        (timers->size() == 0 ||
+         timers->begin()->first > clock::current)) {
       VLOG(3) << "Clock has settled";
       clock::settling = false;
     }
@@ -182,20 +144,7 @@ void handle_timeouts(struct ev_loop* loop, ev_timer* _, int revents)
 
 void Clock::initialize(lambda::function<void(const list<Timer>&)>&& callback)
 {
-  // TODO(benh): Currently this function is expected to get called
-  // just after initializing libev in process::initialize. But that is
-  // too tightly coupled so and we really need to move libev specific
-  // intialization outside of process::initialize that both
-  // process::initialize and Clock::initialize can depend on (and thus
-  // call).
-
   clock::callback = callback;
-
-  ev_async_init(&async_update_timer_watcher, handle_async_update_timer);
-  ev_async_start(loop, &async_update_timer_watcher);
-
-  ev_timer_init(&timeouts_watcher, handle_timeouts, 0., 2100000.0);
-  ev_timer_again(loop, &timeouts_watcher);
 }
 
 
@@ -207,7 +156,7 @@ Time Clock::now()
 
 Time Clock::now(ProcessBase* process)
 {
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (Clock::paused()) {
       if (process != NULL) {
         if (clock::currents->count(process) != 0) {
@@ -221,8 +170,7 @@ Time Clock::now(ProcessBase* process)
     }
   }
 
-  // TODO(benh): Versus ev_now()?
-  double d = ev_time();
+  double d = EventLoop::time();
   Try<Time> time = Time::create(d); // Compensates for clock::advanced.
 
   // TODO(xujyan): Move CHECK_SOME to libprocess and add CHECK_SOME
@@ -252,17 +200,22 @@ Timer Clock::timer(
           << " in the future (" << timeout.time() << ")";
 
   // Add the timer.
-  synchronized (timeouts) {
-    if (timeouts->size() == 0 ||
-        timer.timeout().time() < timeouts->begin()->first) {
+  synchronized (timers) {
+    if (timers->size() == 0 ||
+        timer.timeout().time() < timers->begin()->first) {
       // Need to interrupt the loop to update/set timer repeat.
-      (*timeouts)[timer.timeout().time()].push_back(timer);
-      update_timer = true;
-      ev_async_send(loop, &async_update_timer_watcher);
+
+      (*timers)[timer.timeout().time()].push_back(timer);
+
+      // Schedule another "tick" if necessary.
+      Option<Duration> duration = clock::next(*timers);
+      if (duration.isSome()) {
+        EventLoop::delay(duration.get(), &tick);
+      }
     } else {
       // Timer repeat is adequate, just add the timeout.
-      CHECK(timeouts->size() >= 1);
-      (*timeouts)[timer.timeout().time()].push_back(timer);
+      CHECK(timers->size() >= 1);
+      (*timers)[timer.timeout().time()].push_back(timer);
     }
   }
 
@@ -273,16 +226,16 @@ Timer Clock::timer(
 bool Clock::cancel(const Timer& timer)
 {
   bool canceled = false;
-  synchronized (timeouts) {
+  synchronized (timers) {
     // Check if the timeout is still pending, and if so, erase it. In
     // addition, erase an empty list if we just removed the last
     // timeout.
     Time time = timer.timeout().time();
-    if (timeouts->count(time) > 0) {
+    if (timers->count(time) > 0) {
       canceled = true;
-      (*timeouts)[time].remove(timer);
-      if ((*timeouts)[time].empty()) {
-        timeouts->erase(time);
+      (*timers)[time].remove(timer);
+      if ((*timers)[time].empty()) {
+        timers->erase(time);
       }
     }
   }
@@ -293,9 +246,9 @@ bool Clock::cancel(const Timer& timer)
 
 void Clock::pause()
 {
-  process::initialize(); // To make sure the libev watchers are ready.
+  process::initialize(); // To make sure the event loop is ready.
 
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (!clock::paused) {
       clock::initial = clock::current = now();
       clock::paused = true;
@@ -303,8 +256,8 @@ void Clock::pause()
     }
   }
 
-  // Note that after pausing the clock an existing libev timer might
-  // still fire (invoking handle_timeout), but since paused == true no
+  // Note that after pausing the clock an existing event loop delay
+  // might still fire (invoking tick), but since paused == true no
   // "time" will actually have passed, so no timer will actually fire.
 }
 
@@ -317,16 +270,21 @@ bool Clock::paused()
 
 void Clock::resume()
 {
-  process::initialize(); // To make sure the libev watchers are ready.
+  process::initialize(); // To make sure the event loop is ready.
 
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (clock::paused) {
       VLOG(2) << "Clock resumed at " << clock::current;
+
       clock::paused = false;
       clock::settling = false;
       clock::currents->clear();
-      update_timer = true;
-      ev_async_send(loop, &async_update_timer_watcher);
+
+      // Schedule another "tick" if necessary.
+      Option<Duration> duration = clock::next(*timers);
+      if (duration.isSome()) {
+        EventLoop::delay(duration.get(), &tick);
+      }
     }
   }
 }
@@ -334,14 +292,17 @@ void Clock::resume()
 
 void Clock::advance(const Duration& duration)
 {
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (clock::paused) {
       clock::advanced += duration;
       clock::current += duration;
+
       VLOG(2) << "Clock advanced ("  << duration << ") to " << clock::current;
-      if (!update_timer) {
-        update_timer = true;
-        ev_async_send(loop, &async_update_timer_watcher);
+
+      // Schedule another "tick" if necessary.
+      Option<Duration> duration = clock::next(*timers);
+      if (duration.isSome()) {
+        EventLoop::delay(duration.get(), &tick);
       }
     }
   }
@@ -350,7 +311,7 @@ void Clock::advance(const Duration& duration)
 
 void Clock::advance(ProcessBase* process, const Duration& duration)
 {
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (clock::paused) {
       Time current = now(process);
       current += duration;
@@ -364,15 +325,17 @@ void Clock::advance(ProcessBase* process, const Duration& duration)
 
 void Clock::update(const Time& time)
 {
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (clock::paused) {
       if (clock::current < time) {
         clock::advanced += (time - clock::current);
         clock::current = Time(time);
         VLOG(2) << "Clock updated to " << clock::current;
-        if (!update_timer) {
-          update_timer = true;
-          ev_async_send(loop, &async_update_timer_watcher);
+
+        // Schedule another "tick" if necessary.
+        Option<Duration> duration = clock::next(*timers);
+        if (duration.isSome()) {
+          EventLoop::delay(duration.get(), &tick);
         }
       }
     }
@@ -382,7 +345,7 @@ void Clock::update(const Time& time)
 
 void Clock::update(ProcessBase* process, const Time& time, Update update)
 {
-  synchronized (timeouts) {
+  synchronized (timers) {
     if (clock::paused) {
       if (now(process) < time || update == Clock::FORCE) {
         VLOG(2) << "Clock of " << process->self() << " updated to " << time;
@@ -402,16 +365,14 @@ void Clock::order(ProcessBase* from, ProcessBase* to)
 
 bool Clock::settled()
 {
-  synchronized (timeouts) {
+  synchronized (timers) {
     CHECK(clock::paused);
 
-    if (update_timer) {
-      return false;
-    } else if (clock::settling) {
+    if (clock::settling) {
       VLOG(3) << "Clock still not settled";
       return false;
-    } else if (timeouts->size() == 0 ||
-               timeouts->begin()->first > clock::current) {
+    } else if (timers->size() == 0 ||
+               timers->begin()->first > clock::current) {
       VLOG(3) << "Clock is settled";
       return true;
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/event_loop.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/event_loop.hpp b/3rdparty/libprocess/src/event_loop.hpp
new file mode 100644
index 0000000..34e9f1d
--- /dev/null
+++ b/3rdparty/libprocess/src/event_loop.hpp
@@ -0,0 +1,30 @@
+#ifndef __EVENT_LOOP_HPP__
+#define __EVENT_LOOP_HPP__
+
+#include <stout/duration.hpp>
+
+namespace process {
+
+// The interface that must be implemented by an event management
+// system. This is a class to cleanly isolate the interface and so
+// that in the future we can support multiple implementations.
+class EventLoop
+{
+public:
+  // Initializes the event loop.
+  static void initialize();
+
+  // Invoke the specified function in the event loop after the
+  // specified duration.
+  static void delay(const Duration& duration, void(*function)(void));
+
+  // Returns the current time w.r.t. the event loop.
+  static double time();
+
+  // Runs the event loop.
+  static void* run(void*);
+};
+
+} // namespace process {
+
+#endif // __EVENT_LOOP_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/libev.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/libev.cpp b/3rdparty/libprocess/src/libev.cpp
index 8a557ce..0e8d44c 100644
--- a/3rdparty/libprocess/src/libev.cpp
+++ b/3rdparty/libprocess/src/libev.cpp
@@ -2,8 +2,11 @@
 
 #include <queue>
 
+#include <stout/duration.hpp>
 #include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
 
+#include "event_loop.hpp"
 #include "libev.hpp"
 
 namespace process {
@@ -23,4 +26,96 @@ std::queue<lambda::function<void(void)>>* functions =
 
 ThreadLocal<bool>* _in_event_loop_ = new ThreadLocal<bool>();
 
+
+void handle_async(struct ev_loop* loop, ev_async* _, int revents)
+{
+  synchronized (watchers) {
+    // Start all the new I/O watchers.
+    while (!watchers->empty()) {
+      ev_io* watcher = watchers->front();
+      watchers->pop();
+      ev_io_start(loop, watcher);
+    }
+
+    while (!functions->empty()) {
+      (functions->front())();
+      functions->pop();
+    }
+  }
+}
+
+
+void EventLoop::initialize()
+{
+  synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
+
+  loop = ev_default_loop(EVFLAG_AUTO);
+
+  ev_async_init(&async_watcher, handle_async);
+  ev_async_start(loop, &async_watcher);
+}
+
+
+namespace internal {
+
+void handle_delay(struct ev_loop* loop, ev_timer* timer, int revents)
+{
+  void(*function)(void) = reinterpret_cast<void(*)(void)>(timer->data);
+  function();
+  ev_timer_stop(loop, timer);
+  delete timer;
+}
+
+
+Future<Nothing> delay(const Duration& duration, void(*function)(void))
+{
+  ev_timer* timer = new ev_timer();
+  timer->data = reinterpret_cast<void*>(function);
+
+  // Determine the 'after' parameter to pass to libev and set it to 0
+  // in the event that it's negative so that we always make sure to
+  // invoke 'function' even if libev doesn't support negative 'after'
+  // values.
+  double after = duration.secs();
+
+  if (after < 0) {
+    after = 0;
+  }
+
+  const double repeat = 0.0;
+
+  ev_timer_init(timer, handle_delay, after, repeat);
+  ev_timer_start(loop, timer);
+
+  return Nothing();
+}
+
+} // namespace internal {
+
+
+void EventLoop::delay(const Duration& duration, void(*function)(void))
+{
+  run_in_event_loop<Nothing>(
+      lambda::bind(&internal::delay, duration, function));
+}
+
+
+double EventLoop::time()
+{
+  // TODO(benh): Versus ev_now()?
+  return ev_time();
+}
+
+
+void* EventLoop::run(void*)
+{
+  __in_event_loop__ = true;
+
+  ev_loop(loop, 0);
+
+  __in_event_loop__ = false;
+
+  return NULL;
+}
+
 } // namespace process {

http://git-wip-us.apache.org/repos/asf/mesos/blob/dd37344d/3rdparty/libprocess/src/process.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/process.cpp b/3rdparty/libprocess/src/process.cpp
index fdba949..d3dac4c 100644
--- a/3rdparty/libprocess/src/process.cpp
+++ b/3rdparty/libprocess/src/process.cpp
@@ -1,5 +1,4 @@
 #include <errno.h>
-#include <ev.h>
 #include <limits.h>
 #include <libgen.h>
 #include <netdb.h>
@@ -80,6 +79,7 @@
 #include "config.hpp"
 #include "decoder.hpp"
 #include "encoder.hpp"
+#include "event_loop.hpp"
 #include "gate.hpp"
 #include "libev.hpp"
 #include "process_reference.hpp"
@@ -585,24 +585,6 @@ static Message* parse(Request* request)
 }
 
 
-void handle_async(struct ev_loop* loop, ev_async* _, int revents)
-{
-  synchronized (watchers) {
-    // Start all the new I/O watchers.
-    while (!watchers->empty()) {
-      ev_io* watcher = watchers->front();
-      watchers->pop();
-      ev_io_start(loop, watcher);
-    }
-
-    while (!functions->empty()) {
-      (functions->front())();
-      functions->pop();
-    }
-  }
-}
-
-
 namespace internal {
 
 void decode_recv(
@@ -654,18 +636,6 @@ void decode_recv(
 } // namespace internal {
 
 
-void* serve(void* arg)
-{
-  __in_event_loop__ = true;
-
-  ev_loop(((struct ev_loop*) arg), 0);
-
-  __in_event_loop__ = false;
-
-  return NULL;
-}
-
-
 void* schedule(void* arg)
 {
   do {
@@ -896,21 +866,8 @@ void initialize(const string& delegate)
     PLOG(FATAL) << "Failed to initialize: " << listen.error();
   }
 
-  // Initialize libev.
-  //
-  // TODO(benh): Eventually move this all out of process.cpp after
-  // more is disentangled.
-  synchronizer(watchers) = SYNCHRONIZED_INITIALIZER;
-
-#ifdef __sun__
-  loop = ev_default_loop(EVBACKEND_POLL | EVBACKEND_SELECT);
-#else
-  loop = ev_default_loop(EVFLAG_AUTO);
-#endif // __sun__
-
-  ev_async_init(&async_watcher, handle_async);
-  ev_async_start(loop, &async_watcher);
-
+  // Initialize the event loop.
+  EventLoop::initialize();
   Clock::initialize(lambda::bind(&timedout, lambda::_1));
 
 //   ev_child_init(&child_watcher, child_exited, pid, 0);
@@ -929,7 +886,7 @@ void initialize(const string& delegate)
 //   sigprocmask (SIG_UNBLOCK, &sa.sa_mask, 0);
 
   pthread_t thread; // For now, not saving handles on our threads.
-  if (pthread_create(&thread, NULL, serve, loop) != 0) {
+  if (pthread_create(&thread, NULL, &EventLoop::run, NULL) != 0) {
     LOG(FATAL) << "Failed to initialize, pthread_create";
   }
 
@@ -2020,7 +1977,7 @@ void SocketManager::close(int s)
   // 'sockets' any attempt to send with it will just get ignored.
   // TODO(benh): Always do a 'shutdown(s, SHUT_RDWR)' since that
   // should keep the file descriptor valid until the last Socket
-  // reference does a close but force all libev watchers to stop?
+  // reference does a close but force all event loop watchers to stop?
 }