You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/06/21 19:55:17 UTC
[4/4] git commit: Changed reaper to use an Option for the process
exit status.
Changed reaper to use an Option for the process exit status.
From: Jiang Yan Xu <ya...@jxu.me>
Review: https://reviews.apache.org/r/11556
Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/ad198fe4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/ad198fe4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/ad198fe4
Branch: refs/heads/master
Commit: ad198fe4876ce2ce027c55e04c198005294d0fc5
Parents: 4dd692f
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Fri Jun 21 10:47:44 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Jun 21 10:47:44 2013 -0700
----------------------------------------------------------------------
src/slave/cgroups_isolator.cpp | 23 ++++++++++---------
src/slave/cgroups_isolator.hpp | 4 ++--
src/slave/process_isolator.cpp | 18 +++++++--------
src/slave/process_isolator.hpp | 2 +-
src/slave/reaper.cpp | 24 ++++++++++----------
src/slave/reaper.hpp | 13 ++++++-----
src/slave/slave.cpp | 45 ++++++++++++++++++++++---------------
src/slave/slave.hpp | 2 +-
src/tests/reaper_tests.cpp | 20 +++++++++--------
9 files changed, 81 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/cgroups_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.cpp b/src/slave/cgroups_isolator.cpp
index 6b87d08..9d18886 100644
--- a/src/slave/cgroups_isolator.cpp
+++ b/src/slave/cgroups_isolator.cpp
@@ -801,25 +801,26 @@ Future<Nothing> CgroupsIsolator::recover(
}
-void CgroupsIsolator::reaped(pid_t pid, const Future<int>& status)
+void CgroupsIsolator::reaped(pid_t pid, const Future<Option<int> >& status)
{
- if (status.isDiscarded()) {
- LOG(ERROR) << "The status was discarded";
- return;
- }
- if (status.isFailed()) {
- LOG(ERROR) << status.failure();
- return;
- }
-
CgroupInfo* info = findCgroupInfo(pid);
if (info != NULL) {
FrameworkID frameworkId = info->frameworkId;
ExecutorID executorId = info->executorId;
+ if (!status.isReady()) {
+ LOG(ERROR) << "Failed to get the status for executor " << executorId
+ << " of framework " << frameworkId << ": "
+ << (status.isFailed() ? status.failure() : "discarded");
+ return;
+ }
+
LOG(INFO) << "Executor " << executorId
<< " of framework " << frameworkId
- << " terminated with status " << status.get();
+ << " terminated with status "
+ << (status.get().isSome()
+ ? stringify(status.get().get())
+ : "unknown");
// Set the exit status, so that '_killExecutor()' can send it to the slave.
info->status = status.get();
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/cgroups_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/cgroups_isolator.hpp b/src/slave/cgroups_isolator.hpp
index 06bac98..e86062e 100644
--- a/src/slave/cgroups_isolator.hpp
+++ b/src/slave/cgroups_isolator.hpp
@@ -119,7 +119,7 @@ private:
CgroupsIsolator(const CgroupsIsolator&);
CgroupsIsolator& operator = (const CgroupsIsolator&);
- void reaped(pid_t pid, const Future<int>& status);
+ void reaped(pid_t pid, const Future<Option<int> >& status);
// The cgroup information for each live executor.
struct CgroupInfo
@@ -163,7 +163,7 @@ private:
std::string message; // The reason behind the destruction.
- int status; // Exit status of the executor.
+ Option<int> status; // Exit status of the executor.
Flags flags; // Slave flags.
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/process_isolator.cpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.cpp b/src/slave/process_isolator.cpp
index a3d31e5..b750b26 100644
--- a/src/slave/process_isolator.cpp
+++ b/src/slave/process_isolator.cpp
@@ -423,22 +423,20 @@ Future<ResourceStatistics> ProcessIsolator::usage(
}
-void ProcessIsolator::reaped(pid_t pid, const Future<int>& status)
+void ProcessIsolator::reaped(pid_t pid, const Future<Option<int> >& status)
{
- if (status.isDiscarded()) {
- LOG(ERROR) << "The status was discarded";
- return;
- }
- if (status.isFailed()) {
- LOG(ERROR) << status.failure();
- return;
- }
-
foreachkey (const FrameworkID& frameworkId, infos) {
foreachkey (const ExecutorID& executorId, infos[frameworkId]) {
ProcessInfo* info = infos[frameworkId][executorId];
if (info->pid.isSome() && info->pid.get() == pid) {
+ if (!status.isReady()) {
+ LOG(ERROR) << "Failed to get the status for executor '" << executorId
+ << "' of framework " << frameworkId << ": "
+ << (status.isFailed() ? status.failure() : "discarded");
+ return;
+ }
+
LOG(INFO) << "Telling slave of terminated executor '" << executorId
<< "' of framework " << frameworkId;
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/process_isolator.hpp
----------------------------------------------------------------------
diff --git a/src/slave/process_isolator.hpp b/src/slave/process_isolator.hpp
index a3f6c68..4ae093f 100644
--- a/src/slave/process_isolator.hpp
+++ b/src/slave/process_isolator.hpp
@@ -109,7 +109,7 @@ private:
Reaper reaper;
hashmap<FrameworkID, hashmap<ExecutorID, ProcessInfo*> > infos;
- void reaped(pid_t pid, const Future<int>& status);
+ void reaped(pid_t pid, const Future<Option<int> >& status);
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/reaper.cpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.cpp b/src/slave/reaper.cpp
index 1b19eb2..8d3101d 100644
--- a/src/slave/reaper.cpp
+++ b/src/slave/reaper.cpp
@@ -45,7 +45,7 @@ ReaperProcess::ReaperProcess()
: ProcessBase(ID::generate("reaper")) {}
-Future<int> ReaperProcess::monitor(pid_t pid)
+Future<Option<int> > ReaperProcess::monitor(pid_t pid)
{
// Check to see if the current process has sufficient privileges to
// monitor the liveness of this pid.
@@ -55,14 +55,14 @@ Future<int> ReaperProcess::monitor(pid_t pid)
if (alive.get()) {
// We have permissions to check the validity of the process
// and it's alive, so add it to the promises map.
- Owned<Promise<int> > promise(new Promise<int>());
+ Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
promises.put(pid, promise);
return promise->future();
} else {
// Process doesn't exist.
LOG(WARNING) << "Cannot monitor process " << pid
<< " because it doesn't exist";
- return -1;
+ return None();
}
}
@@ -75,17 +75,17 @@ Future<int> ReaperProcess::monitor(pid_t pid)
// The process terminated and the status was reaped.
// Notify other listeners and return directly for this caller.
notify(pid, status);
- return status;
+ return Option<int>(status);
} else if (result == 0) {
// Child still active, add to the map.
- Owned<Promise<int> > promise(new Promise<int>());
+ Owned<Promise<Option<int> > > promise(new Promise<Option<int> >());
promises.put(pid, promise);
return promise->future();
} else {
// Not a child nor do we have permission to for os::alive();
// we cannot monitor this pid.
- return Future<int>::failed("Failed to monitor process " +
- stringify(pid) + ": " + strerror(errno));
+ return Future<Option<int> >::failed(
+ "Failed to monitor process " + stringify(pid) + ": " + strerror(errno));
}
}
@@ -96,9 +96,9 @@ void ReaperProcess::initialize()
}
-void ReaperProcess::notify(pid_t pid, int status)
+void ReaperProcess::notify(pid_t pid, Option<int> status)
{
- foreach (const Owned<Promise<int> >& promise, promises.get(pid)) {
+ foreach (const Owned<Promise<Option<int> > >& promise, promises.get(pid)) {
promise->set(status);
}
promises.remove(pid);
@@ -120,7 +120,7 @@ void ReaperProcess::reap()
// Notify the "listeners" only if they have requested to monitor
// this pid. Otherwise the status is discarded.
// This means if a child pid is registered via the monitor() call
- // after it's reaped, an invalid status (-1) will be returned.
+ // after it's reaped, status 'None' will be returned.
if (!WIFSTOPPED(status) && promises.contains(pid)) {
notify(pid, status);
}
@@ -145,7 +145,7 @@ void ReaperProcess::reap()
LOG(WARNING) << "Cannot get the exit status of process " << pid
<< " because it is not a child of the calling "
<< "process: " << strerror(errno);
- notify(pid, -1);
+ notify(pid, None());
}
}
@@ -168,7 +168,7 @@ Reaper::~Reaper()
}
-Future<int> Reaper::monitor(pid_t pid)
+Future<Option<int> > Reaper::monitor(pid_t pid)
{
return dispatch(process, &ReaperProcess::monitor, pid);
}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
index 15ebc0d..2d6db03 100644
--- a/src/slave/reaper.hpp
+++ b/src/slave/reaper.hpp
@@ -60,8 +60,10 @@ public:
//
// The exit status of 'pid' can only be correctly captured if the
// calling process is the parent of 'pid' and the process hasn't
- // been reaped yet, otherwise -1 is returned.
- process::Future<int> monitor(pid_t pid);
+ // been reaped yet, otherwise 'None' is returned.
+ // Note that an invalid pid does not cause a failed Future, but an
+ // empty result ('None').
+ process::Future<Option<int> > monitor(pid_t pid);
private:
ReaperProcess* process;
@@ -74,22 +76,21 @@ class ReaperProcess : public process::Process<ReaperProcess>
public:
ReaperProcess();
- process::Future<int> monitor(pid_t pid);
+ process::Future<Option<int> > monitor(pid_t pid);
protected:
virtual void initialize();
void reap();
- // TODO(vinod): Make 'status' an option.
// The notification is sent only if the pid is explicitly registered
// via the monitor() call.
- void notify(pid_t pid, int status);
+ void notify(pid_t pid, Option<int> status);
private:
// Mapping from the monitored pid to all promises the pid exit
// status should be sent to.
- multihashmap<pid_t, Owned<process::Promise<int> > > promises;
+ multihashmap<pid_t, Owned<process::Promise<Option<int> > > > promises;
};
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 952bd14..309d1ac 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2045,19 +2045,28 @@ void _unwatch(
void Slave::executorTerminated(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
- int status,
+ const Option<int>& status_,
bool destroyed,
const string& message)
{
- LOG(INFO) << "Executor '" << executorId
- << "' of framework " << frameworkId
- << (WIFEXITED(status)
- ? " has exited with status '"
- : " has terminated with signal '")
- << (WIFEXITED(status)
- ? stringify(WEXITSTATUS(status))
- : strsignal(WTERMSIG(status)))
- << "'";
+ int status;
+ if (status_.isNone()) {
+ LOG(INFO) << "Executor '" << executorId
+ << "' of framework " << frameworkId
+ << " has terminated with unknown status";
+ // Set a special status for None.
+ status = -1;
+ } else {
+ status = status_.get();
+ LOG(INFO) << "Executor '" << executorId
+ << "' of framework " << frameworkId
+ << (WIFEXITED(status)
+ ? " has exited with status "
+ : " has terminated with signal ")
+ << (WIFEXITED(status)
+ ? stringify(WEXITSTATUS(status))
+ : strsignal(WTERMSIG(status)));
+ }
Framework* framework = getFramework(frameworkId);
if (framework == NULL) {
@@ -2106,18 +2115,18 @@ void Slave::executorTerminated(
// Transition all live launched tasks.
foreachvalue (Task* task, utils::copy(executor->launchedTasks)) {
if (!protobuf::isTerminalState(task->state())) {
- mesos::TaskState status;
+ mesos::TaskState taskState;
isCommandExecutor = !task->has_executor_id();
if (destroyed || isCommandExecutor.get()) {
- status = TASK_FAILED;
+ taskState = TASK_FAILED;
} else {
- status = TASK_LOST;
+ taskState = TASK_LOST;
}
statusUpdate(protobuf::createStatusUpdate(
frameworkId,
info.id(),
task->task_id(),
- status,
+ taskState,
message,
executorId));
}
@@ -2126,18 +2135,18 @@ void Slave::executorTerminated(
// Transition all queued tasks.
foreachvalue (const TaskInfo& task,
utils::copy(executor->queuedTasks)) {
- mesos::TaskState status;
+ mesos::TaskState taskState;
isCommandExecutor = task.has_command();
if (destroyed || isCommandExecutor.get()) {
- status = TASK_FAILED;
+ taskState = TASK_FAILED;
} else {
- status = TASK_LOST;
+ taskState = TASK_LOST;
}
statusUpdate(protobuf::createStatusUpdate(
frameworkId,
info.id(),
task.task_id(),
- status,
+ taskState,
message,
executorId));
}
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index d1ba82e..7ef6ad8 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -171,7 +171,7 @@ public:
void executorTerminated(
const FrameworkID& frameworkId,
const ExecutorID& executorId,
- int status,
+ const Option<int>& status,
bool destroyed,
const std::string& message);
http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/ad198fe4/src/tests/reaper_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reaper_tests.cpp b/src/tests/reaper_tests.cpp
index cfda0f4..d55ac24 100644
--- a/src/tests/reaper_tests.cpp
+++ b/src/tests/reaper_tests.cpp
@@ -29,6 +29,7 @@
#include <process/gtest.hpp>
#include <stout/exit.hpp>
+#include <stout/gtest.hpp>
#include <stout/os.hpp>
#include "slave/reaper.hpp"
@@ -99,7 +100,7 @@ TEST(ReaperTest, NonChildProcess)
Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
// Ask the reaper to monitor the grand child process.
- Future<int> status = reaper.monitor(pid);
+ Future<Option<int> > status = reaper.monitor(pid);
AWAIT_READY(monitor);
@@ -120,8 +121,8 @@ TEST(ReaperTest, NonChildProcess)
// Ensure the reaper notifies of the terminated process.
AWAIT_READY(status);
- // Status is -1 because pid is not an immediate child.
- ASSERT_EQ(-1, status.get());
+ // Status is None because pid is not an immediate child.
+ ASSERT_NONE(status.get());
Clock::resume();
}
@@ -149,7 +150,7 @@ TEST(ReaperTest, ChildProcess)
Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
// Ask the reaper to monitor the grand child process.
- Future<int> status = reaper.monitor(pid);
+ Future<Option<int> > status = reaper.monitor(pid);
AWAIT_READY(monitor);
@@ -168,9 +169,10 @@ TEST(ReaperTest, ChildProcess)
AWAIT_READY(status);
// Check if the status is correct.
- int stat = status.get();
- ASSERT_TRUE(WIFSIGNALED(stat));
- ASSERT_EQ(SIGKILL, WTERMSIG(stat));
+ ASSERT_SOME(status.get());
+ int status_ = status.get().get();
+ ASSERT_TRUE(WIFSIGNALED(status_));
+ ASSERT_EQ(SIGKILL, WTERMSIG(status_));
Clock::resume();
}
@@ -212,7 +214,7 @@ TEST(ReaperTest, TerminatedChildProcess)
Future<Nothing> monitor = FUTURE_DISPATCH(_, &ReaperProcess::monitor);
// Ask the reaper to monitor the child process.
- Future<int> status = reaper.monitor(pid);
+ Future<Option<int> > status = reaper.monitor(pid);
AWAIT_READY(monitor);
@@ -227,7 +229,7 @@ TEST(ReaperTest, TerminatedChildProcess)
// Invalid status is returned because it is reaped before being
// monitored.
- ASSERT_EQ(-1, status.get());
+ ASSERT_NONE(status.get());
Clock::resume();
}