You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:27:37 UTC
svn commit: r1131596 - in /incubator/mesos/trunk/src/third_party/libprocess:
process.cpp process.hpp
Author: benh
Date: Sun Jun 5 03:27:37 2011
New Revision: 1131596
URL: http://svn.apache.org/viewvc?rev=1131596&view=rev
Log:
Added timeout/interval support to Process::await ... really helpful.
Modified:
incubator/mesos/trunk/src/third_party/libprocess/process.cpp
incubator/mesos/trunk/src/third_party/libprocess/process.hpp
Modified: incubator/mesos/trunk/src/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.cpp?rev=1131596&r1=1131595&r2=1131596&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.cpp Sun Jun 5 03:27:37 2011
@@ -276,7 +276,7 @@ void trampoline(int process0, int proces
PID make_pid(const char *str)
{
- PID pid;
+ PID pid = { 0 };
std::istringstream iss(str);
iss >> pid;
return pid;
@@ -1600,7 +1600,7 @@ public:
return !interrupted;
}
#else
- bool await(Process *process, int fd, int op)
+ bool await(Process *process, int fd, int op, ev_tstamp interval)
{
assert(process != NULL);
@@ -1624,6 +1624,15 @@ public:
process->lock();
{
+ timeout_t timeout;
+
+ if (interval > 0) {
+ /* Create timeout. */
+ timeout = create_timeout(process, interval);
+ /* Start the timeout. */
+ start_timeout(timeout);
+ }
+
/* Enqueue the watcher. */
acquire(io_watchersq);
{
@@ -1638,8 +1647,14 @@ public:
process->state = Process::AWAITING;
swapcontext(&process->uctx, &proc_uctx_running);
assert(process->state == Process::READY ||
+ process->state == Process::TIMEDOUT ||
process->state == Process::INTERRUPTED);
+ /* Attempt to cancel the timer if necessary. */
+ if (interval > 0)
+ if (process->state != Process::TIMEDOUT)
+ cancel_timeout(timeout);
+
if (process->state == Process::INTERRUPTED)
interrupted = true;
@@ -1697,10 +1712,13 @@ public:
/* N.B. Process may be RUNNING due to "outside" thread 'receive'. */
assert(process->state == Process::RUNNING ||
process->state == Process::RECEIVING ||
+ process->state == Process::AWAITING ||
+ process->state == Process::INTERRUPTED ||
process->state == Process::PAUSED);
- process->state = Process::TIMEDOUT;
- if (process->state != Process::RUNNING)
+ if (process->state != Process::RUNNING ||
+ process->state != Process::INTERRUPTED)
ProcessManager::instance()->enqueue(process);
+ process->state = Process::TIMEDOUT;
}
}
process->unlock();
@@ -1720,10 +1738,10 @@ public:
}
- timeout_t create_timeout(Process *process, time_t secs)
+ timeout_t create_timeout(Process *process, ev_tstamp interval)
{
assert(process != NULL);
- ev_tstamp tstamp = ev_time() + secs;
+ ev_tstamp tstamp = ev_now(loop) + interval;
return make_tuple(tstamp, process, process->generation);
}
@@ -3056,9 +3074,10 @@ PID Process::link(const PID &to)
}
-bool Process::await(int fd, int op)
+bool Process::await(int fd, int op, const timeval& tv)
{
- return ProcessManager::instance()->await(this, fd, op);
+ ev_tstamp interval = tv.tv_sec + (tv.tv_usec * 1e-6);
+ return ProcessManager::instance()->await(this, fd, op, interval);
}
Modified: incubator/mesos/trunk/src/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.hpp?rev=1131596&r1=1131595&r2=1131596&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.hpp Sun Jun 5 03:27:37 2011
@@ -5,6 +5,8 @@
#include <stdlib.h>
#include <ucontext.h>
+#include <sys/time.h>
+
#ifdef USE_LITHE
#include <lithe.hh>
@@ -195,11 +197,11 @@ protected:
/* Links with the specified PID. */
PID link(const PID &);
- /* IO operations for awaiting. */
+ /* IO events for awaiting. */
enum { RDONLY = 01, WRONLY = 02, RDWR = 03 };
/* Wait until operation is ready for file descriptor (or message received). */
- bool await(int fd, int op);
+ bool await(int fd, int op, const timeval& tv);
/* Returns true if operation on file descriptor is ready. */
bool ready(int fd, int op);