You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2016/03/24 18:24:31 UTC

[6/9] mesos git commit: Used `KillPolicy` and shutdown grace period in command executor.

Used `KillPolicy` and shutdown grace period in command executor.

The command executor determines how much time it allots the
underlying task to clean up (effectively how long to wait for
the task to comply to SIGTERM before sending SIGKILL) based
on both optional task's `KillPolicy` and optional
`shutdown_grace_period` field in `ExecutorInfo`.

Manual testing was performed to ensure newly introduced protobuf
fields are respected. To do that, "mesos-execute" was modified to
support `KillPolicy` and `CommandInfo.shell=false`. To simulate a
task that does not exit in the allotted period, a tiny app
(https://github.com/rukletsov/unresponsive-process) that ignores
SIGTERM was used. More details on testing in the review request.

Review: https://reviews.apache.org/r/44657/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d13de4c4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d13de4c4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d13de4c4

Branch: refs/heads/master
Commit: d13de4c42b39037c8bd8f79122e7a9ac0d82317f
Parents: 1fe6221
Author: Alexander Rukletsov <ru...@gmail.com>
Authored: Thu Mar 24 17:30:52 2016 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Thu Mar 24 18:21:03 2016 +0100

----------------------------------------------------------------------
 src/launcher/executor.cpp | 84 ++++++++++++++++++++++++++++++++++--------
 src/slave/slave.cpp       | 18 +++++++++
 src/tests/slave_tests.cpp | 76 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 162 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d13de4c4/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 2df62f0..4e9b4d9 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -84,14 +84,15 @@ public:
       const Option<string>& _sandboxDirectory,
       const Option<string>& _workingDirectory,
       const Option<string>& _user,
-      const Option<string>& _taskCommand)
+      const Option<string>& _taskCommand,
+      const Duration& _shutdownGracePeriod)
     : state(REGISTERING),
       launched(false),
       killed(false),
       killedByHealthCheck(false),
       pid(-1),
       healthPid(-1),
-      escalationTimeout(slave::EXECUTOR_SIGNAL_ESCALATION_TIMEOUT),
+      shutdownGracePeriod(_shutdownGracePeriod),
       driver(None()),
       frameworkInfo(None()),
       taskId(None()),
@@ -151,6 +152,11 @@ public:
     // Capture the TaskID.
     taskId = task.task_id();
 
+    // Capture the kill policy.
+    if (task.has_kill_policy()) {
+      killPolicy = task.kill_policy();
+    }
+
     // Determine the command to launch the task.
     CommandInfo command;
 
@@ -471,6 +477,34 @@ public:
 
   void shutdown(ExecutorDriver* driver)
   {
+    // If the kill policy's grace period is set, we use it for the signal
+    // escalation timeout. The agent adjusts executor's shutdown grace
+    // period based on it, hence the executor will be given enough time
+    // to clean up. If the kill policy is not specified, the executor's
+    // shutdown grace period is used, which is set to some default value.
+    //
+    // NOTE: We leave a small buffer of time to do the forced kill, otherwise
+    // the agent may destroy the container before we can send `TASK_KILLED`.
+    //
+    // TODO(alexr): Remove `MAX_REAP_INTERVAL` once the reaper signals
+    // immediately after the watched process has exited.
+    Duration gracePeriod =
+      shutdownGracePeriod - process::MAX_REAP_INTERVAL() - Seconds(1);
+
+    if (killPolicy.isSome() && killPolicy->has_grace_period()) {
+      gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds());
+    }
+
+    // TODO(bmahler): If a shutdown arrives after a kill task within
+    // the grace period of the `KillPolicy`, we may need to escalate
+    // more quickly (e.g. the shutdown grace period allotted by the
+    // agent is smaller than the kill grace period).
+
+    shutdown(driver, gracePeriod);
+  }
+
+  void shutdown(ExecutorDriver* driver, const Duration& gracePeriod)
+  {
     cout << "Shutting down" << endl;
 
     if (launched && !killed) {
@@ -509,12 +543,8 @@ public:
              << stringify(trees.get()) << endl;
       }
 
-      // TODO(nnielsen): Make escalationTimeout configurable through
-      // slave flags and/or per-framework/executor.
-      escalationTimer = delay(
-          escalationTimeout,
-          self(),
-          &Self::escalated);
+      escalationTimer =
+        delay(gracePeriod, self(), &Self::escalated, gracePeriod);
 
       killed = true;
     }
@@ -625,11 +655,10 @@ private:
     driver->stop();
   }
 
-  void escalated()
+  void escalated(Duration timeout)
   {
-    cout << "Process " << pid << " did not terminate after "
-         << escalationTimeout << ", sending SIGKILL to "
-         << "process tree at " << pid << endl;
+    cout << "Process " << pid << " did not terminate after " << timeout
+         << ", sending SIGKILL to process tree at " << pid << endl;
 
     // TODO(nnielsen): Sending SIGTERM in the first stage of the
     // shutdown may leave orphan processes hanging off init. This
@@ -702,7 +731,8 @@ private:
   bool killedByHealthCheck;
   pid_t pid;
   pid_t healthPid;
-  Duration escalationTimeout;
+  Duration shutdownGracePeriod;
+  Option<KillPolicy> killPolicy;
   Timer escalationTimer;
   Option<ExecutorDriver*> driver;
   Option<FrameworkInfo> frameworkInfo;
@@ -725,7 +755,8 @@ public:
       const Option<string>& sandboxDirectory,
       const Option<string>& workingDirectory,
       const Option<string>& user,
-      const Option<string>& taskCommand)
+      const Option<string>& taskCommand,
+      const Duration& shutdownGracePeriod)
   {
     process = new CommandExecutorProcess(
         override,
@@ -733,7 +764,8 @@ public:
         sandboxDirectory,
         workingDirectory,
         user,
-        taskCommand);
+        taskCommand,
+        shutdownGracePeriod);
 
     spawn(process);
   }
@@ -889,13 +921,33 @@ int main(int argc, char** argv)
     ? envPath.get()
     : os::realpath(Path(argv[0]).dirname()).get();
 
+  // Get executor shutdown grace period from the environment.
+  //
+  // NOTE: We avoided introducing a command executor flag for this
+  // because the command executor exits if it sees an unknown flag.
+  // This makes it difficult to add or remove command executor flags
+  // that are unconditionally set by the agent.
+  Duration shutdownGracePeriod = DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD;
+  Option<string> value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
+  if (value.isSome()) {
+    Try<Duration> parse = Duration::parse(value.get());
+    if (parse.isError()) {
+      cerr << "Failed to parse value '" << value.get() << "'"
+           << " of 'MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD': " << parse.error();
+      return EXIT_FAILURE;
+    }
+
+    shutdownGracePeriod = parse.get();
+  }
+
   mesos::internal::CommandExecutor executor(
       override,
       path,
       flags.sandbox_directory,
       flags.working_directory,
       flags.user,
-      flags.task_command);
+      flags.task_command,
+      shutdownGracePeriod);
 
   mesos::MesosExecutorDriver driver(&executor);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d13de4c4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c6fd810..f383605 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -43,6 +43,7 @@
 #include <process/dispatch.hpp>
 #include <process/http.hpp>
 #include <process/id.hpp>
+#include <process/reap.hpp>
 #include <process/time.hpp>
 
 #include <stout/bytes.hpp>
@@ -3812,6 +3813,23 @@ ExecutorInfo Slave::getExecutorInfo(
           task.command().environment());
     }
 
+    // Adjust the executor shutdown grace period if the kill policy is
+    // set. We add a small buffer of time to avoid destroying the
+    // container before `TASK_KILLED` is sent by the executor.
+    //
+    // TODO(alexr): Remove `MAX_REAP_INTERVAL` once the reaper signals
+    // immediately after the watched process has exited.
+    if (task.has_kill_policy() &&
+        task.kill_policy().has_grace_period()) {
+      Duration gracePeriod =
+        Nanoseconds(task.kill_policy().grace_period().nanoseconds()) +
+        process::MAX_REAP_INTERVAL() +
+        Seconds(1);
+
+      executor.mutable_shutdown_grace_period()->set_nanoseconds(
+          gracePeriod.ns());
+    }
+
     // We skip setting the user for the command executor that has
     // a rootfs image since we need root permissions to chroot.
     // We assume command executor will change to the correct user

http://git-wip-us.apache.org/repos/asf/mesos/blob/d13de4c4/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 69d4894..1f1a310 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -545,6 +545,82 @@ TEST_F(SlaveTest, ComamndTaskWithArguments)
 }
 
 
+// Tests that task's kill policy grace period does not extend the time
+// a task responsive to SIGTERM needs to exit and the terminal status
+// to be delivered to the master.
+TEST_F(SlaveTest, CommandTaskWithKillPolicy)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers->size());
+  Offer offer = offers.get()[0];
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offer.slave_id());
+  task.mutable_resources()->MergeFrom(offer.resources());
+
+  CommandInfo command;
+  command.set_value("sleep 1000");
+  task.mutable_command()->MergeFrom(command);
+
+  // Set task's kill policy grace period to a large value.
+  Duration gracePeriod = Seconds(100);
+  task.mutable_kill_policy()->mutable_grace_period()->set_nanoseconds(
+      gracePeriod.ns());
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning));
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(statusRunning);
+  EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+  // Kill the task.
+  Future<TaskStatus> statusKilled;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusKilled));
+
+  driver.killTask(statusRunning->task_id());
+
+  // Since "sleep 1000" task is responsive to SIGTERM, we should
+  // observe TASK_KILLED update sooner than after `gracePeriod`
+  // elapses. This indicates that extended grace period does not
+  // influence the time a task and its command executor need to
+  // exit. We add a small buffer for a task to clean up and the
+  // update to be processed by the master.
+  AWAIT_READY_FOR(statusKilled, Seconds(1) + process::MAX_REAP_INTERVAL());
+
+  EXPECT_EQ(TASK_KILLED, statusKilled->state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR,
+            statusKilled->source());
+
+  driver.stop();
+  driver.join();
+}
+
+
 // Don't let args from the CommandInfo struct bleed over into
 // mesos-executor forking. For more details of this see MESOS-1873.
 TEST_F(SlaveTest, GetExecutorInfo)