You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/08/24 00:56:12 UTC

[6/8] mesos git commit: Fixed a bug where TASK_KILLED updates can be dropped by the agent.

Fixed a bug where TASK_KILLED updates can be dropped by the agent.

Per the description of MESOS-7863, there is currently an assumption
that when a pending task is killed, the framework will be stored in
the agent when the launch proceeds for the killed task. When this
assumption does not hold, the TASK_KILLED update will be dropped
due to the frameowrk being unknown when the launch proceeds. This
assumption doesn't hold in two cases:

  (1) Another pending task was killed and we removed the framework
      in 'Slave::run' thinking it was idle, because pending tasks
      was empty (we remove from pending tasks when processing the
      kill). (MESOS-7783 is an example instance of this).

  (2) The last executor terminated without tasks to send terminal
      updates for, or the last terminated executor received its
      last acknowledgement. At this point, we remove the framework
      thinking there were no pending tasks if the task was killed
      (removed from pending).

The fix applied here is to send the status updates from the kill
path rather than the launch path, to be consistent with how we kill
tasks queued within the Executor struct. We ensure that the task
is removed synchronously within the kill path to prevent its launch.

Review: https://reviews.apache.org/r/61645


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b6ca5e41
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b6ca5e41
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b6ca5e41

Branch: refs/heads/1.3.x
Commit: b6ca5e41b7ec4de27c3deb4ccd23cc43bcc6b80c
Parents: dc8b06a
Author: Benjamin Mahler <bm...@apache.org>
Authored: Mon Aug 14 14:59:08 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 17:39:35 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       | 286 ++++++++++++++++++++++++++++-------------
 src/slave/slave.hpp       |  23 +++-
 src/tests/slave_tests.cpp |  33 +++--
 3 files changed, 236 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b6ca5e41/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 1c88ddc..e066b25 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1715,12 +1715,18 @@ void Slave::run(
     }
   }
 
-  // We add the task/task group to 'pending' to ensure the framework is not
-  // removed and the framework and top level executor directories
-  // are not scheduled for deletion before '_run()' is called.
   CHECK_NOTNULL(framework);
-  foreach (const TaskInfo& _task, tasks) {
-    framework->pendingTasks[executorId][_task.task_id()] = _task;
+
+  // Track the pending task / task group to ensure the framework is
+  // not removed and the framework and top level executor directories
+  // are not scheduled for deletion before '_run()' is called.
+  //
+  // TODO(bmahler): Can we instead track pending tasks within the
+  // `Executor` struct by creating it earlier?
+  if (task.isSome()) {
+    framework->addPendingTask(executorId, task.get());
+  } else {
+    framework->addPendingTaskGroup(executorId, taskGroup.get());
   }
 
   // If we are about to create a new executor, unschedule the top
@@ -1805,48 +1811,27 @@ void Slave::_run(
     return;
   }
 
-  const ExecutorID& executorId = executorInfo.executor_id();
-
-  // If any of the tasks in the task group have been killed in the interim,
-  // we send a TASK_KILLED for all the other tasks in the group.
-  bool killed = false;
+  // Ignore the launch if killed in the interim. The invariant here
+  // is that all tasks in the group are still pending, or all were
+  // removed due to a kill arriving for one of the tasks in the group.
+  bool allPending = true;
+  bool allRemoved = true;
   foreach (const TaskInfo& _task, tasks) {
-    if (!framework->pendingTasks.contains(executorId) ||
-        !framework->pendingTasks.at(executorId).contains(_task.task_id())) {
-      killed = true;
-      break;
+    if (framework->isPending(_task.task_id())) {
+      allRemoved = false;
+    } else {
+      allPending = false;
     }
   }
 
-  if (killed) {
+  CHECK(allPending != allRemoved)
+    << "BUG: The task group " << taskOrTaskGroup(task, taskGroup)
+    << " was killed partially";
+
+  if (allRemoved) {
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " of framework " << frameworkId
                  << " because it has been killed in the meantime";
-
-    foreach (const TaskInfo& _task, tasks) {
-      framework->removePendingTask(_task.task_id());
-
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          TASK_KILLED,
-          TaskStatus::SOURCE_SLAVE,
-          UUID::random(),
-          "Killed before delivery to the executor",
-          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
-
-      // TODO(vinod): Ensure that the status update manager reliably
-      // delivers this update. Currently, we don't guarantee this
-      // because removal of the framework causes the status update
-      // manager to stop retrying for its un-acked updates.
-      statusUpdate(update, UPID());
-    }
-
-    if (framework->idle()) {
-      removeFramework(framework);
-    }
-
     return;
   }
 
@@ -1966,53 +1951,34 @@ void Slave::__run(
     return;
   }
 
-  // Remove the task/task group from being pending. If any of the
-  // tasks in the task group have been killed in the interim, we
-  // send a TASK_KILLED for all the other tasks in the group.
-  bool killed = false;
+  // Ignore the launch if killed in the interim. The invariant here
+  // is that all tasks in the group are still pending, or all were
+  // removed due to a kill arriving for one of the tasks in the group.
+  bool allPending = true;
+  bool allRemoved = true;
   foreach (const TaskInfo& _task, tasks) {
-    if (framework->removePendingTask(_task.task_id())) {
-      // NOTE: Ideally we would perform the following check here:
-      //
-      //   if (framework->idle()) {
-      //     removeFramework(framework);
-      //   }
-      //
-      // However, we need 'framework' to stay valid for the rest of
-      // this function. As such, we perform the check before each of
-      // the 'return' statements below.
+    if (framework->isPending(_task.task_id())) {
+      allRemoved = false;
     } else {
-      killed = true;
+      allPending = false;
     }
   }
 
-  if (killed) {
+  CHECK(allPending != allRemoved)
+    << "BUG: The task group " << taskOrTaskGroup(task, taskGroup)
+    << " was killed partially";
+
+  if (allRemoved) {
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " of framework " << frameworkId
                  << " because it has been killed in the meantime";
-
-    foreach (const TaskInfo& _task, tasks) {
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          TASK_KILLED,
-          TaskStatus::SOURCE_SLAVE,
-          UUID::random(),
-          "Killed before delivery to the executor",
-          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
-      statusUpdate(update, UPID());
-    }
-
-    // Refer to the comment after 'framework->removePendingTask' above
-    // for why we need this.
-    if (framework->idle()) {
-      removeFramework(framework);
-    }
-
     return;
   }
 
+  foreach (const TaskInfo& _task, tasks) {
+    CHECK(framework->removePendingTask(_task.task_id()));
+  }
+
   CHECK(!future.isDiscarded());
 
   // Validate that the task (or tasks in case of task group) are authorized
@@ -2870,20 +2836,54 @@ void Slave::killTask(
     return;
   }
 
-  // TODO(bmahler): Removing the task here is a bug: MESOS-7783.
-  bool removedWhilePending = framework->removePendingTask(taskId);
-
-  if (removedWhilePending) {
+  // If the task is pending, we send a TASK_KILLED immediately.
+  // This will trigger a synchronous removal of the pending task,
+  // which prevents it from being launched.
+  if (framework->isPending(taskId)) {
     LOG(WARNING) << "Killing task " << taskId
                  << " of framework " << frameworkId
                  << " before it was launched";
 
-    // We send the TASK_KILLED status update in `_run()` as the
-    // task being killed could be part of a task group and we
-    // don't store this information in `framework->pendingTasks`.
-    // We don't invoke `removeFramework()` here since we need the
-    // framework to be valid for sending the status update later.
-    return;
+    Option<TaskGroupInfo> taskGroup =
+      framework->getTaskGroupForPendingTask(taskId);
+
+    list<StatusUpdate> updates;
+    if (taskGroup.isSome()) {
+      foreach (const TaskInfo& task, taskGroup->tasks()) {
+        updates.push_back(protobuf::createStatusUpdate(
+            frameworkId,
+            info.id(),
+            task.task_id(),
+            TASK_KILLED,
+            TaskStatus::SOURCE_SLAVE,
+            UUID::random(),
+            "A task within the task group was killed before"
+            " delivery to the executor",
+            TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
+            CHECK_NOTNONE(
+                framework->getExecutorIdForPendingTask(task.task_id()))));
+      }
+    } else {
+      updates.push_back(protobuf::createStatusUpdate(
+          frameworkId,
+          info.id(),
+          taskId,
+          TASK_KILLED,
+          TaskStatus::SOURCE_SLAVE,
+          UUID::random(),
+          "Killed before delivery to the executor",
+          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
+          CHECK_NOTNONE(
+              framework->getExecutorIdForPendingTask(taskId))));
+    }
+
+    foreach (const StatusUpdate& update, updates) {
+      // NOTE: Sending a terminal update (TASK_KILLED) synchronously
+      // removes the task/task group from 'framework->pendingTasks'
+      // and 'framework->pendingTaskGroups', so that it will not be
+      // launched.
+      statusUpdate(update, UPID());
+    }
   }
 
   Executor* executor = framework->getExecutor(taskId);
@@ -4354,6 +4354,25 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
 
   const TaskStatus& status = update.status();
 
+  // For pending tasks, we must synchronously remove them
+  // to guarantee that the launch is prevented.
+  //
+  // TODO(bmahler): Ideally we store this task as terminated
+  // but with unacknowledged updates (same as the `Executor`
+  // struct does).
+  if (framework->isPending(status.task_id())) {
+    CHECK(framework->removePendingTask(status.task_id()));
+
+    if (framework->idle()) {
+      removeFramework(framework);
+    }
+
+    metrics.valid_status_updates++;
+
+    statusUpdateManager->update(update, info.id())
+      .onAny(defer(self(), &Slave::___statusUpdate, lambda::_1, update, pid));
+  }
+
   Executor* executor = framework->getExecutor(status.task_id());
   if (executor == nullptr) {
     LOG(WARNING)  << "Could not find the executor for "
@@ -7299,19 +7318,110 @@ void Framework::recoverExecutor(
 }
 
 
+void Framework::addPendingTask(
+    const ExecutorID& executorId,
+    const TaskInfo& task)
+{
+  pendingTasks[executorId][task.task_id()] = task;
+}
+
+
+void Framework::addPendingTaskGroup(
+    const ExecutorID& executorId,
+    const TaskGroupInfo& taskGroup)
+{
+  foreach (const TaskInfo& task, taskGroup.tasks()) {
+    pendingTasks[executorId][task.task_id()] = task;
+  }
+
+  pendingTaskGroups.push_back(taskGroup);
+}
+
+
+bool Framework::isPending(const TaskID& taskId) const
+{
+  foreachkey (const ExecutorID& executorId, pendingTasks) {
+    if (pendingTasks.at(executorId).contains(taskId)) {
+      return true;
+    }
+  }
+
+  return false;
+}
+
+
+Option<TaskGroupInfo> Framework::getTaskGroupForPendingTask(
+    const TaskID& taskId)
+{
+  foreach (const TaskGroupInfo& taskGroup, pendingTaskGroups) {
+    foreach (const TaskInfo& taskInfo, taskGroup.tasks()) {
+      if (taskInfo.task_id() == taskId) {
+        return taskGroup;
+      }
+    }
+  }
+
+  return None();
+}
+
+
 bool Framework::removePendingTask(const TaskID& taskId)
 {
+  bool removed = false;
+
   foreachkey (const ExecutorID& executorId, pendingTasks) {
     if (pendingTasks.at(executorId).contains(taskId)) {
       pendingTasks.at(executorId).erase(taskId);
       if (pendingTasks.at(executorId).empty()) {
         pendingTasks.erase(executorId);
       }
-      return true;
+
+      removed = true;
+      break;
     }
   }
 
-  return false;
+  // We also remove the pending task group if all of its
+  // tasks have been removed.
+  for (auto it = pendingTaskGroups.begin();
+       it != pendingTaskGroups.end();
+       ++it) {
+    foreach (const TaskInfo& t, it->tasks()) {
+      if (t.task_id() == taskId) {
+        // Found its task group, check if all tasks within
+        // the group have been removed.
+        bool allRemoved = true;
+
+        foreach (const TaskInfo& t_, it->tasks()) {
+          if (hasTask(t_.task_id())) {
+            allRemoved = false;
+            break;
+          }
+        }
+
+        if (allRemoved) {
+          pendingTaskGroups.erase(it);
+        }
+
+        return removed;
+      }
+    }
+  }
+
+  return removed;
+}
+
+
+Option<ExecutorID> Framework::getExecutorIdForPendingTask(
+    const TaskID& taskId) const
+{
+  foreachkey (const ExecutorID& executorId, pendingTasks) {
+    if (pendingTasks.at(executorId).contains(taskId)) {
+      return executorId;
+    }
+  }
+
+  return None();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b6ca5e41/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 54bf2a5..0f4a53c 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -1112,6 +1112,8 @@ struct Framework
 
   ~Framework();
 
+  const FrameworkID id() const { return info.id(); }
+
   // Returns whether the framework is idle, where idle is
   // defined as having no activity:
   //   (1) The framework has no non-terminal tasks and executors.
@@ -1133,10 +1135,24 @@ struct Framework
 
   void checkpointFramework() const;
 
+  void addPendingTask(
+      const ExecutorID& executorId,
+      const TaskInfo& task);
+
+  // Note that these tasks will also be tracked within `pendingTasks`.
+  void addPendingTaskGroup(
+      const ExecutorID& executorId,
+      const TaskGroupInfo& taskGroup);
+
+  bool isPending(const TaskID& taskId) const;
+
+  // Returns the task group associated with a pending task.
+  Option<TaskGroupInfo> getTaskGroupForPendingTask(const TaskID& taskId);
+
   // Returns whether the pending task was removed.
   bool removePendingTask(const TaskID& taskId);
 
-  const FrameworkID id() const { return info.id(); }
+  Option<ExecutorID> getExecutorIdForPendingTask(const TaskID& taskId) const;
 
   enum State
   {
@@ -1170,6 +1186,11 @@ struct Framework
   // Executors with pending tasks.
   hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pendingTasks;
 
+  // Pending task groups. This is needed for correctly sending
+  // TASK_KILLED status updates for all tasks in the group if
+  // any of the tasks are killed while pending.
+  std::list<TaskGroupInfo> pendingTaskGroups;
+
   // Current running executors.
   hashmap<ExecutorID, Executor*> executors;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b6ca5e41/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 7a0265b..2a568a9 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -3611,23 +3611,22 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
     .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_killTask),
                     FutureSatisfy(&killTask)));
 
-  driver.killTask(task.task_id());
-
-  AWAIT_READY(killTask);
-
-  // Since this is the only task ever for this framework, the
-  // framework should get removed in Slave::_run().
-  // Thus we can observe that this happens before Shutdown().
   Future<Nothing> removeFramework;
   EXPECT_CALL(slave, removeFramework(_))
     .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework),
                     FutureSatisfy(&removeFramework)));
 
-  slave.unmocked__run(
-      future, frameworkInfo, executorInfo, task_, taskGroup);
+  driver.killTask(task.task_id());
 
+  AWAIT_READY(killTask);
+
+  // The agent will remove the framework when killing this task
+  // since there remain no more tasks.
   AWAIT_READY(removeFramework);
 
+  slave.unmocked__run(
+      future, frameworkInfo, executorInfo, task_, taskGroup);
+
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
 
@@ -6514,6 +6513,13 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
     .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_killTask),
                     FutureSatisfy(&killTask)));
 
+  // Since this is the only task group for this framework, the
+  // framework should get removed when the task is killed.
+  Future<Nothing> removeFramework;
+  EXPECT_CALL(slave, removeFramework(_))
+    .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework),
+                    FutureSatisfy(&removeFramework)));
+
   {
     Call call;
     call.mutable_framework_id()->CopyFrom(frameworkId);
@@ -6528,18 +6534,11 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
 
   AWAIT_READY(killTask);
 
-  // Since this is the only task group for this framework, the
-  // framework should get removed in `Slave::_run()`.
-  Future<Nothing> removeFramework;
-  EXPECT_CALL(slave, removeFramework(_))
-    .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework),
-                    FutureSatisfy(&removeFramework)));
+  AWAIT_READY(removeFramework);
 
   slave.unmocked__run(
       future, frameworkInfo, executorInfo_, task_, taskGroup_);
 
-  AWAIT_READY(removeFramework);
-
   AWAIT_READY(update1);
   AWAIT_READY(update2);