You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2016/02/20 17:37:45 UTC

[09/11] mesos git commit: Updated the command / docker executors to send TASK_KILLING.

Updated the command / docker executors to send TASK_KILLING.

Review: https://reviews.apache.org/r/43489/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a30233b9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a30233b9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a30233b9

Branch: refs/heads/master
Commit: a30233b994dd1a77eb8ef37525b5aa7b6ecdf3bd
Parents: c893c9f
Author: Abhishek Dasgupta <a1...@linux.vnet.ibm.com>
Authored: Sat Feb 20 14:25:15 2016 +0100
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Sat Feb 20 17:32:26 2016 +0100

----------------------------------------------------------------------
 src/docker/executor.cpp   | 51 ++++++++++++++++++++++++-------------
 src/launcher/executor.cpp | 58 +++++++++++++++++++++++++++++++-----------
 2 files changed, 77 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a30233b9/src/docker/executor.cpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index 654a41d..cab9d80 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -91,11 +91,12 @@ public:
   void registered(
       ExecutorDriver* _driver,
       const ExecutorInfo& executorInfo,
-      const FrameworkInfo& frameworkInfo,
+      const FrameworkInfo& _frameworkInfo,
       const SlaveInfo& slaveInfo)
   {
     cout << "Registered docker executor on " << slaveInfo.hostname() << endl;
     driver = _driver;
+    frameworkInfo = _frameworkInfo;
   }
 
   void reregistered(
@@ -123,9 +124,10 @@ public:
       return;
     }
 
-    TaskID taskId = task.task_id();
+    // Capture the TaskID.
+    taskId = task.task_id();
 
-    cout << "Starting task " << taskId.value() << endl;
+    cout << "Starting task " << taskId.get() << endl;
 
     CHECK(task.has_container());
     CHECK(task.has_command());
@@ -147,13 +149,9 @@ public:
         task.resources() + task.executor().resources(),
         None(),
         Subprocess::FD(STDOUT_FILENO),
-        Subprocess::FD(STDERR_FILENO))
-      .onAny(defer(
-        self(),
-        &Self::reaped,
-        driver,
-        taskId,
-        lambda::_1));
+        Subprocess::FD(STDERR_FILENO));
+
+    run->onAny(defer(self(), &Self::reaped, driver, lambda::_1));
 
     // Delay sending TASK_RUNNING status update until we receive
     // inspect output.
@@ -161,7 +159,7 @@ public:
       .then(defer(self(), [=](const Docker::Container& container) {
         if (!killed) {
           TaskStatus status;
-          status.mutable_task_id()->CopyFrom(taskId);
+          status.mutable_task_id()->CopyFrom(taskId.get());
           status.set_state(TASK_RUNNING);
           status.set_data(container.output);
           if (container.ipAddress.isSome()) {
@@ -192,7 +190,10 @@ public:
 
   void killTask(ExecutorDriver* driver, const TaskID& taskId)
   {
-    cout << "Killing docker task" << endl;
+    cout << "Received killTask" << endl;
+
+    // Since the docker executor manages a single task, we
+    // shutdown completely when we receive a killTask.
     shutdown(driver);
     if (healthPid != -1) {
       // Cleanup health check process.
@@ -207,12 +208,25 @@ public:
     cout << "Shutting down" << endl;
 
     if (run.isSome() && !killed) {
+      // Send TASK_KILLING if the framework can handle it.
+      CHECK_SOME(frameworkInfo);
+      CHECK_SOME(taskId);
+
+      foreach (const FrameworkInfo::Capability& c,
+               frameworkInfo->capabilities()) {
+        if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
+          TaskStatus status;
+          status.mutable_task_id()->CopyFrom(taskId.get());
+          status.set_state(TASK_KILLING);
+          driver->sendStatusUpdate(status);
+          break;
+        }
+      }
+
       // The docker daemon might still be in progress starting the
       // container, therefore we kill both the docker run process
       // and also ask the daemon to stop the container.
-
-      // Making a mutable copy of the future so we can call discard.
-      Future<Nothing>(run.get()).discard();
+      run->discard();
       stop = docker->stop(containerName, stopTimeout);
       killed = true;
     }
@@ -257,7 +271,6 @@ protected:
 private:
   void reaped(
       ExecutorDriver* _driver,
-      const TaskID& taskId,
       const Future<Nothing>& run)
   {
     // Wait for docker->stop to finish, and best effort wait for the
@@ -287,8 +300,10 @@ private:
             state = TASK_FINISHED;
           }
 
+          CHECK_SOME(taskId);
+
           TaskStatus taskStatus;
-          taskStatus.mutable_task_id()->CopyFrom(taskId);
+          taskStatus.mutable_task_id()->CopyFrom(taskId.get());
           taskStatus.set_state(state);
           taskStatus.set_message(message);
           if (killed && killedByHealthCheck) {
@@ -415,6 +430,8 @@ private:
   Future<Nothing> stop;
   Future<Nothing> inspect;
   Option<ExecutorDriver*> driver;
+  Option<FrameworkInfo> frameworkInfo;
+  Option<TaskID> taskId;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a30233b9/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index c27e079..b65f0ab 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -93,6 +93,8 @@ public:
       healthPid(-1),
       escalationTimeout(slave::EXECUTOR_SIGNAL_ESCALATION_TIMEOUT),
       driver(None()),
+      frameworkInfo(None()),
+      taskId(None()),
       healthCheckDir(_healthCheckDir),
       override(override),
       sandboxDirectory(_sandboxDirectory),
@@ -105,13 +107,16 @@ public:
   void registered(
       ExecutorDriver* _driver,
       const ExecutorInfo& _executorInfo,
-      const FrameworkInfo& frameworkInfo,
+      const FrameworkInfo& _frameworkInfo,
       const SlaveInfo& slaveInfo)
   {
     CHECK_EQ(REGISTERING, state);
 
     cout << "Registered executor on " << slaveInfo.hostname() << endl;
+
     driver = _driver;
+    frameworkInfo = _frameworkInfo;
+
     state = REGISTERED;
   }
 
@@ -122,6 +127,7 @@ public:
     CHECK(state == REGISTERED || state == REGISTERING) << state;
 
     cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
+
     state = REGISTERED;
   }
 
@@ -142,6 +148,9 @@ public:
       return;
     }
 
+    // Capture the TaskID.
+    taskId = task.task_id();
+
     // Determine the command to launch the task.
     CommandInfo command;
 
@@ -438,12 +447,7 @@ public:
 
     // Monitor this process.
     process::reap(pid)
-      .onAny(defer(self(),
-                   &Self::reaped,
-                   driver,
-                   task.task_id(),
-                   pid,
-                   lambda::_1));
+      .onAny(defer(self(), &Self::reaped, driver, pid, lambda::_1));
 
     TaskStatus status;
     status.mutable_task_id()->MergeFrom(task.task_id());
@@ -455,6 +459,10 @@ public:
 
   void killTask(ExecutorDriver* driver, const TaskID& taskId)
   {
+    cout << "Received killTask" << endl;
+
+    // Since the command executor manages a single task, we
+    // shutdown completely when we receive a killTask.
     shutdown(driver);
     if (healthPid != -1) {
       // Cleanup health check process.
@@ -468,22 +476,39 @@ public:
   {
     cout << "Shutting down" << endl;
 
-    if (pid > 0 && !killed) {
-      cout << "Sending SIGTERM to process tree at pid "
-           << pid << endl;
+    if (launched && !killed) {
+      // Send TASK_KILLING if the framework can handle it.
+      CHECK_SOME(frameworkInfo);
+      CHECK_SOME(taskId);
+
+      foreach (const FrameworkInfo::Capability& c,
+               frameworkInfo->capabilities()) {
+        if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
+          TaskStatus status;
+          status.mutable_task_id()->CopyFrom(taskId.get());
+          status.set_state(TASK_KILLING);
+          driver->sendStatusUpdate(status);
+          break;
+        }
+      }
+
+      // Now perform signal escalation to begin killing the task.
+      CHECK_GT(pid, 0);
+
+      cout << "Sending SIGTERM to process tree at pid " << pid << endl;
 
       Try<std::list<os::ProcessTree> > trees =
         os::killtree(pid, SIGTERM, true, true);
 
       if (trees.isError()) {
-        cerr << "Failed to kill the process tree rooted at pid "
-             << pid << ": " << trees.error() << endl;
+        cerr << "Failed to kill the process tree rooted at pid " << pid
+             << ": " << trees.error() << endl;
 
         // Send SIGTERM directly to process 'pid' as it may not have
         // received signal before os::killtree() failed.
         ::kill(pid, SIGTERM);
       } else {
-        cout << "Killing the following process trees:\n"
+        cout << "Sent SIGTERM to the following process trees:\n"
              << stringify(trees.get()) << endl;
       }
 
@@ -538,7 +563,6 @@ protected:
 private:
   void reaped(
       ExecutorDriver* driver,
-      const TaskID& taskId,
       pid_t pid,
       const Future<Option<int> >& status_)
   {
@@ -574,8 +598,10 @@ private:
 
     cout << message << " (pid: " << pid << ")" << endl;
 
+    CHECK_SOME(taskId);
+
     TaskStatus taskStatus;
-    taskStatus.mutable_task_id()->MergeFrom(taskId);
+    taskStatus.mutable_task_id()->MergeFrom(taskId.get());
     taskStatus.set_state(taskState);
     taskStatus.set_message(message);
     if (killed && killedByHealthCheck) {
@@ -671,6 +697,8 @@ private:
   Duration escalationTimeout;
   Timer escalationTimer;
   Option<ExecutorDriver*> driver;
+  Option<FrameworkInfo> frameworkInfo;
+  Option<TaskID> taskId;
   string healthCheckDir;
   Option<char**> override;
   Option<string> sandboxDirectory;