You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/04/07 07:28:10 UTC

[1/4] mesos git commit: Refactored agent task launch for better composition [1/2].

Repository: mesos
Updated Branches:
  refs/heads/1.5.x d2289d169 -> 3f8b19a92


Refactored agent task launch for better composition [1/2].

This helps to encapsulate a task launch into a single
future which will come in handy when enforcing the task
launch order.

This patch also consolidated the error handling code
in the task launch path.

Affected tests are also updated.

Review: https://reviews.apache.org/r/66126/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/10c3a316
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/10c3a316
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/10c3a316

Branch: refs/heads/1.5.x
Commit: 10c3a31646035fabb2e3b4ad8f2708d777cf7419
Parents: d2289d1
Author: Greg Mann <gr...@gmail.com>
Authored: Thu Apr 5 11:17:35 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Thu Apr 5 11:50:24 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       | 190 +++++++++++++++++++++--------------------
 src/slave/slave.hpp       |   9 +-
 src/tests/mock_slave.cpp  |   8 +-
 src/tests/mock_slave.hpp  |   6 +-
 src/tests/slave_tests.cpp | 146 ++++++++++++++++---------------
 5 files changed, 186 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/10c3a316/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c0501f8..927d384 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2152,22 +2152,88 @@ void Slave::run(
     }
   }
 
+  auto onUnscheduleGCFailure =
+    [=](const Future<list<bool>>& unschedules) -> Future<list<bool>> {
+      LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
+                 << unschedules.failure();
+
+      Framework* _framework = getFramework(frameworkId);
+      if (_framework == nullptr) {
+        const string error =
+          "Cannot handle unschedule GC failure for " +
+          taskOrTaskGroup(task, taskGroup) + " because the framework " +
+          stringify(frameworkId) + " does not exist";
+
+        LOG(WARNING) << error;
+
+        return Failure(error);
+      }
+
+      // We report TASK_DROPPED to the framework because the task was
+      // never launched. For non-partition-aware frameworks, we report
+      // TASK_LOST for backward compatibility.
+      mesos::TaskState taskState = TASK_DROPPED;
+      if (!protobuf::frameworkHasCapability(
+              frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
+        taskState = TASK_LOST;
+      }
+
+      foreach (const TaskInfo& _task, tasks) {
+        _framework->removePendingTask(_task.task_id());
+
+        const StatusUpdate update = protobuf::createStatusUpdate(
+            frameworkId,
+            info.id(),
+            _task.task_id(),
+            taskState,
+            TaskStatus::SOURCE_SLAVE,
+            id::UUID::random(),
+            "Could not launch the task because we failed to unschedule"
+            " directories scheduled for gc",
+            TaskStatus::REASON_GC_ERROR);
+
+        // TODO(vinod): Ensure that the task status update manager
+        // reliably delivers this update. Currently, we don't guarantee
+        // this because removal of the framework causes the status
+        // update manager to stop retrying for its un-acked updates.
+        statusUpdate(update, UPID());
+      }
+
+      if (_framework->idle()) {
+        removeFramework(_framework);
+      }
+
+      return unschedules;
+  };
+
   // Run the task after the unschedules are done.
   collect(unschedules)
-    .onAny(defer(self(),
-                 &Self::_run,
-                 lambda::_1,
-                 frameworkInfo,
-                 executorInfo,
-                 task,
-                 taskGroup,
-                 resourceVersionUuids,
-                 launchExecutor));
+    .repair(defer(self(), onUnscheduleGCFailure))
+    .then(defer(
+        self(),
+        &Self::_run,
+        frameworkInfo,
+        executorInfo,
+        task,
+        taskGroup,
+        resourceVersionUuids,
+        launchExecutor))
+    .recover(defer(self(),
+      [=](const Future<Nothing>& future) -> Future<Nothing> {
+        if (launchExecutor.isSome() && launchExecutor.get()) {
+          // Master expects new executor to be launched for this task launch.
+          // To keep the master executor entries updated, the agent needs to
+          // send 'ExitedExecutorMessage' even though no executor launched.
+          sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+        }
+
+        return future;
+      }
+    ));
 }
 
 
-void Slave::_run(
-    const Future<list<bool>>& unschedules,
+Future<Nothing> Slave::_run(
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& task,
@@ -2192,26 +2258,24 @@ void Slave::_run(
   const FrameworkID& frameworkId = frameworkInfo.id();
   Framework* framework = getFramework(frameworkId);
   if (framework == nullptr) {
-    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
-                 << " because the framework " << frameworkId
-                 << " does not exist";
+    const string error =
+      "Ignoring running " + taskOrTaskGroup(task, taskGroup) +
+      " because the framework " + stringify(frameworkId) + " does not exist";
 
-    if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
-      // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
-      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-    }
+    LOG(WARNING) << error;
 
-    return;
+    return Failure(error);
   }
 
   // We don't send a status update here because a terminating
   // framework cannot send acknowledgements.
   if (framework->state == Framework::TERMINATING) {
-    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
-                 << " of framework " << frameworkId
-                 << " because the framework is terminating";
+    const string error = "Ignoring running " +
+                         taskOrTaskGroup(task, taskGroup) + " of framework " +
+                         stringify(frameworkId) +
+                         " because the framework is terminating";
+
+    LOG(WARNING) << error;
 
     // Although we cannot send a status update in this case, we remove
     // the affected tasks from the pending tasks.
@@ -2223,14 +2287,7 @@ void Slave::_run(
       removeFramework(framework);
     }
 
-    if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
-      // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
-      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-    }
-
-    return;
+    return Failure(error);
   }
 
   // Ignore the launch if killed in the interim. The invariant here
@@ -2251,69 +2308,14 @@ void Slave::_run(
     << " was killed partially";
 
   if (allRemoved) {
-    LOG(WARNING) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
-                 << " of framework " << frameworkId
-                 << " because it has been killed in the meantime";
+    const string error = "Ignoring running " +
+                         taskOrTaskGroup(task, taskGroup) + " of framework " +
+                         stringify(frameworkId) +
+                         " because it has been killed in the meantime";
 
-    if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
-      // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
-      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-    }
+    LOG(WARNING) << error;
 
-    return;
-  }
-
-  CHECK(!unschedules.isDiscarded());
-
-  if (!unschedules.isReady()) {
-    LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
-               << (unschedules.isFailed() ?
-                   unschedules.failure() : "future discarded");
-
-    // We report TASK_DROPPED to the framework because the task was
-    // never launched. For non-partition-aware frameworks, we report
-    // TASK_LOST for backward compatibility.
-    mesos::TaskState taskState = TASK_DROPPED;
-    if (!protobuf::frameworkHasCapability(
-            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
-      taskState = TASK_LOST;
-    }
-
-    foreach (const TaskInfo& _task, tasks) {
-      framework->removePendingTask(_task.task_id());
-
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          taskState,
-          TaskStatus::SOURCE_SLAVE,
-          id::UUID::random(),
-          "Could not launch the task because we failed to unschedule"
-          " directories scheduled for gc",
-          TaskStatus::REASON_GC_ERROR);
-
-      // TODO(vinod): Ensure that the task status update manager
-      // reliably delivers this update. Currently, we don't guarantee
-      // this because removal of the framework causes the status
-      // update manager to stop retrying for its un-acked updates.
-      statusUpdate(update, UPID());
-    }
-
-    if (framework->idle()) {
-      removeFramework(framework);
-    }
-
-    if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
-      // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
-      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-    }
-
-    return;
+    return Failure(error);
   }
 
   // Authorize the task or tasks (as in a task group) to ensure that the
@@ -2338,6 +2340,8 @@ void Slave::_run(
                  taskGroup,
                  resourceVersionUuids,
                  launchExecutor));
+
+  return Nothing();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/10c3a316/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 75e8ccd..28bbcc4 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -170,8 +170,13 @@ public:
       const Option<bool>& launchExecutor);
 
   // Made 'virtual' for Slave mocking.
-  virtual void _run(
-      const process::Future<std::list<bool>>& unschedules,
+  //
+  // This function returns a future so that we can encapsulate a task(group)
+  // launch operation (from agent receiving the run message to the completion
+  // of `_run()`) into a single future. This includes all the asynchronous
+  // steps (currently two: unschedule GC and task authorization) prior to the
+  // executor launch.
+  virtual process::Future<Nothing> _run(
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,

http://git-wip-us.apache.org/repos/asf/mesos/blob/10c3a316/src/tests/mock_slave.cpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.cpp b/src/tests/mock_slave.cpp
index f73a45f..5e72e82 100644
--- a/src/tests/mock_slave.cpp
+++ b/src/tests/mock_slave.cpp
@@ -120,7 +120,7 @@ MockSlave::MockSlave(
   // Set up default behaviors, calling the original methods.
   EXPECT_CALL(*this, runTask(_, _, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTask));
-  EXPECT_CALL(*this, _run(_, _, _, _, _, _, _))
+  EXPECT_CALL(*this, _run(_, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked__run));
   EXPECT_CALL(*this, runTaskGroup(_, _, _, _, _, _))
     .WillRepeatedly(Invoke(this, &MockSlave::unmocked_runTaskGroup));
@@ -161,8 +161,7 @@ void MockSlave::unmocked_runTask(
 }
 
 
-void MockSlave::unmocked__run(
-    const Future<list<bool>>& unschedules,
+Future<Nothing> MockSlave::unmocked__run(
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& taskInfo,
@@ -170,8 +169,7 @@ void MockSlave::unmocked__run(
     const std::vector<ResourceVersionUUID>& resourceVersionUuids,
     const Option<bool>& launchExecutor)
 {
-  slave::Slave::_run(
-      unschedules,
+  return slave::Slave::_run(
       frameworkInfo,
       executorInfo,
       taskInfo,

http://git-wip-us.apache.org/repos/asf/mesos/blob/10c3a316/src/tests/mock_slave.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mock_slave.hpp b/src/tests/mock_slave.hpp
index 42f7d55..600789f 100644
--- a/src/tests/mock_slave.hpp
+++ b/src/tests/mock_slave.hpp
@@ -119,8 +119,7 @@ public:
       const std::vector<ResourceVersionUUID>& resourceVersionUuids,
       const Option<bool>& launchExecutor);
 
-  MOCK_METHOD7(_run, void(
-      const process::Future<std::list<bool>>& unschedules,
+  MOCK_METHOD6(_run, process::Future<Nothing>(
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
@@ -128,8 +127,7 @@ public:
       const std::vector<ResourceVersionUUID>& resourceVersionUuids,
       const Option<bool>& launchExecutor));
 
-  void unmocked__run(
-      const process::Future<std::list<bool>>& unschedules,
+  process::Future<Nothing> unmocked__run(
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,

http://git-wip-us.apache.org/repos/asf/mesos/blob/10c3a316/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 95990c4..95a61cb 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1834,8 +1834,9 @@ TEST_F(SlaveTest, GetStateTaskGroupPending)
   // unmocked `_run()` method. Instead, we want to do nothing so that tasks
   // remain in the framework's 'pending' list.
   Future<Nothing> _run;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
-    .WillOnce(FutureSatisfy(&_run));
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
+    .WillOnce(DoAll(FutureSatisfy(&_run),
+                    Return(Nothing())));
 
   // The executor should not be launched.
   EXPECT_CALL(*executor, connected(_))
@@ -4114,7 +4115,6 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
     .WillOnce(Invoke(slave.get()->mock(), &MockSlave::unmocked_runTask));
 
   // Saved arguments from Slave::_run().
-  Future<list<bool>> unschedules;
   FrameworkInfo frameworkInfo;
   ExecutorInfo executorInfo;
   Option<TaskGroupInfo> taskGroup;
@@ -4125,15 +4125,15 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   // later, tie reaching the critical moment when to kill the task to
   // a future.
   Future<Nothing> _run;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
-                    SaveArg<0>(&unschedules),
-                    SaveArg<1>(&frameworkInfo),
-                    SaveArg<2>(&executorInfo),
-                    SaveArg<3>(&task_),
-                    SaveArg<4>(&taskGroup),
-                    SaveArg<5>(&resourceVersionUuids),
-                    SaveArg<6>(&launchExecutor)));
+                    SaveArg<0>(&frameworkInfo),
+                    SaveArg<1>(&executorInfo),
+                    SaveArg<2>(&task_),
+                    SaveArg<3>(&taskGroup),
+                    SaveArg<4>(&resourceVersionUuids),
+                    SaveArg<5>(&launchExecutor),
+                    Return(Nothing())));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
@@ -4159,18 +4159,23 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   // since there remain no more tasks.
   AWAIT_READY(removeFramework);
 
-  slave.get()->mock()->unmocked__run(
-      unschedules,
-      frameworkInfo,
-      executorInfo,
-      task_,
-      taskGroup,
-      resourceVersionUuids,
-      launchExecutor);
+  Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked__run(
+        frameworkInfo,
+        executorInfo,
+        task_,
+        taskGroup,
+        resourceVersionUuids,
+        launchExecutor);
+
+    return Nothing();
+  });
 
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
 
+  AWAIT(unmocked__run);
+
   driver.stop();
   driver.join();
 }
@@ -4245,7 +4250,6 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   // Skip what Slave::_run() normally does, save its arguments for
   // later, tie reaching the critical moment when to kill the task to
   // a future.
-  Future<list<bool>> unschedules1, unschedules2;
   FrameworkInfo frameworkInfo1, frameworkInfo2;
   ExecutorInfo executorInfo1, executorInfo2;
   Option<TaskGroupInfo> taskGroup1, taskGroup2;
@@ -4254,23 +4258,23 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   Option<bool> launchExecutor1, launchExecutor2;
 
   Future<Nothing> _run1, _run2;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run1),
-                    SaveArg<0>(&unschedules1),
-                    SaveArg<1>(&frameworkInfo1),
-                    SaveArg<2>(&executorInfo1),
-                    SaveArg<3>(&task_1),
-                    SaveArg<4>(&taskGroup1),
-                    SaveArg<5>(&resourceVersionUuids1),
-                    SaveArg<6>(&launchExecutor1)))
+                    SaveArg<0>(&frameworkInfo1),
+                    SaveArg<1>(&executorInfo1),
+                    SaveArg<2>(&task_1),
+                    SaveArg<3>(&taskGroup1),
+                    SaveArg<4>(&resourceVersionUuids1),
+                    SaveArg<5>(&launchExecutor1),
+                    Return(Nothing())))
     .WillOnce(DoAll(FutureSatisfy(&_run2),
-                    SaveArg<0>(&unschedules2),
-                    SaveArg<1>(&frameworkInfo2),
-                    SaveArg<2>(&executorInfo2),
-                    SaveArg<3>(&task_2),
-                    SaveArg<4>(&taskGroup2),
-                    SaveArg<5>(&resourceVersionUuids2),
-                    SaveArg<6>(&launchExecutor2)));
+                    SaveArg<0>(&frameworkInfo2),
+                    SaveArg<1>(&executorInfo2),
+                    SaveArg<2>(&task_2),
+                    SaveArg<3>(&taskGroup2),
+                    SaveArg<4>(&resourceVersionUuids2),
+                    SaveArg<5>(&launchExecutor2),
+                    Return(Nothing())));
 
   driver.launchTasks(offers.get()[0].id(), {task1, task2});
 
@@ -4306,23 +4310,25 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   AWAIT_READY(removeFramework);
 
   // The `__run` continuations should have no effect.
-  slave.get()->mock()->unmocked__run(
-      unschedules1,
-      frameworkInfo1,
-      executorInfo1,
-      task_1,
-      taskGroup1,
-      resourceVersionUuids1,
-      launchExecutor1);
-
-  slave.get()->mock()->unmocked__run(
-      unschedules2,
-      frameworkInfo2,
-      executorInfo2,
-      task_2,
-      taskGroup2,
-      resourceVersionUuids2,
-      launchExecutor2);
+  process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked__run(
+        frameworkInfo1,
+        executorInfo1,
+        task_1,
+        taskGroup1,
+        resourceVersionUuids1,
+        launchExecutor1);
+  });
+
+  process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked__run(
+        frameworkInfo2,
+        executorInfo2,
+        task_2,
+        taskGroup2,
+        resourceVersionUuids2,
+        launchExecutor2);
+  });
 
   Clock::settle();
 
@@ -7200,7 +7206,6 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
                      &MockSlave::unmocked_runTaskGroup));
 
   // Saved arguments from `Slave::_run()`.
-  Future<list<bool>> unschedules;
   FrameworkInfo frameworkInfo;
   ExecutorInfo executorInfo_;
   Option<TaskGroupInfo> taskGroup_;
@@ -7212,15 +7217,15 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   // later, till reaching the critical moment when to kill the task
   // in the future.
   Future<Nothing> _run;
-  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _, _))
+  EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
-                    SaveArg<0>(&unschedules),
-                    SaveArg<1>(&frameworkInfo),
-                    SaveArg<2>(&executorInfo_),
-                    SaveArg<3>(&task_),
-                    SaveArg<4>(&taskGroup_),
-                    SaveArg<5>(&resourceVersionUuids),
-                    SaveArg<6>(&launchExecutor)));
+                    SaveArg<0>(&frameworkInfo),
+                    SaveArg<1>(&executorInfo_),
+                    SaveArg<2>(&task_),
+                    SaveArg<3>(&taskGroup_),
+                    SaveArg<4>(&resourceVersionUuids),
+                    SaveArg<5>(&launchExecutor),
+                    Return(Nothing())));
 
   const v1::Offer& offer = offers->offers(0);
   const SlaveID slaveId = devolve(offer.agent_id());
@@ -7280,14 +7285,17 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
 
   AWAIT_READY(removeFramework);
 
-  slave.get()->mock()->unmocked__run(
-      unschedules,
-      frameworkInfo,
-      executorInfo_,
-      task_,
-      taskGroup_,
-      resourceVersionUuids,
-      launchExecutor);
+  Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] {
+    slave.get()->mock()->unmocked__run(
+        frameworkInfo,
+        executorInfo_,
+        task_,
+        taskGroup_,
+        resourceVersionUuids,
+        launchExecutor);
+
+    return Nothing();
+  });
 
   AWAIT_READY(update1);
   AWAIT_READY(update2);


[2/4] mesos git commit: Refactored agent task launch for better composition [2/2].

Posted by gr...@apache.org.
Refactored agent task launch for better composition [2/2].

This helps to encapsulate a task launch into a single
future which will come in handy when enforcing the task
launch order.

Affected tests are also updated.

Review: https://reviews.apache.org/r/66143/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a9cb8d8c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a9cb8d8c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a9cb8d8c

Branch: refs/heads/1.5.x
Commit: a9cb8d8c138b1dbdbf630439e33ddd3c652bee9f
Parents: 10c3a31
Author: Meng Zhu <mz...@mesosphere.io>
Authored: Wed Apr 4 16:36:42 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Apr 6 23:42:32 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp       | 224 ++++++++++++++++++++++-------------------
 src/slave/slave.hpp       |   2 -
 src/tests/slave_tests.cpp |  58 ++++++-----
 3 files changed, 155 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a9cb8d8c/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 927d384..2044b6e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2206,8 +2206,9 @@ void Slave::run(
       return unschedules;
   };
 
-  // Run the task after the unschedules are done.
-  collect(unschedules)
+  // Handle any unschedule GC failure. If unschedule GC succeeds, trigger
+  // the next continuations.
+  Future<Nothing> taskLaunch = collect(unschedules)
     .repair(defer(self(), onUnscheduleGCFailure))
     .then(defer(
         self(),
@@ -2217,19 +2218,29 @@ void Slave::run(
         task,
         taskGroup,
         resourceVersionUuids,
-        launchExecutor))
-    .recover(defer(self(),
-      [=](const Future<Nothing>& future) -> Future<Nothing> {
-        if (launchExecutor.isSome() && launchExecutor.get()) {
-          // Master expects new executor to be launched for this task launch.
-          // To keep the master executor entries updated, the agent needs to
-          // send 'ExitedExecutorMessage' even though no executor launched.
-          sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-        }
+        launchExecutor));
 
-        return future;
+  taskLaunch
+    .onReady(defer(
+        self(),
+        &Self::__run,
+        frameworkInfo,
+        executorInfo,
+        task,
+        taskGroup,
+        resourceVersionUuids,
+        launchExecutor))
+    .onFailed(defer(self(), [=](const string& failure) {
+      if (launchExecutor.isSome() && launchExecutor.get()) {
+        // Master expects new executor to be launched for this task launch.
+        // To keep the master executor entries updated, the agent needs to send
+        // 'ExitedExecutorMessage' even though no executor launched.
+        sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
       }
-    ));
+    }));
+
+  // TODO(mzhu): Consolidate error handling code in `__run` here with
+  // then/recover pattern.
 }
 
 
@@ -2330,23 +2341,107 @@ Future<Nothing> Slave::_run(
     authorizations.push_back(authorizeTask(_task, frameworkInfo));
   }
 
-  collect(authorizations)
-    .onAny(defer(self(),
-                 &Self::__run,
-                 lambda::_1,
-                 frameworkInfo,
-                 executorInfo,
-                 task,
-                 taskGroup,
-                 resourceVersionUuids,
-                 launchExecutor));
+  auto onTaskAuthorizationFailure =
+    [=](const string& error, Framework* _framework) {
+      CHECK_NOTNULL(_framework);
 
-  return Nothing();
+      // For failed authorization, we send a TASK_ERROR status update
+      // for all tasks.
+      const TaskStatus::Reason reason = task.isSome()
+        ? TaskStatus::REASON_TASK_UNAUTHORIZED
+        : TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
+
+      LOG(ERROR) << "Authorization failed for "
+                 << taskOrTaskGroup(task, taskGroup) << " of framework "
+                 << frameworkId << ": " << error;
+
+      foreach (const TaskInfo& _task, tasks) {
+        _framework->removePendingTask(_task.task_id());
+
+        const StatusUpdate update = protobuf::createStatusUpdate(
+            frameworkId,
+            info.id(),
+            _task.task_id(),
+            TASK_ERROR,
+            TaskStatus::SOURCE_SLAVE,
+            id::UUID::random(),
+            error,
+            reason);
+
+        statusUpdate(update, UPID());
+      }
+
+      if (_framework->idle()) {
+        removeFramework(_framework);
+      }
+  };
+
+  return collect(authorizations)
+    .recover(defer(self(),
+      [=](const Future<list<bool>>& future) -> Future<list<bool>> {
+        CHECK(future.isFailed());
+
+        Framework* _framework = getFramework(frameworkId);
+        if (_framework == nullptr) {
+          const string error =
+            "Authorization failed for " + taskOrTaskGroup(task, taskGroup) +
+            " because the framework " + stringify(frameworkId) +
+            " does not exist";
+
+            LOG(WARNING) << error;
+
+          return Failure(error);
+        }
+
+        const string error =
+          "Failed to authorize " + taskOrTaskGroup(task, taskGroup) +
+          ": " + future.failure();
+
+        onTaskAuthorizationFailure(error, _framework);
+
+        return future;
+      }
+    ))
+    .then(defer(self(),
+      [=](const Future<list<bool>>& future) -> Future<Nothing> {
+        Framework* _framework = getFramework(frameworkId);
+        if (_framework == nullptr) {
+          const string error =
+            "Ignoring running " + taskOrTaskGroup(task, taskGroup) +
+            " because the framework " + stringify(frameworkId) +
+            " does not exist";
+
+            LOG(WARNING) << error;
+
+          return Failure(error);
+        }
+
+        list<bool> authorizations = future.get();
+
+        foreach (const TaskInfo& _task, tasks) {
+          bool authorized = authorizations.front();
+          authorizations.pop_front();
+
+          // If authorization for this task fails, we fail all tasks (in case
+          // of a task group) with this specific error.
+          if (!authorized) {
+            const string error =
+              "Framework " + stringify(frameworkId) +
+              " is not authorized to launch task " + stringify(_task);
+
+            onTaskAuthorizationFailure(error, _framework);
+
+            return Failure(error);
+          }
+        }
+
+        return Nothing();
+      }
+    ));
 }
 
 
 void Slave::__run(
-    const Future<list<bool>>& future,
     const FrameworkInfo& frameworkInfo,
     const ExecutorInfo& executorInfo,
     const Option<TaskInfo>& task,
@@ -2448,85 +2543,6 @@ void Slave::__run(
     CHECK(framework->removePendingTask(_task.task_id()));
   }
 
-  CHECK(!future.isDiscarded());
-
-  // Validate that the task (or tasks in case of task group) are authorized
-  // to be run on this agent.
-  Option<Error> error = None();
-  if (!future.isReady()) {
-    error = Error("Failed to authorize " + taskOrTaskGroup(task, taskGroup) +
-                  ": " + future.failure());
-  }
-
-  if (error.isNone()) {
-    list<bool> authorizations = future.get();
-
-    foreach (const TaskInfo& _task, tasks) {
-      bool authorized = authorizations.front();
-      authorizations.pop_front();
-
-      // If authorization for this task fails, we fail all tasks (in case of
-      // a task group) with this specific error.
-      if (!authorized) {
-        string user = frameworkInfo.user();
-
-        if (_task.has_command() && _task.command().has_user()) {
-          user = _task.command().user();
-        } else if (executorInfo.has_command() &&
-                   executorInfo.command().has_user()) {
-          user = executorInfo.command().user();
-        }
-
-        error = Error("Task '" + stringify(_task.task_id()) + "'"
-                      " is not authorized to launch as"
-                      " user '" + user + "'");
-
-        break;
-      }
-    }
-  }
-
-  // For failed authorization, we send a TASK_ERROR status update for
-  // all tasks.
-  if (error.isSome()) {
-    const TaskStatus::Reason reason = task.isSome()
-      ? TaskStatus::REASON_TASK_UNAUTHORIZED
-      : TaskStatus::REASON_TASK_GROUP_UNAUTHORIZED;
-
-    LOG(ERROR) << "Ignoring running " << taskOrTaskGroup(task, taskGroup)
-               << " of framework " << frameworkId
-               << ": " << error->message;
-
-    foreach (const TaskInfo& _task, tasks) {
-      const StatusUpdate update = protobuf::createStatusUpdate(
-          frameworkId,
-          info.id(),
-          _task.task_id(),
-          TASK_ERROR,
-          TaskStatus::SOURCE_SLAVE,
-          id::UUID::random(),
-          error->message,
-          reason);
-
-      statusUpdate(update, UPID());
-    }
-
-    // Refer to the comment after 'framework->removePendingTask' above
-    // for why we need this.
-    if (framework->idle()) {
-      removeFramework(framework);
-    }
-
-    if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
-      // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
-      sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-    }
-
-    return;
-  }
-
   // Check task invariants.
   //
   // TODO(bbannier): Instead of copy-pasting identical code to deal

http://git-wip-us.apache.org/repos/asf/mesos/blob/a9cb8d8c/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 28bbcc4..11cbbc6 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -393,7 +393,6 @@ public:
   virtual void exited(const process::UPID& pid);
 
   void __run(
-      const process::Future<std::list<bool>>& future,
       const FrameworkInfo& frameworkInfo,
       const ExecutorInfo& executorInfo,
       const Option<TaskInfo>& task,
@@ -401,7 +400,6 @@ public:
       const std::vector<ResourceVersionUUID>& resourceVersionUuids,
       const Option<bool>& launchExecutor);
 
-
   // This is called when the resource limits of the container have
   // been updated for the given tasks and task groups. If the update is
   // successful, we flush the given tasks to the executor by sending

http://git-wip-us.apache.org/repos/asf/mesos/blob/a9cb8d8c/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 95a61cb..b8669a0 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -1831,12 +1831,14 @@ TEST_F(SlaveTest, GetStateTaskGroupPending)
   const SlaveID slaveId = devolve(offer.agent_id());
 
   // Override the default expectation, which forwards calls to the agent's
-  // unmocked `_run()` method. Instead, we want to do nothing so that tasks
-  // remain in the framework's 'pending' list.
+  // unmocked `_run()` method. Instead, we return a pending future to pause
+  // the original continuation so that tasks remain in the framework's
+  // 'pending' list.
+  Promise<Nothing> promise;
   Future<Nothing> _run;
   EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
-                    Return(Nothing())));
+                    Return(promise.future())));
 
   // The executor should not be launched.
   EXPECT_CALL(*executor, connected(_))
@@ -4121,9 +4123,11 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   Option<TaskInfo> task_;
   vector<ResourceVersionUUID> resourceVersionUuids;
   Option<bool> launchExecutor;
+
   // Skip what Slave::_run() normally does, save its arguments for
-  // later, tie reaching the critical moment when to kill the task to
-  // a future.
+  // later, return a pending future to pause the original continuation,
+  // so that we can control when the task is killed.
+  Promise<Nothing> promise;
   Future<Nothing> _run;
   EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
@@ -4133,7 +4137,7 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
                     SaveArg<3>(&taskGroup),
                     SaveArg<4>(&resourceVersionUuids),
                     SaveArg<5>(&launchExecutor),
-                    Return(Nothing())));
+                    Return(promise.future())));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
@@ -4160,17 +4164,18 @@ TEST_F(SlaveTest, KillTaskBetweenRunTaskParts)
   AWAIT_READY(removeFramework);
 
   Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] {
-    slave.get()->mock()->unmocked__run(
+    return slave.get()->mock()->unmocked__run(
         frameworkInfo,
         executorInfo,
         task_,
         taskGroup,
         resourceVersionUuids,
         launchExecutor);
-
-    return Nothing();
   });
 
+  // Resume the original continuation once `unmocked__run` is complete.
+  promise.associate(unmocked__run);
+
   AWAIT_READY(status);
   EXPECT_EQ(TASK_KILLED, status->state());
 
@@ -4248,8 +4253,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
     .WillOnce(Invoke(slave.get()->mock(), &MockSlave::unmocked_runTask));
 
   // Skip what Slave::_run() normally does, save its arguments for
-  // later, tie reaching the critical moment when to kill the task to
-  // a future.
+  // later, return a pending future to pause the original continuation,
+  // so that we can control when the task is killed.
   FrameworkInfo frameworkInfo1, frameworkInfo2;
   ExecutorInfo executorInfo1, executorInfo2;
   Option<TaskGroupInfo> taskGroup1, taskGroup2;
@@ -4257,6 +4262,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   vector<ResourceVersionUUID> resourceVersionUuids1, resourceVersionUuids2;
   Option<bool> launchExecutor1, launchExecutor2;
 
+  Promise<Nothing> promise1, promise2;
   Future<Nothing> _run1, _run2;
   EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run1),
@@ -4266,7 +4272,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
                     SaveArg<3>(&taskGroup1),
                     SaveArg<4>(&resourceVersionUuids1),
                     SaveArg<5>(&launchExecutor1),
-                    Return(Nothing())))
+                    Return(promise1.future())))
     .WillOnce(DoAll(FutureSatisfy(&_run2),
                     SaveArg<0>(&frameworkInfo2),
                     SaveArg<1>(&executorInfo2),
@@ -4274,7 +4280,7 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
                     SaveArg<3>(&taskGroup2),
                     SaveArg<4>(&resourceVersionUuids2),
                     SaveArg<5>(&launchExecutor2),
-                    Return(Nothing())));
+                    Return(promise2.future())));
 
   driver.launchTasks(offers.get()[0].id(), {task1, task2});
 
@@ -4310,8 +4316,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
   AWAIT_READY(removeFramework);
 
   // The `__run` continuations should have no effect.
-  process::dispatch(slave.get()->pid, [=] {
-    slave.get()->mock()->unmocked__run(
+  Future<Nothing> unmocked__run1 = process::dispatch(slave.get()->pid, [=] {
+    return slave.get()->mock()->unmocked__run(
         frameworkInfo1,
         executorInfo1,
         task_1,
@@ -4320,8 +4326,8 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
         launchExecutor1);
   });
 
-  process::dispatch(slave.get()->pid, [=] {
-    slave.get()->mock()->unmocked__run(
+  Future<Nothing> unmocked__run2 = process::dispatch(slave.get()->pid, [=] {
+    return slave.get()->mock()->unmocked__run(
         frameworkInfo2,
         executorInfo2,
         task_2,
@@ -4330,6 +4336,10 @@ TEST_F(SlaveTest, KillMultiplePendingTasks)
         launchExecutor2);
   });
 
+  // Resume the original continuation once unmocked__run is complete.
+  promise1.associate(unmocked__run1);
+  promise2.associate(unmocked__run2);
+
   Clock::settle();
 
   driver.stop();
@@ -7214,8 +7224,9 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   Option<bool> launchExecutor;
 
   // Skip what `Slave::_run()` normally does, save its arguments for
-  // later, till reaching the critical moment when to kill the task
-  // in the future.
+  // later, return a pending future to pause the original continuation,
+  // till reaching the critical moment when to kill the task in the future.
+  Promise<Nothing> promise;
   Future<Nothing> _run;
   EXPECT_CALL(*slave.get()->mock(), _run(_, _, _, _, _, _))
     .WillOnce(DoAll(FutureSatisfy(&_run),
@@ -7225,7 +7236,7 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
                     SaveArg<3>(&taskGroup_),
                     SaveArg<4>(&resourceVersionUuids),
                     SaveArg<5>(&launchExecutor),
-                    Return(Nothing())));
+                    Return(promise.future())));
 
   const v1::Offer& offer = offers->offers(0);
   const SlaveID slaveId = devolve(offer.agent_id());
@@ -7286,17 +7297,18 @@ TEST_F(SlaveTest, KillTaskGroupBetweenRunTaskParts)
   AWAIT_READY(removeFramework);
 
   Future<Nothing> unmocked__run = process::dispatch(slave.get()->pid, [=] {
-    slave.get()->mock()->unmocked__run(
+    return slave.get()->mock()->unmocked__run(
         frameworkInfo,
         executorInfo_,
         task_,
         taskGroup_,
         resourceVersionUuids,
         launchExecutor);
-
-    return Nothing();
   });
 
+  // Resume the original continuation once `unmocked__run` is complete.
+  promise.associate(unmocked__run);
+
   AWAIT_READY(update1);
   AWAIT_READY(update2);
 


[3/4] mesos git commit: Fixed a potential race in `Sequence`.

Posted by gr...@apache.org.
Fixed a potential race in `Sequence`.

Adding item to sequence is realized by dispatching
`add()` to the sequence actor. However, this could
race with the sequence actor termination.

This patch fixes this by enqueueing the terminate
message at the end of the message queue.

Also removed the clock settle in the test `DiscardAll`.
As the processing of the messages are now guaranteed
to happen before the actor termination.

Also added comments to clarify the onDiscard propagation.

Review: https://reviews.apache.org/r/66322/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d075e3e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d075e3e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d075e3e

Branch: refs/heads/1.5.x
Commit: 2d075e3ee17d6220e73dd37969d12e9bda537346
Parents: a9cb8d8
Author: Meng Zhu <mz...@mesosphere.io>
Authored: Wed Apr 4 16:36:50 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Apr 6 23:42:39 2018 -0700

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/sequence.hpp | 15 ++++++++++++++-
 3rdparty/libprocess/src/tests/sequence_tests.cpp |  6 ------
 2 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2d075e3e/3rdparty/libprocess/include/process/sequence.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/sequence.hpp b/3rdparty/libprocess/include/process/sequence.hpp
index b4d7593..24712b1 100644
--- a/3rdparty/libprocess/include/process/sequence.hpp
+++ b/3rdparty/libprocess/include/process/sequence.hpp
@@ -113,6 +113,15 @@ public:
     // discarded. We use weak futures here to avoid cyclic dependencies.
 
     // Discard the future associated with this notifier.
+    //
+    // NOTE: When we discard the notifier future, any `onDiscard()` callbacks
+    // registered on `promise->future` will be invoked, but `onDiscard`
+    // callbacks registered on the future returned by `add()` will NOT be
+    // invoked. This is because currently discards do not propagate through
+    // `dispatch()`. In other words, users should be careful when registering
+    // `onDiscard` callbacks on the returned future.
+    //
+    // TODO(*): Propagate `onDiscard` through `dispatch`.
     notifier->future().onDiscard(
         lambda::bind(
             &internal::discard<T>,
@@ -175,7 +184,11 @@ inline Sequence::Sequence(const std::string& id)
 
 inline Sequence::~Sequence()
 {
-  process::terminate(process);
+  // We set `inject` to false so that the terminate message is added to the
+  // end of the sequence actor message queue. This guarantees that all `add()`
+  // calls which happened before the sequence destruction are processed.
+  // See MESOS-8741.
+  process::terminate(process, false);
   process::wait(process);
   delete process;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d075e3e/3rdparty/libprocess/src/tests/sequence_tests.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/tests/sequence_tests.cpp b/3rdparty/libprocess/src/tests/sequence_tests.cpp
index 43911b6..3c80a1d 100644
--- a/3rdparty/libprocess/src/tests/sequence_tests.cpp
+++ b/3rdparty/libprocess/src/tests/sequence_tests.cpp
@@ -195,12 +195,6 @@ TEST(SequenceTest, DiscardAll)
   EXPECT_CALL(process, func3())
     .Times(0);
 
-  // Flush the event queue to make sure that all callbacks have been
-  // added to the sequence.
-  Clock::pause();
-  Clock::settle();
-  Clock::resume();
-
   // This should cancel all pending callbacks.
   sequence.reset();
 


[4/4] mesos git commit: Enforced task launch order on the agent.

Posted by gr...@apache.org.
Enforced task launch order on the agent.

Up until now, Mesos does not guarantee in-order
task launch on the agent. There are two asynchronous
steps (unschedule GC and task authorization) in the
agent task launch path. Depending on the CPU scheduling
order, a later task launch may finish these two steps earlier
than its predecessors and get to the launch executor stage
earlier, resulting in out-of-order task delivery.

This patch enforces the task delivery order by sequencing
task launch after the two asynchronous steps, specifically
right before entering `__run()`.

Review: https://reviews.apache.org/r/66144/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3f8b19a9
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3f8b19a9
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3f8b19a9

Branch: refs/heads/1.5.x
Commit: 3f8b19a92b7f28e6efcf1cd9397b380c995e9948
Parents: 2d075e3
Author: Meng Zhu <mz...@mesosphere.io>
Authored: Wed Apr 4 16:36:52 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Apr 6 23:42:39 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 156 +++++++++++++++++++++++++++++++++++------------
 src/slave/slave.hpp |  14 +++++
 2 files changed, 132 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3f8b19a9/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2044b6e..0d89915 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2206,10 +2206,12 @@ void Slave::run(
       return unschedules;
   };
 
-  // Handle any unschedule GC failure. If unschedule GC succeeds, trigger
-  // the next continuations.
+  // `taskLaunch` encapsulates each task's launch steps from this point
+  // to the end of `_run` (the completion of task authorization).
   Future<Nothing> taskLaunch = collect(unschedules)
+    // Handle the failure iff unschedule GC fails.
     .repair(defer(self(), onUnscheduleGCFailure))
+    // If unschedule GC succeeds, trigger the next continuation.
     .then(defer(
         self(),
         &Self::_run,
@@ -2220,27 +2222,80 @@ void Slave::run(
         resourceVersionUuids,
         launchExecutor));
 
-  taskLaunch
-    .onReady(defer(
-        self(),
-        &Self::__run,
-        frameworkInfo,
-        executorInfo,
-        task,
-        taskGroup,
-        resourceVersionUuids,
-        launchExecutor))
-    .onFailed(defer(self(), [=](const string& failure) {
-      if (launchExecutor.isSome() && launchExecutor.get()) {
-        // Master expects new executor to be launched for this task launch.
-        // To keep the master executor entries updated, the agent needs to send
-        // 'ExitedExecutorMessage' even though no executor launched.
-        sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
-      }
+  // Use a sequence to ensure that task launch order is preserved.
+  framework->taskLaunchSequences[executorId]
+    .add<Nothing>([taskLaunch]() -> Future<Nothing> {
+      // We use this sequence only to maintain the task launching order. If the
+      // sequence is deleted, we do not want the resulting discard event to
+      // propagate up the chain, which would prevent the previous `.then()` or
+      // `.repair()` callbacks from being invoked. Thus, we use `undiscardable`
+      // to protect each `taskLaunch`.
+      return undiscardable(taskLaunch);
+    })
+    // We register `onAny` on the future returned by the sequence (referred to
+    // as `seqFuture` below). The following scenarios could happen:
+    //
+    // (1) `seqFuture` becomes ready. This happens when all previous tasks'
+    // `taskLaunch` futures are in non-pending state AND this task's own
+    // `taskLaunch` future is in ready state. The `onReady` call registered
+    // below will be triggered and continue the success path.
+    //
+    // (2) `seqFuture` becomes failed. This happens when all previous tasks'
+    // `taskLaunch` futures are in non-pending state AND this task's own
+    // `taskLaunch` future is in failed state (e.g. due to unschedule GC
+    // failure or some other failure). The `onFailed` call registered below
+    // will be triggered to handle the failure.
+    //
+    // (3) `seqFuture` becomes discarded. This happens when the sequence is
+    // destructed (see declaration of `taskLaunchSequences` on its lifecycle)
+    // while the `seqFuture` is still pending. In this case, we wait until
+    // this task's own `taskLaunch` future becomes non-pending and trigger
+    // callbacks accordingly.
+    //
+    // TODO(mzhu): In case (3), the destruction of the sequence means that the
+    // agent will eventually discover that the executor is absent and drop
+    // the task. While `__run` is capable of handling this case, it is more
+    // optimal to handle the failure earlier here rather than waiting for
+    // the `taskLaunch` transition and directing control to `__run`.
+    .onAny(defer(self(), [=](const Future<Nothing>&) {
+      // We only want to execute the following callbacks once the work performed
+      // in the `taskLaunch` chain is complete. Thus, we add them onto the
+      // `taskLaunch` chain rather than dispatching directly.
+      taskLaunch
+        .onReady(defer(
+            self(),
+            &Self::__run,
+            frameworkInfo,
+            executorInfo,
+            task,
+            taskGroup,
+            resourceVersionUuids,
+            launchExecutor))
+        .onFailed(defer(self(), [=](const string& failure) {
+          Framework* _framework = getFramework(frameworkId);
+          if (_framework == nullptr) {
+            LOG(WARNING) << "Ignoring running "
+                         << taskOrTaskGroup(task, taskGroup)
+                         << " because the framework " << stringify(frameworkId)
+                         << " does not exist";
+          }
+
+          if (launchExecutor.isSome() && launchExecutor.get()) {
+            // Master expects a new executor to be launched for this task(s).
+            // To keep the master executor entries updated, the agent needs to
+            // send `ExitedExecutorMessage` even though no executor launched.
+            sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+            // See the declaration of `taskLaunchSequences` regarding its
+            // lifecycle management.
+            if (_framework != nullptr) {
+              _framework->taskLaunchSequences.erase(executorInfo.executor_id());
+            }
+          }
+        }));
     }));
 
-  // TODO(mzhu): Consolidate error handling code in `__run` here with
-  // then/recover pattern.
+  // TODO(mzhu): Consolidate error handling code in `__run` here.
 }
 
 
@@ -2377,10 +2432,8 @@ Future<Nothing> Slave::_run(
   };
 
   return collect(authorizations)
-    .recover(defer(self(),
+    .repair(defer(self(),
       [=](const Future<list<bool>>& future) -> Future<list<bool>> {
-        CHECK(future.isFailed());
-
         Framework* _framework = getFramework(frameworkId);
         if (_framework == nullptr) {
           const string error =
@@ -2469,10 +2522,13 @@ void Slave::__run(
                  << " does not exist";
 
     if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
+      // Master expects a new executor to be launched for this task(s).
       // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
+      // `ExitedExecutorMessage` even though no executor launched.
       sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // There is no need to clean up the task launch sequence here since
+      // the framework (along with the sequence) no longer exists.
     }
 
     return;
@@ -2498,10 +2554,14 @@ void Slave::__run(
     }
 
     if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
+      // Master expects a new executor to be launched for this task(s).
       // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
+      // `ExitedExecutorMessage` even though no executor launched.
       sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // See the declaration of `taskLaunchSequences` regarding its lifecycle
+      // management.
+      framework->taskLaunchSequences.erase(executorInfo.executor_id());
     }
 
     return;
@@ -2530,10 +2590,14 @@ void Slave::__run(
                  << " because it has been killed in the meantime";
 
     if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
+      // Master expects a new executor to be launched for this task(s).
       // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
+      // `ExitedExecutorMessage` even though no executor launched.
       sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // See the declaration of `taskLaunchSequences` regarding its lifecycle
+      // management.
+      framework->taskLaunchSequences.erase(executorInfo.executor_id());
     }
 
     return;
@@ -2621,10 +2685,14 @@ void Slave::__run(
     }
 
     if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
+      // Master expects a new executor to be launched for this task(s).
       // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
+      // `ExitedExecutorMessage` even though no executor launched.
       sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // See the declaration of `taskLaunchSequences` regarding its lifecycle
+      // management.
+      framework->taskLaunchSequences.erase(executorInfo.executor_id());
     }
 
     return;
@@ -2695,10 +2763,14 @@ void Slave::__run(
     }
 
     if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
+      // Master expects a new executor to be launched for this task(s).
       // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
+      // `ExitedExecutorMessage` even though no executor launched.
       sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // See the declaration of `taskLaunchSequences` regarding its lifecycle
+      // management.
+      framework->taskLaunchSequences.erase(executorInfo.executor_id());
     }
 
     return;
@@ -2755,10 +2827,14 @@ void Slave::__run(
     }
 
     if (launchExecutor.isSome() && launchExecutor.get()) {
-      // Master expects new executor to be launched for this task(s) launch.
+      // Master expects a new executor to be launched for this task(s).
       // To keep the master executor entries updated, the agent needs to send
-      // 'ExitedExecutorMessage' even though no executor launched.
+      // `ExitedExecutorMessage` even though no executor launched.
       sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
+
+      // See the declaration of `taskLaunchSequences` regarding its lifecycle
+      // management.
+      framework->taskLaunchSequences.erase(executorInfo.executor_id());
     }
 
     return;
@@ -2865,9 +2941,9 @@ void Slave::__run(
             statusUpdate(update, UPID());
           }
 
-          // Master expects new executor to be launched for this task(s) launch.
+          // Master expects a new executor to be launched for this task(s).
           // To keep the master executor entries updated, the agent needs to
-          // send 'ExitedExecutorMessage' even though no executor launched.
+          // send `ExitedExecutorMessage` even though no executor launched.
           if (executor->state == Executor::TERMINATED) {
             sendExitedExecutorMessage(frameworkId, executorInfo.executor_id());
           } else {
@@ -8959,6 +9035,10 @@ void Framework::destroyExecutor(const ExecutorID& executorId)
     Executor* executor = executors[executorId];
     executors.erase(executorId);
 
+    // See the declaration of `taskLaunchSequences` regarding its
+    // lifecycle management.
+    taskLaunchSequences.erase(executorId);
+
     // Pass ownership of the executor pointer.
     completedExecutors.push_back(Owned<Executor>(executor));
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f8b19a9/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 11cbbc6..ca8cc65 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -53,6 +53,7 @@
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 #include <process/shared.hpp>
+#include <process/sequence.hpp>
 
 #include <stout/boundedhashmap.hpp>
 #include <stout/bytes.hpp>
@@ -1138,6 +1139,19 @@ public:
   // Executors with pending tasks.
   hashmap<ExecutorID, hashmap<TaskID, TaskInfo>> pendingTasks;
 
+  // Sequences in this map are used to enforce the order of tasks launched on
+  // each executor.
+  //
+  // Note on the lifecycle of the sequence: if the corresponding executor struct
+  // has not been created, we tie the lifecycle of the sequence to the first
+  // task in the sequence (which must have the `launch_executor` flag set to
+  // true modulo MESOS-3870). If the task fails to launch before creating the
+  // executor struct, we will delete the sequence. Once the executor struct is
+  // created, we tie the lifecycle of the sequence to the executor struct.
+  //
+  // TODO(mzhu): Create the executor struct early and put the sequence in it.
+  hashmap<ExecutorID, process::Sequence> taskLaunchSequences;
+
   // Pending task groups. This is needed for correctly sending
   // TASK_KILLED status updates for all tasks in the group if
   // any of the tasks are killed while pending.