You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jp...@apache.org on 2018/05/03 16:16:32 UTC

[3/8] mesos git commit: Added `max_completion_time` support to command executor.

Added `max_completion_time` support to command executor.

If `TaskInfo.max_completion_time` is set, command executor will kill
the task with `SIGKILL` immediately. Note that no KillPolicy will be
observed. Framework should only received a `TASK_FAILED` state with
`REASON_MAX_COMPLETION_TIME_REACHED` reason.

Review: https://reviews.apache.org/r/66259/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/197d1ea3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/197d1ea3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/197d1ea3

Branch: refs/heads/master
Commit: 197d1ea395e76961615e30f5b03550b1a4a4e779
Parents: 95bf46b
Author: Zhitao Li <zh...@gmail.com>
Authored: Thu May 3 08:33:32 2018 -0700
Committer: James Peach <jp...@apache.org>
Committed: Thu May 3 08:33:32 2018 -0700

----------------------------------------------------------------------
 src/launcher/executor.cpp | 64 ++++++++++++++++++++++++++++++++++++++----
 1 file changed, 59 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/197d1ea3/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 8d0869c..541ca5b 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -140,6 +140,7 @@ public:
       launched(false),
       killed(false),
       killedByHealthCheck(false),
+      killedByMaxCompletionTimer(false),
       terminated(false),
       pid(None()),
       shutdownGracePeriod(_shutdownGracePeriod),
@@ -646,6 +647,21 @@ protected:
       launchEnvironment.add_variables()->CopyFrom(variable);
     }
 
+    // Setup timer for max_completion_time.
+    if (task.max_completion_time().nanoseconds() > 0) {
+      Duration duration = Nanoseconds(task.max_completion_time().nanoseconds());
+
+      LOG(INFO) << "Task " << taskId.get() << " has a max completion time of "
+                << duration;
+
+      taskCompletionTimer = delay(
+          duration,
+          self(),
+          &Self::taskCompletionTimeout,
+          task.task_id(),
+          duration);
+    }
+
     LOG(INFO) << "Starting task " << taskId.get();
 
     pid = launchTaskSubprocess(
@@ -734,6 +750,12 @@ protected:
 
   void kill(const TaskID& _taskId, const Option<KillPolicy>& override = None())
   {
+    // Cancel the taskCompletionTimer if it is set and ongoing.
+    if (taskCompletionTimer.isSome()) {
+      Clock::cancel(taskCompletionTimer.get());
+      taskCompletionTimer = None();
+    }
+
     // Default grace period is set to 3s for backwards compatibility.
     //
     // TODO(alexr): Replace it with a more meaningful default, e.g.
@@ -846,10 +868,13 @@ private:
       CHECK_SOME(taskId);
       CHECK(taskId.get() == _taskId);
 
-      if (protobuf::frameworkHasCapability(
+      if (!killedByMaxCompletionTimer &&
+          protobuf::frameworkHasCapability(
               frameworkInfo.get(),
               FrameworkInfo::Capability::TASK_KILLING_STATE)) {
-        TaskStatus status = createTaskStatus(taskId.get(), TASK_KILLING);
+        TaskStatus status =
+          createTaskStatus(taskId.get(), TASK_KILLING);
+
         forward(status);
       }
 
@@ -915,6 +940,13 @@ private:
       Clock::cancel(killGracePeriodTimer.get());
     }
 
+    if (taskCompletionTimer.isSome()) {
+      Clock::cancel(taskCompletionTimer.get());
+      taskCompletionTimer = None();
+    }
+
+    Option<TaskStatus::Reason> reason = None();
+
     if (!status_.isReady()) {
       taskState = TASK_FAILED;
       message =
@@ -928,7 +960,10 @@ private:
       CHECK(WIFEXITED(status) || WIFSIGNALED(status))
         << "Unexpected wait status " << status;
 
-      if (killed) {
+      if (killedByMaxCompletionTimer) {
+        taskState = TASK_FAILED;
+        reason = TaskStatus::REASON_MAX_COMPLETION_TIME_REACHED;
+      } else if (killed) {
         // Send TASK_KILLED if the task was killed as a result of
         // kill() or shutdown().
         taskState = TASK_KILLED;
@@ -948,7 +983,7 @@ private:
     TaskStatus status = createTaskStatus(
         taskId.get(),
         taskState,
-        None(),
+        reason,
         message);
 
     // Indicate that a kill occurred due to a failing health check.
@@ -1007,6 +1042,23 @@ private:
     }
   }
 
+
+  void taskCompletionTimeout(const TaskID& taskId, const Duration& duration)
+  {
+    CHECK(!terminated);
+    CHECK(!killed);
+
+    LOG(INFO) << "Killing task " << taskId
+              << " which exceeded its maximum completion time of " << duration;
+
+    taskCompletionTimer = None();
+    killedByMaxCompletionTimer = true;
+
+    // Use a zero gracePeriod to kill the task.
+    kill(taskId, Duration::zero());
+  }
+
+
   // Use this helper to create a status update from scratch, i.e., without
   // previously attached extra information like `data` or `check_status`.
   TaskStatus createTaskStatus(
@@ -1130,11 +1182,13 @@ private:
   bool launched;
   bool killed;
   bool killedByHealthCheck;
+  bool killedByMaxCompletionTimer;
+
   bool terminated;
 
   Option<Time> killGracePeriodStart;
   Option<Timer> killGracePeriodTimer;
-
+  Option<Timer> taskCompletionTimer;
   Option<pid_t> pid;
   Duration shutdownGracePeriod;
   Option<KillPolicy> killPolicy;