You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/07/19 23:20:27 UTC
[2/2] git commit: Refactored Command Executor to use libprocess.
Refactored Command Executor to use libprocess.
Review: https://reviews.apache.org/r/12754
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6d8aca41
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6d8aca41
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6d8aca41
Branch: refs/heads/master
Commit: 6d8aca413d14fd81770c9adc7d8ee510701d4b5b
Parents: 898e906
Author: Vinod Kone <vi...@twitter.com>
Authored: Thu Jul 18 19:59:41 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jul 19 14:19:50 2013 -0700
----------------------------------------------------------------------
src/launcher/executor.cpp | 242 +++++++++++++++++++++++++++++------------
src/slave/reaper.hpp | 4 +-
2 files changed, 174 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6d8aca41/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 2907ec5..c5fd78e 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -26,105 +26,81 @@
#include <mesos/executor.hpp>
+#include <process/defer.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
#include <stout/duration.hpp>
+#include <stout/lambda.hpp>
#include <stout/os.hpp>
#include <stout/strings.hpp>
-#include "common/thread.hpp"
+#include "common/type_utils.hpp"
#include "logging/logging.hpp"
+#include "slave/reaper.hpp"
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+using std::cout;
+using std::cerr;
+using std::endl;
using std::string;
namespace mesos {
namespace internal {
-// Waits for command to finish. Note that we currently launch a thread
-// that calls this function and thus the ExecutorDriver pointer might
-// be used by multiple threads. This is not ideal, but should be
-// sufficient for now (see the comment below where we instantiate the
-// MesosSchedulerDriver for how we "get around" any issues related to
-// the driver pointer becoming a dangling reference).
-static void waiter(pid_t pid, const TaskID& taskId, ExecutorDriver* driver)
-{
- int status;
- while (wait(&status) != pid || WIFSTOPPED(status));
-
- CHECK(WIFEXITED(status) || WIFSIGNALED(status));
-
- std::cout << "Waited on process " << pid
- << ", returned status " << status << std::endl;
-
- TaskStatus taskStatus;
- taskStatus.mutable_task_id()->MergeFrom(taskId);
-
- if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
- taskStatus.set_state(TASK_FINISHED);
- } else {
- taskStatus.set_state(TASK_FAILED);
- }
-
- Try<string> message = WIFEXITED(status)
- ? strings::format("Command exited with status %d", WEXITSTATUS(status))
- : strings::format("Command terminated with signal '%s'",
- strsignal(WTERMSIG(status)));
-
- if (message.isSome()) {
- taskStatus.set_message(message.get());
- }
-
- driver->sendStatusUpdate(taskStatus);
-
- // A hack for now ... but we need to wait until for the status
- // update to get sent to the slave before we shut ourselves down.
- os::sleep(Seconds(1));
- driver->stop();
-}
+using namespace process;
-class CommandExecutor : public Executor
+class CommandExecutorProcess : public Process<CommandExecutorProcess>
{
public:
- CommandExecutor()
+ CommandExecutorProcess()
: launched(false),
killed(false),
pid(-1) {}
- virtual ~CommandExecutor() {}
+ virtual ~CommandExecutorProcess() {}
- virtual void registered(
+ void registered(
ExecutorDriver* driver,
const ExecutorInfo& executorInfo,
const FrameworkInfo& frameworkInfo,
const SlaveInfo& slaveInfo)
{
- std::cout << "Registered executor on " << slaveInfo.hostname() << std::endl;
+ cout << "Registered executor on " << slaveInfo.hostname() << endl;
}
- virtual void reregistered(ExecutorDriver* driver,
- const SlaveInfo& slaveInfo)
+ void reregistered(
+ ExecutorDriver* driver,
+ const SlaveInfo& slaveInfo)
{
- std::cout << "Re-registered executor on " << slaveInfo.hostname()
- << std::endl;
+ cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
}
- virtual void disconnected(ExecutorDriver* driver) {}
+ void disconnected(ExecutorDriver* driver) {}
- virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+ void launchTask(ExecutorDriver* driver, const TaskInfo& task)
{
if (launched) {
TaskStatus status;
status.mutable_task_id()->MergeFrom(task.task_id());
status.set_state(TASK_FAILED);
- status.set_message("Attempted to run tasks using a \"command\" executor");
+ status.set_message(
+ "Attempted to run multiple tasks using a \"command\" executor");
+
driver->sendStatusUpdate(status);
return;
}
- CHECK(task.has_command()) << "Expecting task to have a command!";
+ CHECK(task.has_command()) << "Expecting task " << task.task_id()
+ << " to have a command!";
- std::cout << "Starting task " << task.task_id().value() << std::endl;
+ std::cout << "Starting task " << task.task_id() << std::endl;
+ // TODO(benh): Clean this up with the new 'Fork' abstraction.
// Use pipes to determine which child has successfully changed
// session. This is needed as the setsid call can fail from other
// processes having the same group id.
@@ -208,8 +184,14 @@ public:
std::cout << "Forked command at " << pid << std::endl;
- // In parent process, fork a thread to wait for this process.
- thread::start(std::tr1::bind(&waiter, pid, task.task_id(), driver));
+ // Monitor this process.
+ reaper.monitor(pid)
+ .onAny(defer(self(),
+ &Self::reaped,
+ driver,
+ task.task_id(),
+ pid,
+ lambda::_1));
TaskStatus status;
status.mutable_task_id()->MergeFrom(task.task_id());
@@ -219,7 +201,7 @@ public:
launched = true;
}
- virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
+ void killTask(ExecutorDriver* driver, const TaskID& taskId)
{
std::cout << "Killing command at pid " << pid << std::endl;
@@ -231,9 +213,9 @@ public:
}
}
- virtual void frameworkMessage(ExecutorDriver* driver, const string& data) {}
+ void frameworkMessage(ExecutorDriver* driver, const string& data) {}
- virtual void shutdown(ExecutorDriver* driver)
+ void shutdown(ExecutorDriver* driver)
{
std::cout << "Shutting down" << std::endl;
@@ -248,9 +230,137 @@ public:
virtual void error(ExecutorDriver* driver, const string& message) {}
private:
+ void reaped(
+ ExecutorDriver* driver,
+ const TaskID& taskId,
+ pid_t pid,
+ const Future<Option<int> >& status_)
+ {
+ TaskState state;
+ string message;
+
+ if (!status_.isReady()) {
+ state = TASK_FAILED;
+ message =
+ "Failed to get exit status for Command: " +
+ (status_.isFailed() ? status_.failure() : "future discarded");
+ } else if (status_.get().isNone()) {
+ state = TASK_FAILED;
+ message = "Failed to get exit status for Command";
+ } else {
+ int status = status_.get().get();
+ CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << status;
+
+ if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
+ state = TASK_FINISHED;
+ } else {
+ // TODO(vinod): Send TASK_KILLED if 'killed' is true.
+ state = TASK_FAILED;
+ }
+
+ message = string("Command") +
+ (WIFEXITED(status)
+ ? " exited with status "
+ : " terminated with signal ") +
+ (WIFEXITED(status)
+ ? stringify(WEXITSTATUS(status))
+ : strsignal(WTERMSIG(status)));
+ }
+
+ cout << message << " (pid: " << pid << ")" << endl;
+
+ TaskStatus taskStatus;
+ taskStatus.mutable_task_id()->MergeFrom(taskId);
+ taskStatus.set_state(state);
+ taskStatus.set_message(message);
+
+ driver->sendStatusUpdate(taskStatus);
+
+ // A hack for now ... but we need to wait until the status update
+ // is sent to the slave before we shut ourselves down.
+ os::sleep(Seconds(1));
+ driver->stop();
+ }
+
bool launched;
bool killed;
pid_t pid;
+ slave::Reaper reaper;
+};
+
+
+class CommandExecutor: public Executor
+{
+public:
+ CommandExecutor()
+ {
+ process = new CommandExecutorProcess();
+ spawn(process);
+ }
+
+ virtual ~CommandExecutor()
+ {
+ terminate(process);
+ wait(process);
+ delete process;
+ }
+
+ virtual void registered(
+ ExecutorDriver* driver,
+ const ExecutorInfo& executorInfo,
+ const FrameworkInfo& frameworkInfo,
+ const SlaveInfo& slaveInfo)
+ {
+ dispatch(process,
+ &CommandExecutorProcess::registered,
+ driver,
+ executorInfo,
+ frameworkInfo,
+ slaveInfo);
+ }
+
+ virtual void reregistered(
+ ExecutorDriver* driver,
+ const SlaveInfo& slaveInfo)
+ {
+ dispatch(process,
+ &CommandExecutorProcess::reregistered,
+ driver,
+ slaveInfo);
+ }
+
+ virtual void disconnected(ExecutorDriver* driver)
+ {
+ dispatch(process, &CommandExecutorProcess::disconnected, driver);
+ }
+
+ virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+ {
+ dispatch(process, &CommandExecutorProcess::launchTask, driver, task);
+ }
+
+ virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
+ {
+ dispatch(process, &CommandExecutorProcess::killTask, driver, taskId);
+ }
+
+ virtual void frameworkMessage(ExecutorDriver* driver, const string& data)
+ {
+ dispatch(process, &CommandExecutorProcess::frameworkMessage, driver, data);
+ }
+
+ virtual void shutdown(ExecutorDriver* driver)
+ {
+ dispatch(process, &CommandExecutorProcess::shutdown, driver);
+ }
+
+ virtual void error(ExecutorDriver* driver, const string& data)
+ {
+ dispatch(process, &CommandExecutorProcess::error, driver, data);
+ }
+
+private:
+ CommandExecutorProcess* process;
};
} // namespace internal {
@@ -260,12 +370,6 @@ private:
int main(int argc, char** argv)
{
mesos::internal::CommandExecutor executor;
-
- // Note that we currently put the MesosSchedulerDriver on the heap
- // so that we don't have to deal with issues created because the
- // thread we launched is trying to use the pointer.
- mesos::MesosExecutorDriver* driver =
- new mesos::MesosExecutorDriver(&executor);
-
- return driver->run() == mesos::DRIVER_STOPPED ? 0 : 1;
+ mesos::MesosExecutorDriver driver(&executor);
+ return driver.run() == mesos::DRIVER_STOPPED ? 0 : 1;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/6d8aca41/src/slave/reaper.hpp
----------------------------------------------------------------------
diff --git a/src/slave/reaper.hpp b/src/slave/reaper.hpp
index 6eb9a3a..4498139 100644
--- a/src/slave/reaper.hpp
+++ b/src/slave/reaper.hpp
@@ -38,9 +38,7 @@ namespace slave {
class ReaperProcess;
-// TODO(vinod): Refactor the Reaper into 2 components:
-// 1) Reaps the status of child processes.
-// 2) Checks the exit status of requested processes.
+// TODO(vinod): Pull reaper into common or libprocess.
class Reaper
{
public: