You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/02/14 10:49:48 UTC

[4/8] mesos git commit: Fixed a bug where executor info lingers on master if failed to launch.

Fixed a bug where executor info lingers on master if failed to launch.

Master relies on `ExitedExecutorMessage` from the agent to remove
executor entries. However, this message won't be sent if an executor
never actually launched (due to transient error), leaving executor
info on the master and the executor's resources claimed.
See MESOS-1720.

This patch fixes this issue by sending the `ExitedExecutorMessage`
from the agent if the executor is never launched.

Review: https://reviews.apache.org/r/65449/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b5350fec
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b5350fec
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b5350fec

Branch: refs/heads/master
Commit: b5350fecc8604bdddb45303d9363aff4ca60cfcc
Parents: 0321b85
Author: Meng Zhu <mz...@mesosphere.io>
Authored: Tue Feb 13 22:45:07 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Feb 14 02:31:28 2018 -0800

----------------------------------------------------------------------
 src/slave/slave.cpp       | 276 +++++++++++++++++++++++++++++++++++++----
 src/slave/slave.hpp       |  10 +-
 src/tests/mock_slave.cpp  |   8 +-
 src/tests/mock_slave.hpp  |   8 +-
 src/tests/slave_tests.cpp |  35 ++++--
 5 files changed, 292 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b5350fec/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index f96c97e..c5ec62c 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1937,7 +1937,13 @@ void Slave::runTask(
 
   const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task);
 
-  run(frameworkInfo, executorInfo, task, None(), resourceVersionUuids, pid);
+  run(frameworkInfo,
+      executorInfo,
+      task,
+      None(),
+      resourceVersionUuids,
+      pid,
+      launchExecutor);
 }
 
 
@@ -1947,7 +1953,8 @@ void Slave::run(
     Option<TaskInfo> task,
     Option<TaskGroupInfo> taskGroup,
     const vector<ResourceVersionUUID>& resourceVersionUuids,
-    const UPID& pid)
+    const UPID& pid,
+    const Option<bool>& launchExecutor)
 {
   CHECK_NE(task.isSome(), taskGroup.isSome())
     << "Either task or task group should be set but not both";
@@ -2032,6 +2039,10 @@ void Slave::run(
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " because the agent is " << state;
 
+    // We do not send `ExitedExecutorMessage` here because the disconnected
+    // agent is expected to (eventually) reregister and reconcile the executor
+    // states with the master.
+
     // TODO(vinod): Consider sending a TASK_LOST here.
     // Currently it is tricky because 'statusUpdate()'
     // ignores updates for unknown frameworks.
@@ -2152,7 +2163,8 @@ void Slave::run(
                  executorInfo,
                  task,
                  taskGroup,
-                 resourceVersionUuids));
+                 resourceVersionUuids,
+                 launchExecutor));
 }
 
 
@@ -2162,7 +2174,8 @@ void Slave::_run(
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& task,
     const Option<TaskGroupInfo>& taskGroup,
-    const std::vector<ResourceVersionUUID>& resourceVersionUuids)
+    const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   // TODO(anindya_sinha): Consider refactoring the initial steps common
   // to `_run()` and `__run()`.
@@ -2184,6 +2197,14 @@ void Slave::_run(
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " because the framework " << frameworkId
                  << " does not exist";
+
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2204,6 +2225,13 @@ void Slave::_run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2228,6 +2256,14 @@ void Slave::_run(
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " of framework " << frameworkId
                  << " because it has been killed in the meantime";
+
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2272,6 +2308,13 @@ void Slave::_run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2295,7 +2338,8 @@ void Slave::_run(
                  executorInfo,
                  task,
                  taskGroup,
-                 resourceVersionUuids));
+                 resourceVersionUuids,
+                 launchExecutor));
 }
 
 
@@ -2305,7 +2349,8 @@ void Slave::__run(
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& task,
     const Option<TaskGroupInfo>& taskGroup,
-    const vector<ResourceVersionUUID>& resourceVersionUuids)
+    const vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   CHECK_NE(task.isSome(), taskGroup.isSome())
     << "Either task or task group should be set but not both";
@@ -2325,6 +2370,14 @@ void Slave::__run(
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " because the framework " << frameworkId
                  << " does not exist";
+
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2347,6 +2400,13 @@ void Slave::__run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2371,6 +2431,14 @@ void Slave::__run(
     LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
                  << " of framework " << frameworkId
                  << " because it has been killed in the meantime";
+
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2447,6 +2515,13 @@ void Slave::__run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2527,6 +2602,13 @@ void Slave::__run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2594,6 +2676,13 @@ void Slave::__run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2647,6 +2736,13 @@ void Slave::__run(
       removeFramework(framework);
     }
 
+    if (launchExecutor.isSome() && launchExecutor.get()) {
+      // Master expects new executor to be launched for this task(s) launch.
+      // To keep the master executor entries updated, the agent needs to send
+      // 'ExitedExecutorMessage' even though no executor launched.
+      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+    }
+
     return;
   }
 
@@ -2666,8 +2762,8 @@ void Slave::__run(
       removeFramework(framework);
     }
 
-    // We don't send a TASK_LOST here because the slave is
-    // terminating.
+    // We don't send TASK_LOST or ExitedExecutorMessage here because the slave
+    // is terminating.
     return;
   }
 
@@ -2676,29 +2772,159 @@ void Slave::__run(
   LOG(INFO) << "Launching " << taskOrTaskGroup(task, taskGroup)
             << " for framework " << frameworkId;
 
-  // Either send the task/task group to an executor or start a new executor
-  // and queue it until the executor has started.
-  Executor* executor = framework->getExecutor(executorId);
-
-  if (executor == nullptr) {
-    executor = framework->addExecutor(executorInfo);
+  auto doLaunchExecutor = [&]() {
+    Executor* executor = framework->addExecutor(executorInfo);
 
     if (secretGenerator) {
       generateSecret(framework->id(), executor->id, executor->containerId)
         .onAny(defer(
-            self(),
-            &Self::launchExecutor,
-            lambda::_1,
-            frameworkId,
-            executorId,
-            taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()));
+              self(),
+              &Self::launchExecutor,
+              lambda::_1,
+              frameworkId,
+              executorId,
+              taskGroup.isNone() ? task.get() : Option<TaskInfo>::none()));
     } else {
-      launchExecutor(
+      Slave::launchExecutor(
           None(),
           frameworkId,
           executorId,
           taskGroup.isNone() ? task.get() : Option<TaskInfo>::none());
     }
+
+    return executor;
+  };
+
+  Executor* executor = framework->getExecutor(executorId);
+
+  if (launchExecutor.isNone()) {
+    // This is the legacy case where the master did not set the
+    // `launch_executor` flag. Executor will be launched if there is none.
+    if (executor == nullptr) {
+      executor = doLaunchExecutor();
+    }
+  } else {
+    if (taskGroup.isNone() && task.get().has_command()) {
+      // We are dealing with command task; a new command executor will be
+      // launched.
+      CHECK(executor == nullptr);
+      executor = doLaunchExecutor();
+    } else {
+      // Master set the `launch_executor` flag and this is not a command task.
+      if (launchExecutor.get()) {
+        // Master requests launching a new executor.
+        if (executor == nullptr) {
+          executor = doLaunchExecutor();
+        } else {
+          // Master requests launching executor but an executor still exits
+          // on the agent. In this case we will drop tasks. This could happen if
+          // the executor is already terminated on the agent (and agent has sent
+          // out the `ExitedExecutorMessage` and it was received by the master)
+          // but the agent is still waiting for all the status updates to be
+          // acked before removing the executor struct.
+
+          // We report TASK_DROPPED to the framework because the task was
+          // never launched. For non-partition-aware frameworks, we report
+          // TASK_LOST for backward compatibility.
+          mesos::TaskState taskState = TASK_DROPPED;
+          if (!protobuf::frameworkHasCapability(
+              frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+            taskState = TASK_LOST;
+          }
+
+          foreach (const TaskInfo& _task, tasks) {
+            const StatusUpdate update = protobuf::createStatusUpdate(
+                frameworkId,
+                info.id(),
+                _task.task_id(),
+                taskState,
+                TaskStatus::SOURCE_SLAVE,
+                id::UUID::random(),
+                "Master wants to launch executor, but there already exits one",
+                TaskStatus::REASON_EXECUTOR_TERMINATED,
+                executorId);
+
+            statusUpdate(update, UPID());
+          }
+
+          // Master expects new executor to be launched for this task(s) launch.
+          // To keep the master executor entries updated, the agent needs to
+          // send 'ExitedExecutorMessage' even though no executor launched.
+          if (executor->state == Executor::TERMINATED) {
+            sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+          } else {
+            // This could happen if the following sequence of events happen:
+            //
+            //  (1) Master sends `runTaskMessage` to agent with
+            //      `launch_executor = true`;
+            //
+            //  (2) Before the agent got the `runTaskMessage`, it reconnects and
+            //      reconciles with the master. Master then removes the executor
+            //      entry it asked the agent to launch in step (1);
+            //
+            //  (3) Agent got the `runTaskMessage` sent in step (1), launches
+            //      the task and the executor (that the master does not know
+            //      about).
+            //
+            //  (4) Master now sends another `runTaskMessage` for the same
+            //      executor id with `launch_executor = true`.
+            //
+            // The agent ends up with a lingering executor that the master does
+            // not know about. We will shutdown the executor.
+            //
+            // TODO(mzhu): This could be avoided if the agent can
+            // tell whether the master's message was sent before or after the
+            // reconnection and discard the message in the former case.
+            //
+            // TODO(mzhu): Master needs to do proper executor reconciliation
+            // with the agent to avoid this from happening.
+            _shutdownExecutor(framework, executor);
+          }
+
+          return;
+        }
+      } else {
+        // Master does not want to launch executor.
+        if (executor == nullptr) {
+          // Master wants no new executor launched and there is none running on
+          // the agent. This could happen if the task expects some previous
+          // tasks to launch the executor. However, the earlier task got killed
+          // or dropped hence did not launch the executor but the master doesn't
+          // know about it yet because the `ExitedExecutorMessage` is still in
+          // flight. In this case, we will drop the task.
+          //
+          // We report TASK_DROPPED to the framework because the task was
+          // never launched. For non-partition-aware frameworks, we report
+          // TASK_LOST for backward compatibility.
+          mesos::TaskState taskState = TASK_DROPPED;
+          if (!protobuf::frameworkHasCapability(
+              frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+            taskState = TASK_LOST;
+          }
+
+          foreach (const TaskInfo& _task, tasks) {
+            const StatusUpdate update = protobuf::createStatusUpdate(
+                frameworkId,
+                info.id(),
+                _task.task_id(),
+                taskState,
+                TaskStatus::SOURCE_SLAVE,
+                id::UUID::random(),
+                "No executor is expected to launch and there is none running",
+                TaskStatus::REASON_EXECUTOR_TERMINATED,
+                executorId);
+
+            statusUpdate(update, UPID());
+          }
+
+          // We do not send `ExitedExecutorMessage` here because the expectation
+          // is that there is already one on the fly to master. If the message
+          // gets dropped, we will hopefully reconcile with the master later.
+
+          return;
+        }
+      }
+    }
   }
 
   CHECK_NOTNULL(executor);
@@ -3311,20 +3537,22 @@ void Slave::runTaskGroup(
     return;
   }
 
+  // TODO(mzhu): Consider doing a `CHECK` here since this shouldn't be possible.
   if (taskGroupInfo.tasks().empty()) {
     LOG(ERROR) << "Ignoring run task group message from " << from
                << " for framework " << frameworkInfo.id()
                << " because it has no tasks";
+
     return;
   }
 
-  run(
-      frameworkInfo,
+  run(frameworkInfo,
       executorInfo,
       None(),
       taskGroupInfo,
       resourceVersionUuids,
-      UPID());
+      UPID(),
+      launchExecutor);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b5350fec/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 7840ce8..9cd3597 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -166,7 +166,8 @@ public:
       Option<TaskInfo> task,
       Option<TaskGroupInfo> taskGroup,
       const std::vector<ResourceVersionUUID>& resourceVersionUuids,
-      const process::UPID& pid);
+      const process::UPID& pid,
+      const Option<bool>& launchExecutor);
 
   // Made 'virtual' for Slave mocking.
   virtual void _run(
@@ -175,7 +176,8 @@ public:
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
       const Option<TaskGroupInfo>& taskGroup,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
 
   // This is called when the resource limits of the container have
   // been updated for the given tasks and task groups. If the update is
@@ -414,7 +416,9 @@ public:
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
       const Option<TaskGroupInfo>& taskGroup,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
+
 
   process::Future<Secret> generateSecret(
       const FrameworkID& frameworkId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/b5350fec/src/tests/mock_slave.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp
index cf383bb..9b06b9c 100644
--- a/src/tests/mock_slave.cpp
+++ b/src/tests/mock_slave.cpp
@@ -128,7 +128,7 @@ MockSlave::MockSlave(
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked____run));
   EXPECT_CALL(*this, runTask(_, _, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
-  EXPECT_CALL(*this, _run(_, _, _, _, _, _))
+  EXPECT_CALL(*this, _run(_, _, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked__run));
   EXPECT_CALL(*this, runTaskGroup(_, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTaskGroup));
@@ -195,7 +195,8 @@ void MockSlave::unmocked__run(
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& taskInfo,
     const Option<TaskGroupInfo>& taskGroup,
-    const std::vector<ResourceVersionUUID>& resourceVersionUuids)
+    const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+    const Option<bool>& launchExecutor)
 {
   slave::Slave::_run(
       unschedules,
@@ -203,7 +204,8 @@ void MockSlave::unmocked__run(
       executorInfo,
       taskInfo,
       taskGroup,
-      resourceVersionUuids);
+      resourceVersionUuids,
+      launchExecutor);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b5350fec/src/tests/mock_slave.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp
index 39a8651..9ed3e55 100644
--- a/src/tests/mock_slave.hpp
+++ b/src/tests/mock_slave.hpp
@@ -135,13 +135,14 @@ public:
       const std::vector<ResourceVersionUUID>& resourceVersionUuids,
       const Option<bool>& launchExecutor);
 
-  MOCK_METHOD6(_run, void(
+  MOCK_METHOD7(_run, void(
       const process::Future<std::list<bool>>& unschedules,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
       const Option<TaskGroupInfo>& taskGroup,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids));
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor));
 
   void unmocked__run(
       const process::Future<std::list<bool>>& unschedules,
@@ -149,7 +150,8 @@ public:
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
       const Option<TaskGroupInfo>& taskGroup,
-      const std::vector<ResourceVersionUUID>& resourceVersionUuids);
+      const std::vector<ResourceVersionUUID>& resourceVersionUuids,
+      const Option<bool>& launchExecutor);
 
   MOCK_METHOD6(runTaskGroup, void(
       const process::UPID& from,

http://git-wip-us.apache.org/repos/asf/mesos/blob/b5350fec/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 3e82991..55cb579 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1835,7 +1835,7 @@ TEST_F(SlaveTest, GetStateTaskGroupPending)
   // unmocked `_run()` method. Instead, we want to do nothing so that tasks
   // remain in the framework's 'pending' list.
   Future<Nothing> _run;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
     .WillOnce(FutureSatisfy(&_run));
 
   // The executor should not be launched.
@@ -4124,18 +4124,20 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   Option<TaskGroupInfo> taskGroup;
   Option<TaskInfo> task_;
   vector<ResourceVersionUUID> resourceVersionUuids;
+  Option<bool> launchExecutor;
   // Skip what Slave::_run() normally does, save its arguments for
   // later, tie reaching the critical moment when to kill the task to
   // a future.
   Future<Nothing> _run;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
                     SaveArg<0>(&unschedules),
                     SaveArg<1>(&frameworkInfo),
                     SaveArg<2>(&executorInfo),
                     SaveArg<3>(&task_),
                     SaveArg<4>(&taskGroup),
-                    SaveArg<5>(&resourceVersionUuids)));
+                    SaveArg<5>(&resourceVersionUuids),
+                    SaveArg<6>(&launchExecutor)));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
@@ -4167,7 +4169,8 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
       executorInfo,
       task_,
       taskGroup,
-      resourceVersionUuids);
+      resourceVersionUuids,
+      launchExecutor);
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
@@ -4252,23 +4255,26 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   Option<TaskGroupInfo> taskGroup1, taskGroup2;
   Option<TaskInfo> task_1, task_2;
   vector<ResourceVersionUUID> resourceVersionUuids1, resourceVersionUuids2;
+  Option<bool> launchExecutor1, launchExecutor2;
 
   Future<Nothing> _run1, _run2;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run1),
                     SaveArg<0>(&unschedules1),
                     SaveArg<1>(&frameworkInfo1),
                     SaveArg<2>(&executorInfo1),
                     SaveArg<3>(&task_1),
                     SaveArg<4>(&taskGroup1),
-                    SaveArg<5>(&resourceVersionUuids1)))
+                    SaveArg<5>(&resourceVersionUuids1),
+                    SaveArg<6>(&launchExecutor1)))
     .WillOnce(DoAll(FutureSatisfy(&_run2),
                     SaveArg<0>(&unschedules2),
                     SaveArg<1>(&frameworkInfo2),
                     SaveArg<2>(&executorInfo2),
                     SaveArg<3>(&task_2),
                     SaveArg<4>(&taskGroup2),
-                    SaveArg<5>(&resourceVersionUuids2)));
+                    SaveArg<5>(&resourceVersionUuids2),
+                    SaveArg<6>(&launchExecutor2)));
 
   driver.launchTasks(offers.get()[0].id(), {task1, task2});
 
@@ -4310,7 +4316,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
       executorInfo1,
       task_1,
       taskGroup1,
-      resourceVersionUuids1);
+      resourceVersionUuids1,
+      launchExecutor1);
 
   slave.get()->mock()->unmocked__run(
       unschedules2,
@@ -4318,7 +4325,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
       executorInfo2,
       task_2,
       taskGroup2,
-      resourceVersionUuids2);
+      resourceVersionUuids2,
+      launchExecutor2);
 
   Clock::settle();
 
@@ -7679,19 +7687,21 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   Option<TaskGroupInfo> taskGroup_;
   Option<TaskInfo> task_;
   vector<ResourceVersionUUID> resourceVersionUuids;
+  Option<bool> launchExecutor;
 
   // Skip what `Slave::_run()` normally does, save its arguments for
   // later, till reaching the critical moment when to kill the task
   // in the future.
   Future<Nothing> _run;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
                     SaveArg<0>(&unschedules),
                     SaveArg<1>(&frameworkInfo),
                     SaveArg<2>(&executorInfo_),
                     SaveArg<3>(&task_),
                     SaveArg<4>(&taskGroup_),
-                    SaveArg<5>(&resourceVersionUuids)));
+                    SaveArg<5>(&resourceVersionUuids),
+                    SaveArg<6>(&launchExecutor)));
 
   const v1::Offer& offer = offers->offers(0);
   const SlaveID slaveId = devolve(offer.agent_id());
@@ -7757,7 +7767,8 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
       executorInfo_,
       task_,
       taskGroup_,
-      resourceVersionUuids);
+      resourceVersionUuids,
+      launchExecutor);
 
   AWAIT_READY(update1);
   AWAIT_READY(update2);