You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/08/23 22:23:44 UTC

[01/13] mesos git commit: Fixed a bug where TASK_KILLED updates can be dropped by the agent.

Repository: mesos
Updated Branches:
  refs/heads/master f961c9d4a -> 0ce87cb46


Fixed a bug where TASK_KILLED updates can be dropped by the agent.

Per the description of MESOS-7863, there is currently an assumption
that when a pending task is killed, the framework will be stored in
the agent when the launch proceeds for the killed task. When this
assumption does not hold, the TASK_KILLED update will be dropped
due to the frameowrk being unknown when the launch proceeds. This
assumption doesn't hold in two cases:

  (1) Another pending task was killed and we removed the framework
      in 'Slave::run' thinking it was idle, because pending tasks
      was empty (we remove from pending tasks when processing the
      kill). (MESOS-7783 is an example instance of this).

  (2) The last executor terminated without tasks to send terminal
      updates for, or the last terminated executor received its
      last acknowledgement. At this point, we remove the framework
      thinking there were no pending tasks if the task was killed
      (removed from pending).

The fix applied here is to send the status updates from the kill
path rather than the launch path, to be consistent with how we kill
tasks queued within the Executor struct. We ensure that the task
is removed synchronously within the kill path to prevent its launch.

Review: https://reviews.apache.org/r/61645


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/45c2444a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/45c2444a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/45c2444a

Branch: refs/heads/master
Commit: 45c2444ad2eb7988813adae41325c9acd87e46d8
Parents: edd9b13
Author: Benjamin Mahler <bm...@apache.org>
Authored: Mon Aug 14 14:59:08 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:07 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       | 286 ++++++++++++++++++++++++++++-------------
 src/slave/slave.hpp       |  20 +++
 src/tests/slave_tests.cpp |  33 +++--
 3 files changed, 234 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/45c2444a/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index def507d..eab3617 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1813,12 +1813,18 @@ void Slave::run(
     }
   }
 
-  // We add the task/task group to 'pending' to ensure the framework is not
-  // removed and the framework and top level executor directories
-  // are not scheduled for deletion before '_run()' is called.
   CHECK_NOTNULL(framework);
-  foreach (const TaskInfo& _task, tasks) {
-    framework->pendingTasks[executorId][_task.task_id()] = _task;
+
+  // Track the pending task / task group to ensure the framework is
+  // not removed and the framework and top level executor directories
+  // are not scheduled for deletion before '_run()' is called.
+  //
+  // TODO(bmahler): Can we instead track pending tasks within the
+  // `Executor` struct by creating it earlier?
+  if (task.isSome()) {
+    framework->addPendingTask(executorId, task.get());
+  } else {
+    framework->addPendingTaskGroup(executorId, taskGroup.get());
   }
 
   // If we are about to create a new executor, unschedule the top
@@ -1903,48 +1909,27 @@ void Slave::_run(
     return;
   }
 
-  const ExecutorID& executorId = executorInfo.executor_id();
-
-  // If any of the tasks in the task group have been killed in the interim,
-  // we send a TASK_KILLED for all the other tasks in the group.
-  bool killed = false;
+  // Ignore the launch if killed in the interim. The invariant here
+  // is that all tasks in the group are still pending, or all were
+  // removed due to a kill arriving for one of the tasks in the group.
+  bool allPending = true;
+  bool allRemoved = true;
   foreach (const TaskInfo& _task, tasks) {
-    if (!framework->pendingTasks.contains(executorId) ||
-        !framework->pendingTasks.at(executorId).contains(_task.task_id())) {
-      killed = true;
-      break;
+    if (framework->isPending(_task.task_id())) {
+      allRemoved = false;
+    } else {
+      allPending = false;
     }
   }
 
-  if (killed) {
+  CHECK(allPending != allRemoved)
+    << "BUG: The task group " << taskOrTaskGroup(task, taskGroup)
+    << " was killed partially";
+
+  if (allRemoved) {
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " of framework " << frameworkId
                  << " because it has been killed in the meantime";
-
-    foreach (const TaskInfo& _task, tasks) {
-      framework->removePendingTask(_task.task_id());
-
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          TASK_KILLED,
-          TaskStatus::SOURCE_SLAVE,
-          UUID::random(),
-          "Killed before delivery to the executor",
-          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
-
-      // TODO(vinod): Ensure that the status update manager reliably
-      // delivers this update. Currently, we don't guarantee this
-      // because removal of the framework causes the status update
-      // manager to stop retrying for its un-acked updates.
-      statusUpdate(update, UPID());
-    }
-
-    if (framework->idle()) {
-      removeFramework(framework);
-    }
-
     return;
   }
 
@@ -2064,53 +2049,34 @@ void Slave::__run(
     return;
   }
 
-  // Remove the task/task group from being pending. If any of the
-  // tasks in the task group have been killed in the interim, we
-  // send a TASK_KILLED for all the other tasks in the group.
-  bool killed = false;
+  // Ignore the launch if killed in the interim. The invariant here
+  // is that all tasks in the group are still pending, or all were
+  // removed due to a kill arriving for one of the tasks in the group.
+  bool allPending = true;
+  bool allRemoved = true;
   foreach (const TaskInfo& _task, tasks) {
-    if (framework->removePendingTask(_task.task_id())) {
-      // NOTE: Ideally we would perform the following check here:
-      //
-      //   if (framework->idle()) {
-      //     removeFramework(framework);
-      //   }
-      //
-      // However, we need 'framework' to stay valid for the rest of
-      // this function. As such, we perform the check before each of
-      // the 'return' statements below.
+    if (framework->isPending(_task.task_id())) {
+      allRemoved = false;
     } else {
-      killed = true;
+      allPending = false;
     }
   }
 
-  if (killed) {
+  CHECK(allPending != allRemoved)
+    << "BUG: The task group " << taskOrTaskGroup(task, taskGroup)
+    << " was killed partially";
+
+  if (allRemoved) {
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " of framework " << frameworkId
                  << " because it has been killed in the meantime";
-
-    foreach (const TaskInfo& _task, tasks) {
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          TASK_KILLED,
-          TaskStatus::SOURCE_SLAVE,
-          UUID::random(),
-          "Killed before delivery to the executor",
-          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
-      statusUpdate(update, UPID());
-    }
-
-    // Refer to the comment after 'framework->removePendingTask' above
-    // for why we need this.
-    if (framework->idle()) {
-      removeFramework(framework);
-    }
-
     return;
   }
 
+  foreach (const TaskInfo& _task, tasks) {
+    CHECK(framework->removePendingTask(_task.task_id()));
+  }
+
   CHECK(!future.isDiscarded());
 
   // Validate that the task (or tasks in case of task group) are authorized
@@ -2982,20 +2948,54 @@ void Slave::killTask(
     return;
   }
 
-  // TODO(bmahler): Removing the task here is a bug: MESOS-7783.
-  bool removedWhilePending = framework->removePendingTask(taskId);
-
-  if (removedWhilePending) {
+  // If the task is pending, we send a TASK_KILLED immediately.
+  // This will trigger a synchronous removal of the pending task,
+  // which prevents it from being launched.
+  if (framework->isPending(taskId)) {
     LOG(WARNING) << "Killing task " << taskId
                  << " of framework " << frameworkId
                  << " before it was launched";
 
-    // We send the TASK_KILLED status update in `_run()` as the
-    // task being killed could be part of a task group and we
-    // don't store this information in `framework->pending`.
-    // We don't invoke `removeFramework()` here since we need the
-    // framework to be valid for sending the status update later.
-    return;
+    Option<TaskGroupInfo> taskGroup =
+      framework->getTaskGroupForPendingTask(taskId);
+
+    list<StatusUpdate> updates;
+    if (taskGroup.isSome()) {
+      foreach (const TaskInfo& task, taskGroup->tasks()) {
+        updates.push_back(protobuf::createStatusUpdate(
+            frameworkId,
+            info.id(),
+            task.task_id(),
+            TASK_KILLED,
+            TaskStatus::SOURCE_SLAVE,
+            UUID::random(),
+            "A task within the task group was killed before"
+            " delivery to the executor",
+            TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
+            CHECK_NOTNONE(
+                framework->getExecutorIdForPendingTask(task.task_id()))));
+      }
+    } else {
+      updates.push_back(protobuf::createStatusUpdate(
+          frameworkId,
+          info.id(),
+          taskId,
+          TASK_KILLED,
+          TaskStatus::SOURCE_SLAVE,
+          UUID::random(),
+          "Killed before delivery to the executor",
+          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
+          CHECK_NOTNONE(
+              framework->getExecutorIdForPendingTask(taskId))));
+    }
+
+    foreach (const StatusUpdate& update, updates) {
+      // NOTE: Sending a terminal update (TASK_KILLED) synchronously
+      // removes the task/task group from 'framework->pendingTasks'
+      // and 'framework->pendingTaskGroups', so that it will not be
+      // launched.
+      statusUpdate(update, UPID());
+    }
   }
 
   Executor* executor = framework->getExecutor(taskId);
@@ -4451,6 +4451,25 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
 
   const TaskStatus& status = update.status();
 
+  // For pending tasks, we must synchronously remove them
+  // to guarantee that the launch is prevented.
+  //
+  // TODO(bmahler): Ideally we store this task as terminated
+  // but with unacknowledged updates (same as the `Executor`
+  // struct does).
+  if (framework->isPending(status.task_id())) {
+    CHECK(framework->removePendingTask(status.task_id()));
+
+    if (framework->idle()) {
+      removeFramework(framework);
+    }
+
+    metrics.valid_status_updates++;
+
+    statusUpdateManager->update(update, info.id())
+      .onAny(defer(self(), &Slave::___statusUpdate, lambda::_1, update, pid));
+  }
+
   Executor* executor = framework->getExecutor(status.task_id());
   if (executor == nullptr) {
     LOG(WARNING)  << "Could not find the executor for "
@@ -7447,6 +7466,26 @@ void Framework::recoverExecutor(
 }
 
 
+void Framework::addPendingTask(
+    const ExecutorID& executorId,
+    const TaskInfo& task)
+{
+  pendingTasks[executorId][task.task_id()] = task;
+}
+
+
+void Framework::addPendingTaskGroup(
+    const ExecutorID& executorId,
+    const TaskGroupInfo& taskGroup)
+{
+  foreach (const TaskInfo& task, taskGroup.tasks()) {
+    pendingTasks[executorId][task.task_id()] = task;
+  }
+
+  pendingTaskGroups.push_back(taskGroup);
+}
+
+
 bool Framework::hasTask(const TaskID& taskId)
 {
   foreachkey (const ExecutorID& executorId, pendingTasks) {
@@ -7467,19 +7506,90 @@ bool Framework::hasTask(const TaskID& taskId)
 }
 
 
+bool Framework::isPending(const TaskID& taskId) const
+{
+  foreachkey (const ExecutorID& executorId, pendingTasks) {
+    if (pendingTasks.at(executorId).contains(taskId)) {
+      return true;
+    }
+  }
+
+  return false;
+}
+
+
+Option<TaskGroupInfo> Framework::getTaskGroupForPendingTask(
+    const TaskID& taskId)
+{
+  foreach (const TaskGroupInfo& taskGroup, pendingTaskGroups) {
+    foreach (const TaskInfo& taskInfo, taskGroup.tasks()) {
+      if (taskInfo.task_id() == taskId) {
+        return taskGroup;
+      }
+    }
+  }
+
+  return None();
+}
+
+
 bool Framework::removePendingTask(const TaskID& taskId)
 {
+  bool removed = false;
+
   foreachkey (const ExecutorID& executorId, pendingTasks) {
     if (pendingTasks.at(executorId).contains(taskId)) {
       pendingTasks.at(executorId).erase(taskId);
       if (pendingTasks.at(executorId).empty()) {
         pendingTasks.erase(executorId);
       }
-      return true;
+
+      removed = true;
+      break;
     }
   }
 
-  return false;
+  // We also remove the pending task group if all of its
+  // tasks have been removed.
+  for (auto it = pendingTaskGroups.begin();
+       it != pendingTaskGroups.end();
+       ++it) {
+    foreach (const TaskInfo& t, it->tasks()) {
+      if (t.task_id() == taskId) {
+        // Found its task group, check if all tasks within
+        // the group have been removed.
+        bool allRemoved = true;
+
+        foreach (const TaskInfo& t_, it->tasks()) {
+          if (hasTask(t_.task_id())) {
+            allRemoved = false;
+            break;
+          }
+        }
+
+        if (allRemoved) {
+          pendingTaskGroups.erase(it);
+        }
+
+        return removed;
+      }
+    }
+  }
+
+  return removed;
+}
+
+
+Option<ExecutorID> Framework::getExecutorIdForPendingTask(
+    const TaskID& taskId) const
+{
+  foreachkey (const ExecutorID& executorId, pendingTasks) {
+    if (pendingTasks.at(executorId).contains(taskId)) {
+      return executorId;
+    }
+  }
+
+  return None();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/45c2444a/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index f909463..4058381 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -875,11 +875,26 @@ public:
       bool recheckpointExecutor,
       const hashset<TaskID>& tasksToRecheckpoint);
 
+  void addPendingTask(
+      const ExecutorID& executorId,
+      const TaskInfo& task);
+
+  // Note that these tasks will also be tracked within `pendingTasks`.
+  void addPendingTaskGroup(
+      const ExecutorID& executorId,
+      const TaskGroupInfo& taskGroup);
+
   bool hasTask(const TaskID& taskId);
+  bool isPending(const TaskID& taskId) const;
+
+  // Returns the task group associated with a pending task.
+  Option<TaskGroupInfo> getTaskGroupForPendingTask(const TaskID& taskId);
 
   // Returns whether the pending task was removed.
   bool removePendingTask(const TaskID& taskId);
 
+  Option<ExecutorID> getExecutorIdForPendingTask(const TaskID& taskId) const;
+
   Resources allocatedResources() const;
 
   enum State
@@ -914,6 +929,11 @@ public:
   // Executors with pending tasks.
   hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pendingTasks;
 
+  // Pending task groups. This is needed for correctly sending
+  // TASK_KILLED status updates for all tasks in the group if
+  // any of the tasks are killed while pending.
+  std::list<TaskGroupInfo> pendingTaskGroups;
+
   // Current running executors.
   hashmap<ExecutorID, Executor*> executors;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/45c2444a/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 8b85f93..e650f31 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -4065,23 +4065,22 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
     .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_killTask),
                     FutureSatisfy(&killTask)));
 
-  driver.killTask(task.task_id());
-
-  AWAIT_READY(killTask);
-
-  // Since this is the only task ever for this framework, the
-  // framework should get removed in Slave::_run().
-  // Thus we can observe that this happens before Shutdown().
   Future<Nothing> removeFramework;
   EXPECT_CALL(slave, removeFramework(_))
     .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework),
                     FutureSatisfy(&removeFramework)));
 
-  slave.unmocked__run(
-      future, frameworkInfo, executorInfo, task_, taskGroup);
+  driver.killTask(task.task_id());
 
+  AWAIT_READY(killTask);
+
+  // The agent will remove the framework when killing this task
+  // since there remain no more tasks.
   AWAIT_READY(removeFramework);
 
+  slave.unmocked__run(
+      future, frameworkInfo, executorInfo, task_, taskGroup);
+
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
 
@@ -6967,6 +6966,13 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
     .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_killTask),
                     FutureSatisfy(&killTask)));
 
+  // Since this is the only task group for this framework, the
+  // framework should get removed when the task is killed.
+  Future<Nothing> removeFramework;
+  EXPECT_CALL(slave, removeFramework(_))
+    .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework),
+                    FutureSatisfy(&removeFramework)));
+
   {
     Call call;
     call.mutable_framework_id()->CopyFrom(frameworkId);
@@ -6981,18 +6987,11 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
 
   AWAIT_READY(killTask);
 
-  // Since this is the only task group for this framework, the
-  // framework should get removed in `Slave::_run()`.
-  Future<Nothing> removeFramework;
-  EXPECT_CALL(slave, removeFramework(_))
-    .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework),
-                    FutureSatisfy(&removeFramework)));
+  AWAIT_READY(removeFramework);
 
   slave.unmocked__run(
       future, frameworkInfo, executorInfo_, task_, taskGroup_);
 
-  AWAIT_READY(removeFramework);
-
   AWAIT_READY(update1);
   AWAIT_READY(update2);
 


[07/13] mesos git commit: Renamed Framework::pending to Framework::pendingTasks in agent.

Posted by bm...@apache.org.
Renamed Framework::pending to Framework::pendingTasks in agent.

Review: https://reviews.apache.org/r/61643


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a6e748c5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a6e748c5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a6e748c5

Branch: refs/heads/master
Commit: a6e748c58c6f342b63191440303ac24697636660
Parents: 509147d
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 4 16:56:40 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:07 2017 -0700

----------------------------------------------------------------------
 src/slave/http.cpp  |  2 +-
 src/slave/slave.cpp | 30 +++++++++++++++---------------
 src/slave/slave.hpp |  2 +-
 3 files changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a6e748c5/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index a828c5f..7fb0948 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -1703,7 +1703,7 @@ mesos::agent::Response::GetTasks Http::_getTasks(
   foreach (const Framework* framework, frameworks) {
     // Pending tasks.
     typedef hashmap<TaskID, TaskInfo> TaskMap;
-    foreachvalue (const TaskMap& taskInfos, framework->pending) {
+    foreachvalue (const TaskMap& taskInfos, framework->pendingTasks) {
       foreachvalue (const TaskInfo& taskInfo, taskInfos) {
         // Skip unauthorized tasks.
         if (!approveViewTaskInfo(tasksApprover, taskInfo, framework->info)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/a6e748c5/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 05505a6..def507d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1489,7 +1489,7 @@ void Slave::doReliableRegistration(Duration maxBackoff)
       // pending tasks, and we need to send exited events if they
       // cannot be launched, see MESOS-1715, MESOS-1720, MESOS-1800.
       typedef hashmap<TaskID, TaskInfo> TaskMap;
-      foreachvalue (const TaskMap& tasks, framework->pending) {
+      foreachvalue (const TaskMap& tasks, framework->pendingTasks) {
         foreachvalue (const TaskInfo& task, tasks) {
           message.add_tasks()->CopyFrom(protobuf::createTask(
               task, TASK_STAGING, framework->id()));
@@ -1818,7 +1818,7 @@ void Slave::run(
   // are not scheduled for deletion before '_run()' is called.
   CHECK_NOTNULL(framework);
   foreach (const TaskInfo& _task, tasks) {
-    framework->pending[executorId][_task.task_id()] = _task;
+    framework->pendingTasks[executorId][_task.task_id()] = _task;
   }
 
   // If we are about to create a new executor, unschedule the top
@@ -1909,8 +1909,8 @@ void Slave::_run(
   // we send a TASK_KILLED for all the other tasks in the group.
   bool killed = false;
   foreach (const TaskInfo& _task, tasks) {
-    if (!framework->pending.contains(executorId) ||
-        !framework->pending.at(executorId).contains(_task.task_id())) {
+    if (!framework->pendingTasks.contains(executorId) ||
+        !framework->pendingTasks.at(executorId).contains(_task.task_id())) {
       killed = true;
       break;
     }
@@ -5500,7 +5500,7 @@ void Slave::removeExecutor(Framework* framework, Executor* executor)
 
   // Schedule the top level executor work directory, only if the
   // framework doesn't have any 'pending' tasks for this executor.
-  if (!framework->pending.contains(executor->id)) {
+  if (!framework->pendingTasks.contains(executor->id)) {
     const string path = paths::getExecutorPath(
         flags.work_dir, info.id(), framework->id(), executor->id);
 
@@ -5530,7 +5530,7 @@ void Slave::removeExecutor(Framework* framework, Executor* executor)
 
     // Schedule the top level executor meta directory, only if the
     // framework doesn't have any 'pending' tasks for this executor.
-    if (!framework->pending.contains(executor->id)) {
+    if (!framework->pendingTasks.contains(executor->id)) {
       const string path = paths::getExecutorPath(
           metaDir, info.id(), framework->id(), executor->id);
 
@@ -6884,7 +6884,7 @@ double Slave::_tasks_staging()
   double count = 0.0;
   foreachvalue (Framework* framework, frameworks) {
     typedef hashmap<TaskID, TaskInfo> TaskMap;
-    foreachvalue (const TaskMap& tasks, framework->pending) {
+    foreachvalue (const TaskMap& tasks, framework->pendingTasks) {
       count += tasks.size();
     }
 
@@ -7449,8 +7449,8 @@ void Framework::recoverExecutor(
 
 bool Framework::hasTask(const TaskID& taskId)
 {
-  foreachkey (const ExecutorID& executorId, pending) {
-    if (pending[executorId].contains(taskId)) {
+  foreachkey (const ExecutorID& executorId, pendingTasks) {
+    if (pendingTasks[executorId].contains(taskId)) {
       return true;
     }
   }
@@ -7469,11 +7469,11 @@ bool Framework::hasTask(const TaskID& taskId)
 
 bool Framework::removePendingTask(const TaskID& taskId)
 {
-  foreachkey (const ExecutorID& executorId, pending) {
-    if (pending.at(executorId).contains(taskId)) {
-      pending.at(executorId).erase(taskId);
-      if (pending.at(executorId).empty()) {
-        pending.erase(executorId);
+  foreachkey (const ExecutorID& executorId, pendingTasks) {
+    if (pendingTasks.at(executorId).contains(taskId)) {
+      pendingTasks.at(executorId).erase(taskId);
+      if (pendingTasks.at(executorId).empty()) {
+        pendingTasks.erase(executorId);
       }
       return true;
     }
@@ -7494,7 +7494,7 @@ Resources Framework::allocatedResources() const
   hashset<ExecutorID> pendingExecutors;
 
   typedef hashmap<TaskID, TaskInfo> TaskMap;
-  foreachvalue (const TaskMap& pendingTasks, pending) {
+  foreachvalue (const TaskMap& pendingTasks, pendingTasks) {
     foreachvalue (const TaskInfo& task, pendingTasks) {
       allocated += task.resources();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a6e748c5/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 67ffdc9..f909463 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -912,7 +912,7 @@ public:
   // being bypassed, and provide public views into them.
 
   // Executors with pending tasks.
-  hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pending;
+  hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pendingTasks;
 
   // Current running executors.
   hashmap<ExecutorID, Executor*> executors;


[13/13] mesos git commit: Marked Framework::hasTask in agent as const.

Posted by bm...@apache.org.
Marked Framework::hasTask in agent as const.

Review: https://reviews.apache.org/r/61648


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/12026b8b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/12026b8b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/12026b8b

Branch: refs/heads/master
Commit: 12026b8bf2322d4b64bb087bd8c3972972632de2
Parents: 10b2a45
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 4 16:32:03 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:45 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 4 ++--
 src/slave/slave.hpp | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/12026b8b/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index eab3617..6ee8187 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7486,10 +7486,10 @@ void Framework::addPendingTaskGroup(
 }
 
 
-bool Framework::hasTask(const TaskID& taskId)
+bool Framework::hasTask(const TaskID& taskId) const
 {
   foreachkey (const ExecutorID& executorId, pendingTasks) {
-    if (pendingTasks[executorId].contains(taskId)) {
+    if (pendingTasks.at(executorId).contains(taskId)) {
       return true;
     }
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/12026b8b/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 1f0e826..28fa671 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -892,7 +892,7 @@ public:
       const ExecutorID& executorId,
       const TaskGroupInfo& taskGroup);
 
-  bool hasTask(const TaskID& taskId);
+  bool hasTask(const TaskID& taskId) const;
   bool isPending(const TaskID& taskId) const;
 
   // Returns the task group associated with a pending task.


[09/13] mesos git commit: Added a test to verify a fix for MESOS-7863.

Posted by bm...@apache.org.
Added a test to verify a fix for MESOS-7863.

This test ensures that the agent does not drop the TASK_KILLED
update when all pending tasks from a framework are killed.
See MESOS-7863 for the motivating issue.

Review: https://reviews.apache.org/r/61646


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/658992ca
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/658992ca
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/658992ca

Branch: refs/heads/master
Commit: 658992cad0174f223d7eaeb30d2eca13845ca6a7
Parents: 45c2444
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 4 19:32:12 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:45 2017 -0700

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 130 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 130 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/658992ca/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index e650f31..57679bd 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -32,6 +32,7 @@
 #include <mesos/authentication/http/basic_authenticator_factory.hpp>
 
 #include <process/clock.hpp>
+#include <process/collect.hpp>
 #include <process/future.hpp>
 #include <process/gmock.hpp>
 #include <process/http.hpp>
@@ -4092,6 +4093,135 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
 }
 
 
+// This test ensures was added due to MESOS-7863, where the
+// agent previously dropped TASK_KILLED in the cases outlined
+// in the issue.
+TEST_F(SlaveTest, KillMultiplePendingTasks)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  MockSlave slave(CreateSlaveFlags(), &detector, &containerizer);
+  spawn(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers->size());
+
+  // We only pause the clock after receiving the offer since the
+  // agent uses a delay to re-register.
+  //
+  // TODO(bmahler): Remove the initial agent delay within the tests.
+  Clock::pause();
+
+  Resources taskResources = Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  TaskInfo task1 = createTask(
+      offers->at(0).slave_id(), taskResources, "echo hi");
+
+  TaskInfo task2 = createTask(
+      offers->at(0).slave_id(), taskResources, "echo hi");
+
+  EXPECT_CALL(containerizer, launch(_, _, _, _))
+    .Times(0);
+
+  Future<TaskStatus> status1, status2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status1))
+    .WillOnce(FutureArg<1>(&status2));
+
+  EXPECT_CALL(slave, runTask(_, _, _, _, _))
+    .WillOnce(Invoke(&slave, &MockSlave::unmocked_runTask))
+    .WillOnce(Invoke(&slave, &MockSlave::unmocked_runTask));
+
+  // Skip what Slave::_run() normally does, save its arguments for
+  // later, tie reaching the critical moment when to kill the task to
+  // a future.
+  Future<bool> future1, future2;
+  FrameworkInfo frameworkInfo1, frameworkInfo2;
+  ExecutorInfo executorInfo1, executorInfo2;
+  Option<TaskGroupInfo> taskGroup1, taskGroup2;
+  Option<TaskInfo> task_1, task_2;
+
+  Future<Nothing> _run1, _run2;
+  EXPECT_CALL(slave, _run(_, _, _, _, _))
+    .WillOnce(DoAll(FutureSatisfy(&_run1),
+                    SaveArg<0>(&future1),
+                    SaveArg<1>(&frameworkInfo1),
+                    SaveArg<2>(&executorInfo1),
+                    SaveArg<3>(&task_1),
+                    SaveArg<4>(&taskGroup1)))
+    .WillOnce(DoAll(FutureSatisfy(&_run2),
+                    SaveArg<0>(&future2),
+                    SaveArg<1>(&frameworkInfo2),
+                    SaveArg<2>(&executorInfo2),
+                    SaveArg<3>(&task_2),
+                    SaveArg<4>(&taskGroup2)));
+
+  driver.launchTasks(offers.get()[0].id(), {task1, task2});
+
+  AWAIT_READY(process::await(_run1, _run2));
+
+  Future<Nothing> killTask1, killTask2;
+  EXPECT_CALL(slave, killTask(_, _))
+    .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_killTask),
+                    FutureSatisfy(&killTask1)))
+    .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_killTask),
+                    FutureSatisfy(&killTask2)));
+
+  Future<Nothing> removeFramework;
+  EXPECT_CALL(slave, removeFramework(_))
+    .WillOnce(DoAll(Invoke(&slave, &MockSlave::unmocked_removeFramework),
+                    FutureSatisfy(&removeFramework)));
+
+  driver.killTask(task1.task_id());
+  driver.killTask(task2.task_id());
+
+  AWAIT_READY(process::await(killTask1, killTask2));
+
+  // We expect the tasks to be killed and framework removed.
+  AWAIT_READY(status1);
+  EXPECT_EQ(TASK_KILLED, status1->state());
+
+  AWAIT_READY(status2);
+  EXPECT_EQ(TASK_KILLED, status2->state());
+
+  AWAIT_READY(removeFramework);
+
+  // The `__run` continuations should have no effect.
+  slave.unmocked__run(
+      future1, frameworkInfo1, executorInfo1, task_1, taskGroup1);
+
+  slave.unmocked__run(
+      future2, frameworkInfo2, executorInfo2, task_2, taskGroup2);
+
+  Clock::settle();
+
+  driver.stop();
+  driver.join();
+
+  terminate(slave);
+  wait(slave);
+}
+
+
 // This test verifies that when the agent gets a `killTask`
 // message for a queued task on a registering executor, a
 // the agent will generate a TASK_KILLED and will shut down


[02/13] mesos git commit: Improved the reason and message for tasks killed before delivery.

Posted by bm...@apache.org.
Improved the reason and message for tasks killed before delivery.

Previously, if a task was killed before delivery to a registering
executor, we sent REASON_EXECUTOR_UNREGISTERED with a message of
"Unregistered executor" in the agent, and the master sent no reason.
This introduces a REASON_TASK_KILLED_BEFORE_DELIVERY to clarify
this case to frameworks.

Review: https://reviews.apache.org/r/61640


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ba45457e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ba45457e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ba45457e

Branch: refs/heads/master
Commit: ba45457e915331b56b5fb1c76868fa8931bc783f
Parents: 0b9c3de
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 11 13:38:08 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:07 2017 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto                |  3 ++-
 include/mesos/v1/mesos.proto             |  3 ++-
 src/master/master.cpp                    |  6 ++++--
 src/slave/slave.cpp                      | 23 +++++++++++++----------
 src/tests/master_authorization_tests.cpp |  7 ++++++-
 src/tests/slave_recovery_tests.cpp       | 11 ++++++-----
 src/tests/slave_tests.cpp                | 10 ++++++----
 7 files changed, 39 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ba45457e/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index d91c968..86936cf 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -2163,7 +2163,7 @@ message TaskStatus {
     REASON_EXECUTOR_REGISTRATION_TIMEOUT = 23;
     REASON_EXECUTOR_REREGISTRATION_TIMEOUT = 24;
     REASON_EXECUTOR_TERMINATED = 1;
-    REASON_EXECUTOR_UNREGISTERED = 2;
+    REASON_EXECUTOR_UNREGISTERED = 2; // No longer used.
     REASON_FRAMEWORK_REMOVED = 3;
     REASON_GC_ERROR = 4;
     REASON_INVALID_FRAMEWORKID = 5;
@@ -2176,6 +2176,7 @@ message TaskStatus {
     REASON_SLAVE_REMOVED = 11;
     REASON_SLAVE_RESTARTED = 12;
     REASON_SLAVE_UNKNOWN = 13;
+    REASON_TASK_KILLED_DURING_LAUNCH = 30;
     REASON_TASK_CHECK_STATUS_UPDATED = 28;
     REASON_TASK_HEALTH_CHECK_STATUS_UPDATED = 29;
     REASON_TASK_GROUP_INVALID = 25;

http://git-wip-us.apache.org/repos/asf/mesos/blob/ba45457e/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index 6a1d011..20d9ecf 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -2146,7 +2146,7 @@ message TaskStatus {
     REASON_EXECUTOR_REGISTRATION_TIMEOUT = 23;
     REASON_EXECUTOR_REREGISTRATION_TIMEOUT = 24;
     REASON_EXECUTOR_TERMINATED = 1;
-    REASON_EXECUTOR_UNREGISTERED = 2;
+    REASON_EXECUTOR_UNREGISTERED = 2; // No longer used.
     REASON_FRAMEWORK_REMOVED = 3;
     REASON_GC_ERROR = 4;
     REASON_INVALID_FRAMEWORKID = 5;
@@ -2159,6 +2159,7 @@ message TaskStatus {
     REASON_AGENT_REMOVED = 11;
     REASON_AGENT_RESTARTED = 12;
     REASON_AGENT_UNKNOWN = 13;
+    REASON_TASK_KILLED_DURING_LAUNCH = 30;
     REASON_TASK_CHECK_STATUS_UPDATED = 28;
     REASON_TASK_HEALTH_CHECK_STATUS_UPDATED = 29;
     REASON_TASK_GROUP_INVALID = 25;

http://git-wip-us.apache.org/repos/asf/mesos/blob/ba45457e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 7f38a5e..a9d2191 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4951,7 +4951,8 @@ void Master::_accept(
                   TaskStatus::SOURCE_MASTER,
                   None(),
                   "A task within the task group was killed before"
-                  " delivery to the agent");
+                  " delivery to the agent",
+                  TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
 
               metrics->tasks_killed++;
 
@@ -5320,7 +5321,8 @@ void Master::kill(Framework* framework, const scheduler::Call::Kill& kill)
         TASK_KILLED,
         TaskStatus::SOURCE_MASTER,
         None(),
-        "Killed pending task");
+        "Killed before delivery to the agent",
+        TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
 
     forward(update, UPID(), framework);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ba45457e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 1521d5d..6ffecdc 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1931,7 +1931,8 @@ void Slave::_run(
           TASK_KILLED,
           TaskStatus::SOURCE_SLAVE,
           UUID::random(),
-          "Task killed before it was launched");
+          "Killed before delivery to the executor",
+          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
 
       // TODO(vinod): Ensure that the status update manager reliably
       // delivers this update. Currently, we don't guarantee this
@@ -2097,7 +2098,8 @@ void Slave::__run(
           TASK_KILLED,
           TaskStatus::SOURCE_SLAVE,
           UUID::random(),
-          "Task killed before it was launched");
+          "Killed before delivery to the executor",
+          TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH);
       statusUpdate(update, UPID());
     }
 
@@ -3048,8 +3050,9 @@ void Slave::killTask(
               TASK_KILLED,
               TaskStatus::SOURCE_SLAVE,
               UUID::random(),
-              "Unregistered executor",
-              TaskStatus::REASON_EXECUTOR_UNREGISTERED,
+              "A task within the task group was killed before"
+              " delivery to the executor",
+              TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
               executor->id));
         }
       } else {
@@ -3060,8 +3063,8 @@ void Slave::killTask(
             TASK_KILLED,
             TaskStatus::SOURCE_SLAVE,
             UUID::random(),
-            "Unregistered executor",
-            TaskStatus::REASON_EXECUTOR_UNREGISTERED,
+            "Killed before delivery to the executor",
+            TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
             executor->id));
       }
 
@@ -3105,8 +3108,8 @@ void Slave::killTask(
                 TASK_KILLED,
                 TaskStatus::SOURCE_SLAVE,
                 UUID::random(),
-                "Task killed while it was queued",
-                None(),
+                "Killed before delivery to the executor",
+                TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
                 executor->id));
           }
         } else {
@@ -3117,8 +3120,8 @@ void Slave::killTask(
               TASK_KILLED,
               TaskStatus::SOURCE_SLAVE,
               UUID::random(),
-              "Task killed while it was queued",
-              None(),
+              "Killed before delivery to the executor",
+              TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
               executor->id));
         }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ba45457e/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp
index b41b4a1..a776a58 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -408,6 +408,7 @@ TEST_F(MasterAuthorizationTest, KillTask)
   // Framework should get a TASK_KILLED right away.
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
+  EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, status->reason());
 
   Future<Nothing> recoverResources =
     FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
@@ -530,7 +531,9 @@ TEST_F(MasterAuthorizationTest, KillPendingTaskInTaskGroup)
   AWAIT_READY(task1Status);
   EXPECT_EQ(TASK_KILLED, task1Status->state());
   EXPECT_TRUE(strings::contains(
-      task1Status->message(), "Killed pending task"));
+      task1Status->message(), "Killed before delivery to the agent"));
+  EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
+            task1Status->reason());
 
   Future<Nothing> recoverResources =
     FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
@@ -544,6 +547,8 @@ TEST_F(MasterAuthorizationTest, KillPendingTaskInTaskGroup)
   EXPECT_TRUE(strings::contains(
       task2Status->message(),
       "A task within the task group was killed before delivery to the agent"));
+  EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH,
+            task2Status->reason());
 
   // No task launch should happen resulting in all resources being
   // returned to the allocator.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ba45457e/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 4b3069f..9aa0a51 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -1298,10 +1298,11 @@ TYPED_TEST(SlaveRecoveryTest, RecoverUnregisteredExecutor)
 }
 
 
-// This test verifies that when the agent gets a `killTask` message and restarts
-// before the executor registers, a TASK_KILLED update is sent and the executor
-// shuts down.
-TYPED_TEST(SlaveRecoveryTest, KillTaskUnregisteredExecutor)
+// This test verifies that when the agent gets a `killTask`
+// message for a queued task on a registering executor, a
+// restart of the agent will generate a TASK_KILLED and
+// will shut down the executor.
+TYPED_TEST(SlaveRecoveryTest, KillQueuedTaskDuringExecutorRegistration)
 {
   Try<Owned<cluster::Master>> master = this->StartMaster();
   ASSERT_SOME(master);
@@ -1361,7 +1362,7 @@ TYPED_TEST(SlaveRecoveryTest, KillTaskUnregisteredExecutor)
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
-  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_UNREGISTERED, status->reason());
+  EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, status->reason());
 
   slave.get()->terminate();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ba45457e/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index c138914..8b85f93 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -4093,9 +4093,11 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
 }
 
 
-// This test ensures that if a `killTask()` for an executor is received by the
-// agent before the executor registers, the executor is properly cleaned up.
-TEST_F(SlaveTest, KillTaskUnregisteredExecutor)
+// This test verifies that when the agent gets a `killTask`
+// message for a queued task on a registering executor, a
+// the agent will generate a TASK_KILLED and will shut down
+// the executor.
+TEST_F(SlaveTest, KillQueuedTaskDuringExecutorRegistration)
 {
   // Start a master.
   Try<Owned<cluster::Master>> master = StartMaster();
@@ -4163,7 +4165,7 @@ TEST_F(SlaveTest, KillTaskUnregisteredExecutor)
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
-  EXPECT_EQ(TaskStatus::REASON_EXECUTOR_UNREGISTERED, status->reason());
+  EXPECT_EQ(TaskStatus::REASON_TASK_KILLED_DURING_LAUNCH, status->reason());
 
   // Now let the executor register by spoofing the message.
   RegisterExecutorMessage registerExecutor;


[03/13] mesos git commit: Fixed a bug where the agent kills and still launches a task.

Posted by bm...@apache.org.
Fixed a bug where the agent kills and still launches a task.

The following race leads to the agent both killing and launching a task:

  (1) Slave::__run completes, task is now within Executor::queuedTasks.
  (2) Slave::killTask locates the executor based on the task ID residing
      in queuedTasks, calls Slave::statusUpdate() with TASK_KILLED.
  (3) Slave::___run assumes that killed tasks have been removed from
      Executor::queuedTasks, but this now occurs asynchronously in
      Slave::_statusUpdate. So, the agent still sees the queued task
      and delivers it and adds the task to Executor::launchedTasks.
  (3) Slave::_statusUpdate runs, removes the task from
      Executor::launchedTasks and adds it to Executor::terminatedTasks.

The fix applied here is to synchronously transition queued tasks to
a terminal state when statusUpdate is called. This can be done because
for queued tasks, we do not need to retrieve the container status (the
task never reached the container).

Review: https://reviews.apache.org/r/61639


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0b9c3ded
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0b9c3ded
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0b9c3ded

Branch: refs/heads/master
Commit: 0b9c3dedb04e9bf2c3d1f1663cf9cd4f47cb674b
Parents: f961c9d
Author: Benjamin Mahler <bm...@apache.org>
Authored: Thu Aug 10 18:34:15 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:07 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 82 ++++++++++++++++++++++++++++--------------------
 src/slave/slave.hpp |  2 +-
 2 files changed, 49 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0b9c3ded/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 50d2a10..1521d5d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -4516,26 +4516,36 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
 
   metrics.valid_status_updates++;
 
-  // Before sending update, we need to retrieve the container status.
-  //
-  // NOTE: If the executor sets the ContainerID inside the
-  // ContainerStatus, that indicates that the Task this status update
-  // is associated with is tied to that container (could be nested).
-  // Therefore, we need to get the status of that container, instead
-  // of the top level executor container.
-  ContainerID containerId = executor->containerId;
-  if (update.status().has_container_status() &&
-      update.status().container_status().has_container_id()) {
-    containerId = update.status().container_status().container_id();
-  }
-
-  containerizer->status(containerId)
-    .onAny(defer(self(),
-                 &Slave::_statusUpdate,
-                 update,
-                 pid,
-                 executor->id,
-                 lambda::_1));
+  // Before sending update, we need to retrieve the container status
+  // if the task reached the executor. For tasks that are queued, we
+  // do not need to send the container status and we must
+  // synchronously transition the task to ensure that it is removed
+  // from the queued tasks before the run task path continues.
+  if (executor->queuedTasks.contains(status.task_id())) {
+    CHECK(protobuf::isTerminalState(status.state()))
+        << "Queued tasks can only be transitioned to terminal states";
+
+    _statusUpdate(update, pid, executor->id, None());
+  } else {
+    // NOTE: If the executor sets the ContainerID inside the
+    // ContainerStatus, that indicates that the Task this status update
+    // is associated with is tied to that container (could be nested).
+    // Therefore, we need to get the status of that container, instead
+    // of the top level executor container.
+    ContainerID containerId = executor->containerId;
+    if (update.status().has_container_status() &&
+        update.status().container_status().has_container_id()) {
+      containerId = update.status().container_status().container_id();
+    }
+
+    containerizer->status(containerId)
+      .onAny(defer(self(),
+                   &Slave::_statusUpdate,
+                   update,
+                   pid,
+                   executor->id,
+                   lambda::_1));
+  }
 }
 
 
@@ -4543,26 +4553,26 @@ void Slave::_statusUpdate(
     StatusUpdate update,
     const Option<process::UPID>& pid,
     const ExecutorID& executorId,
-    const Future<ContainerStatus>& future)
+    const Option<Future<ContainerStatus>>& containerStatus)
 {
-  ContainerStatus* containerStatus =
-    update.mutable_status()->mutable_container_status();
-
   // There can be cases where a container is already removed from the
   // containerizer before the `status` call is dispatched to the
   // containerizer, leading to the failure of the returned `Future`.
   // In such a case we should simply not update the `ContainerStatus`
   // with the return `Future` but continue processing the
   // `StatusUpdate`.
-  if (future.isReady()) {
-    containerStatus->MergeFrom(future.get());
+  if (containerStatus.isSome() && containerStatus->isReady()) {
+    ContainerStatus* status =
+      update.mutable_status()->mutable_container_status();
+
+    status->MergeFrom(containerStatus->get());
 
     // Fill in the container IP address with the IP from the agent
     // PID, if not already filled in.
     //
     // TODO(karya): Fill in the IP address by looking up the executor PID.
-    if (containerStatus->network_infos().size() == 0) {
-      NetworkInfo* networkInfo = containerStatus->add_network_infos();
+    if (status->network_infos().size() == 0) {
+      NetworkInfo* networkInfo = status->add_network_infos();
       NetworkInfo::IPAddress* ipAddress = networkInfo->add_ip_addresses();
 
       // Set up IPv4 address.
@@ -5385,18 +5395,22 @@ void Slave::executorTerminated(
       // the status update manager should have already cleaned up all the
       // status update streams for a framework that is terminating.
       if (framework->state != Framework::TERMINATING) {
-        // Transition all live launched tasks.
-        foreachvalue (Task* task, executor->launchedTasks) {
+        // Transition all live launched tasks. Note that the map is
+        // removed from within the loop due terminal status updates.
+        foreach (const TaskID& taskId, executor->launchedTasks.keys()) {
+          Task* task = executor->launchedTasks.at(taskId);
+
           if (!protobuf::isTerminalState(task->state())) {
             sendExecutorTerminatedStatusUpdate(
-                task->task_id(), termination, frameworkId, executor);
+                taskId, termination, frameworkId, executor);
           }
         }
 
-        // Transition all queued tasks.
-        foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+        // Transition all queued tasks. Note that the map is removed
+        // from within the loop due terminal status updates.
+        foreach (const TaskID& taskId, executor->queuedTasks.keys()) {
           sendExecutorTerminatedStatusUpdate(
-              task.task_id(), termination, frameworkId, executor);
+              taskId, termination, frameworkId, executor);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0b9c3ded/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0e07a1a..ca9f3da 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -252,7 +252,7 @@ public:
       StatusUpdate update,
       const Option<process::UPID>& pid,
       const ExecutorID& executorId,
-      const process::Future<ContainerStatus>& future);
+      const Option<process::Future<ContainerStatus>>& containerStatus);
 
   // Continue handling the status update after optionally updating the
   // container's resources.


[08/13] mesos git commit: Unschedule directories for GC in parallel in the agent.

Posted by bm...@apache.org.
Unschedule directories for GC in parallel in the agent.

Previously, when unscheduling directories for garbage collection,
a dispatch back to the agent was required between each gc result.

Review: https://reviews.apache.org/r/61649


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d7c62b84
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d7c62b84
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d7c62b84

Branch: refs/heads/master
Commit: d7c62b8413f9018371e6e1dc30a292a54973d19d
Parents: 12026b8
Author: Benjamin Mahler <bm...@apache.org>
Authored: Mon Aug 14 14:54:22 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:45 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       | 43 ++++++++++++++++++------------------------
 src/slave/slave.hpp       |  4 +---
 src/tests/mock_slave.cpp  |  4 ++--
 src/tests/mock_slave.hpp  |  4 ++--
 src/tests/slave_tests.cpp | 23 +++++++++++-----------
 5 files changed, 35 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d7c62b84/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6ee8187..b5564bb 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1596,14 +1596,6 @@ void Slave::doReliableRegistration(Duration maxBackoff)
 }
 
 
-// Helper to unschedule the path.
-// TODO(vinod): Can we avoid this helper?
-Future<bool> Slave::unschedule(const string& path)
-{
-  return gc->unschedule(path);
-}
-
-
 // TODO(vinod): Instead of crashing the slave on checkpoint errors,
 // send TASK_LOST to the framework.
 void Slave::runTask(
@@ -1742,7 +1734,7 @@ void Slave::run(
     return;
   }
 
-  Future<bool> unschedule = true;
+  list<Future<bool>> unschedules;
 
   // If we are about to create a new framework, unschedule the work
   // and meta directories from getting gc'ed.
@@ -1753,13 +1745,13 @@ void Slave::run(
         flags.work_dir, info.id(), frameworkId);
 
     if (os::exists(path)) {
-      unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+      unschedules.push_back(gc->unschedule(path));
     }
 
     // Unschedule framework meta directory.
     path = paths::getFrameworkPath(metaDir, info.id(), frameworkId);
     if (os::exists(path)) {
-      unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+      unschedules.push_back(gc->unschedule(path));
     }
 
     Option<UPID> frameworkPid = None();
@@ -1836,31 +1828,31 @@ void Slave::run(
         flags.work_dir, info.id(), frameworkId, executorId);
 
     if (os::exists(path)) {
-      unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+      unschedules.push_back(gc->unschedule(path));
     }
 
     // Unschedule executor meta directory.
     path = paths::getExecutorPath(metaDir, info.id(), frameworkId, executorId);
 
     if (os::exists(path)) {
-      unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+      unschedules.push_back(gc->unschedule(path));
     }
   }
 
   // Run the task after the unschedules are done.
-  unschedule.onAny(defer(
-      self(),
-      &Self::_run,
-      lambda::_1,
-      frameworkInfo,
-      executorInfo,
-      task,
-      taskGroup));
+  collect(unschedules)
+    .onAny(defer(self(),
+                 &Self::_run,
+                 lambda::_1,
+                 frameworkInfo,
+                 executorInfo,
+                 task,
+                 taskGroup));
 }
 
 
 void Slave::_run(
-    const Future<bool>& future,
+    const Future<list<bool>>& unschedules,
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& task,
@@ -1933,11 +1925,12 @@ void Slave::_run(
     return;
   }
 
-  CHECK(!future.isDiscarded());
+  CHECK(!unschedules.isDiscarded());
 
-  if (!future.isReady()) {
+  if (!unschedules.isReady()) {
     LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
-               << (future.isFailed() ? future.failure() : "future discarded");
+               << (unschedules.isFailed() ?
+                   unschedules.failure() : "future discarded");
 
     // We report TASK_DROPPED to the framework because the task was
     // never launched. For non-partition-aware frameworks, we report

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7c62b84/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 28fa671..80fb1ab 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -158,7 +158,7 @@ public:
 
   // Made 'virtual' for Slave mocking.
   virtual void _run(
-      const process::Future<bool>& future,
+      const process::Future<std::list<bool>>& unschedules,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
@@ -171,8 +171,6 @@ public:
       const ExecutorInfo& executorInfo,
       const TaskGroupInfo& taskGroupInfo);
 
-  process::Future<bool> unschedule(const std::string& path);
-
   // Made 'virtual' for Slave mocking.
   virtual void killTask(
       const process::UPID& from,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7c62b84/src/tests/mock_slave.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp
index c435ec7..db24f9e 100644
--- a/src/tests/mock_slave.cpp
+++ b/src/tests/mock_slave.cpp
@@ -178,14 +178,14 @@ void MockSlave::unmocked_runTask(
 
 
 void MockSlave::unmocked__run(
-    const Future<bool>& future,
+    const Future<list<bool>>& unschedules,
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& taskInfo,
     const Option<TaskGroupInfo>& taskGroup)
 {
   slave::Slave::_run(
-      future, frameworkInfo, executorInfo, taskInfo, taskGroup);
+      unschedules, frameworkInfo, executorInfo, taskInfo, taskGroup);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7c62b84/src/tests/mock_slave.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp
index 767ed3d..57189ce 100644
--- a/src/tests/mock_slave.hpp
+++ b/src/tests/mock_slave.hpp
@@ -132,14 +132,14 @@ public:
       const TaskInfo& task);
 
   MOCK_METHOD5(_run, void(
-      const process::Future<bool>& future,
+      const process::Future<std::list<bool>>& unschedules,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
       const Option<TaskGroupInfo>& taskGroup));
 
   void unmocked__run(
-      const process::Future<bool>& future,
+      const process::Future<std::list<bool>>& unschedules,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7c62b84/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 57679bd..1bdadce 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -122,6 +122,7 @@ using process::http::Unauthorized;
 
 using process::http::authentication::Principal;
 
+using std::list;
 using std::map;
 using std::shared_ptr;
 using std::string;
@@ -4040,7 +4041,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
     .WillOnce(Invoke(&slave, &MockSlave::unmocked_runTask));
 
   // Saved arguments from Slave::_run().
-  Future<bool> future;
+  Future<list<bool>> unschedules;
   FrameworkInfo frameworkInfo;
   ExecutorInfo executorInfo;
   Option<TaskGroupInfo> taskGroup;
@@ -4051,7 +4052,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   Future<Nothing> _run;
   EXPECT_CALL(slave, _run(_, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
-                    SaveArg<0>(&future),
+                    SaveArg<0>(&unschedules),
                     SaveArg<1>(&frameworkInfo),
                     SaveArg<2>(&executorInfo),
                     SaveArg<3>(&task_),
@@ -4080,7 +4081,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   AWAIT_READY(removeFramework);
 
   slave.unmocked__run(
-      future, frameworkInfo, executorInfo, task_, taskGroup);
+      unschedules, frameworkInfo, executorInfo, task_, taskGroup);
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
@@ -4154,7 +4155,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   // Skip what Slave::_run() normally does, save its arguments for
   // later, tie reaching the critical moment when to kill the task to
   // a future.
-  Future<bool> future1, future2;
+  Future<list<bool>> unschedules1, unschedules2;
   FrameworkInfo frameworkInfo1, frameworkInfo2;
   ExecutorInfo executorInfo1, executorInfo2;
   Option<TaskGroupInfo> taskGroup1, taskGroup2;
@@ -4163,13 +4164,13 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   Future<Nothing> _run1, _run2;
   EXPECT_CALL(slave, _run(_, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run1),
-                    SaveArg<0>(&future1),
+                    SaveArg<0>(&unschedules1),
                     SaveArg<1>(&frameworkInfo1),
                     SaveArg<2>(&executorInfo1),
                     SaveArg<3>(&task_1),
                     SaveArg<4>(&taskGroup1)))
     .WillOnce(DoAll(FutureSatisfy(&_run2),
-                    SaveArg<0>(&future2),
+                    SaveArg<0>(&unschedules2),
                     SaveArg<1>(&frameworkInfo2),
                     SaveArg<2>(&executorInfo2),
                     SaveArg<3>(&task_2),
@@ -4207,10 +4208,10 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
 
   // The `__run` continuations should have no effect.
   slave.unmocked__run(
-      future1, frameworkInfo1, executorInfo1, task_1, taskGroup1);
+      unschedules1, frameworkInfo1, executorInfo1, task_1, taskGroup1);
 
   slave.unmocked__run(
-      future2, frameworkInfo2, executorInfo2, task_2, taskGroup2);
+      unschedules2, frameworkInfo2, executorInfo2, task_2, taskGroup2);
 
   Clock::settle();
 
@@ -7036,7 +7037,7 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
     .WillOnce(Invoke(&slave, &MockSlave::unmocked_runTaskGroup));
 
   // Saved arguments from `Slave::_run()`.
-  Future<bool> future;
+  Future<list<bool>> unschedules;
   FrameworkInfo frameworkInfo;
   ExecutorInfo executorInfo_;
   Option<TaskGroupInfo> taskGroup_;
@@ -7048,7 +7049,7 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   Future<Nothing> _run;
   EXPECT_CALL(slave, _run(_, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
-                    SaveArg<0>(&future),
+                    SaveArg<0>(&unschedules),
                     SaveArg<1>(&frameworkInfo),
                     SaveArg<2>(&executorInfo_),
                     SaveArg<3>(&task_),
@@ -7120,7 +7121,7 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   AWAIT_READY(removeFramework);
 
   slave.unmocked__run(
-      future, frameworkInfo, executorInfo_, task_, taskGroup_);
+      unschedules, frameworkInfo, executorInfo_, task_, taskGroup_);
 
   AWAIT_READY(update1);
   AWAIT_READY(update2);


[05/13] mesos git commit: Introduced a Framework::idle function in the agent.

Posted by bm...@apache.org.
Introduced a Framework::idle function in the agent.

This ensures the call-sites consistently check idleness of the
framework, it also aids readability in that it clarifies that
we remove idle frameworks.

Review: https://reviews.apache.org/r/61642


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/509147d8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/509147d8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/509147d8

Branch: refs/heads/master
Commit: 509147d85f2e60af33434d42a8ebf5d41209604c
Parents: 95aa9e5
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 4 15:51:05 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:07 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 39 +++++++++++++++++++++------------------
 src/slave/slave.hpp |  9 +++++++++
 2 files changed, 30 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/509147d8/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 534ea23..05505a6 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1896,7 +1896,7 @@ void Slave::_run(
       framework->removePendingTask(_task.task_id());
     }
 
-    if (framework->executors.empty() && framework->pending.empty()) {
+    if (framework->idle()) {
       removeFramework(framework);
     }
 
@@ -1941,7 +1941,7 @@ void Slave::_run(
       statusUpdate(update, UPID());
     }
 
-    if (framework->executors.empty() && framework->pending.empty()) {
+    if (framework->idle()) {
       removeFramework(framework);
     }
 
@@ -1984,7 +1984,7 @@ void Slave::_run(
       statusUpdate(update, UPID());
     }
 
-    if (framework->executors.empty() && framework->pending.empty()) {
+    if (framework->idle()) {
       removeFramework(framework);
     }
 
@@ -2057,7 +2057,7 @@ void Slave::__run(
       framework->removePendingTask(_task.task_id());
     }
 
-    if (framework->executors.empty() && framework->pending.empty()) {
+    if (framework->idle()) {
       removeFramework(framework);
     }
 
@@ -2072,8 +2072,7 @@ void Slave::__run(
     if (framework->removePendingTask(_task.task_id())) {
       // NOTE: Ideally we would perform the following check here:
       //
-      //   if (framework->executors.empty() &&
-      //       framework->pending.empty()) {
+      //   if (framework->idle()) {
       //     removeFramework(framework);
       //   }
       //
@@ -2105,7 +2104,7 @@ void Slave::__run(
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
-    if (framework->executors.empty() && framework->pending.empty()) {
+    if (framework->idle()) {
       removeFramework(framework);
     }
 
@@ -2177,7 +2176,7 @@ void Slave::__run(
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
-    if (framework->executors.empty() && framework->pending.empty()) {
+    if (framework->idle()) {
       removeFramework(framework);
     }
 
@@ -2246,7 +2245,7 @@ void Slave::__run(
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
-    if (framework->executors.empty() && framework->pending.empty()) {
+    if (framework->idle()) {
       removeFramework(framework);
     }
 
@@ -2299,7 +2298,7 @@ void Slave::__run(
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
-    if (framework->executors.empty() && framework->pending.empty()) {
+    if (framework->idle()) {
       removeFramework(framework);
     }
 
@@ -2318,7 +2317,7 @@ void Slave::__run(
 
     // Refer to the comment after 'framework->removePendingTask' above
     // for why we need this.
-    if (framework->executors.empty() && framework->pending.empty()) {
+    if (framework->idle()) {
       removeFramework(framework);
     }
 
@@ -3226,7 +3225,7 @@ void Slave::shutdownFramework(
       }
 
       // Remove this framework if it has no pending executors and tasks.
-      if (framework->executors.empty() && framework->pending.empty()) {
+      if (framework->idle()) {
         removeFramework(framework);
       }
       break;
@@ -3671,7 +3670,7 @@ void Slave::_statusUpdateAcknowledgement(
   }
 
   // Remove this framework if it has no pending executors and tasks.
-  if (framework->executors.empty() && framework->pending.empty()) {
+  if (framework->idle()) {
     removeFramework(framework);
   }
 }
@@ -5437,7 +5436,7 @@ void Slave::executorTerminated(
       }
 
       // Remove this framework if it has no pending executors and tasks.
-      if (framework->executors.empty() && framework->pending.empty()) {
+      if (framework->idle()) {
         removeFramework(framework);
       }
       break;
@@ -5557,10 +5556,8 @@ void Slave::removeFramework(Framework* framework)
   CHECK(framework->state == Framework::RUNNING ||
         framework->state == Framework::TERMINATING);
 
-  // The invariant here is that a framework should not be removed
-  // if it has either pending executors or pending tasks.
-  CHECK(framework->executors.empty());
-  CHECK(framework->pending.empty());
+  // We only remove frameworks once they become idle.
+  CHECK(framework->idle());
 
   // Close all status update streams for this framework.
   statusUpdateManager->cleanup(framework->id());
@@ -7105,6 +7102,12 @@ Framework::~Framework()
 }
 
 
+bool Framework::idle() const
+{
+  return executors.empty() && pendingTasks.empty();
+}
+
+
 void Framework::checkpointFramework() const
 {
   // Checkpoint the framework info.

http://git-wip-us.apache.org/repos/asf/mesos/blob/509147d8/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 3965fec..67ffdc9 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -851,6 +851,15 @@ public:
 
   ~Framework();
 
+  // Returns whether the framework is idle, where idle is
+  // defined as having no activity:
+  //   (1) The framework has no non-terminal tasks and executors.
+  //   (2) All status updates have been acknowledged.
+  //
+  // TODO(bmahler): The framework should also not be considered
+  // idle if there are unacknowledged updates for "pending" tasks.
+  bool idle() const;
+
   void checkpointFramework() const;
 
   const FrameworkID id() const { return info.id(); }


[10/13] mesos git commit: Controlled queued task handling with helpers in the agent.

Posted by bm...@apache.org.
Controlled queued task handling with helpers in the agent.

With this patch, the enqueueing and dequeueing of tasks in the
agent are controlled solely by Executor class member functions,
which simplifies the agent code.

Review: https://reviews.apache.org/r/61650


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0ce87cb4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0ce87cb4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0ce87cb4

Branch: refs/heads/master
Commit: 0ce87cb46bdb53ed58156712770e3a62e7f6092c
Parents: a663be9
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 11 13:37:26 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:45 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 203 ++++++++++++++++++++++++-----------------------
 src/slave/slave.hpp |   9 ++-
 2 files changed, 110 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0ce87cb4/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index b5564bb..6dcd195 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2356,19 +2356,18 @@ void Slave::__run(
       break;
     }
     case Executor::REGISTERING:
-      foreach (const TaskInfo& _task, tasks) {
-        // Checkpoint the task before we do anything else.
-        if (executor->checkpoint) {
+      if (executor->checkpoint) {
+        foreach (const TaskInfo& _task, tasks) {
           executor->checkpointTask(_task);
         }
-
-        // Queue task if the executor has not yet registered.
-        executor->queuedTasks[_task.task_id()] = _task;
       }
 
       if (taskGroup.isSome()) {
-        // Queue task group if the executor has not yet registered.
-        executor->queuedTaskGroups.push_back(taskGroup.get());
+        executor->enqueueTaskGroup(taskGroup.get());
+      } else {
+        foreach (const TaskInfo& _task, tasks) {
+          executor->enqueueTask(_task);
+        }
       }
 
       LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup)
@@ -2376,21 +2375,20 @@ void Slave::__run(
 
       break;
     case Executor::RUNNING: {
-      foreach (const TaskInfo& _task, tasks) {
-        // Checkpoint the task before we do anything else.
-        if (executor->checkpoint) {
+      if (executor->checkpoint) {
+        foreach (const TaskInfo& _task, tasks) {
           executor->checkpointTask(_task);
         }
-
-        // Queue task until the containerizer is updated with new
-        // resource limits (MESOS-998).
-        executor->queuedTasks[_task.task_id()] = _task;
       }
 
+      // Queue tasks until the containerizer is updated
+      // with new resource limits (MESOS-998).
       if (taskGroup.isSome()) {
-        // Queue task group until the containerizer is updated with new
-        // resource limits (MESOS-998).
-        executor->queuedTaskGroups.push_back(taskGroup.get());
+        executor->enqueueTaskGroup(taskGroup.get());
+      } else {
+        foreach (const TaskInfo& _task, tasks) {
+          executor->enqueueTask(_task);
+        }
       }
 
       LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup)
@@ -2571,10 +2569,8 @@ void Slave::___run(
       continue;
     }
 
-    executor->queuedTasks.erase(task.task_id());
-
-    // Add the task and send it to the executor.
-    executor->addTask(task);
+    CHECK_SOME(executor->dequeueTask(task.task_id()));
+    executor->addLaunchedTask(task);
 
     LOG(INFO) << "Sending queued task '" << task.task_id()
               << "' to executor " << *executor;
@@ -2591,12 +2587,24 @@ void Slave::___run(
   }
 
   foreach (const TaskGroupInfo& taskGroup, taskGroups) {
-    auto it = find(
-        executor->queuedTaskGroups.begin(),
-        executor->queuedTaskGroups.end(),
-        taskGroup);
+    // The invariant here is that all queued tasks in the group
+    // are still queued, or all were removed due to a kill arriving
+    // for one of the tasks in the group.
+    bool allQueued = true;
+    bool allRemoved = true;
+    foreach (const TaskInfo& task, taskGroup.tasks()) {
+      if (executor->queuedTasks.contains(task.task_id())) {
+        allRemoved = false;
+      } else {
+        allQueued = false;
+      }
+    }
 
-    if (it == executor->queuedTaskGroups.end()) {
+    CHECK(allQueued != allRemoved)
+      << "BUG: The task group " << taskOrTaskGroup(None(), taskGroup)
+      << " was killed partially";
+
+    if (allRemoved) {
       // This is the case where the task group is killed. No need to send
       // status update because it should be handled in 'killTask'.
       LOG(WARNING) << "Ignoring sending queued task group "
@@ -2605,19 +2613,15 @@ void Slave::___run(
       continue;
     }
 
-    LOG(INFO) << "Sending queued task group " << taskOrTaskGroup(None(), *it)
+    LOG(INFO) << "Sending queued task group"
+              << " " << taskOrTaskGroup(None(), taskGroup)
               << " to executor " << *executor;
 
-    // Add the tasks and send the task group to the executor. Since, the
-    // queued tasks also include tasks from the queued task group, we
-    // remove them from queued tasks.
-    foreach (const TaskInfo& task, it->tasks()) {
-      executor->addTask(task);
-      executor->queuedTasks.erase(task.task_id());
+    foreach (const TaskInfo& task, taskGroup.tasks()) {
+      CHECK_SOME(executor->dequeueTask(task.task_id()));
+      executor->addLaunchedTask(task);
     }
 
-    executor->queuedTaskGroups.erase(it);
-
     executor::Event event;
     event.set_type(executor::Event::LAUNCH_GROUP);
 
@@ -3794,22 +3798,12 @@ void Slave::subscribe(
             None());
       }
 
-      // We maintain a copy of the tasks in `queuedTaskGroups` also in
-      // `queuedTasks`. Hence, we need to ensure that we don't send the same
-      // tasks to the executor twice.
-      LinkedHashMap<TaskID, TaskInfo> queuedTasks;
-      foreachpair (const TaskID& taskId,
-                   const TaskInfo& taskInfo,
-                   executor->queuedTasks) {
-        queuedTasks[taskId] = taskInfo;
-      }
+      // Split the queued tasks between the task groups and tasks.
+      LinkedHashMap<TaskID, TaskInfo> queuedTasks = executor->queuedTasks;
 
       foreach (const TaskGroupInfo& taskGroup, executor->queuedTaskGroups) {
         foreach (const TaskInfo& task, taskGroup.tasks()) {
-          const TaskID& taskId = task.task_id();
-          if (queuedTasks.contains(taskId)) {
-            queuedTasks.erase(taskId);
-          }
+          queuedTasks.erase(task.task_id());
         }
       }
 
@@ -4008,22 +4002,12 @@ void Slave::registerExecutor(
       message.mutable_slave_info()->MergeFrom(info);
       executor->send(message);
 
-      // We maintain a copy of the tasks in `queuedTaskGroups` also in
-      // `queuedTasks`. Hence, we need to ensure that we don't send the same
-      // tasks to the executor twice.
-      LinkedHashMap<TaskID, TaskInfo> queuedTasks;
-      foreachpair (const TaskID& taskId,
-                   const TaskInfo& taskInfo,
-                   executor->queuedTasks) {
-        queuedTasks[taskId] = taskInfo;
-      }
+      // Split the queued tasks between the task groups and tasks.
+      LinkedHashMap<TaskID, TaskInfo> queuedTasks = executor->queuedTasks;
 
       foreach (const TaskGroupInfo& taskGroup, executor->queuedTaskGroups) {
         foreach (const TaskInfo& task, taskGroup.tasks()) {
-          const TaskID& taskId = task.task_id();
-          if (queuedTasks.contains(taskId)) {
-            queuedTasks.erase(taskId);
-          }
+          queuedTasks.erase(task.task_id());
         }
       }
 
@@ -7665,8 +7649,61 @@ Executor::~Executor()
 }
 
 
-Task* Executor::addTask(const TaskInfo& task)
+void Executor::enqueueTask(const TaskInfo& task)
+{
+  queuedTasks[task.task_id()] = task;
+}
+
+
+void Executor::enqueueTaskGroup(const TaskGroupInfo& taskGroup)
+{
+  foreach (const TaskInfo& task, taskGroup.tasks()) {
+    queuedTasks[task.task_id()] = task;
+  }
+
+  queuedTaskGroups.push_back(taskGroup);
+}
+
+
+Option<TaskInfo> Executor::dequeueTask(const TaskID& taskId)
+{
+  Option<TaskInfo> taskInfo = queuedTasks.get(taskId);
+
+  queuedTasks.erase(taskId);
+
+  // Remove the task group if all of its tasks have been dequeued.
+  for (auto it = queuedTaskGroups.begin(); it != queuedTaskGroups.end(); ++it) {
+    foreach (const TaskInfo& t, it->tasks()) {
+      if (t.task_id() == taskId) {
+        // Found its task group, check if all tasks within
+        // the group have been removed.
+        bool allRemoved = true;
+
+        foreach (const TaskInfo& t_, it->tasks()) {
+          if (queuedTasks.contains(t_.task_id())) {
+            allRemoved = false;
+            break;
+          }
+        }
+
+        if (allRemoved) {
+          queuedTaskGroups.erase(it);
+        }
+
+        return taskInfo;
+      }
+    }
+  }
+
+  return taskInfo;
+}
+
+
+Task* Executor::addLaunchedTask(const TaskInfo& task)
 {
+  CHECK(!queuedTasks.contains(task.task_id()))
+    << "Task " << task.task_id() << " was not dequeued";
+
   // The master should enforce unique task IDs, but just in case
   // maybe we shouldn't make this a fatal error.
   CHECK(!launchedTasks.contains(task.task_id()))
@@ -7800,52 +7837,20 @@ Try<Nothing> Executor::updateTaskState(const TaskStatus& status)
   bool terminal = protobuf::isTerminalState(status.state());
 
   const TaskID& taskId = status.task_id();
-  Option<TaskGroupInfo> taskGroup = getQueuedTaskGroup(taskId);
 
   Task* task = nullptr;
 
-  if (taskGroup.isSome()) {
-    if (!terminal) {
-      return Error("Cannot send non-terminal update for queued task group");
-    }
-
-    // Since, the queued tasks also include tasks from the queued task group, we
-    // remove them from queued tasks.
-    queuedTasks.erase(taskId);
-
-    foreach (const TaskInfo& task_, taskGroup->tasks()) {
-      if (task_.task_id() == taskId) {
-        task = new Task(protobuf::createTask(
-            task_,
-            status.state(),
-            frameworkId));
-        break;
-      }
-    }
-
-    size_t nonTerminalTasks = 0;
-    foreach (const TaskInfo& task_, taskGroup->tasks()) {
-      if (!terminatedTasks.contains(task_.task_id())) {
-        nonTerminalTasks++;
-      }
-    }
-
-    // We remove the task group when all the other tasks in the task group
-    // are terminal.
-    if (nonTerminalTasks == 1) {
-      queuedTaskGroups.remove(taskGroup.get());
-    }
-  } else if (queuedTasks.contains(taskId)) {
+  if (queuedTasks.contains(taskId)) {
     if (!terminal) {
       return Error("Cannot send non-terminal update for queued task");
     }
 
+    TaskInfo taskInfo = CHECK_NOTNONE(dequeueTask(taskId));
+
     task = new Task(protobuf::createTask(
-        queuedTasks.at(taskId),
+        taskInfo,
         status.state(),
         frameworkId));
-
-    queuedTasks.erase(taskId);
   } else if (launchedTasks.contains(taskId)) {
     task = launchedTasks.at(status.task_id());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0ce87cb4/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 80fb1ab..7d07868 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -712,7 +712,12 @@ public:
 
   ~Executor();
 
-  Task* addTask(const TaskInfo& task);
+  // Note that these tasks will also be tracked within `queuedTasks`.
+  void enqueueTaskGroup(const TaskGroupInfo& taskGroup);
+
+  void enqueueTask(const TaskInfo& task);
+  Option<TaskInfo> dequeueTask(const TaskID& taskId);
+  Task* addLaunchedTask(const TaskInfo& task);
   void completeTask(const TaskID& taskId);
   void checkpointExecutor();
   void checkpointTask(const TaskInfo& task);
@@ -814,8 +819,6 @@ public:
   // Not yet launched task groups. This is needed for correctly sending
   // TASK_KILLED status updates for all tasks in the group if any of the
   // tasks were killed before the executor could register with the agent.
-  //
-  // TODO(anand): Replace this with `LinkedHashSet` when it is available.
   std::list<TaskGroupInfo> queuedTaskGroups;
 
   // Running.


[04/13] mesos git commit: Introduced a CHECK_NOTNONE.

Posted by bm...@apache.org.
Introduced a CHECK_NOTNONE.

This is similar to glog's CHECK_NOTNULL.

Review: https://reviews.apache.org/r/61644


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/edd9b13a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/edd9b13a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/edd9b13a

Branch: refs/heads/master
Commit: edd9b13af889785eff2c56b09c1ebdae788bf1ce
Parents: a6e748c
Author: Benjamin Mahler <bm...@apache.org>
Authored: Thu Aug 10 18:04:48 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:07 2017 -0700

----------------------------------------------------------------------
 3rdparty/stout/include/stout/check.hpp | 50 ++++++++++++++++++++++++++++-
 1 file changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/edd9b13a/3rdparty/stout/include/stout/check.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/check.hpp b/3rdparty/stout/include/stout/check.hpp
index e3cabd4..6a33579 100644
--- a/3rdparty/stout/include/stout/check.hpp
+++ b/3rdparty/stout/include/stout/check.hpp
@@ -53,10 +53,58 @@
   CHECK_STATE(CHECK_ERROR, _check_error, expression)
 
 
-// Private structs/functions used for CHECK_*.
+// A private helper for CHECK_NOTNONE which is similar to the
+// CHECK_NOTNULL provided by glog.
+template <typename T>
+T&& _check_not_none(
+    const char* file,
+    int line,
+    const char* message,
+    Option<T>&& t) {
+  if (t.isNone()) {
+    google::LogMessageFatal(file, line, new std::string(message));
+  }
+  return std::move(t).get();
+}
+
+
+template <typename T>
+T& _check_not_none(
+    const char* file,
+    int line,
+    const char* message,
+    Option<T>& t) {
+  if (t.isNone()) {
+    google::LogMessageFatal(file, line, new std::string(message));
+  }
+  return t.get();
+}
 
 
 template <typename T>
+const T& _check_not_none(
+    const char* file,
+    int line,
+    const char* message,
+    const Option<T>& t) {
+  if (t.isNone()) {
+    google::LogMessageFatal(file, line, new std::string(message));
+  }
+  return t.get();
+}
+
+
+#define CHECK_NOTNONE(expression) \
+  _check_not_none( \
+      __FILE__, \
+      __LINE__, \
+      "'" #expression "' Must be SOME", \
+      (expression))
+
+
+// Private structs/functions used for CHECK_*.
+
+template <typename T>
 Option<Error> _check_some(const Option<T>& o)
 {
   if (o.isNone()) {


[12/13] mesos git commit: Added a TODO to correctly represent framework lifecycle in agent.

Posted by bm...@apache.org.
Added a TODO to correctly represent framework lifecycle in agent.

Currently, we store frameworks as "completed" in the agent when
the agent no longer has any active tasks for executors for the
framework. However, the framework may not be completed according
to the master. Ideally, the master informs the agent when a
framework is _actually_ completed and the agent represents these
"idle" frameworks as being in a non-completed state.

Review: https://reviews.apache.org/r/61647


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/10b2a453
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/10b2a453
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/10b2a453

Branch: refs/heads/master
Commit: 10b2a45358198e092a8d2e64171e028e533fb0b6
Parents: 658992c
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 4 15:07:46 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:45 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.hpp | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/10b2a453/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 4058381..1f0e826 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -577,6 +577,14 @@ private:
 
   hashmap<FrameworkID, Framework*> frameworks;
 
+  // Note that these frameworks are "completed" only in that
+  // they no longer have any active tasks or executors on this
+  // particular agent.
+  //
+  // TODO(bmahler): Implement a more accurate framework lifecycle
+  // in the agent code, ideally the master can inform the agent
+  // when a framework is actually completed, and the agent can
+  // perhaps store a cache of "idle" frameworks. See MESOS-7890.
   BoundedHashMap<FrameworkID, process::Owned<Framework>> completedFrameworks;
 
   mesos::master::detector::MasterDetector* detector;


[06/13] mesos git commit: Updated Framework::removePendingTask to take only a TaskID.

Posted by bm...@apache.org.
Updated Framework::removePendingTask to take only a TaskID.

This allows the function to be used in the kill task path where we
do not have the TaskInfo or ExecutorInfo available. With this, we
now have all removals going through this function.

Review: https://reviews.apache.org/r/61641


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/95aa9e5d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/95aa9e5d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/95aa9e5d

Branch: refs/heads/master
Commit: 95aa9e5d2d997b6af9d6e9ef104dde61b7836866
Parents: ba45457
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 4 15:46:21 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:07 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 64 +++++++++++++++++++++---------------------------
 src/slave/slave.hpp |  5 ++--
 2 files changed, 30 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/95aa9e5d/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6ffecdc..534ea23 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1883,8 +1883,6 @@ void Slave::_run(
     return;
   }
 
-  const ExecutorID& executorId = executorInfo.executor_id();
-
   // We don't send a status update here because a terminating
   // framework cannot send acknowledgements.
   if (framework->state == Framework::TERMINATING) {
@@ -1895,7 +1893,7 @@ void Slave::_run(
     // Although we cannot send a status update in this case, we remove
     // the affected tasks from the pending tasks.
     foreach (const TaskInfo& _task, tasks) {
-      framework->removePendingTask(_task, executorInfo);
+      framework->removePendingTask(_task.task_id());
     }
 
     if (framework->executors.empty() && framework->pending.empty()) {
@@ -1905,6 +1903,8 @@ void Slave::_run(
     return;
   }
 
+  const ExecutorID& executorId = executorInfo.executor_id();
+
   // If any of the tasks in the task group have been killed in the interim,
   // we send a TASK_KILLED for all the other tasks in the group.
   bool killed = false;
@@ -1922,7 +1922,7 @@ void Slave::_run(
                  << " because it has been killed in the meantime";
 
     foreach (const TaskInfo& _task, tasks) {
-      framework->removePendingTask(_task, executorInfo);
+      framework->removePendingTask(_task.task_id());
 
       const StatusUpdate update = protobuf::createStatusUpdate(
           frameworkId,
@@ -1964,7 +1964,7 @@ void Slave::_run(
     }
 
     foreach (const TaskInfo& _task, tasks) {
-      framework->removePendingTask(_task, executorInfo);
+      framework->removePendingTask(_task.task_id());
 
       const StatusUpdate update = protobuf::createStatusUpdate(
           frameworkId,
@@ -2054,7 +2054,7 @@ void Slave::__run(
     // Although we cannot send a status update in this case, we remove
     // the affected tasks from the list of pending tasks.
     foreach (const TaskInfo& _task, tasks) {
-      framework->removePendingTask(_task, executorInfo);
+      framework->removePendingTask(_task.task_id());
     }
 
     if (framework->executors.empty() && framework->pending.empty()) {
@@ -2069,7 +2069,7 @@ void Slave::__run(
   // send a TASK_KILLED for all the other tasks in the group.
   bool killed = false;
   foreach (const TaskInfo& _task, tasks) {
-    if (framework->removePendingTask(_task, executorInfo)) {
+    if (framework->removePendingTask(_task.task_id())) {
       // NOTE: Ideally we would perform the following check here:
       //
       //   if (framework->executors.empty() &&
@@ -2983,23 +2983,20 @@ void Slave::killTask(
     return;
   }
 
-  foreachkey (const ExecutorID& executorId, framework->pending) {
-    if (framework->pending[executorId].contains(taskId)) {
-      LOG(WARNING) << "Killing task " << taskId
-                   << " of framework " << frameworkId
-                   << " before it was launched";
+  // TODO(bmahler): Removing the task here is a bug: MESOS-7783.
+  bool removedWhilePending = framework->removePendingTask(taskId);
 
-      // We send the TASK_KILLED status update in `_run()` as the
-      // task being killed could be part of a task group and we
-      // don't store this information in `framework->pending`.
-      // We don't invoke `removeFramework()` here since we need the
-      // framework to be valid for sending the status update later.
-     framework->pending[executorId].erase(taskId);
-     if (framework->pending[executorId].empty()) {
-       framework->pending.erase(executorId);
-     }
-     return;
-    }
+  if (removedWhilePending) {
+    LOG(WARNING) << "Killing task " << taskId
+                 << " of framework " << frameworkId
+                 << " before it was launched";
+
+    // We send the TASK_KILLED status update in `_run()` as the
+    // task being killed could be part of a task group and we
+    // don't store this information in `framework->pending`.
+    // We don't invoke `removeFramework()` here since we need the
+    // framework to be valid for sending the status update later.
+    return;
   }
 
   Executor* executor = framework->getExecutor(taskId);
@@ -7467,21 +7464,16 @@ bool Framework::hasTask(const TaskID& taskId)
 }
 
 
-// Return `true` if `task` was a pending task of this framework
-// before the removal; `false` otherwise.
-bool Framework::removePendingTask(
-    const TaskInfo& task,
-    const ExecutorInfo& executorInfo)
+bool Framework::removePendingTask(const TaskID& taskId)
 {
-  const ExecutorID executorId = executorInfo.executor_id();
-
-  if (pending.contains(executorId) &&
-      pending.at(executorId).contains(task.task_id())) {
-    pending.at(executorId).erase(task.task_id());
-    if (pending.at(executorId).empty()) {
-      pending.erase(executorId);
+  foreachkey (const ExecutorID& executorId, pending) {
+    if (pending.at(executorId).contains(taskId)) {
+      pending.at(executorId).erase(taskId);
+      if (pending.at(executorId).empty()) {
+        pending.erase(executorId);
+      }
+      return true;
     }
-    return true;
   }
 
   return false;

http://git-wip-us.apache.org/repos/asf/mesos/blob/95aa9e5d/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index ca9f3da..3965fec 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -868,9 +868,8 @@ public:
 
   bool hasTask(const TaskID& taskId);
 
-  bool removePendingTask(
-      const TaskInfo& task,
-      const ExecutorInfo& executorInfo);
+  // Returns whether the pending task was removed.
+  bool removePendingTask(const TaskID& taskId);
 
   Resources allocatedResources() const;
 


[11/13] mesos git commit: Implemented LinkedHashMap copy construction and assignment.

Posted by bm...@apache.org.
Implemented LinkedHashMap copy construction and assignment.

The defaults being generated were incorrect. No code is relying
on them today, this was discovered when writing new code.

Review: https://reviews.apache.org/r/61638


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a663be90
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a663be90
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a663be90

Branch: refs/heads/master
Commit: a663be9021e2aabe372db7b47696c31d8b7c5436
Parents: d7c62b8
Author: Benjamin Mahler <bm...@apache.org>
Authored: Mon Aug 14 22:08:31 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:45 2017 -0700

----------------------------------------------------------------------
 3rdparty/stout/include/stout/linkedhashmap.hpp | 29 +++++++++++
 3rdparty/stout/tests/linkedhashmap_tests.cpp   | 57 +++++++++++++++++++++
 2 files changed, 86 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a663be90/3rdparty/stout/include/stout/linkedhashmap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/linkedhashmap.hpp b/3rdparty/stout/include/stout/linkedhashmap.hpp
index 35684e4..33cc86a 100644
--- a/3rdparty/stout/include/stout/linkedhashmap.hpp
+++ b/3rdparty/stout/include/stout/linkedhashmap.hpp
@@ -33,6 +33,35 @@ public:
   typedef std::list<entry> list;
   typedef hashmap<Key, typename list::iterator> map;
 
+  LinkedHashMap() = default;
+
+  LinkedHashMap(const LinkedHashMap<Key, Value>& other)
+    : entries_(other.entries_)
+  {
+    // Build up the index.
+    for (auto it = entries_.begin(); it != entries_.end(); ++it) {
+      keys_[it->first] = it;
+    }
+  }
+
+  LinkedHashMap& operator=(const LinkedHashMap<Key, Value>& other)
+  {
+    clear();
+
+    entries_ = other.entries_;
+
+    // Build up the index.
+    for (auto it = entries_.begin(); it != entries_.end(); ++it) {
+      keys_[it->first] = it;
+    }
+
+    return *this;
+  }
+
+  // TODO(bmahler): Implement move construction / assignment.
+  LinkedHashMap(LinkedHashMap<Key, Value>&&) = delete;
+  LinkedHashMap& operator=(LinkedHashMap&&) = delete;
+
   Value& operator[] (const Key& key)
   {
     if (!keys_.contains(key)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/a663be90/3rdparty/stout/tests/linkedhashmap_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/tests/linkedhashmap_tests.cpp b/3rdparty/stout/tests/linkedhashmap_tests.cpp
index 267022e..712752a 100644
--- a/3rdparty/stout/tests/linkedhashmap_tests.cpp
+++ b/3rdparty/stout/tests/linkedhashmap_tests.cpp
@@ -182,3 +182,60 @@ TEST(LinkedHashMapTest, ForeachMutate)
   list<string> values = {"foo", "qux", "caz"};
   EXPECT_EQ(values, map.values());
 }
+
+
+// TODO(bmahler): Simplify this test once LinkedHashMap
+// has equality operators.
+TEST(LinkedHashMapTest, CopyConstruction)
+{
+  LinkedHashMap<int, string> map;
+
+  map[1] = "1";
+  map[2] = "2";
+  map[3] = "3";
+
+  LinkedHashMap<int, string> copy(map);
+
+  EXPECT_EQ(map.keys(), copy.keys());
+  EXPECT_EQ(map.values(), copy.values());
+
+  EXPECT_EQ(1u, map.erase(1));
+  EXPECT_EQ(1u, copy.erase(1));
+
+  EXPECT_EQ(map.keys(), copy.keys());
+  EXPECT_EQ(map.values(), copy.values());
+
+  copy[4] = "4";
+
+  EXPECT_NE(map.keys(), copy.keys());
+  EXPECT_NE(map.values(), copy.values());
+}
+
+
+// TODO(bmahler): Simplify this test once LinkedHashMap
+// has equality operators.
+TEST(LinkedHashMapTest, Assignment)
+{
+  LinkedHashMap<int, string> map;
+
+  map[1] = "1";
+  map[2] = "2";
+  map[3] = "3";
+
+  LinkedHashMap<int, string> copy;
+  copy = map;
+
+  EXPECT_EQ(map.keys(), copy.keys());
+  EXPECT_EQ(map.values(), copy.values());
+
+  EXPECT_EQ(1u, map.erase(1));
+  EXPECT_EQ(1u, copy.erase(1));
+
+  EXPECT_EQ(map.keys(), copy.keys());
+  EXPECT_EQ(map.values(), copy.values());
+
+  copy[4] = "4";
+
+  EXPECT_NE(map.keys(), copy.keys());
+  EXPECT_NE(map.values(), copy.values());
+}