You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/06/21 19:55:14 UTC
[1/4] git commit: Refactored Reaper with an abstraction that hides
the dispatch syntax.
Updated Branches:
refs/heads/master 464192eee -> ad198fe48
Refactored Reaper with an abstraction that hides the dispatch syntax.
From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/11552
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/93dfc5f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/93dfc5f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/93dfc5f0
Branch: refs/heads/master
Commit: 93dfc5f03d32b7b01f5c26fceeb9f869dd11bbb0
Parents: 464192e
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Jun 21 10:43:02 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jun 21 10:43:02 2013 -0700
----------------------------------------------------------------------
src/slave/cgroups_isolator.cpp | 15 ++----------
src/slave/cgroups_isolator.hpp | 4 +---
src/slave/process_isolator.cpp | 15 ++----------
src/slave/process_isolator.hpp | 4 +---
src/slave/reaper.cpp | 48 +++++++++++++++++++++++++++----------
src/slave/reaper.hpp | 40 +++++++++++++++++++++----------
src/tests/reaper_tests.cpp | 8 ++-----
7 files changed, 72 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 553844b..9f5de60 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -200,18 +200,7 @@ CgroupsIsolator::CgroupsIsolator()
// Spawn the reaper, note that it might send us a message before we
// actually get spawned ourselves, but that's okay, the message will
// just get dropped.
- reaper = new Reaper();
- spawn(reaper);
- dispatch(reaper, &Reaper::addListener, this);
-}
-
-
-CgroupsIsolator::~CgroupsIsolator()
-{
- CHECK(reaper != NULL);
- terminate(reaper);
- process::wait(reaper); // Necessary for disambiguation.
- delete reaper;
+ reaper.addListener(this);
}
@@ -780,7 +769,7 @@ Future<Nothing> CgroupsIsolator::recover(
// Add the pid to the reaper to monitor exit status.
if (run.forkedPid.isSome()) {
- dispatch(reaper, &Reaper::monitor, run.forkedPid.get());
+ reaper.monitor(run.forkedPid.get());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index 124a4b3..df6069e 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -81,8 +81,6 @@ class CgroupsIsolator : public Isolator, public ProcessExitedListener
public:
CgroupsIsolator();
- virtual ~CgroupsIsolator();
-
virtual void initialize(
const Flags& flags,
const Resources& resources,
@@ -292,7 +290,7 @@ private:
bool local;
process::PID<Slave> slave;
bool initialized;
- Reaper* reaper;
+ Reaper reaper;
// File descriptor to 'mesos/tasks' file in the cgroup on which we place
// an advisory lock.
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index 97a017e..a3eace3 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -68,18 +68,7 @@ ProcessIsolator::ProcessIsolator()
// Spawn the reaper, note that it might send us a message before we
// actually get spawned ourselves, but that's okay, the message will
// just get dropped.
- reaper = new Reaper();
- spawn(reaper);
- dispatch(reaper, &Reaper::addListener, this);
-}
-
-
-ProcessIsolator::~ProcessIsolator()
-{
- CHECK(reaper != NULL);
- terminate(reaper);
- wait(reaper);
- delete reaper;
+ reaper.addListener(this);
}
@@ -345,7 +334,7 @@ Future<Nothing> ProcessIsolator::recover(
// Add the pid to the reaper to monitor exit status.
if (run.forkedPid.isSome()) {
- dispatch(reaper, &Reaper::monitor, run.forkedPid.get());
+ reaper.monitor(run.forkedPid.get());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index ee693f6..c5abc6b 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -46,8 +46,6 @@ class ProcessIsolator : public Isolator, public ProcessExitedListener
public:
ProcessIsolator();
- virtual ~ProcessIsolator();
-
virtual void initialize(
const Flags& flags,
const Resources& resources,
@@ -109,7 +107,7 @@ private:
bool local;
process::PID<Slave> slave;
bool initialized;
- Reaper* reaper;
+ Reaper reaper;
hashmap<FrameworkID, hashmap<ExecutorID, ProcessInfo*> > infos;
};
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/slave/reaper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.cpp b/src/slave/reaper.cpp
index 1d826d1..c2e606e 100644
--- a/src/slave/reaper.cpp
+++ b/src/slave/reaper.cpp
@@ -41,28 +41,25 @@ namespace mesos {
namespace internal {
namespace slave {
-Reaper::Reaper()
+ReaperProcess::ReaperProcess()
: ProcessBase(ID::generate("reaper")) {}
-Reaper::~Reaper() {}
-
-
-void Reaper::addListener(
+void ReaperProcess::addListener(
const PID<ProcessExitedListener>& listener)
{
listeners.push_back(listener);
}
-Try<Nothing> Reaper::monitor(pid_t pid)
+Future<Nothing> ReaperProcess::monitor(pid_t pid)
{
// Check to see if the current process has sufficient privileges to
// monitor the liveness of this pid.
Try<bool> alive = os::alive(pid);
if (alive.isError()) {
- return Error("Failed to monitor process " + stringify(pid) +
- ": " + alive.error());
+ return Future<Nothing>::failed("Failed to monitor process " +
+ stringify(pid) + ": " + alive.error());
} else {
pids.insert(pid);
}
@@ -70,13 +67,13 @@ Try<Nothing> Reaper::monitor(pid_t pid)
}
-void Reaper::initialize()
+void ReaperProcess::initialize()
{
reap();
}
-void Reaper::notify(pid_t pid, int status)
+void ReaperProcess::notify(pid_t pid, int status)
{
foreach (const PID<ProcessExitedListener>& listener, listeners) {
dispatch(listener, &ProcessExitedListener::processExited, pid, status);
@@ -84,7 +81,7 @@ void Reaper::notify(pid_t pid, int status)
}
-void Reaper::reap()
+void ReaperProcess::reap()
{
// Check whether any monitored process has exited.
foreach (pid_t pid, utils::copy(pids)) {
@@ -117,7 +114,34 @@ void Reaper::reap()
pids.erase(pid);
}
}
- delay(Seconds(1), self(), &Reaper::reap); // Reap forever!
+ delay(Seconds(1), self(), &ReaperProcess::reap); // Reap forever!
+}
+
+
+Reaper::Reaper()
+{
+ process = new ReaperProcess();
+ spawn(process);
+}
+
+
+Reaper::~Reaper()
+{
+ terminate(process);
+ wait(process);
+ delete process;
+}
+
+
+void Reaper::addListener(const process::PID<ProcessExitedListener>& listener)
+{
+ dispatch(process, &ReaperProcess::addListener, listener);
+}
+
+
+Future<Nothing> Reaper::monitor(pid_t pid)
+{
+ return dispatch(process, &ReaperProcess::monitor, pid);
}
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
index 09844d8..d599e9c 100644
--- a/src/slave/reaper.hpp
+++ b/src/slave/reaper.hpp
@@ -27,6 +27,7 @@
#include <stout/nothing.hpp>
#include <stout/try.hpp>
+
namespace mesos {
namespace internal {
namespace slave {
@@ -38,11 +39,35 @@ public:
};
+// Reaper implementation. See comments of the Reaper class.
+class ReaperProcess : public process::Process<ReaperProcess>
+{
+public:
+ ReaperProcess();
+
+ void addListener(const process::PID<ProcessExitedListener>&);
+
+ process::Future<Nothing> monitor(pid_t pid);
+
+protected:
+ virtual void initialize();
+
+ void reap();
+
+ // TODO(vinod): Make 'status' an option.
+ void notify(pid_t pid, int status);
+
+private:
+ std::list<process::PID<ProcessExitedListener> > listeners;
+ std::set<pid_t> pids;
+};
+
+
// TODO(vinod): Refactor the Reaper into 2 components:
// 1) Reaps the status of child processes.
// 2) Checks the exit status of requested processes.
// Also, use Futures instead of callbacks to notify process exits.
-class Reaper : public process::Process<Reaper>
+class Reaper
{
public:
Reaper();
@@ -55,19 +80,10 @@ public:
// 1) is the parent of 'pid' or
// 2) has the same real/effective UID as that of 'pid' or
// 3) is run as a privileged user.
- Try<Nothing> monitor(pid_t pid);
-
-protected:
- virtual void initialize();
-
- void reap();
-
- // TOOD(vinod): Make 'status' an option.
- void notify(pid_t pid, int status);
+ process::Future<Nothing> monitor(pid_t pid);
private:
- std::list<process::PID<ProcessExitedListener> > listeners;
- std::set<pid_t> pids;
+ ReaperProcess* process;
};
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/93dfc5f0/src/tests/reaper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reaper_tests.cpp b/src/tests/reaper_tests.cpp
index fbb6066..6849419 100644
--- a/src/tests/reaper_tests.cpp
+++ b/src/tests/reaper_tests.cpp
@@ -102,16 +102,15 @@ TEST(ReaperTest, NonChildProcess)
// Spawn the reaper.
Reaper reaper;
- spawn(reaper);
// Ignore the exit of the child process.
EXPECT_CALL(listener, processExited(_,_))
.WillRepeatedly(DoDefault());
- dispatch(reaper, &Reaper::addListener, listener.self());
+ reaper.addListener(listener.self());
// Ask the reaper to monitor the grand child process.
- dispatch(reaper, &Reaper::monitor, pid);
+ reaper.monitor(pid);
// Catch the exit of the grand child process.
Future<Nothing> processExited;
@@ -134,9 +133,6 @@ TEST(ReaperTest, NonChildProcess)
// Ensure the reaper notifies of the terminated process.
AWAIT_READY(processExited);
- terminate(reaper);
- wait(reaper);
-
terminate(listener);
wait(listener);
[4/4] git commit: Changed reaper to use an Option for the process
exit status.
Posted by bm...@apache.org.
Changed reaper to use an Option for the process exit status.
From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/11556
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/ad198fe4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/ad198fe4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/ad198fe4
Branch: refs/heads/master
Commit: ad198fe4876ce2ce027c55e04c198005294d0fc5
Parents: 4dd692f
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Jun 21 10:47:44 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jun 21 10:47:44 2013 -0700
----------------------------------------------------------------------
src/slave/cgroups_isolator.cpp | 23 ++++++++++---------
src/slave/cgroups_isolator.hpp | 4 ++--
src/slave/process_isolator.cpp | 18 +++++++--------
src/slave/process_isolator.hpp | 2 +-
src/slave/reaper.cpp | 24 ++++++++++----------
src/slave/reaper.hpp | 13 ++++++-----
src/slave/slave.cpp | 45 ++++++++++++++++++++++---------------
src/slave/slave.hpp | 2 +-
src/tests/reaper_tests.cpp | 20 +++++++++--------
9 files changed, 81 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 6b87d08..9d18886 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -801,25 +801,26 @@ Future<Nothing> CgroupsIsolator::recover(
}
-void CgroupsIsolator::reaped(pid_t pid, const Future<int>& status)
+void CgroupsIsolator::reaped(pid_t pid, const Future<Option<int> >& status)
{
- if (status.isDiscarded()) {
- LOG(ERROR) << "The status was discarded";
- return;
- }
- if (status.isFailed()) {
- LOG(ERROR) << status.failure();
- return;
- }
-
CgroupInfo* info = findCgroupInfo(pid);
if (info != NULL) {
FrameworkID frameworkId = info->frameworkId;
ExecutorID executorId = info->executorId;
+ if (!status.isReady()) {
+ LOG(ERROR) << "Failed to get the status for executor " << executorId
+ << " of framework " << frameworkId << ": "
+ << (status.isFailed() ? status.failure() : "discarded");
+ return;
+ }
+
LOG(INFO) << "Executor " << executorId
<< " of framework " << frameworkId
- << " terminated with status " << status.get();
+ << " terminated with status "
+ << (status.get().isSome()
+ ? stringify(status.get().get())
+ : "unknown");
// Set the exit status, so that '_killExecutor()' can send it to the slave.
info->status = status.get();
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index 06bac98..e86062e 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -119,7 +119,7 @@ private:
CgroupsIsolator(const CgroupsIsolator&);
CgroupsIsolator& operator = (const CgroupsIsolator&);
- void reaped(pid_t pid, const Future<int>& status);
+ void reaped(pid_t pid, const Future<Option<int> >& status);
// The cgroup information for each live executor.
struct CgroupInfo
@@ -163,7 +163,7 @@ private:
std::string message; // The reason behind the destruction.
- int status; // Exit status of the executor.
+ Option<int> status; // Exit status of the executor.
Flags flags; // Slave flags.
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index a3d31e5..b750b26 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -423,22 +423,20 @@ Future<ResourceStatistics> ProcessIsolator::usage(
}
-void ProcessIsolator::reaped(pid_t pid, const Future<int>& status)
+void ProcessIsolator::reaped(pid_t pid, const Future<Option<int> >& status)
{
- if (status.isDiscarded()) {
- LOG(ERROR) << "The status was discarded";
- return;
- }
- if (status.isFailed()) {
- LOG(ERROR) << status.failure();
- return;
- }
-
foreachkey (const FrameworkID& frameworkId, infos) {
foreachkey (const ExecutorID& executorId, infos[frameworkId]) {
ProcessInfo* info = infos[frameworkId][executorId];
if (info->pid.isSome() && info->pid.get() == pid) {
+ if (!status.isReady()) {
+ LOG(ERROR) << "Failed to get the status for executor '" << executorId
+ << "' of framework " << frameworkId << ": "
+ << (status.isFailed() ? status.failure() : "discarded");
+ return;
+ }
+
LOG(INFO) << "Telling slave of terminated executor '" << executorId
<< "' of framework " << frameworkId;
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index a3f6c68..4ae093f 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -109,7 +109,7 @@ private:
Reaper reaper;
hashmap<FrameworkID, hashmap<ExecutorID, ProcessInfo*> > infos;
- void reaped(pid_t pid, const Future<int>& status);
+ void reaped(pid_t pid, const Future<Option<int> >& status);
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/reaper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.cpp b/src/slave/reaper.cpp
index 1b19eb2..8d3101d 100644
--- a/src/slave/reaper.cpp
+++ b/src/slave/reaper.cpp
@@ -45,7 +45,7 @@ ReaperProcess::ReaperProcess()
: ProcessBase(ID::generate("reaper")) {}
-Future<int> ReaperProcess::monitor(pid_t pid)
+Future<Option<int> > ReaperProcess::monitor(pid_t pid)
{
// Check to see if the current process has sufficient privileges to
// monitor the liveness of this pid.
@@ -55,14 +55,14 @@ Future<int> ReaperProcess::monitor(pid_t pid)
if (alive.get()) {
// We have permissions to check the validity of the process
// and it's alive, so add it to the promises map.
- Owned<Promise<int> > promise(new Promise<int>());
+ Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
promises.put(pid, promise);
return promise->future();
} else {
// Process doesn't exist.
LOG(WARNING) << "Cannot monitor process " << pid
<< " because it doesn't exist";
- return -1;
+ return None();
}
}
@@ -75,17 +75,17 @@ Future<int> ReaperProcess::monitor(pid_t pid)
// The process terminated and the status was reaped.
// Notify other listeners and return directly for this caller.
notify(pid, status);
- return status;
+ return Option<int>(status);
} else if (result == 0) {
// Child still active, add to the map.
- Owned<Promise<int> > promise(new Promise<int>());
+ Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
promises.put(pid, promise);
return promise->future();
} else {
// Not a child nor do we have permission to for os::alive();
// we cannot monitor this pid.
- return Future<int>::failed("Failed to monitor process " +
- stringify(pid) + ": " + strerror(errno));
+ return Future<Option<int> >::failed(
+ "Failed to monitor process " + stringify(pid) + ": " + strerror(errno));
}
}
@@ -96,9 +96,9 @@ void ReaperProcess::initialize()
}
-void ReaperProcess::notify(pid_t pid, int status)
+void ReaperProcess::notify(pid_t pid, Option<int> status)
{
- foreach (const Owned<Promise<int> >& promise, promises.get(pid)) {
+ foreach (const Owned<Promise<Option<int> > >& promise, promises.get(pid)) {
promise->set(status);
}
promises.remove(pid);
@@ -120,7 +120,7 @@ void ReaperProcess::reap()
// Notify the "listeners" only if they have requested to monitor
// this pid. Otherwise the status is discarded.
// This means if a child pid is registered via the monitor() call
- // after it's reaped, an invalid status (-1) will be returned.
+ // after it's reaped, status 'None' will be returned.
if (!WIFSTOPPED(status) && promises.contains(pid)) {
notify(pid, status);
}
@@ -145,7 +145,7 @@ void ReaperProcess::reap()
LOG(WARNING) << "Cannot get the exit status of process " << pid
<< " because it is not a child of the calling "
<< "process: " << strerror(errno);
- notify(pid, -1);
+ notify(pid, None());
}
}
@@ -168,7 +168,7 @@ Reaper::~Reaper()
}
-Future<int> Reaper::monitor(pid_t pid)
+Future<Option<int> > Reaper::monitor(pid_t pid)
{
return dispatch(process, &ReaperProcess::monitor, pid);
}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
index 15ebc0d..2d6db03 100644
--- a/src/slave/reaper.hpp
+++ b/src/slave/reaper.hpp
@@ -60,8 +60,10 @@ public:
//
// The exit status of 'pid' can only be correctly captured if the
// calling process is the parent of 'pid' and the process hasn't
- // been reaped yet, otherwise -1 is returned.
- process::Future<int> monitor(pid_t pid);
+ // been reaped yet, otherwise 'None' is returned.
+ // Note that an invalid pid does not cause a failed Future, but an
+ // empty result ('None').
+ process::Future<Option<int> > monitor(pid_t pid);
private:
ReaperProcess* process;
@@ -74,22 +76,21 @@ class ReaperProcess : public process::Process<ReaperProcess>
public:
ReaperProcess();
- process::Future<int> monitor(pid_t pid);
+ process::Future<Option<int> > monitor(pid_t pid);
protected:
virtual void initialize();
void reap();
- // TODO(vinod): Make 'status' an option.
// The notification is sent only if the pid is explicitly registered
// via the monitor() call.
- void notify(pid_t pid, int status);
+ void notify(pid_t pid, Option<int> status);
private:
// Mapping from the monitored pid to all promises the pid exit
// status should be sent to.
- multihashmap<pid_t, Owned<process::Promise<int> > > promises;
+ multihashmap<pid_t, Owned<process::Promise<Option<int> > > > promises;
};
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 952bd14..309d1ac 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2045,19 +2045,28 @@ void _unwatch(
void Slave::executorTerminated(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
- int status,
+ const Option<int>& status_,
bool destroyed,
const string& message)
{
- LOG(INFO) << "Executor '" << executorId
- << "' of framework " << frameworkId
- << (WIFEXITED(status)
- ? " has exited with status '"
- : " has terminated with signal '")
- << (WIFEXITED(status)
- ? stringify(WEXITSTATUS(status))
- : strsignal(WTERMSIG(status)))
- << "'";
+ int status;
+ if (status_.isNone()) {
+ LOG(INFO) << "Executor '" << executorId
+ << "' of framework " << frameworkId
+ << " has terminated with unknown status";
+ // Set a special status for None.
+ status = -1;
+ } else {
+ status = status_.get();
+ LOG(INFO) << "Executor '" << executorId
+ << "' of framework " << frameworkId
+ << (WIFEXITED(status)
+ ? " has exited with status "
+ : " has terminated with signal ")
+ << (WIFEXITED(status)
+ ? stringify(WEXITSTATUS(status))
+ : strsignal(WTERMSIG(status)));
+ }
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
@@ -2106,18 +2115,18 @@ void Slave::executorTerminated(
// Transition all live launched tasks.
foreachvalue (Task* task, utils::copy(executor->launchedTasks)) {
if (!protobuf::isTerminalState(task->state())) {
- mesos::TaskState status;
+ mesos::TaskState taskState;
isCommandExecutor = !task->has_executor_id();
if (destroyed || isCommandExecutor.get()) {
- status = TASK_FAILED;
+ taskState = TASK_FAILED;
} else {
- status = TASK_LOST;
+ taskState = TASK_LOST;
}
statusUpdate(protobuf::createStatusUpdate(
frameworkId,
info.id(),
task->task_id(),
- status,
+ taskState,
message,
executorId));
}
@@ -2126,18 +2135,18 @@ void Slave::executorTerminated(
// Transition all queued tasks.
foreachvalue (const TaskInfo& task,
utils::copy(executor->queuedTasks)) {
- mesos::TaskState status;
+ mesos::TaskState taskState;
isCommandExecutor = task.has_command();
if (destroyed || isCommandExecutor.get()) {
- status = TASK_FAILED;
+ taskState = TASK_FAILED;
} else {
- status = TASK_LOST;
+ taskState = TASK_LOST;
}
statusUpdate(protobuf::createStatusUpdate(
frameworkId,
info.id(),
task.task_id(),
- status,
+ taskState,
message,
executorId));
}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d1ba82e..7ef6ad8 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -171,7 +171,7 @@ public:
void executorTerminated(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
- int status,
+ const Option<int>& status,
bool destroyed,
const std::string& message);
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/tests/reaper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reaper_tests.cpp b/src/tests/reaper_tests.cpp
index cfda0f4..d55ac24 100644
--- a/src/tests/reaper_tests.cpp
+++ b/src/tests/reaper_tests.cpp
@@ -29,6 +29,7 @@
#include <process/gtest.hpp>
#include <stout/exit.hpp>
+#include <stout/gtest.hpp>
#include <stout/os.hpp>
#include "slave/reaper.hpp"
@@ -99,7 +100,7 @@ TEST(ReaperTest, NonChildProcess)
Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
// Ask the reaper to monitor the grand child process.
- Future<int> status = reaper.monitor(pid);
+ Future<Option<int> > status = reaper.monitor(pid);
AWAIT_READY(monitor);
@@ -120,8 +121,8 @@ TEST(ReaperTest, NonChildProcess)
// Ensure the reaper notifies of the terminated process.
AWAIT_READY(status);
- // Status is -1 because pid is not an immediate child.
- ASSERT_EQ(-1, status.get());
+ // Status is None because pid is not an immediate child.
+ ASSERT_NONE(status.get());
Clock::resume();
}
@@ -149,7 +150,7 @@ TEST(ReaperTest, ChildProcess)
Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
// Ask the reaper to monitor the grand child process.
- Future<int> status = reaper.monitor(pid);
+ Future<Option<int> > status = reaper.monitor(pid);
AWAIT_READY(monitor);
@@ -168,9 +169,10 @@ TEST(ReaperTest, ChildProcess)
AWAIT_READY(status);
// Check if the status is correct.
- int stat = status.get();
- ASSERT_TRUE(WIFSIGNALED(stat));
- ASSERT_EQ(SIGKILL, WTERMSIG(stat));
+ ASSERT_SOME(status.get());
+ int status_ = status.get().get();
+ ASSERT_TRUE(WIFSIGNALED(status_));
+ ASSERT_EQ(SIGKILL, WTERMSIG(status_));
Clock::resume();
}
@@ -212,7 +214,7 @@ TEST(ReaperTest, TerminatedChildProcess)
Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
// Ask the reaper to monitor the child process.
- Future<int> status = reaper.monitor(pid);
+ Future<Option<int> > status = reaper.monitor(pid);
AWAIT_READY(monitor);
@@ -227,7 +229,7 @@ TEST(ReaperTest, TerminatedChildProcess)
// Invalid status is returned because it is reaped before being
// monitored.
- ASSERT_EQ(-1, status.get());
+ ASSERT_NONE(status.get());
Clock::resume();
}
[3/4] git commit: Changed reaper to use a Future as the notification
mechanism. The notification is now sent only if the pid is explicitly
registered via the monitor() call.
Posted by bm...@apache.org.
Changed reaper to use a Future as the notification mechanism.
The notification is now sent only if the pid is explicitly registered
via the monitor() call.
From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/11554
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/4dd692f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/4dd692f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/4dd692f1
Branch: refs/heads/master
Commit: 4dd692f19973482e145d67fc429ab931d0ecb882
Parents: ad842c3
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Jun 21 10:46:33 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jun 21 10:46:33 2013 -0700
----------------------------------------------------------------------
src/slave/cgroups_isolator.cpp | 35 ++++++---
src/slave/cgroups_isolator.hpp | 6 +-
src/slave/process_isolator.cpp | 36 ++++++---
src/slave/process_isolator.hpp | 5 +-
src/slave/reaper.cpp | 121 ++++++++++++++++++-----------
src/slave/reaper.hpp | 72 +++++++++--------
src/tests/reaper_tests.cpp | 149 +++++++++++++++++++++++++++++-------
7 files changed, 291 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 9f5de60..6b87d08 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -195,13 +195,7 @@ std::ostream& operator << (std::ostream& out, const Cpuset& cpuset)
CgroupsIsolator::CgroupsIsolator()
: ProcessBase(ID::generate("cgroups-isolator")),
initialized(false),
- lockFile(None())
-{
- // Spawn the reaper, note that it might send us a message before we
- // actually get spawned ourselves, but that's okay, the message will
- // just get dropped.
- reaper.addListener(this);
-}
+ lockFile(None()) {}
void CgroupsIsolator::initialize(
@@ -540,6 +534,12 @@ void CgroupsIsolator::launchExecutor(
// Store the pid of the leading process of the executor.
info->pid = pid;
+ reaper.monitor(pid)
+ .onAny(defer(PID<CgroupsIsolator>(this),
+ &CgroupsIsolator::reaped,
+ pid,
+ lambda::_1));
+
// Tell the slave this executor has started.
dispatch(slave,
&Slave::executorStarted,
@@ -769,7 +769,11 @@ Future<Nothing> CgroupsIsolator::recover(
// Add the pid to the reaper to monitor exit status.
if (run.forkedPid.isSome()) {
- reaper.monitor(run.forkedPid.get());
+ reaper.monitor(run.forkedPid.get())
+ .onAny(defer(PID<CgroupsIsolator>(this),
+ &CgroupsIsolator::reaped,
+ run.forkedPid.get(),
+ lambda::_1));
}
}
}
@@ -797,8 +801,17 @@ Future<Nothing> CgroupsIsolator::recover(
}
-void CgroupsIsolator::processExited(pid_t pid, int status)
+void CgroupsIsolator::reaped(pid_t pid, const Future<int>& status)
{
+ if (status.isDiscarded()) {
+ LOG(ERROR) << "The status was discarded";
+ return;
+ }
+ if (status.isFailed()) {
+ LOG(ERROR) << status.failure();
+ return;
+ }
+
CgroupInfo* info = findCgroupInfo(pid);
if (info != NULL) {
FrameworkID frameworkId = info->frameworkId;
@@ -806,10 +819,10 @@ void CgroupsIsolator::processExited(pid_t pid, int status)
LOG(INFO) << "Executor " << executorId
<< " of framework " << frameworkId
- << " terminated with status " << status;
+ << " terminated with status " << status.get();
// Set the exit status, so that '_killExecutor()' can send it to the slave.
- info->status = status;
+ info->status = status.get();
if (!info->killed) {
killExecutor(frameworkId, executorId);
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index df6069e..06bac98 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -76,7 +76,7 @@ private:
};
-class CgroupsIsolator : public Isolator, public ProcessExitedListener
+class CgroupsIsolator : public Isolator
{
public:
CgroupsIsolator();
@@ -114,13 +114,13 @@ public:
virtual process::Future<Nothing> recover(
const Option<state::SlaveState>& state);
- virtual void processExited(pid_t pid, int status);
-
private:
// No copying, no assigning.
CgroupsIsolator(const CgroupsIsolator&);
CgroupsIsolator& operator = (const CgroupsIsolator&);
+ void reaped(pid_t pid, const Future<int>& status);
+
// The cgroup information for each live executor.
struct CgroupInfo
{
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index a3eace3..a3d31e5 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -25,12 +25,14 @@
#include <set>
#include <process/clock.hpp>
+#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/id.hpp>
#include <stout/check.hpp>
#include <stout/exit.hpp>
#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
@@ -48,6 +50,7 @@ using std::map;
using std::set;
using std::string;
+using process::defer;
using process::wait; // Necessary on some OS's to disambiguate.
namespace mesos {
@@ -63,13 +66,7 @@ using state::RunState;
ProcessIsolator::ProcessIsolator()
: ProcessBase(ID::generate("process-isolator")),
- initialized(false)
-{
- // Spawn the reaper, note that it might send us a message before we
- // actually get spawned ourselves, but that's okay, the message will
- // just get dropped.
- reaper.addListener(this);
-}
+ initialized(false) {}
void ProcessIsolator::initialize(
@@ -166,6 +163,12 @@ void ProcessIsolator::launchExecutor(
// Record the pid (should also be the pgid since we setsid below).
infos[frameworkId][executorId]->pid = pid;
+ reaper.monitor(pid)
+ .onAny(defer(PID<ProcessIsolator>(this),
+ &ProcessIsolator::reaped,
+ pid,
+ lambda::_1));
+
// Tell the slave this executor has started.
dispatch(slave, &Slave::executorStarted, frameworkId, executorId, pid);
} else {
@@ -334,7 +337,11 @@ Future<Nothing> ProcessIsolator::recover(
// Add the pid to the reaper to monitor exit status.
if (run.forkedPid.isSome()) {
- reaper.monitor(run.forkedPid.get());
+ reaper.monitor(run.forkedPid.get())
+ .onAny(defer(PID<ProcessIsolator>(this),
+ &ProcessIsolator::reaped,
+ run.forkedPid.get(),
+ lambda::_1));
}
}
}
@@ -416,8 +423,17 @@ Future<ResourceStatistics> ProcessIsolator::usage(
}
-void ProcessIsolator::processExited(pid_t pid, int status)
+void ProcessIsolator::reaped(pid_t pid, const Future<int>& status)
{
+ if (status.isDiscarded()) {
+ LOG(ERROR) << "The status was discarded";
+ return;
+ }
+ if (status.isFailed()) {
+ LOG(ERROR) << status.failure();
+ return;
+ }
+
foreachkey (const FrameworkID& frameworkId, infos) {
foreachkey (const ExecutorID& executorId, infos[frameworkId]) {
ProcessInfo* info = infos[frameworkId][executorId];
@@ -430,7 +446,7 @@ void ProcessIsolator::processExited(pid_t pid, int status)
&Slave::executorTerminated,
frameworkId,
executorId,
- status,
+ status.get(),
false,
"Executor terminated");
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index c5abc6b..a3f6c68 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -41,7 +41,7 @@ namespace mesos {
namespace internal {
namespace slave {
-class ProcessIsolator : public Isolator, public ProcessExitedListener
+class ProcessIsolator : public Isolator
{
public:
ProcessIsolator();
@@ -77,7 +77,6 @@ public:
virtual process::Future<Nothing> recover(
const Option<state::SlaveState>& state);
- virtual void processExited(pid_t pid, int status);
private:
// No copying, no assigning.
@@ -109,6 +108,8 @@ private:
bool initialized;
Reaper reaper;
hashmap<FrameworkID, hashmap<ExecutorID, ProcessInfo*> > infos;
+
+ void reaped(pid_t pid, const Future<int>& status);
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/slave/reaper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.cpp b/src/slave/reaper.cpp
index c2e606e..1b19eb2 100644
--- a/src/slave/reaper.cpp
+++ b/src/slave/reaper.cpp
@@ -45,25 +45,48 @@ ReaperProcess::ReaperProcess()
: ProcessBase(ID::generate("reaper")) {}
-void ReaperProcess::addListener(
- const PID<ProcessExitedListener>& listener)
-{
- listeners.push_back(listener);
-}
-
-
-Future<Nothing> ReaperProcess::monitor(pid_t pid)
+Future<int> ReaperProcess::monitor(pid_t pid)
{
// Check to see if the current process has sufficient privileges to
// monitor the liveness of this pid.
Try<bool> alive = os::alive(pid);
- if (alive.isError()) {
- return Future<Nothing>::failed("Failed to monitor process " +
- stringify(pid) + ": " + alive.error());
+
+ if (alive.isSome()) {
+ if (alive.get()) {
+ // We have permissions to check the validity of the process
+ // and it's alive, so add it to the promises map.
+ Owned<Promise<int> > promise(new Promise<int>());
+ promises.put(pid, promise);
+ return promise->future();
+ } else {
+ // Process doesn't exist.
+ LOG(WARNING) << "Cannot monitor process " << pid
+ << " because it doesn't exist";
+ return -1;
+ }
+ }
+
+ // Now we know we don't have permission for alive(), but we can
+ // still monitor it if it is our child.
+ int status;
+ pid_t result = waitpid(pid, &status, WNOHANG);
+
+ if (result > 0) {
+ // The process terminated and the status was reaped.
+ // Notify other listeners and return directly for this caller.
+ notify(pid, status);
+ return status;
+ } else if (result == 0) {
+ // Child still active, add to the map.
+ Owned<Promise<int> > promise(new Promise<int>());
+ promises.put(pid, promise);
+ return promise->future();
} else {
- pids.insert(pid);
+ // Not a child nor do we have permission to for os::alive();
+ // we cannot monitor this pid.
+ return Future<int>::failed("Failed to monitor process " +
+ stringify(pid) + ": " + strerror(errno));
}
- return Nothing();
}
@@ -75,45 +98,57 @@ void ReaperProcess::initialize()
void ReaperProcess::notify(pid_t pid, int status)
{
- foreach (const PID<ProcessExitedListener>& listener, listeners) {
- dispatch(listener, &ProcessExitedListener::processExited, pid, status);
+ foreach (const Owned<Promise<int> >& promise, promises.get(pid)) {
+ promise->set(status);
}
+ promises.remove(pid);
}
void ReaperProcess::reap()
{
- // Check whether any monitored process has exited.
- foreach (pid_t pid, utils::copy(pids)) {
- Try<bool> alive = os::alive(pid);
- CHECK_SOME(alive);
-
- if (!alive.get()) { // The process has terminated.
- // Attempt to reap the status.
- // If pid is not a child process of the current process, this is a no-op.
- int status = -1;
- if (waitpid(pid, &status, WNOHANG) < 0) {
- LOG(WARNING) << "Cannot get the exit status of process " << pid
- << " because it either does not exist or"
- << " is not a child of the calling process: "
- << strerror(errno);
- }
-
- notify(pid, status); // Notify the listeners.
- pids.erase(pid);
- }
- }
+ // This method assumes that the registered PIDs are
+ // 1) children, or
+ // 2) non-children that we have permission to check liveness, or
+ // 3) nonexistent / reaped elsewhere.
- // Check whether any child processes have exited.
+ // Reap all child processes first.
pid_t pid;
int status;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
// Ignore this if the process has only stopped.
- if (!WIFSTOPPED(status)) {
- notify(pid, status); // Notify the listeners.
- pids.erase(pid);
+ // Notify the "listeners" only if they have requested to monitor
+ // this pid. Otherwise the status is discarded.
+ // This means if a child pid is registered via the monitor() call
+ // after it's reaped, an invalid status (-1) will be returned.
+ if (!WIFSTOPPED(status) && promises.contains(pid)) {
+ notify(pid, status);
+ }
+ }
+
+ // Check whether any monitored process has exited and been reaped.
+ // 1) If a child terminates before the foreach loop but after the
+ // while loop, it won't be reaped until the next reap() cycle
+ // and the alive() check below returns true.
+ // 2) If a non-child process terminates and is reaped elsewhere,
+ // e.g. by init, we notify the listeners. (We know we have
+ // permission to check its liveness in this case.)
+ // 3) If a non-child process terminates and is not yet reaped,
+ // alive() returns true and no notification is sent.
+ // 4) If a child terminates before the while loop above, then we've
+ // already reaped it and have the listeners notified!
+ foreach (pid_t pid, utils::copy(promises.keys())) {
+ Try<bool> alive = os::alive(pid);
+
+ if (alive.isSome() && !alive.get()) {
+ // The process has been reaped.
+ LOG(WARNING) << "Cannot get the exit status of process " << pid
+ << " because it is not a child of the calling "
+ << "process: " << strerror(errno);
+ notify(pid, -1);
}
}
+
delay(Seconds(1), self(), &ReaperProcess::reap); // Reap forever!
}
@@ -133,13 +168,7 @@ Reaper::~Reaper()
}
-void Reaper::addListener(const process::PID<ProcessExitedListener>& listener)
-{
- dispatch(process, &ReaperProcess::addListener, listener);
-}
-
-
-Future<Nothing> Reaper::monitor(pid_t pid)
+Future<int> Reaper::monitor(pid_t pid)
{
return dispatch(process, &ReaperProcess::monitor, pid);
}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
index d599e9c..15ebc0d 100644
--- a/src/slave/reaper.hpp
+++ b/src/slave/reaper.hpp
@@ -22,32 +22,59 @@
#include <list>
#include <set>
+#include <process/future.hpp>
#include <process/process.hpp>
+#include <stout/multihashmap.hpp>
#include <stout/nothing.hpp>
+#include <stout/owned.hpp>
#include <stout/try.hpp>
-
namespace mesos {
namespace internal {
namespace slave {
-class ProcessExitedListener : public process::Process<ProcessExitedListener>
+// Forward declaration.
+class ReaperProcess;
+
+
+// TODO(vinod): Refactor the Reaper into 2 components:
+// 1) Reaps the status of child processes.
+// 2) Checks the exit status of requested processes.
+class Reaper
{
public:
- virtual void processExited(pid_t pid, int status) = 0;
+ Reaper();
+ virtual ~Reaper();
+
+ // Monitor the given process and notify the caller if it terminates
+ // via a Future of the exit status.
+ //
+ // NOTE: The termination of pid can only be monitored if the
+ // calling process:
+ // 1) has the same real or effective user ID as the real or saved
+ // set-user-ID of 'pid', or
+ // 2) is run as a privileged user, or
+ // 3) pid is a child of the current process.
+ // Otherwise a failed Future is returned.
+ //
+ // The exit status of 'pid' can only be correctly captured if the
+ // calling process is the parent of 'pid' and the process hasn't
+ // been reaped yet, otherwise -1 is returned.
+ process::Future<int> monitor(pid_t pid);
+
+private:
+ ReaperProcess* process;
};
-// Reaper implementation. See comments of the Reaper class.
+// Reaper implementation.
class ReaperProcess : public process::Process<ReaperProcess>
{
public:
ReaperProcess();
- void addListener(const process::PID<ProcessExitedListener>&);
-
- process::Future<Nothing> monitor(pid_t pid);
+ process::Future<int> monitor(pid_t pid);
protected:
virtual void initialize();
@@ -55,35 +82,14 @@ protected:
void reap();
// TODO(vinod): Make 'status' an option.
+ // The notification is sent only if the pid is explicitly registered
+ // via the monitor() call.
void notify(pid_t pid, int status);
private:
- std::list<process::PID<ProcessExitedListener> > listeners;
- std::set<pid_t> pids;
-};
-
-
-// TODO(vinod): Refactor the Reaper into 2 components:
-// 1) Reaps the status of child processes.
-// 2) Checks the exit status of requested processes.
-// Also, use Futures instead of callbacks to notify process exits.
-class Reaper
-{
-public:
- Reaper();
- virtual ~Reaper();
-
- void addListener(const process::PID<ProcessExitedListener>&);
-
- // Monitor the given process and notify the listener if it terminates.
- // NOTE: A notification is only sent if the calling process:
- // 1) is the parent of 'pid' or
- // 2) has the same real/effective UID as that of 'pid' or
- // 3) is run as a privileged user.
- process::Future<Nothing> monitor(pid_t pid);
-
-private:
- ReaperProcess* process;
+ // Mapping from the monitored pid to all promises the pid exit
+ // status should be sent to.
+ multihashmap<pid_t, Owned<process::Promise<int> > > promises;
};
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/4dd692f1/src/tests/reaper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reaper_tests.cpp b/src/tests/reaper_tests.cpp
index 6849419..cfda0f4 100644
--- a/src/tests/reaper_tests.cpp
+++ b/src/tests/reaper_tests.cpp
@@ -19,6 +19,8 @@
#include <signal.h>
#include <unistd.h>
+#include <sys/wait.h>
+
#include <gtest/gtest.h>
#include <process/clock.hpp>
@@ -26,6 +28,9 @@
#include <process/gmock.hpp>
#include <process/gtest.hpp>
+#include <stout/exit.hpp>
+#include <stout/os.hpp>
+
#include "slave/reaper.hpp"
using namespace mesos;
@@ -39,13 +44,6 @@ using testing::_;
using testing::DoDefault;
-class MockProcessListener : public ProcessExitedListener
-{
-public:
- MOCK_METHOD2(processExited, void(pid_t, int));
-};
-
-
// This test checks that the Reaper can monitor a non-child process.
TEST(ReaperTest, NonChildProcess)
{
@@ -93,48 +91,143 @@ TEST(ReaperTest, NonChildProcess)
}
}
+ // In parent process.
LOG(INFO) << "Grand child process " << pid;
- MockProcessListener listener;
+ Reaper reaper;
- // Spawn the listener.
- spawn(listener);
+ Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
- // Spawn the reaper.
- Reaper reaper;
+ // Ask the reaper to monitor the grand child process.
+ Future<int> status = reaper.monitor(pid);
+
+ AWAIT_READY(monitor);
+
+ // Now kill the grand child.
+ // NOTE: We send a SIGKILL here because sometimes the grand child
+ // process seems to be in a hung state and not responding to
+ // SIGTERM/SIGINT.
+ EXPECT_EQ(0, kill(pid, SIGKILL));
+
+ Clock::pause();
+
+ // Now advance time until the reaper reaps the executor.
+ while (status.isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ // Ensure the reaper notifies of the terminated process.
+ AWAIT_READY(status);
+
+ // Status is -1 because pid is not an immediate child.
+ ASSERT_EQ(-1, status.get());
+
+ Clock::resume();
+}
+
+
+// This test checks that the Reaper can monitor a child process with
+// accurate exit status returned.
+TEST(ReaperTest, ChildProcess)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ pid_t pid = fork();
+ ASSERT_NE(-1, pid);
+
+ if (pid == 0) {
+ // In child process. Keep waiting till we get a signal.
+ while (true);
+ }
+
+ // In parent process.
+ LOG(INFO) << "Child process " << pid;
- // Ignore the exit of the child process.
- EXPECT_CALL(listener, processExited(_,_))
- .WillRepeatedly(DoDefault());
+ Reaper reaper;
- reaper.addListener(listener.self());
+ Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
// Ask the reaper to monitor the grand child process.
- reaper.monitor(pid);
+ Future<int> status = reaper.monitor(pid);
- // Catch the exit of the grand child process.
- Future<Nothing> processExited;
- EXPECT_CALL(listener, processExited(pid, _))
- .WillOnce(FutureSatisfy(&processExited));
+ AWAIT_READY(monitor);
- // Now kill the grand child.
- // NOTE: We send a SIGKILL here because sometimes the grand child process
- // seems to be in a hung state and not responding to SIGTERM/SIGINT.
+ // Now kill the child.
EXPECT_EQ(0, kill(pid, SIGKILL));
Clock::pause();
// Now advance time until the reaper reaps the executor.
- while (processExited.isPending()) {
+ while (status.isPending()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ // Ensure the reaper notifies of the terminated process.
+ AWAIT_READY(status);
+
+ // Check if the status is correct.
+ int stat = status.get();
+ ASSERT_TRUE(WIFSIGNALED(stat));
+ ASSERT_EQ(SIGKILL, WTERMSIG(stat));
+
+ Clock::resume();
+}
+
+
+// Check that the Reaper can monitor a child process that exits
+// before monitor() is called on it.
+TEST(ReaperTest, TerminatedChildProcess)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ pid_t pid = fork();
+ ASSERT_NE(-1, pid);
+
+ if (pid == 0) {
+ // In child process. Return directly
+ exit(EXIT_SUCCESS);
+ }
+
+ // In parent process.
+ LOG(INFO) << "Child process " << pid;
+
+ Reaper reaper;
+
+ Clock::pause();
+
+ ASSERT_SOME(os::alive(pid));
+
+ // Because reaper reaps all child processes even if they aren't
+ // registered, we advance time until that happens.
+ while (os::alive(pid).get()) {
+ Clock::advance(Seconds(1));
+ Clock::settle();
+ }
+
+ // Now we request to monitor the child process which is already
+ // reaped.
+
+ Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
+
+ // Ask the reaper to monitor the child process.
+ Future<int> status = reaper.monitor(pid);
+
+ AWAIT_READY(monitor);
+
+ // Now advance time until the reaper sends the notification.
+ while (status.isPending()) {
Clock::advance(Seconds(1));
Clock::settle();
}
// Ensure the reaper notifies of the terminated process.
- AWAIT_READY(processExited);
+ AWAIT_READY(status);
- terminate(listener);
- wait(listener);
+ // Invalid status is returned because it is reaped before being
+ // monitored.
+ ASSERT_EQ(-1, status.get());
Clock::resume();
}
[2/4] git commit: Added (EXPECT|ASSERT)_NONE in gtest.hpp.
Posted by bm...@apache.org.
Added (EXPECT|ASSERT)_NONE in gtest.hpp.
From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/11555
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/ad842c39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/ad842c39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/ad842c39
Branch: refs/heads/master
Commit: ad842c3998fe5e97fe178cf7ae9ffe1f890f7b4f
Parents: 93dfc5f
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Jun 21 10:45:53 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jun 21 10:45:53 2013 -0700
----------------------------------------------------------------------
3rdparty/libprocess/3rdparty/stout/include/stout/gtest.hpp | 8 ++++++++
1 file changed, 8 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad842c39/3rdparty/libprocess/3rdparty/stout/include/stout/gtest.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/3rdparty/stout/include/stout/gtest.hpp b/3rdparty/libprocess/3rdparty/stout/include/stout/gtest.hpp
index 3c34124..1f10834 100644
--- a/3rdparty/libprocess/3rdparty/stout/include/stout/gtest.hpp
+++ b/3rdparty/libprocess/3rdparty/stout/include/stout/gtest.hpp
@@ -119,4 +119,12 @@ template <typename T1, typename T2>
#define EXPECT_ERROR(actual) \
EXPECT_TRUE(actual.isError())
+
+#define ASSERT_NONE(actual) \
+ ASSERT_TRUE(actual.isNone())
+
+
+#define EXPECT_NONE(actual) \
+ EXPECT_TRUE(actual.isNone())
+
#endif // __STOUT_GTEST_HPP__