You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/08/24 20:11:08 UTC

[2/8] mesos git commit: Fixed a bug where TASK_KILLED updates can be dropped by the agent.

Fixed a bug where TASK_KILLED updates can be dropped by the agent.

Per the description of MESOS-7863, there is currently an assumption
that when a pending task is killed, the framework will be stored in
the agent when the launch proceeds for the killed task. When this
assumption does not hold, the TASK_KILLED update will be dropped
due to the frameowrk being unknown when the launch proceeds. This
assumption doesn't hold in two cases:

  (1) Another pending task was killed and we removed the framework
      in 'Slave::run' thinking it was idle, because pending tasks
      was empty (we remove from pending tasks when processing the
      kill). (MESOS-7783 is an example instance of this).

  (2) The last executor terminated without tasks to send terminal
      updates for, or the last terminated executor received its
      last acknowledgement. At this point, we remove the framework
      thinking there were no pending tasks if the task was killed
      (removed from pending).

The fix applied here is to send the status updates from the kill
path rather than the launch path, to be consistent with how we kill
tasks queued within the Executor struct. We ensure that the task
is removed synchronously within the kill path to prevent its launch.

Review: https://reviews.apache.org/r/61645


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ccdbf887
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ccdbf887
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ccdbf887

Branch: refs/heads/1.2.x
Commit: ccdbf88755f6add61cf358a2cc18d6783f4f5b47
Parents: 1d56db0
Author: Benjamin Mahler <bm...@apache.org>
Authored: Mon Aug 14 14:59:08 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Thu Aug 24 13:10:29 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       | 252 ++++++++++++++++++++++++++++++++---------
 src/slave/slave.hpp       |  23 +++-
 src/tests/slave_tests.cpp |  33 +++---
 3 files changed, 237 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ccdbf887/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 0a57be5..36c380e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1727,12 +1727,18 @@ void Slave::run(
     }
   }
 
-  // We add the task/task group to 'pending' to ensure the framework is not
-  // removed and the framework and top level executor directories
-  // are not scheduled for deletion before '_run()' is called.
   CHECK_NOTNULL(framework);
-  foreach (const TaskInfo& _task, tasks) {
-    framework->pendingTasks[executorId][_task.task_id()] = _task;
+
+  // Track the pending task / task group to ensure the framework is
+  // not removed and the framework and top level executor directories
+  // are not scheduled for deletion before '_run()' is called.
+  //
+  // TODO(bmahler): Can we instead track pending tasks within the
+  // `Executor` struct by creating it earlier?
+  if (task.isSome()) {
+    framework->addPendingTask(executorId, task.get());
+  } else {
+    framework->addPendingTaskGroup(executorId, taskGroup.get());
   }
 
   // If we are about to create a new executor, unschedule the top
@@ -1799,34 +1805,17 @@ void Slave::_run(
     return;
   }
 
-  const ExecutorID& executorId = executorInfo.executor_id();
-
-  // If any of the tasks in the task group have been killed in the interim,
-  // we send a TASK_KILLED for all the other tasks in the group.
-  bool killed = false;
-  foreach (const TaskInfo& _task, tasks) {
-    if (!framework->removePendingTask(_task.task_id())) {
-      killed = true;
-    }
-  }
-
-  if (killed) {
+  // We don't send a status update here because a terminating
+  // framework cannot send acknowledgements.
+  if (framework->state == Framework::TERMINATING) {
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " of framework " << frameworkId
-                 << " because it has been killed in the meantime";
+                 << " because the framework is terminating";
 
+    // Although we cannot send a status update in this case, we remove
+    // the affected tasks from the pending tasks.
     foreach (const TaskInfo& _task, tasks) {
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          TASK_KILLED,
-          TaskStatus::SOURCE_SLAVE,
-          UUID::random(),
-          "Killed before delivery to the executor",
-          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
-
-      statusUpdate(update, UPID());
+      framework->removePendingTask(_task.task_id());
     }
 
     if (framework->idle()) {
@@ -1836,22 +1825,34 @@ void Slave::_run(
     return;
   }
 
-  // We don't send a status update here because a terminating
-  // framework cannot send acknowledgements.
-  if (framework->state == Framework::TERMINATING) {
-    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
-                 << " of framework " << frameworkId
-                 << " because the framework is terminating";
-
-    // Refer to the comment after 'framework->pending.erase' above
-    // for why we need this.
-    if (framework->idle()) {
-      removeFramework(framework);
+  // Ignore the launch if killed in the interim. The invariant here
+  // is that all tasks in the group are still pending, or all were
+  // removed due to a kill arriving for one of the tasks in the group.
+  bool allPending = true;
+  bool allRemoved = true;
+  foreach (const TaskInfo& _task, tasks) {
+    if (framework->isPending(_task.task_id())) {
+      allRemoved = false;
+    } else {
+      allPending = false;
     }
+  }
 
+  CHECK(allPending != allRemoved)
+    << "BUG: The task group " << taskOrTaskGroup(task, taskGroup)
+    << " was killed partially";
+
+  if (allRemoved) {
+    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
+                 << " of framework " << frameworkId
+                 << " because it has been killed in the meantime";
     return;
   }
 
+  foreach (const TaskInfo& _task, tasks) {
+    CHECK(framework->removePendingTask(_task.task_id()));
+  }
+
   if (!future.isReady()) {
     LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
                << (future.isFailed() ? future.failure() : "future discarded");
@@ -1884,7 +1885,6 @@ void Slave::_run(
       statusUpdate(update, UPID());
     }
 
-    // for why we need this.
     if (framework->idle()) {
       removeFramework(framework);
     }
@@ -1960,6 +1960,8 @@ void Slave::_run(
 
   CHECK_EQ(kill, false);
 
+  const ExecutorID& executorId = executorInfo.executor_id();
+
   // Refer to the comment above when looping across tasks on
   // why we need to unallocate resources.
   Resources checkpointedExecutorResources =
@@ -2452,20 +2454,54 @@ void Slave::killTask(
     return;
   }
 
-  // TODO(bmahler): Removing the task here is a bug: MESOS-7783.
-  bool removedWhilePending = framework->removePendingTask(taskId);
-
-  if (removedWhilePending) {
+  // If the task is pending, we send a TASK_KILLED immediately.
+  // This will trigger a synchronous removal of the pending task,
+  // which prevents it from being launched.
+  if (framework->isPending(taskId)) {
     LOG(WARNING) << "Killing task " << taskId
                  << " of framework " << frameworkId
                  << " before it was launched";
 
-    // We send the TASK_KILLED status update in `_run()` as the
-    // task being killed could be part of a task group and we
-    // don't store this information in `framework->pendingTasks`.
-    // We don't invoke `removeFramework()` here since we need the
-    // framework to be valid for sending the status update later.
-    return;
+    Option<TaskGroupInfo> taskGroup =
+      framework->getTaskGroupForPendingTask(taskId);
+
+    list<StatusUpdate> updates;
+    if (taskGroup.isSome()) {
+      foreach (const TaskInfo& task, taskGroup->tasks()) {
+        updates.push_back(protobuf::createStatusUpdate(
+            frameworkId,
+            info.id(),
+            task.task_id(),
+            TASK_KILLED,
+            TaskStatus::SOURCE_SLAVE,
+            UUID::random(),
+            "A task within the task group was killed before"
+            " delivery to the executor",
+            TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
+            CHECK_NOTNONE(
+                framework->getExecutorIdForPendingTask(task.task_id()))));
+      }
+    } else {
+      updates.push_back(protobuf::createStatusUpdate(
+          frameworkId,
+          info.id(),
+          taskId,
+          TASK_KILLED,
+          TaskStatus::SOURCE_SLAVE,
+          UUID::random(),
+          "Killed before delivery to the executor",
+          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
+          CHECK_NOTNONE(
+              framework->getExecutorIdForPendingTask(taskId))));
+    }
+
+    foreach (const StatusUpdate& update, updates) {
+      // NOTE: Sending a terminal update (TASK_KILLED) synchronously
+      // removes the task/task group from 'framework->pendingTasks'
+      // and 'framework->pendingTaskGroups', so that it will not be
+      // launched.
+      statusUpdate(update, UPID());
+    }
   }
 
   Executor* executor = framework->getExecutor(taskId);
@@ -3936,6 +3972,25 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
 
   const TaskStatus& status = update.status();
 
+  // For pending tasks, we must synchronously remove them
+  // to guarantee that the launch is prevented.
+  //
+  // TODO(bmahler): Ideally we store this task as terminated
+  // but with unacknowledged updates (same as the `Executor`
+  // struct does).
+  if (framework->isPending(status.task_id())) {
+    CHECK(framework->removePendingTask(status.task_id()));
+
+    if (framework->idle()) {
+      removeFramework(framework);
+    }
+
+    metrics.valid_status_updates++;
+
+    statusUpdateManager->update(update, info.id())
+      .onAny(defer(self(), &Slave::___statusUpdate, lambda::_1, update, pid));
+  }
+
   Executor* executor = framework->getExecutor(status.task_id());
   if (executor == nullptr) {
     LOG(WARNING)  << "Could not find the executor for "
@@ -6906,19 +6961,110 @@ void Framework::recoverExecutor(const ExecutorState& state)
 }
 
 
+void Framework::addPendingTask(
+    const ExecutorID& executorId,
+    const TaskInfo& task)
+{
+  pendingTasks[executorId][task.task_id()] = task;
+}
+
+
+void Framework::addPendingTaskGroup(
+    const ExecutorID& executorId,
+    const TaskGroupInfo& taskGroup)
+{
+  foreach (const TaskInfo& task, taskGroup.tasks()) {
+    pendingTasks[executorId][task.task_id()] = task;
+  }
+
+  pendingTaskGroups.push_back(taskGroup);
+}
+
+
+bool Framework::isPending(const TaskID& taskId) const
+{
+  foreachkey (const ExecutorID& executorId, pendingTasks) {
+    if (pendingTasks.at(executorId).contains(taskId)) {
+      return true;
+    }
+  }
+
+  return false;
+}
+
+
+Option<TaskGroupInfo> Framework::getTaskGroupForPendingTask(
+    const TaskID& taskId)
+{
+  foreach (const TaskGroupInfo& taskGroup, pendingTaskGroups) {
+    foreach (const TaskInfo& taskInfo, taskGroup.tasks()) {
+      if (taskInfo.task_id() == taskId) {
+        return taskGroup;
+      }
+    }
+  }
+
+  return None();
+}
+
+
 bool Framework::removePendingTask(const TaskID& taskId)
 {
+  bool removed = false;
+
   foreachkey (const ExecutorID& executorId, pendingTasks) {
     if (pendingTasks.at(executorId).contains(taskId)) {
       pendingTasks.at(executorId).erase(taskId);
       if (pendingTasks.at(executorId).empty()) {
         pendingTasks.erase(executorId);
       }
-      return true;
+
+      removed = true;
+      break;
     }
   }
 
-  return false;
+  // We also remove the pending task group if all of its
+  // tasks have been removed.
+  for (auto it = pendingTaskGroups.begin();
+       it != pendingTaskGroups.end();
+       ++it) {
+    foreach (const TaskInfo& t, it->tasks()) {
+      if (t.task_id() == taskId) {
+        // Found its task group, check if all tasks within
+        // the group have been removed.
+        bool allRemoved = true;
+
+        foreach (const TaskInfo& t_, it->tasks()) {
+          if (hasTask(t_.task_id())) {
+            allRemoved = false;
+            break;
+          }
+        }
+
+        if (allRemoved) {
+          pendingTaskGroups.erase(it);
+        }
+
+        return removed;
+      }
+    }
+  }
+
+  return removed;
+}
+
+
+Option<ExecutorID> Framework::getExecutorIdForPendingTask(
+    const TaskID& taskId) const
+{
+  foreachkey (const ExecutorID& executorId, pendingTasks) {
+    if (pendingTasks.at(executorId).contains(taskId)) {
+      return executorId;
+    }
+  }
+
+  return None();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ccdbf887/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 1c4f625..11dd272 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -1047,6 +1047,8 @@ struct Framework
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& taskInfo);
 
+  const FrameworkID id() const { return info.id(); }
+
   // Returns whether the framework is idle, where idle is
   // defined as having no activity:
   //   (1) The framework has no non-terminal tasks and executors.
@@ -1062,10 +1064,24 @@ struct Framework
   void recoverExecutor(const state::ExecutorState& state);
   void checkpointFramework() const;
 
+  void addPendingTask(
+      const ExecutorID& executorId,
+      const TaskInfo& task);
+
+  // Note that these tasks will also be tracked within `pendingTasks`.
+  void addPendingTaskGroup(
+      const ExecutorID& executorId,
+      const TaskGroupInfo& taskGroup);
+
+  bool isPending(const TaskID& taskId) const;
+
+  // Returns the task group associated with a pending task.
+  Option<TaskGroupInfo> getTaskGroupForPendingTask(const TaskID& taskId);
+
   // Returns whether the pending task was removed.
   bool removePendingTask(const TaskID& taskId);
 
-  const FrameworkID id() const { return info.id(); }
+  Option<ExecutorID> getExecutorIdForPendingTask(const TaskID& taskId) const;
 
   enum State
   {
@@ -1099,6 +1115,11 @@ struct Framework
   // Executors with pending tasks.
   hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pendingTasks;
 
+  // Pending task groups. This is needed for correctly sending
+  // TASK_KILLED status updates for all tasks in the group if
+  // any of the tasks are killed while pending.
+  std::list<TaskGroupInfo> pendingTaskGroups;
+
   // Current running executors.
   hashmap<ExecutorID, Executor*> executors;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ccdbf887/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 635fb97..2ffdca4 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -3381,23 +3381,22 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
     .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_killTask),
                     FutureSatisfy(&killTask)));
 
-  driver.killTask(task.task_id());
-
-  AWAIT_READY(killTask);
-
-  // Since this is the only task ever for this framework, the
-  // framework should get removed in Slave::_run().
-  // Thus we can observe that this happens before Shutdown().
   Future<Nothing> removeFramework;
   EXPECT_CALL(slave, removeFramework(_))
     .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework),
                     FutureSatisfy(&removeFramework)));
 
-  slave.unmocked__run(
-      future, frameworkInfo, executorInfo, task_, taskGroup);
+  driver.killTask(task.task_id());
 
+  AWAIT_READY(killTask);
+
+  // The agent will remove the framework when killing this task
+  // since there remain no more tasks.
   AWAIT_READY(removeFramework);
 
+  slave.unmocked__run(
+      future, frameworkInfo, executorInfo, task_, taskGroup);
+
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status.get().state());
 
@@ -5244,6 +5243,13 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
     .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_killTask),
                     FutureSatisfy(&killTask)));
 
+  // Since this is the only task group for this framework, the
+  // framework should get removed when the task is killed.
+  Future<Nothing> removeFramework;
+  EXPECT_CALL(slave, removeFramework(_))
+    .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework),
+                    FutureSatisfy(&removeFramework)));
+
   {
     Call call;
     call.mutable_framework_id()->CopyFrom(frameworkId);
@@ -5258,18 +5264,11 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
 
   AWAIT_READY(killTask);
 
-  // Since this is the only task group for this framework, the
-  // framework should get removed in `Slave::_run()`.
-  Future<Nothing> removeFramework;
-  EXPECT_CALL(slave, removeFramework(_))
-    .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework),
-                    FutureSatisfy(&removeFramework)));
+  AWAIT_READY(removeFramework);
 
   slave.unmocked__run(
       future, frameworkInfo, executorInfo_, task_, taskGroup_);
 
-  AWAIT_READY(removeFramework);
-
   AWAIT_READY(update1);
   AWAIT_READY(update2);