You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:27:37 UTC

svn commit: r1131596 - in /incubator/mesos/trunk/src/third_party/libprocess: process.cpp process.hpp

Author: benh
Date: Sun Jun  5 03:27:37 2011
New Revision: 1131596

URL: http://svn.apache.org/viewvc?rev=1131596&view=rev
Log:
Added timeout/interval support to Process::await ... really helpful.

Modified:
    incubator/mesos/trunk/src/third_party/libprocess/process.cpp
    incubator/mesos/trunk/src/third_party/libprocess/process.hpp

Modified: incubator/mesos/trunk/src/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.cpp?rev=1131596&r1=1131595&r2=1131596&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.cpp Sun Jun  5 03:27:37 2011
@@ -276,7 +276,7 @@ void trampoline(int process0, int proces
 
 PID make_pid(const char *str)
 {
-  PID pid;
+  PID pid = { 0 };
   std::istringstream iss(str);
   iss >> pid;
   return pid;
@@ -1600,7 +1600,7 @@ public:
     return !interrupted;
   }
 #else
-  bool await(Process *process, int fd, int op)
+  bool await(Process *process, int fd, int op, ev_tstamp interval)
   {
     assert(process != NULL);
 
@@ -1624,6 +1624,15 @@ public:
 
     process->lock();
     {
+      timeout_t timeout;
+
+      if (interval > 0) {
+	/* Create timeout. */
+	timeout = create_timeout(process, interval);
+	/* Start the timeout. */
+	start_timeout(timeout);
+      }
+
       /* Enqueue the watcher. */
       acquire(io_watchersq);
       {
@@ -1638,8 +1647,14 @@ public:
       process->state = Process::AWAITING;
       swapcontext(&process->uctx, &proc_uctx_running);
       assert(process->state == Process::READY ||
+	     process->state == Process::TIMEDOUT ||
 	     process->state == Process::INTERRUPTED);
 
+      /* Attempt to cancel the timer if necessary. */
+      if (interval > 0)
+	if (process->state != Process::TIMEDOUT)
+	  cancel_timeout(timeout);
+
       if (process->state == Process::INTERRUPTED)
 	interrupted = true;
 
@@ -1697,10 +1712,13 @@ public:
 	/* N.B. Process may be RUNNING due to "outside" thread 'receive'. */
 	assert(process->state == Process::RUNNING ||
 	       process->state == Process::RECEIVING ||
+	       process->state == Process::AWAITING ||
+	       process->state == Process::INTERRUPTED ||
 	       process->state == Process::PAUSED);
-	process->state = Process::TIMEDOUT;
-	if (process->state != Process::RUNNING)
+	if (process->state != Process::RUNNING ||
+	    process->state != Process::INTERRUPTED)
 	  ProcessManager::instance()->enqueue(process);
+	process->state = Process::TIMEDOUT;
       }
     }
     process->unlock();
@@ -1720,10 +1738,10 @@ public:
   }
 
 
-  timeout_t create_timeout(Process *process, time_t secs)
+  timeout_t create_timeout(Process *process, ev_tstamp interval)
   {
     assert(process != NULL);
-    ev_tstamp tstamp = ev_time() + secs;
+    ev_tstamp tstamp = ev_now(loop) + interval;
     return make_tuple(tstamp, process, process->generation);
   }
 
@@ -3056,9 +3074,10 @@ PID Process::link(const PID &to)
 }
 
 
-bool Process::await(int fd, int op)
+bool Process::await(int fd, int op, const timeval& tv)
 {
-  return ProcessManager::instance()->await(this, fd, op);
+  ev_tstamp interval = tv.tv_sec + (tv.tv_usec * 1e-6);
+  return ProcessManager::instance()->await(this, fd, op, interval);
 }
 
 

Modified: incubator/mesos/trunk/src/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.hpp?rev=1131596&r1=1131595&r2=1131596&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.hpp Sun Jun  5 03:27:37 2011
@@ -5,6 +5,8 @@
 #include <stdlib.h>
 #include <ucontext.h>
 
+#include <sys/time.h>
+
 #ifdef USE_LITHE
 #include <lithe.hh>
 
@@ -195,11 +197,11 @@ protected:
   /* Links with the specified PID. */
   PID link(const PID &);
 
-  /* IO operations for awaiting. */
+  /* IO events for awaiting. */
   enum { RDONLY = 01, WRONLY = 02, RDWR = 03 };
 
   /* Wait until operation is ready for file descriptor (or message received). */
-  bool await(int fd, int op);
+  bool await(int fd, int op, const timeval& tv);
 
   /* Returns true if operation on file descriptor is ready. */
   bool ready(int fd, int op);