You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2018/02/08 02:59:37 UTC

[3/6] mesos git commit: Fixed a bug relating to lingering executors [1/2].

Fixed a bug relating to lingering executors [1/2].

An executor should be shutdown if all of its tasks are
killed while the executor is launching.

This patch fixes an issue where the executor is left
running when the task(s) get killed between the executor
registration/subscription and `Slave::___run()`. See
MESOS-8411 for more details. There is an additional race
in the agent failover case that is addressed in this patch.

The fix here is to fix the race by checking an executor's various
tasks queues during task kill and executor (re-)registration,
and shutting down executors that had never received any tasks.

Review: https://reviews.apache.org/r/65109/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/81d4bb20
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/81d4bb20
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/81d4bb20

Branch: refs/heads/master
Commit: 81d4bb20d189c93d935a807b7ea401689159a887
Parents: 93f48fd
Author: Meng Zhu <mz...@mesosphere.io>
Authored: Wed Feb 7 17:01:58 2018 -0800
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Feb 7 18:41:15 2018 -0800

----------------------------------------------------------------------
 src/slave/constants.hpp |   8 +++
 src/slave/slave.cpp     | 118 +++++++++++++++++++++++++++++++++++--------
 src/slave/slave.hpp     |  24 +++++++++
 3 files changed, 129 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/81d4bb20/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index e6cb7cc..030fb05 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -86,6 +86,14 @@ constexpr size_t MAX_COMPLETED_FRAMEWORKS = 50;
 constexpr size_t DEFAULT_MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK = 150;
 
 // Maximum number of completed tasks per executor to store in memory.
+//
+// NOTE: This should be greater than zero because the agent looks
+// for completed tasks to determine (with false positives) whether
+// an executor ever received tasks. See MESOS-8411.
+//
+// TODO(mzhu): Remove this note once we can determine whether an
+// executor ever received tasks without looking through the
+// completed tasks.
 constexpr size_t MAX_COMPLETED_TASKS_PER_EXECUTOR = 200;
 
 // Default cpus offered by the slave.

http://git-wip-us.apache.org/repos/asf/mesos/blob/81d4bb20/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 22383cc..98ee5e6 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2817,6 +2817,12 @@ void Slave::___run(
     return;
   }
 
+  // At this point, we must have either sent some tasks to the running
+  // executor or there are queued tasks that need to be delivered.
+  // Otherwise, the executor state would have been synchronously
+  // transitioned to TERMINATING when the queued tasks were killed.
+  CHECK(executor->everSentTask() || !executor->queuedTasks.empty());
+
   foreach (const TaskInfo& task, tasks) {
     // This is the case where the task is killed. No need to send
     // status update because it should be handled in 'killTask'.
@@ -3351,6 +3357,10 @@ void Slave::killTask(
         statusUpdate(update, UPID());
       }
 
+      // TODO(mzhu): Consider shutting down the executor here
+      // if all of its initial tasks are killed rather than
+      // waiting for it to register.
+
       break;
     }
     case Executor::TERMINATING:
@@ -3407,6 +3417,19 @@ void Slave::killTask(
           // a later point in time, it won't get this task.
           statusUpdate(update, UPID());
         }
+
+        // Shutdown the executor if all of its initial tasks are killed.
+        // See MESOS-8411. This is a workaround for those executors (e.g.,
+        // command executor, default executor) that do not have a proper
+        // self terminating logic when they haven't received the task or
+        // task group within a timeout.
+        if (!executor->everSentTask() && executor->queuedTasks.empty()) {
+          LOG(WARNING) << "Shutting down executor " << *executor
+                       << " because it has never been sent a task and all of"
+                       << " its queued tasks have been killed before delivery";
+
+          _shutdownExecutor(framework, executor);
+        }
       } else {
         // Send a message to the executor and wait for
         // it to send us a status update.
@@ -4220,15 +4243,10 @@ void Slave::subscribe(
       // those executors (e.g., command executor, default executor) that do not
       // have a proper self terminating logic when they haven't received the
       // task or task group within a timeout.
-      if (state != RECOVERING &&
-          executor->queuedTasks.empty() &&
-          executor->queuedTaskGroups.empty()) {
-        CHECK(executor->launchedTasks.empty())
-            << " Newly registered executor '" << executor->id
-            << "' has launched tasks";
-
-        LOG(WARNING) << "Shutting down the executor " << *executor
-                     << " because it has no tasks to run";
+      if (!executor->everSentTask() && executor->queuedTasks.empty()) {
+        LOG(WARNING) << "Shutting down subscribing executor " << *executor
+                     << " because it was never sent a task and"
+                     << " has no tasks to run";
 
         _shutdownExecutor(framework, executor);
 
@@ -4303,7 +4321,7 @@ void Slave::subscribe(
       // TODO(vinod): Consider checkpointing 'TaskInfo' instead of
       // 'Task' so that we can relaunch such tasks! Currently we don't
       // do it because 'TaskInfo.data' could be huge.
-      foreachvalue (Task* task, executor->launchedTasks) {
+      foreach (Task* task, executor->launchedTasks.values()) {
         if (task->state() == TASK_STAGING &&
             !unackedTasks.contains(task->task_id())) {
           mesos::TaskState newTaskState = TASK_DROPPED;
@@ -4447,12 +4465,8 @@ void Slave::registerExecutor(
       // this shutdown message, it is safe because the executor driver shuts
       // down the executor if it gets disconnected from the agent before
       // registration.
-      if (executor->queuedTasks.empty()) {
-        CHECK(executor->launchedTasks.empty())
-            << " Newly registered executor '" << executor->id
-            << "' has launched tasks";
-
-        LOG(WARNING) << "Shutting down the executor " << *executor
+      if (!executor->everSentTask() && executor->queuedTasks.empty()) {
+        LOG(WARNING) << "Shutting down registering executor " << *executor
                      << " because it has no tasks to run";
 
         _shutdownExecutor(framework, executor);
@@ -4648,7 +4662,7 @@ void Slave::reregisterExecutor(
       // TODO(vinod): Consider checkpointing 'TaskInfo' instead of
       // 'Task' so that we can relaunch such tasks! Currently we
       // don't do it because 'TaskInfo.data' could be huge.
-      foreachvalue (Task* task, executor->launchedTasks) {
+      foreach (Task* task, executor->launchedTasks.values()) {
         if (task->state() == TASK_STAGING &&
             !unackedTasks.contains(task->task_id())) {
           mesos::TaskState newTaskState = TASK_DROPPED;
@@ -4678,8 +4692,21 @@ void Slave::reregisterExecutor(
         }
       }
 
-      // TODO(vinod): Similar to what we do in `registerExecutor()` the executor
-      // should be shutdown if it hasn't received any tasks.
+      // Shutdown the executor if all of its initial tasks are killed.
+      // This is a workaround for those executors (e.g.,
+      // command executor, default executor) that do not have a proper
+      // self terminating logic when they haven't received the task or
+      // task group within a timeout.
+      if (!executor->everSentTask() && executor->queuedTasks.empty()) {
+        LOG(WARNING) << "Shutting down re-registering executor " << *executor
+                     << " because it has no tasks to run and"
+                     << " has never been sent a task";
+
+        _shutdownExecutor(framework, executor);
+
+        return;
+      }
+
       break;
     }
   }
@@ -4987,11 +5014,21 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
   // do not need to send the container status and we must
   // synchronously transition the task to ensure that it is removed
   // from the queued tasks before the run task path continues.
+  //
+  // Also if the task is in `launchedTasks` but was dropped by the
+  // agent, we know that the task did not reach the executor. We
+  // will synchronously transition the task to ensure that the
+  // agent re-registration logic can call `everSentTask()` after
+  // dropping tasks.
   if (executor->queuedTasks.contains(status.task_id())) {
     CHECK(protobuf::isTerminalState(status.state()))
         << "Queued tasks can only be transitioned to terminal states";
 
     _statusUpdate(update, pid, executor->id, None());
+  } else if (executor->launchedTasks.contains(status.task_id()) &&
+            (status.state() == TASK_DROPPED || status.state() == TASK_LOST) &&
+            status.source() == TaskStatus::SOURCE_SLAVE) {
+    _statusUpdate(update, pid, executor->id, None());
   } else {
     // NOTE: If the executor sets the ContainerID inside the
     // ContainerStatus, that indicates that the Task this status update
@@ -8948,11 +8985,24 @@ Executor::Executor(
     user(_user),
     checkpoint(_checkpoint),
     http(None()),
-    pid(None()),
-    completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
+    pid(None())
 {
   CHECK_NOTNULL(slave);
 
+  // NOTE: This should be greater than zero because the agent looks
+  // for completed tasks to determine (with false positives) whether
+  // an executor ever received tasks. See MESOS-8411.
+  //
+  // TODO(mzhu): Remove this check once we can determine whether an
+  // executor ever received tasks without looking through the
+  // completed tasks.
+  static_assert(
+      MAX_COMPLETED_TASKS_PER_EXECUTOR > 0,
+      "Max completed tasks per executor should be greater than zero");
+
+  completedTasks =
+    boost::circular_buffer<shared_ptr<Task>>(MAX_COMPLETED_TASKS_PER_EXECUTOR);
+
   // TODO(jieyu): The way we determine if an executor is generated for
   // a command task (either command or docker executor) is really
   // hacky. We rely on the fact that docker executor launch command is
@@ -9260,6 +9310,32 @@ bool Executor::incompleteTasks()
 }
 
 
+bool Executor::everSentTask() const
+{
+  if (!launchedTasks.empty()) {
+    return true;
+  }
+
+  foreachvalue (Task* task, terminatedTasks) {
+    foreach (const TaskStatus& status, task->statuses()) {
+      if (status.source() == TaskStatus::SOURCE_EXECUTOR) {
+        return true;
+      }
+    }
+  }
+
+  foreach (const shared_ptr<Task>& task, completedTasks) {
+    foreach (const TaskStatus& status, task->statuses()) {
+      if (status.source() == TaskStatus::SOURCE_EXECUTOR) {
+        return true;
+      }
+    }
+  }
+
+  return false;
+}
+
+
 bool Executor::isGeneratedForCommandTask() const
 {
   return isGeneratedForCommandTask_;

http://git-wip-us.apache.org/repos/asf/mesos/blob/81d4bb20/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 09c01eb..38808cb 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -862,6 +862,30 @@ public:
   // Returns true if there are any queued/launched/terminated tasks.
   bool incompleteTasks();
 
+  // Returns true if the agent ever sent any tasks to this executor.
+  // More precisely, this function returns whether either:
+  //
+  //  (1) There are terminated/completed tasks with a
+  //      SOURCE_EXECUTOR status.
+  //
+  //  (2) `launchedTasks` is not empty.
+  //
+  // If this function returns false and there are no queued tasks,
+  // we probably (see TODO below) have killed or dropped all of its
+  // initial tasks, in which case the agent will shut down the executor.
+  //
+  // TODO(mzhu): Note however, that since we look through the cache
+  // of completed tasks, we can have false positives when a task
+  // that was delivered to the executor has been evicted from the
+  // completed task cache by tasks that have been killed by the
+  // agent before delivery. This should be fixed.
+  //
+  // NOTE: This function checks whether any tasks has ever been sent,
+  // this does not necessarily mean the executor has ever received
+  // any tasks. Specifically, tasks in `launchedTasks` may be dropped
+  // before received by the executor if the agent restarts.
+  bool everSentTask() const;
+
   // Sends a message to the connected executor.
   template <typename Message>
   void send(const Message& message)