You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/08/23 22:23:46 UTC

[03/13] mesos git commit: Fixed a bug where the agent kills and still launches a task.

Fixed a bug where the agent kills and still launches a task.

The following race leads to the agent both killing and launching a task:

  (1) Slave::__run completes, task is now within Executor::queuedTasks.
  (2) Slave::killTask locates the executor based on the task ID residing
      in queuedTasks, calls Slave::statusUpdate() with TASK_KILLED.
  (3) Slave::___run assumes that killed tasks have been removed from
      Executor::queuedTasks, but this now occurs asynchronously in
      Slave::_statusUpdate. So, the agent still sees the queued task
      and delivers it and adds the task to Executor::launchedTasks.
  (3) Slave::_statusUpdate runs, removes the task from
      Executor::launchedTasks and adds it to Executor::terminatedTasks.

The fix applied here is to synchronously transition queued tasks to
a terminal state when statusUpdate is called. This can be done because
for queued tasks, we do not need to retrieve the container status (the
task never reached the container).

Review: https://reviews.apache.org/r/61639


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0b9c3ded
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0b9c3ded
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0b9c3ded

Branch: refs/heads/master
Commit: 0b9c3dedb04e9bf2c3d1f1663cf9cd4f47cb674b
Parents: f961c9d
Author: Benjamin Mahler <bm...@apache.org>
Authored: Thu Aug 10 18:34:15 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Wed Aug 23 15:09:07 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 82 ++++++++++++++++++++++++++++--------------------
 src/slave/slave.hpp |  2 +-
 2 files changed, 49 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0b9c3ded/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 50d2a10..1521d5d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -4516,26 +4516,36 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid)
 
   metrics.valid_status_updates++;
 
-  // Before sending update, we need to retrieve the container status.
-  //
-  // NOTE: If the executor sets the ContainerID inside the
-  // ContainerStatus, that indicates that the Task this status update
-  // is associated with is tied to that container (could be nested).
-  // Therefore, we need to get the status of that container, instead
-  // of the top level executor container.
-  ContainerID containerId = executor->containerId;
-  if (update.status().has_container_status() &&
-      update.status().container_status().has_container_id()) {
-    containerId = update.status().container_status().container_id();
-  }
-
-  containerizer->status(containerId)
-    .onAny(defer(self(),
-                 &Slave::_statusUpdate,
-                 update,
-                 pid,
-                 executor->id,
-                 lambda::_1));
+  // Before sending update, we need to retrieve the container status
+  // if the task reached the executor. For tasks that are queued, we
+  // do not need to send the container status and we must
+  // synchronously transition the task to ensure that it is removed
+  // from the queued tasks before the run task path continues.
+  if (executor->queuedTasks.contains(status.task_id())) {
+    CHECK(protobuf::isTerminalState(status.state()))
+        << "Queued tasks can only be transitioned to terminal states";
+
+    _statusUpdate(update, pid, executor->id, None());
+  } else {
+    // NOTE: If the executor sets the ContainerID inside the
+    // ContainerStatus, that indicates that the Task this status update
+    // is associated with is tied to that container (could be nested).
+    // Therefore, we need to get the status of that container, instead
+    // of the top level executor container.
+    ContainerID containerId = executor->containerId;
+    if (update.status().has_container_status() &&
+        update.status().container_status().has_container_id()) {
+      containerId = update.status().container_status().container_id();
+    }
+
+    containerizer->status(containerId)
+      .onAny(defer(self(),
+                   &Slave::_statusUpdate,
+                   update,
+                   pid,
+                   executor->id,
+                   lambda::_1));
+  }
 }
 
 
@@ -4543,26 +4553,26 @@ void Slave::_statusUpdate(
     StatusUpdate update,
     const Option<process::UPID>& pid,
     const ExecutorID& executorId,
-    const Future<ContainerStatus>& future)
+    const Option<Future<ContainerStatus>>& containerStatus)
 {
-  ContainerStatus* containerStatus =
-    update.mutable_status()->mutable_container_status();
-
   // There can be cases where a container is already removed from the
   // containerizer before the `status` call is dispatched to the
   // containerizer, leading to the failure of the returned `Future`.
   // In such a case we should simply not update the `ContainerStatus`
   // with the return `Future` but continue processing the
   // `StatusUpdate`.
-  if (future.isReady()) {
-    containerStatus->MergeFrom(future.get());
+  if (containerStatus.isSome() && containerStatus->isReady()) {
+    ContainerStatus* status =
+      update.mutable_status()->mutable_container_status();
+
+    status->MergeFrom(containerStatus->get());
 
     // Fill in the container IP address with the IP from the agent
     // PID, if not already filled in.
     //
     // TODO(karya): Fill in the IP address by looking up the executor PID.
-    if (containerStatus->network_infos().size() == 0) {
-      NetworkInfo* networkInfo = containerStatus->add_network_infos();
+    if (status->network_infos().size() == 0) {
+      NetworkInfo* networkInfo = status->add_network_infos();
       NetworkInfo::IPAddress* ipAddress = networkInfo->add_ip_addresses();
 
       // Set up IPv4 address.
@@ -5385,18 +5395,22 @@ void Slave::executorTerminated(
       // the status update manager should have already cleaned up all the
       // status update streams for a framework that is terminating.
       if (framework->state != Framework::TERMINATING) {
-        // Transition all live launched tasks.
-        foreachvalue (Task* task, executor->launchedTasks) {
+        // Transition all live launched tasks. Note that the map is
+        // removed from within the loop due terminal status updates.
+        foreach (const TaskID& taskId, executor->launchedTasks.keys()) {
+          Task* task = executor->launchedTasks.at(taskId);
+
           if (!protobuf::isTerminalState(task->state())) {
             sendExecutorTerminatedStatusUpdate(
-                task->task_id(), termination, frameworkId, executor);
+                taskId, termination, frameworkId, executor);
           }
         }
 
-        // Transition all queued tasks.
-        foreachvalue (const TaskInfo& task, executor->queuedTasks) {
+        // Transition all queued tasks. Note that the map is removed
+        // from within the loop due terminal status updates.
+        foreach (const TaskID& taskId, executor->queuedTasks.keys()) {
           sendExecutorTerminatedStatusUpdate(
-              task.task_id(), termination, frameworkId, executor);
+              taskId, termination, frameworkId, executor);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0b9c3ded/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 0e07a1a..ca9f3da 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -252,7 +252,7 @@ public:
       StatusUpdate update,
       const Option<process::UPID>& pid,
       const ExecutorID& executorId,
-      const process::Future<ContainerStatus>& future);
+      const Option<process::Future<ContainerStatus>>& containerStatus);
 
   // Continue handling the status update after optionally updating the
   // container's resources.