You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2015/02/18 07:19:18 UTC

[2/2] mesos git commit: Fixed a CPU-intensive bug in libprocess' clock.

Fixed a CPU-intensive bug in libprocess' clock.

Review: https://reviews.apache.org/r/31145


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0d2c29b9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0d2c29b9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0d2c29b9

Branch: refs/heads/master
Commit: 0d2c29b9f3e940de467b25773840194ad43392cf
Parents: d6e62a8
Author: Benjamin Mahler <be...@gmail.com>
Authored: Tue Feb 17 17:11:33 2015 -0800
Committer: Benjamin Mahler <be...@gmail.com>
Committed: Tue Feb 17 22:11:50 2015 -0800

----------------------------------------------------------------------
 3rdparty/libprocess/src/clock.cpp | 122 +++++++++++++++++++++++----------
 1 file changed, 85 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0d2c29b9/3rdparty/libprocess/src/clock.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/clock.cpp b/3rdparty/libprocess/src/clock.cpp
index de4afb3..bff5310 100644
--- a/3rdparty/libprocess/src/clock.cpp
+++ b/3rdparty/libprocess/src/clock.cpp
@@ -2,6 +2,7 @@
 
 #include <list>
 #include <map>
+#include <set>
 
 #include <process/clock.hpp>
 #include <process/pid.hpp>
@@ -20,6 +21,7 @@
 
 using std::list;
 using std::map;
+using std::set;
 
 namespace process {
 
@@ -54,8 +56,14 @@ bool settling = false;
 lambda::function<void(const list<Timer>&)>* callback =
     new lambda::function<void(const list<Timer>&)>();
 
+// Keep track of 'ticks' that have been scheduled. To reduce the
+// number of outstanding delays on the EventLoop system, we only
+// schedule a _new_ 'tick' when it's earlier than all currently
+// scheduled 'ticks'.
+set<Time>* ticks = new set<Time>();
 
-// Helper for determining the duration until the next timer elapses,
+
+// Helper for determining the time when the next timer elapses,
 // or None if no timers are pending, or the clock is paused and no
 // timers are expired. Note that we don't manipulate 'timers' directly
 // so that it's clear from the callsite that the use of 'timers' is
@@ -63,31 +71,58 @@ lambda::function<void(const list<Timer>&)>* callback =
 //
 // TODO(benh): Create a generic 'Timers' abstraction which hides this
 // and more away (i.e., all manipulations of 'timers' below).
-Option<Duration> next(const map<Time, list<Timer>>& timers)
+Option<Time> next(const map<Time, list<Timer>>& timers)
 {
   if (!timers.empty()) {
-    // Determine when the next "tick" should occur. We pass NULL
-    // to ensure that this looks at the global clock, since this
-    // can be called from a Process context through Clock::timer.
-    Duration duration = (timers.begin()->first - Clock::now(NULL));
+    Time first = timers.begin()->first;
 
     // If the clock is paused and no timers are expired, the
     // timers cannot fire until the clock is advanced, so we
-    // return None() here.
-    if (Clock::paused() && duration > Seconds(0)) {
+    // return None() here. Note that we pass NULL to ensure
+    // that this looks at the global clock, since this can be
+    // called from a Process context through Clock::timer.
+    if (Clock::paused() && first > Clock::now(NULL)) {
       return None();
     }
 
-    return duration;
+    return first;
   }
 
   return None();
 }
 
-} // namespace clock {
+
+// Forward declaration for scheduleTick.
+void tick(const Time& time);
 
 
-void tick()
+// Helper for scheduling the next clock tick, if applicable. Note
+// that we don't manipulate 'timers' or 'ticks' directly so that
+// it's clear from the callsite that this needs to be called within
+// a 'synchronized' block.
+// TODO(bmahler): Consider taking an optional 'now' to avoid
+// excessive syscalls via Clock::now(NULL).
+void scheduleTick(const map<Time, list<Timer>>& timers, set<Time>* ticks)
+{
+  // Determine when the next 'tick' should fire.
+  const Option<Time> next = clock::next(timers);
+
+  if (next.isSome()) {
+    // Don't schedule a 'tick' if there is a 'tick' scheduled for
+    // an earlier time, to avoid excessive pending timers.
+    if (ticks->empty() || next.get() < (*ticks->begin())) {
+      ticks->insert(next.get());
+
+      // The delay can be negative if the timer is expired, this
+      // is expected will result in a 'tick' firing immediately.
+      const Duration delay = next.get() - Clock::now(NULL);
+      EventLoop::delay(delay, lambda::bind(tick, next.get()));
+    }
+  }
+}
+
+
+void tick(const Time& time)
 {
   list<Timer> timedout;
 
@@ -124,11 +159,13 @@ void tick()
     // Okay, so the timeout for the next timer should not have fired.
     CHECK(timers->empty() || (timers->begin()->first > now));
 
+    // Remove this tick from the scheduled 'ticks', it may have
+    // been removed already if the clock was paused / manipulated
+    // in the interim.
+    ticks->erase(time);
+
     // Schedule another "tick" if necessary.
-    Option<Duration> duration = clock::next(*timers);
-    if (duration.isSome()) {
-      EventLoop::delay(duration.get(), &tick);
-    }
+    scheduleTick(*timers, ticks);
   }
 
   (*clock::callback)(timedout);
@@ -146,6 +183,8 @@ void tick()
   }
 }
 
+} // namespace clock {
+
 
 void Clock::initialize(lambda::function<void(const list<Timer>&)>&& callback)
 {
@@ -209,14 +248,10 @@ Timer Clock::timer(
     if (timers->size() == 0 ||
         timer.timeout().time() < timers->begin()->first) {
       // Need to interrupt the loop to update/set timer repeat.
-
       (*timers)[timer.timeout().time()].push_back(timer);
 
       // Schedule another "tick" if necessary.
-      Option<Duration> duration = clock::next(*timers);
-      if (duration.isSome()) {
-        EventLoop::delay(duration.get(), &tick);
-      }
+      clock::scheduleTick(*timers, clock::ticks);
     } else {
       // Timer repeat is adequate, just add the timeout.
       CHECK(timers->size() >= 1);
@@ -258,12 +293,20 @@ void Clock::pause()
       *clock::initial = *clock::current = now();
       clock::paused = true;
       VLOG(2) << "Clock paused at " << clock::initial;
+
+      // When the clock is paused, we clear the scheduled 'ticks'
+      // since they no longer accurately represent when a 'tick'
+      // will fire (our notion of "time" is now moving differently
+      // from that of the event loop). Note that only 'ticks'
+      // that fire immediately will be scheduled while the clock
+      // is paused.
+      clock::ticks->clear();
     }
   }
 
-  // Note that after pausing the clock an existing event loop delay
-  // might still fire (invoking tick), but since paused == true no
-  // "time" will actually have passed, so no timer will actually fire.
+  // Note that after pausing the clock, the existing scheduled
+  // 'ticks' might still fire, but since 'paused' == true no "time"
+  // will actually have passed, so no timer will actually fire.
 }
 
 
@@ -286,10 +329,7 @@ void Clock::resume()
       clock::currents->clear();
 
       // Schedule another "tick" if necessary.
-      Option<Duration> duration = clock::next(*timers);
-      if (duration.isSome()) {
-        EventLoop::delay(duration.get(), &tick);
-      }
+      clock::scheduleTick(*timers, clock::ticks);
     }
   }
 }
@@ -304,11 +344,10 @@ void Clock::advance(const Duration& duration)
 
       VLOG(2) << "Clock advanced ("  << duration << ") to " << clock::current;
 
-      // Schedule another "tick" if necessary.
-      Option<Duration> duration = clock::next(*timers);
-      if (duration.isSome()) {
-        EventLoop::delay(duration.get(), &tick);
-      }
+      // Schedule another "tick" if necessary. Only "ticks" that
+      // fire immediately will be scheduled here, since the clock
+      // is paused.
+      clock::scheduleTick(*timers, clock::ticks);
     }
   }
 }
@@ -323,6 +362,11 @@ void Clock::advance(ProcessBase* process, const Duration& duration)
       (*clock::currents)[process] = current;
       VLOG(2) << "Clock of " << process->self() << " advanced (" << duration
               << ") to " << current;
+
+      // When the clock is advanced for a specific process, we do not
+      // need to schedule another "tick", as done in the global
+      // advance() above. This is because the clock ticks are based
+      // on global time, not per-Process time.
     }
   }
 }
@@ -337,11 +381,10 @@ void Clock::update(const Time& time)
         *clock::current = Time(time);
         VLOG(2) << "Clock updated to " << clock::current;
 
-        // Schedule another "tick" if necessary.
-        Option<Duration> duration = clock::next(*timers);
-        if (duration.isSome()) {
-          EventLoop::delay(duration.get(), &tick);
-        }
+        // Schedule another "tick" if necessary. Only "ticks" that
+        // fire immediately will be scheduled here, since the clock
+        // is paused.
+        clock::scheduleTick(*timers, clock::ticks);
       }
     }
   }
@@ -355,6 +398,11 @@ void Clock::update(ProcessBase* process, const Time& time, Update update)
       if (now(process) < time || update == Clock::FORCE) {
         VLOG(2) << "Clock of " << process->self() << " updated to " << time;
         (*clock::currents)[process] = Time(time);
+
+        // When the clock is updated for a specific process, we do not
+        // need to schedule another "tick", as done in the global
+        // update() above. This is because the clock ticks are based
+        // on global time, not per-Process time.
       }
     }
   }