You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/08/24 00:56:08 UTC

[2/8] mesos git commit: Updated Framework::removePendingTask to take only a TaskID.

Updated Framework::removePendingTask to take only a TaskID.

This allows the function to be used in the kill task path where we
do not have the TaskInfo or ExecutorInfo available. With this, we
now have all removals going through this function.

Review: https://reviews.apache.org/r/61641


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/080cbb6d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/080cbb6d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/080cbb6d

Branch: refs/heads/1.3.x
Commit: 080cbb6d8209da328f7305aede99f8e4b2b8ea1f
Parents: 0ced46f
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 4 15:46:21 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 17:37:31 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 80 ++++++++++++++++++++++--------------------------
 src/slave/slave.hpp |  5 ++-
 2 files changed, 38 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/080cbb6d/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 97114e5..34e7842 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1785,8 +1785,6 @@ void Slave::_run(
     return;
   }
 
-  const ExecutorID& executorId = executorInfo.executor_id();
-
   // We don't send a status update here because a terminating
   // framework cannot send acknowledgements.
   if (framework->state == Framework::TERMINATING) {
@@ -1797,7 +1795,7 @@ void Slave::_run(
     // Although we cannot send a status update in this case, we remove
     // the affected tasks from the pending tasks.
     foreach (const TaskInfo& _task, tasks) {
-      framework->removePendingTask(_task, executorInfo);
+      framework->removePendingTask(_task.task_id());
     }
 
     if (framework->executors.empty() && framework->pending.empty()) {
@@ -1807,6 +1805,8 @@ void Slave::_run(
     return;
   }
 
+  const ExecutorID& executorId = executorInfo.executor_id();
+
   // If any of the tasks in the task group have been killed in the interim,
   // we send a TASK_KILLED for all the other tasks in the group.
   bool killed = false;
@@ -1824,7 +1824,7 @@ void Slave::_run(
                  << " because it has been killed in the meantime";
 
     foreach (const TaskInfo& _task, tasks) {
-      framework->removePendingTask(_task, executorInfo);
+      framework->removePendingTask(_task.task_id());
 
       const StatusUpdate update = protobuf::createStatusUpdate(
           frameworkId,
@@ -1866,7 +1866,7 @@ void Slave::_run(
     }
 
     foreach (const TaskInfo& _task, tasks) {
-      framework->removePendingTask(_task, executorInfo);
+      framework->removePendingTask(_task.task_id());
 
       const StatusUpdate update = protobuf::createStatusUpdate(
           frameworkId,
@@ -1956,7 +1956,7 @@ void Slave::__run(
     // Although we cannot send a status update in this case, we remove
     // the affected tasks from the list of pending tasks.
     foreach (const TaskInfo& _task, tasks) {
-      framework->removePendingTask(_task, executorInfo);
+      framework->removePendingTask(_task.task_id());
     }
 
     if (framework->executors.empty() && framework->pending.empty()) {
@@ -1971,7 +1971,7 @@ void Slave::__run(
   // send a TASK_KILLED for all the other tasks in the group.
   bool killed = false;
   foreach (const TaskInfo& _task, tasks) {
-    if (framework->removePendingTask(_task, executorInfo)) {
+    if (framework->removePendingTask(_task.task_id())) {
       // NOTE: Ideally we would perform the following check here:
       //
       //   if (framework->executors.empty() &&
@@ -2871,23 +2871,20 @@ void Slave::killTask(
     return;
   }
 
-  foreachkey (const ExecutorID& executorId, framework->pending) {
-    if (framework->pending[executorId].contains(taskId)) {
-      LOG(WARNING) << "Killing task " << taskId
-                   << " of framework " << frameworkId
-                   << " before it was launched";
+  // TODO(bmahler): Removing the task here is a bug: MESOS-7783.
+  bool removedWhilePending = framework->removePendingTask(taskId);
 
-      // We send the TASK_KILLED status update in `_run()` as the
-      // task being killed could be part of a task group and we
-      // don't store this information in `framework->pending`.
-      // We don't invoke `removeFramework()` here since we need the
-      // framework to be valid for sending the status update later.
-     framework->pending[executorId].erase(taskId);
-     if (framework->pending[executorId].empty()) {
-       framework->pending.erase(executorId);
-     }
-     return;
-    }
+  if (removedWhilePending) {
+    LOG(WARNING) << "Killing task " << taskId
+                 << " of framework " << frameworkId
+                 << " before it was launched";
+
+    // We send the TASK_KILLED status update in `_run()` as the
+    // task being killed could be part of a task group and we
+    // don't store this information in `framework->pending`.
+    // We don't invoke `removeFramework()` here since we need the
+    // framework to be valid for sending the status update later.
+    return;
   }
 
   Executor* executor = framework->getExecutor(taskId);
@@ -7114,27 +7111,6 @@ Executor* Framework::getExecutor(const TaskID& taskId) const
 }
 
 
-// Return `true` if `task` was a pending task of this framework
-// before the removal; `false` otherwise.
-bool Framework::removePendingTask(
-    const TaskInfo& task,
-    const ExecutorInfo& executorInfo)
-{
-  const ExecutorID executorId = executorInfo.executor_id();
-
-  if (pending.contains(executorId) &&
-      pending.at(executorId).contains(task.task_id())) {
-    pending.at(executorId).erase(task.task_id());
-    if (pending.at(executorId).empty()) {
-      pending.erase(executorId);
-    }
-    return true;
-  }
-
-  return false;
-}
-
-
 Executor* Slave::getExecutor(const ContainerID& containerId) const
 {
   const ContainerID rootContainerId = protobuf::getRootContainerId(containerId);
@@ -7320,6 +7296,22 @@ void Framework::recoverExecutor(
 }
 
 
+bool Framework::removePendingTask(const TaskID& taskId)
+{
+  foreachkey (const ExecutorID& executorId, pending) {
+    if (pending.at(executorId).contains(taskId)) {
+      pending.at(executorId).erase(taskId);
+      if (pending.at(executorId).empty()) {
+        pending.erase(executorId);
+      }
+      return true;
+    }
+  }
+
+  return false;
+}
+
+
 Executor::Executor(
     Slave* _slave,
     const FrameworkID& _frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/080cbb6d/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 6a86d09..d5a4b81 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -1124,9 +1124,8 @@ struct Framework
 
   void checkpointFramework() const;
 
-  bool removePendingTask(
-      const TaskInfo& task,
-      const ExecutorInfo& executorInfo);
+  // Returns whether the pending task was removed.
+  bool removePendingTask(const TaskID& taskId);
 
   const FrameworkID id() const { return info.id(); }