You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2012/02/08 20:56:28 UTC
svn commit: r1242055 - in /incubator/mesos/trunk/third_party/libprocess:
include/process/clock.hpp src/process.cpp src/tests.cpp
Author: benh
Date: Wed Feb 8 19:56:27 2012
New Revision: 1242055
URL: http://svn.apache.org/viewvc?rev=1242055&view=rev
Log:
Add settle operation to libprocess (contributed by Charles Reiss).
Modified:
incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp
incubator/mesos/trunk/third_party/libprocess/src/process.cpp
incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
Modified: incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp?rev=1242055&r1=1242054&r2=1242055&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/include/process/clock.hpp Wed Feb 8 19:56:27 2012
@@ -18,6 +18,7 @@ public:
static void update(double secs);
static void update(ProcessBase* process, double secs);
static void order(ProcessBase* from, ProcessBase* to);
+ static void settle();
};
} // namespace process {
Modified: incubator/mesos/trunk/third_party/libprocess/src/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/process.cpp?rev=1242055&r1=1242054&r2=1242055&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/process.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/process.cpp Wed Feb 8 19:56:27 2012
@@ -282,6 +282,8 @@ public:
void enqueue(ProcessBase* process);
ProcessBase* dequeue();
+ void settle();
+
private:
// Map of all local spawned and running processes.
map<string, ProcessBase*> processes;
@@ -293,6 +295,9 @@ private:
// Queue of runnable processes (implemented using list).
list<ProcessBase*> runq;
synchronizable(runq);
+
+ // Number of running processes, to support Clock::settle operation.
+ int running;
};
@@ -337,6 +342,11 @@ static map<double, list<timer> >* timeou
new map<double, list<timer> >();
static synchronizable(timeouts) = SYNCHRONIZED_INITIALIZER_RECURSIVE;
+// For supporting Clock::settle(), true if timers have been removed
+// from 'timeouts' but may not have been executed yet. Protected by
+// the timeouts lock. This is only used when the clock is paused.
+static bool pending_timers = false;
+
// Flag to indicate whether or to update the timer on async interrupt.
static bool update_timer = false;
@@ -516,6 +526,12 @@ void Clock::order(ProcessBase* from, Pro
update(to, now(from));
}
+void Clock::settle()
+{
+ CHECK(clock::paused); // TODO(benh): Consider returning a bool instead.
+ process_manager->settle();
+}
+
int set_nbio(int fd)
{
@@ -652,6 +668,10 @@ void handle_timeouts(struct ev_loop* loo
VLOG(2) << "Have timeout(s) at "
<< std::fixed << std::setprecision(9) << timeout;
+ // Record that we have pending timers to execute so the
+ // Clock::settle() operation can wait until we're done.
+ pending_timers = true;
+
foreach (const timer& timer, (*timeouts)[timeout]) {
timedout.push_back(timer);
}
@@ -711,6 +731,13 @@ void handle_timeouts(struct ev_loop* loo
foreach (const timer& timer, timedout) {
timer.thunk();
}
+
+ // Mark ourselves as done executing the timers since it's now safe
+ // for a call to Clock::settle() to check if there will be any
+ // future timeouts reached.
+ synchronized (timeouts) {
+ pending_timers = false;
+ }
}
@@ -1633,6 +1660,8 @@ ProcessManager::ProcessManager()
{
synchronizer(processes) = SYNCHRONIZED_INITIALIZER;
synchronizer(runq) = SYNCHRONIZED_INITIALIZER;
+ running = 0;
+ __sync_synchronize(); // Ensure write to 'running' visible in other threads.
}
@@ -1862,6 +1891,9 @@ void ProcessManager::resume(ProcessBase*
}
__process__ = NULL;
+
+ CHECK_GE(running, 1);
+ __sync_fetch_and_sub(&running, 1);
}
@@ -2039,6 +2071,7 @@ bool ProcessManager::wait(const UPID& pi
if (process != NULL) {
VLOG(1) << "Donating thread to " << process->pid << " while waiting";
ProcessBase* donator = __process__;
+ __sync_fetch_and_add(&running, 1);
process_manager->resume(process);
__process__ = donator;
}
@@ -2092,6 +2125,10 @@ ProcessBase* ProcessManager::dequeue()
if (!runq.empty()) {
process = runq.front();
runq.pop_front();
+ // Increment the running count of processes in order to support
+ // the Clock::settle() operation (this must be done atomically
+ // with removing the process from the runq).
+ __sync_fetch_and_add(&running, 1);
}
}
@@ -2099,6 +2136,40 @@ ProcessBase* ProcessManager::dequeue()
}
+void ProcessManager::settle()
+{
+ bool done = true;
+ do {
+ usleep(10000);
+ done = true;
+ // Hopefully this is the only place we acquire both these locks.
+ synchronized (runq) {
+ synchronized (timeouts) {
+ CHECK(Clock::paused()); // Since another thread could resume the clock!
+
+ if (!runq.empty()) {
+ done = false;
+ }
+
+ __sync_synchronize(); // Read barrier for 'running'.
+ if (running > 0) {
+ done = false;
+ }
+
+ if (timeouts->size() > 0 &&
+ timeouts->begin()->first <= clock::current) {
+ done = false;
+ }
+
+ if (pending_timers) {
+ done = false;
+ }
+ }
+ }
+ } while (!done);
+}
+
+
namespace timers {
timer create(double secs, const lambda::function<void(void)>& thunk)
Modified: incubator/mesos/trunk/third_party/libprocess/src/tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/src/tests.cpp?rev=1242055&r1=1242054&r2=1242055&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/src/tests.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/src/tests.cpp Wed Feb 8 19:56:27 2012
@@ -553,6 +553,55 @@ TEST(libprocess, select)
}
+class SettleProcess : public Process<SettleProcess>
+{
+public:
+ SettleProcess() : calledDispatch(false) {}
+
+ virtual void initialize()
+ {
+ usleep(10000);
+ delay(0.0, self(), &SettleProcess::afterDelay);
+ }
+
+ void afterDelay()
+ {
+ dispatch(self(), &SettleProcess::afterDispatch);
+ usleep(10000);
+ TimeoutProcess timeoutProcess;
+ spawn(timeoutProcess);
+ terminate(timeoutProcess);
+ wait(timeoutProcess);
+ }
+
+ void afterDispatch()
+ {
+ usleep(10000);
+ calledDispatch = true;
+ }
+
+ volatile bool calledDispatch;
+};
+
+
+TEST(libprocess, settle)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ // Try 100 times to hit a race.
+ for (int i = 0; i < 100; ++i) {
+ Clock::pause();
+ SettleProcess process;
+ spawn(process);
+ Clock::settle();
+ ASSERT_TRUE(process.calledDispatch);
+ terminate(process);
+ wait(process);
+ Clock::resume();
+ }
+}
+
+
// #define ENUMERATE1(item) item##1
// #define ENUMERATE2(item) ENUMERATE1(item), item##2
// #define ENUMERATE3(item) ENUMERATE2(item), item##3