You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2016/02/20 17:37:45 UTC
[09/11] mesos git commit: Updated the command / docker executors to
send TASK_KILLING.
Updated the command / docker executors to send TASK_KILLING.
Review: https://reviews.apache.org/r/43489/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a30233b9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a30233b9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a30233b9
Branch: refs/heads/master
Commit: a30233b994dd1a77eb8ef37525b5aa7b6ecdf3bd
Parents: c893c9f
Author: Abhishek Dasgupta <a1...@linux.vnet.ibm.com>
Authored: Sat Feb 20 14:25:15 2016 +0100
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Sat Feb 20 17:32:26 2016 +0100
----------------------------------------------------------------------
src/docker/executor.cpp | 51 ++++++++++++++++++++++++-------------
src/launcher/executor.cpp | 58 +++++++++++++++++++++++++++++++-----------
2 files changed, 77 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a30233b9/src/docker/executor.cpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index 654a41d..cab9d80 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -91,11 +91,12 @@ public:
void registered(
ExecutorDriver* _driver,
const ExecutorInfo& executorInfo,
- const FrameworkInfo& frameworkInfo,
+ const FrameworkInfo& _frameworkInfo,
const SlaveInfo& slaveInfo)
{
cout << "Registered docker executor on " << slaveInfo.hostname() << endl;
driver = _driver;
+ frameworkInfo = _frameworkInfo;
}
void reregistered(
@@ -123,9 +124,10 @@ public:
return;
}
- TaskID taskId = task.task_id();
+ // Capture the TaskID.
+ taskId = task.task_id();
- cout << "Starting task " << taskId.value() << endl;
+ cout << "Starting task " << taskId.get() << endl;
CHECK(task.has_container());
CHECK(task.has_command());
@@ -147,13 +149,9 @@ public:
task.resources() + task.executor().resources(),
None(),
Subprocess::FD(STDOUT_FILENO),
- Subprocess::FD(STDERR_FILENO))
- .onAny(defer(
- self(),
- &Self::reaped,
- driver,
- taskId,
- lambda::_1));
+ Subprocess::FD(STDERR_FILENO));
+
+ run->onAny(defer(self(), &Self::reaped, driver, lambda::_1));
// Delay sending TASK_RUNNING status update until we receive
// inspect output.
@@ -161,7 +159,7 @@ public:
.then(defer(self(), [=](const Docker::Container& container) {
if (!killed) {
TaskStatus status;
- status.mutable_task_id()->CopyFrom(taskId);
+ status.mutable_task_id()->CopyFrom(taskId.get());
status.set_state(TASK_RUNNING);
status.set_data(container.output);
if (container.ipAddress.isSome()) {
@@ -192,7 +190,10 @@ public:
void killTask(ExecutorDriver* driver, const TaskID& taskId)
{
- cout << "Killing docker task" << endl;
+ cout << "Received killTask" << endl;
+
+ // Since the docker executor manages a single task, we
+ // shutdown completely when we receive a killTask.
shutdown(driver);
if (healthPid != -1) {
// Cleanup health check process.
@@ -207,12 +208,25 @@ public:
cout << "Shutting down" << endl;
if (run.isSome() && !killed) {
+ // Send TASK_KILLING if the framework can handle it.
+ CHECK_SOME(frameworkInfo);
+ CHECK_SOME(taskId);
+
+ foreach (const FrameworkInfo::Capability& c,
+ frameworkInfo->capabilities()) {
+ if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(taskId.get());
+ status.set_state(TASK_KILLING);
+ driver->sendStatusUpdate(status);
+ break;
+ }
+ }
+
// The docker daemon might still be in progress starting the
// container, therefore we kill both the docker run process
// and also ask the daemon to stop the container.
-
- // Making a mutable copy of the future so we can call discard.
- Future<Nothing>(run.get()).discard();
+ run->discard();
stop = docker->stop(containerName, stopTimeout);
killed = true;
}
@@ -257,7 +271,6 @@ protected:
private:
void reaped(
ExecutorDriver* _driver,
- const TaskID& taskId,
const Future<Nothing>& run)
{
// Wait for docker->stop to finish, and best effort wait for the
@@ -287,8 +300,10 @@ private:
state = TASK_FINISHED;
}
+ CHECK_SOME(taskId);
+
TaskStatus taskStatus;
- taskStatus.mutable_task_id()->CopyFrom(taskId);
+ taskStatus.mutable_task_id()->CopyFrom(taskId.get());
taskStatus.set_state(state);
taskStatus.set_message(message);
if (killed && killedByHealthCheck) {
@@ -415,6 +430,8 @@ private:
Future<Nothing> stop;
Future<Nothing> inspect;
Option<ExecutorDriver*> driver;
+ Option<FrameworkInfo> frameworkInfo;
+ Option<TaskID> taskId;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/a30233b9/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index c27e079..b65f0ab 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -93,6 +93,8 @@ public:
healthPid(-1),
escalationTimeout(slave::EXECUTOR_SIGNAL_ESCALATION_TIMEOUT),
driver(None()),
+ frameworkInfo(None()),
+ taskId(None()),
healthCheckDir(_healthCheckDir),
override(override),
sandboxDirectory(_sandboxDirectory),
@@ -105,13 +107,16 @@ public:
void registered(
ExecutorDriver* _driver,
const ExecutorInfo& _executorInfo,
- const FrameworkInfo& frameworkInfo,
+ const FrameworkInfo& _frameworkInfo,
const SlaveInfo& slaveInfo)
{
CHECK_EQ(REGISTERING, state);
cout << "Registered executor on " << slaveInfo.hostname() << endl;
+
driver = _driver;
+ frameworkInfo = _frameworkInfo;
+
state = REGISTERED;
}
@@ -122,6 +127,7 @@ public:
CHECK(state == REGISTERED || state == REGISTERING) << state;
cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
+
state = REGISTERED;
}
@@ -142,6 +148,9 @@ public:
return;
}
+ // Capture the TaskID.
+ taskId = task.task_id();
+
// Determine the command to launch the task.
CommandInfo command;
@@ -438,12 +447,7 @@ public:
// Monitor this process.
process::reap(pid)
- .onAny(defer(self(),
- &Self::reaped,
- driver,
- task.task_id(),
- pid,
- lambda::_1));
+ .onAny(defer(self(), &Self::reaped, driver, pid, lambda::_1));
TaskStatus status;
status.mutable_task_id()->MergeFrom(task.task_id());
@@ -455,6 +459,10 @@ public:
void killTask(ExecutorDriver* driver, const TaskID& taskId)
{
+ cout << "Received killTask" << endl;
+
+ // Since the command executor manages a single task, we
+ // shutdown completely when we receive a killTask.
shutdown(driver);
if (healthPid != -1) {
// Cleanup health check process.
@@ -468,22 +476,39 @@ public:
{
cout << "Shutting down" << endl;
- if (pid > 0 && !killed) {
- cout << "Sending SIGTERM to process tree at pid "
- << pid << endl;
+ if (launched && !killed) {
+ // Send TASK_KILLING if the framework can handle it.
+ CHECK_SOME(frameworkInfo);
+ CHECK_SOME(taskId);
+
+ foreach (const FrameworkInfo::Capability& c,
+ frameworkInfo->capabilities()) {
+ if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(taskId.get());
+ status.set_state(TASK_KILLING);
+ driver->sendStatusUpdate(status);
+ break;
+ }
+ }
+
+ // Now perform signal escalation to begin killing the task.
+ CHECK_GT(pid, 0);
+
+ cout << "Sending SIGTERM to process tree at pid " << pid << endl;
Try<std::list<os::ProcessTree> > trees =
os::killtree(pid, SIGTERM, true, true);
if (trees.isError()) {
- cerr << "Failed to kill the process tree rooted at pid "
- << pid << ": " << trees.error() << endl;
+ cerr << "Failed to kill the process tree rooted at pid " << pid
+ << ": " << trees.error() << endl;
// Send SIGTERM directly to process 'pid' as it may not have
// received signal before os::killtree() failed.
::kill(pid, SIGTERM);
} else {
- cout << "Killing the following process trees:\n"
+ cout << "Sent SIGTERM to the following process trees:\n"
<< stringify(trees.get()) << endl;
}
@@ -538,7 +563,6 @@ protected:
private:
void reaped(
ExecutorDriver* driver,
- const TaskID& taskId,
pid_t pid,
const Future<Option<int> >& status_)
{
@@ -574,8 +598,10 @@ private:
cout << message << " (pid: " << pid << ")" << endl;
+ CHECK_SOME(taskId);
+
TaskStatus taskStatus;
- taskStatus.mutable_task_id()->MergeFrom(taskId);
+ taskStatus.mutable_task_id()->MergeFrom(taskId.get());
taskStatus.set_state(taskState);
taskStatus.set_message(message);
if (killed && killedByHealthCheck) {
@@ -671,6 +697,8 @@ private:
Duration escalationTimeout;
Timer escalationTimer;
Option<ExecutorDriver*> driver;
+ Option<FrameworkInfo> frameworkInfo;
+ Option<TaskID> taskId;
string healthCheckDir;
Option<char**> override;
Option<string> sandboxDirectory;