You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/02/08 20:56:28 UTC

svn commit: r1242055 - in /incubator/mesos/trunk/third_party/libprocess: include/process/clock.hpp src/process.cpp src/tests.cpp

Author: benh
Date: Wed Feb  8 19:56:27 2012
New Revision: 1242055

URL: http://svn.apache.org/viewvc?rev=1242055&view=rev
Log:
Add settle operation to libprocess (contributed by Charles Reiss).

Modified:
    incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp
    incubator/mesos/trunk/third_party/libprocess/src/process.cpp
    incubator/mesos/trunk/third_party/libprocess/src/tests.cpp

Modified: incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp?rev=1242055&r1=1242054&r2=1242055&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp Wed Feb  8 19:56:27 2012
@@ -18,6 +18,7 @@ public:
   static void update(double secs);
   static void update(ProcessBase* process, double secs);
   static void order(ProcessBase* from, ProcessBase* to);
+  static void settle();
 };
 
 } // namespace process {

Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1242055&r1=1242054&r2=1242055&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Wed Feb  8 19:56:27 2012
@@ -282,6 +282,8 @@ public:
   void enqueue(ProcessBase* process);
   ProcessBase* dequeue();
 
+  void settle();
+
 private:
   // Map of all local spawned and running processes.
   map<string, ProcessBase*> processes;
@@ -293,6 +295,9 @@ private:
   // Queue of runnable processes (implemented using list).
   list<ProcessBase*> runq;
   synchronizable(runq);
+
+  // Number of running processes, to support Clock::settle operation.
+  int running;
 };
 
 
@@ -337,6 +342,11 @@ static map<double, list<timer> >* timeou
   new map<double, list<timer> >();
 static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
 
+// For supporting Clock::settle(), true if timers have been removed
+// from 'timeouts' but may not have been executed yet. Protected by
+// the timeouts lock. This is only used when the clock is paused.
+static bool pending_timers = false;
+
 // Flag to indicate whether or to update the timer on async interrupt.
 static bool update_timer = false;
 
@@ -516,6 +526,12 @@ void Clock::order(ProcessBase* from, Pro
   update(to, now(from));
 }
 
+void Clock::settle()
+{
+  CHECK(clock::paused); // TODO(benh): Consider returning a bool instead.
+  process_manager->settle();
+}
+
 
 int set_nbio(int fd)
 {
@@ -652,6 +668,10 @@ void handle_timeouts(struct ev_loop* loo
       VLOG(2) << "Have timeout(s) at "
               << std::fixed << std::setprecision(9) << timeout;
 
+      // Record that we have pending timers to execute so the
+      // Clock::settle() operation can wait until we're done.
+      pending_timers = true;
+
       foreach (const timer& timer, (*timeouts)[timeout]) {
         timedout.push_back(timer);
       }
@@ -711,6 +731,13 @@ void handle_timeouts(struct ev_loop* loo
   foreach (const timer& timer, timedout) {
     timer.thunk();
   }
+
+  // Mark ourselves as done executing the timers since it's now safe
+  // for a call to Clock::settle() to check if there will be any
+  // future timeouts reached.
+  synchronized (timeouts) {
+    pending_timers = false;
+  }
 }
 
 
@@ -1633,6 +1660,8 @@ ProcessManager::ProcessManager()
 {
   synchronizer(processes) = SYNCHRONIZED_INITIALIZER;
   synchronizer(runq) = SYNCHRONIZED_INITIALIZER;
+  running = 0;
+  __sync_synchronize(); // Ensure write to 'running' visible in other threads.
 }
 
 
@@ -1862,6 +1891,9 @@ void ProcessManager::resume(ProcessBase*
   }
 
   __process__ = NULL;
+
+  CHECK_GE(running, 1);
+  __sync_fetch_and_sub(&running, 1);
 }
 
 
@@ -2039,6 +2071,7 @@ bool ProcessManager::wait(const UPID& pi
   if (process != NULL) {
     VLOG(1) << "Donating thread to " << process->pid << " while waiting";
     ProcessBase* donator = __process__;
+    __sync_fetch_and_add(&running, 1);
     process_manager->resume(process);
     __process__ = donator;
   }
@@ -2092,6 +2125,10 @@ ProcessBase* ProcessManager::dequeue()
     if (!runq.empty()) {
       process = runq.front();
       runq.pop_front();
+      // Increment the running count of processes in order to support
+      // the Clock::settle() operation (this must be done atomically
+      // with removing the process from the runq).
+      __sync_fetch_and_add(&running, 1);
     }
   }
 
@@ -2099,6 +2136,40 @@ ProcessBase* ProcessManager::dequeue()
 }
 
 
+void ProcessManager::settle()
+{
+  bool done = true;
+  do {
+    usleep(10000);
+    done = true;
+    // Hopefully this is the only place we acquire both these locks.
+    synchronized (runq) {
+      synchronized (timeouts) {
+        CHECK(Clock::paused()); // Since another thread could resume the clock!
+
+        if (!runq.empty()) {
+          done = false;
+        }
+
+        __sync_synchronize(); // Read barrier for 'running'.
+        if (running > 0) {
+          done = false;
+        }
+
+        if (timeouts->size() > 0 &&
+            timeouts->begin()->first <= clock::current) {
+          done = false;
+        }
+
+        if (pending_timers) {
+          done = false;
+        }
+      }
+    }
+  } while (!done);
+}
+
+
 namespace timers {
 
 timer create(double secs, const lambda::function<void(void)>& thunk)

Modified: incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/tests.cpp?rev=1242055&r1=1242054&r2=1242055&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/tests.cpp Wed Feb  8 19:56:27 2012
@@ -553,6 +553,55 @@ TEST(libprocess, select)
 }
 
 
+class SettleProcess : public Process<SettleProcess>
+{
+public:
+  SettleProcess() : calledDispatch(false) {}
+
+  virtual void initialize()
+  {
+    usleep(10000);
+    delay(0.0, self(), &SettleProcess::afterDelay);
+  }
+
+  void afterDelay()
+  {
+    dispatch(self(), &SettleProcess::afterDispatch);
+    usleep(10000);
+    TimeoutProcess timeoutProcess;
+    spawn(timeoutProcess);
+    terminate(timeoutProcess);
+    wait(timeoutProcess);
+  }
+
+  void afterDispatch()
+  {
+    usleep(10000);
+    calledDispatch = true;
+  }
+
+  volatile bool calledDispatch;
+};
+
+
+TEST(libprocess, settle)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  // Try 100 times to hit a race.
+  for (int i = 0; i < 100; ++i) {
+    Clock::pause();
+    SettleProcess process;
+    spawn(process);
+    Clock::settle();
+    ASSERT_TRUE(process.calledDispatch);
+    terminate(process);
+    wait(process);
+    Clock::resume();
+  }
+}
+
+
 // #define ENUMERATE1(item) item##1
 // #define ENUMERATE2(item) ENUMERATE1(item), item##2
 // #define ENUMERATE3(item) ENUMERATE2(item), item##3