You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:37:30 UTC
svn commit: r1131822 - in /incubator/mesos/trunk/src/third_party/libprocess:
process.cpp process.hpp
Author: benh
Date: Sun Jun 5 05:37:30 2011
New Revision: 1131822
URL: http://svn.apache.org/viewvc?rev=1131822&view=rev
Log:
Fixed concurrency bug caused by racing exiting processes. Closes #58.
Modified:
incubator/mesos/trunk/src/third_party/libprocess/process.cpp
incubator/mesos/trunk/src/third_party/libprocess/process.hpp
Modified: incubator/mesos/trunk/src/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.cpp?rev=1131822&r1=1131821&r2=1131822&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.cpp Sun Jun 5 05:37:30 2011
@@ -896,7 +896,6 @@ public:
Process *process = (Process *) task->tls;
assert(process->state == Process::RUNNING);
- process->state = Process::EXITED;
ProcessManager::instance()->cleanup(process);
@@ -947,8 +946,8 @@ public:
<< " exited due to unknown exception" << endl;
}
- process->state = Process::EXITED;
cleanup(process);
+
proc_process = NULL;
setcontext(&proc_uctx_schedule);
}
@@ -961,7 +960,6 @@ public:
Process *process = (Process *) task->tls;
assert(process->state == Process::RUNNING);
- process->state = Process::EXITED; // s/EXITED/KILLED ?
ProcessManager::instance()->cleanup(process);
@@ -977,8 +975,8 @@ public:
#else
void kill(Process *process)
{
- process->state = Process::EXITED; // s/EXITED/KILLED ?
cleanup(process);
+
proc_process = NULL;
setcontext(&proc_uctx_schedule);
}
@@ -1063,7 +1061,6 @@ public:
void cleanup(Process *process)
{
//cout << "cleanup for " << process->pid << endl;
- assert(process->state == Process::EXITED);
#ifdef USE_LITHE
/* TODO(benh): Assert that we are on the transition stack. */
@@ -1111,7 +1108,7 @@ public:
foreachpair (_, set<Process *> &waiting, waiters)
assert(waiting.find(process) == waiting.end());
- /* Record any waiting processes. */
+ /* Grab all the waiting processes that are now resumable. */
foreach (Process *waiter, waiters[process])
resumable.push_back(waiter);
@@ -1124,6 +1121,8 @@ public:
/* N.B. The last thread that leaves the gate also free's it. */
gates.erase(it);
}
+
+ process->state = Process::EXITED;
}
process->unlock();
}
@@ -1141,6 +1140,9 @@ public:
foreach (Process *p, resumable) {
p->lock();
{
+ // Process 'p' might be RUNNING because it is racing to become
+ // WAITING while we are actually trying to get it to become
+ // running again..
assert(p->state == Process::RUNNING || p->state == Process::WAITING);
if (p->state == Process::RUNNING) {
p->state = Process::INTERRUPTED;
@@ -1419,17 +1421,16 @@ public:
if (lithe_task_gettls((void **) &process) < 0)
abort();
- bool waited = true;
+ bool waited = false;
/* Now we can add the process to the waiters. */
acquire(processes);
{
map<uint32_t, Process *>::iterator it = processes.find(pid.pipe);
if (it != processes.end()) {
- if (it->second->state != Process::EXITED)
- waiters[it->second].insert(process);
- else
- waited = false;
+ assert(it->second->state != Process::EXITED);
+ waiters[it->second].insert(process);
+ waited = true;
}
}
release(processes);
@@ -1451,17 +1452,16 @@ public:
Process *process = proc_process;
- bool waited = true;
+ bool waited = false;
/* Now we can add the process to the waiters. */
acquire(processes);
{
map<uint32_t, Process *>::iterator it = processes.find(pid.pipe);
if (it != processes.end()) {
- if (it->second->state != Process::EXITED)
- waiters[it->second].insert(process);
- else
- waited = false;
+ assert(it->second->state != Process::EXITED);
+ waiters[it->second].insert(process);
+ waited = true;
}
}
release(processes);
@@ -1492,6 +1492,15 @@ public:
bool external_wait(PID pid)
{
+ // We use a gate for external waiters. A gate is single use. That
+ // is, a new gate is created when the first external thread shows
+ // up and wants to wait for a process that currently has no
+ // gate. Once that process exits, the last external thread to
+ // leave the gate will also clean it up. Note that a gate will
+ // never get more external threads waiting on it after it has been
+ // opened, since the process should no longer be valid and
+ // therefore will not have an entry in 'processes'.
+
Gate *gate = NULL;
Gate::state_t old;
@@ -1500,6 +1509,7 @@ public:
{
map<uint32_t, Process *>::iterator it = processes.find(pid.pipe);
if (it != processes.end()) {
+ assert(it->second->state != Process::EXITED);
Process *process = it->second;
/* Check and see if a gate already exists. */
if (gates.find(process) == gates.end())
@@ -2869,26 +2879,26 @@ void Process::enqueue(struct msg *msg)
assert(msg != NULL);
lock();
{
- if (state != EXITED) {
- //cout << "enqueing pending message: " << msg << endl;
- msgs.push_back(msg);
-
- if (state == RECEIVING) {
- state = READY;
- ProcessManager::instance()->enqueue(this);
- } else if (state == AWAITING) {
- state = INTERRUPTED;
- ProcessManager::instance()->enqueue(this);
- }
-
- assert(state == INIT ||
- state == READY ||
- state == RUNNING ||
- state == PAUSED ||
- state == WAITING ||
- state == INTERRUPTED ||
- state == TIMEDOUT);
- }
+ assert (state != EXITED);
+
+ //cout << "enqueing pending message: " << msg << endl;
+ msgs.push_back(msg);
+
+ if (state == RECEIVING) {
+ state = READY;
+ ProcessManager::instance()->enqueue(this);
+ } else if (state == AWAITING) {
+ state = INTERRUPTED;
+ ProcessManager::instance()->enqueue(this);
+ }
+
+ assert(state == INIT ||
+ state == READY ||
+ state == RUNNING ||
+ state == PAUSED ||
+ state == WAITING ||
+ state == INTERRUPTED ||
+ state == TIMEDOUT);
}
unlock();
}
Modified: incubator/mesos/trunk/src/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.hpp?rev=1131822&r1=1131821&r2=1131822&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.hpp Sun Jun 5 05:37:30 2011
@@ -222,10 +222,10 @@ public:
/* Spawn a new process. */
static PID spawn(Process *process);
- /* Wait for PID to exit (returns true if actually waited). */
+ /* Wait for PID to exit (returns true if actually waited on a process). */
static bool wait(PID pid);
- /* Wait for PID to exit (returns true if actually waited). */
+ /* Wait for PID to exit (returns true if actually waited on a process). */
static bool wait(Process *process);
/* Invoke the thunk in a legacy safe way. */