You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:37:30 UTC

svn commit: r1131822 - in /incubator/mesos/trunk/src/third_party/libprocess: process.cpp process.hpp

Author: benh
Date: Sun Jun  5 05:37:30 2011
New Revision: 1131822

URL: http://svn.apache.org/viewvc?rev=1131822&view=rev
Log:
Fixed concurrency bug caused by racing exiting processes. Closes #58.

Modified:
    incubator/mesos/trunk/src/third_party/libprocess/process.cpp
    incubator/mesos/trunk/src/third_party/libprocess/process.hpp

Modified: incubator/mesos/trunk/src/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.cpp?rev=1131822&r1=1131821&r2=1131822&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.cpp Sun Jun  5 05:37:30 2011
@@ -896,7 +896,6 @@ public:
     Process *process = (Process *) task->tls;
 
     assert(process->state == Process::RUNNING);
-    process->state = Process::EXITED;
 
     ProcessManager::instance()->cleanup(process);
 
@@ -947,8 +946,8 @@ public:
 	   << " exited due to unknown exception" << endl;
     }
 
-    process->state = Process::EXITED;
     cleanup(process);
+
     proc_process = NULL;
     setcontext(&proc_uctx_schedule);
   }
@@ -961,7 +960,6 @@ public:
     Process *process = (Process *) task->tls;
 
     assert(process->state == Process::RUNNING);
-    process->state = Process::EXITED; // s/EXITED/KILLED ?
 
     ProcessManager::instance()->cleanup(process);
 
@@ -977,8 +975,8 @@ public:
 #else
   void kill(Process *process)
   {
-    process->state = Process::EXITED; // s/EXITED/KILLED ?
     cleanup(process);
+
     proc_process = NULL;
     setcontext(&proc_uctx_schedule);
   }
@@ -1063,7 +1061,6 @@ public:
   void cleanup(Process *process)
   {
     //cout << "cleanup for " << process->pid << endl;
-    assert(process->state == Process::EXITED);
 
 #ifdef USE_LITHE
     /* TODO(benh): Assert that we are on the transition stack. */
@@ -1111,7 +1108,7 @@ public:
 	foreachpair (_, set<Process *> &waiting, waiters)
 	  assert(waiting.find(process) == waiting.end());
 
-	/* Record any waiting processes. */
+	/* Grab all the waiting processes that are now resumable. */
 	foreach (Process *waiter, waiters[process])
 	  resumable.push_back(waiter);
 
@@ -1124,6 +1121,8 @@ public:
 	  /* N.B. The last thread that leaves the gate also free's it. */
 	  gates.erase(it);
 	}
+
+	process->state = Process::EXITED;
       }
       process->unlock();
     }
@@ -1141,6 +1140,9 @@ public:
     foreach (Process *p, resumable) {
       p->lock();
       {
+	// Process 'p' might be RUNNING because it is racing to become
+	// WAITING while we are actually trying to get it to become
+	// running again..
 	assert(p->state == Process::RUNNING || p->state == Process::WAITING);
 	if (p->state == Process::RUNNING) {
 	  p->state = Process::INTERRUPTED;
@@ -1419,17 +1421,16 @@ public:
     if (lithe_task_gettls((void **) &process) < 0)
       abort();
 
-    bool waited = true;
+    bool waited = false;
 
     /* Now we can add the process to the waiters. */
     acquire(processes);
     {
       map<uint32_t, Process *>::iterator it = processes.find(pid.pipe);
       if (it != processes.end()) {
-	if (it->second->state != Process::EXITED)
-	  waiters[it->second].insert(process);
-	else
-	  waited = false;
+	assert(it->second->state != Process::EXITED);
+	waiters[it->second].insert(process);
+	waited = true;
       }
     }
     release(processes);
@@ -1451,17 +1452,16 @@ public:
 
     Process *process = proc_process;
 
-    bool waited = true;
+    bool waited = false;
 
     /* Now we can add the process to the waiters. */
     acquire(processes);
     {
       map<uint32_t, Process *>::iterator it = processes.find(pid.pipe);
       if (it != processes.end()) {
-	if (it->second->state != Process::EXITED)
-	  waiters[it->second].insert(process);
-	else
-	  waited = false;
+	assert(it->second->state != Process::EXITED);
+	waiters[it->second].insert(process);
+	waited = true;
       }
     }
     release(processes);
@@ -1492,6 +1492,15 @@ public:
 
   bool external_wait(PID pid)
   {
+    // We use a gate for external waiters. A gate is single use. That
+    // is, a new gate is created when the first external thread shows
+    // up and wants to wait for a process that currently has no
+    // gate. Once that process exits, the last external thread to
+    // leave the gate will also clean it up. Note that a gate will
+    // never get more external threads waiting on it after it has been
+    // opened, since the process should no longer be valid and
+    // therefore will not have an entry in 'processes'.
+
     Gate *gate = NULL;
     Gate::state_t old;
 
@@ -1500,6 +1509,7 @@ public:
     {
       map<uint32_t, Process *>::iterator it = processes.find(pid.pipe);
       if (it != processes.end()) {
+	assert(it->second->state != Process::EXITED);
 	Process *process = it->second;
 	/* Check and see if a gate already exists. */
 	if (gates.find(process) == gates.end())
@@ -2869,26 +2879,26 @@ void Process::enqueue(struct msg *msg)
   assert(msg != NULL);
   lock();
   {
-    if (state != EXITED) {
-      //cout << "enqueing pending message: " << msg << endl;
-      msgs.push_back(msg);
-
-      if (state == RECEIVING) {
-	state = READY;
-	ProcessManager::instance()->enqueue(this);
-      } else if (state == AWAITING) {
-	state = INTERRUPTED;
-	ProcessManager::instance()->enqueue(this);
-      }
-
-      assert(state == INIT ||
-	     state == READY ||
-	     state == RUNNING ||
-	     state == PAUSED ||
-	     state == WAITING ||
-	     state == INTERRUPTED ||
-	     state == TIMEDOUT);
-    }
+    assert (state != EXITED);
+
+    //cout << "enqueing pending message: " << msg << endl;
+    msgs.push_back(msg);
+
+    if (state == RECEIVING) {
+      state = READY;
+      ProcessManager::instance()->enqueue(this);
+    } else if (state == AWAITING) {
+      state = INTERRUPTED;
+      ProcessManager::instance()->enqueue(this);
+    }
+
+    assert(state == INIT ||
+	   state == READY ||
+	   state == RUNNING ||
+	   state == PAUSED ||
+	   state == WAITING ||
+	   state == INTERRUPTED ||
+	   state == TIMEDOUT);
   }
   unlock();
 }

Modified: incubator/mesos/trunk/src/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.hpp?rev=1131822&r1=1131821&r2=1131822&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.hpp Sun Jun  5 05:37:30 2011
@@ -222,10 +222,10 @@ public:
   /* Spawn a new process. */
   static PID spawn(Process *process);
 
-  /* Wait for PID to exit (returns true if actually waited). */
+  /* Wait for PID to exit (returns true if actually waited on a process). */
   static bool wait(PID pid);
 
-  /* Wait for PID to exit (returns true if actually waited). */
+  /* Wait for PID to exit (returns true if actually waited on a process). */
   static bool wait(Process *process);
 
   /* Invoke the thunk in a legacy safe way. */