You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/10/12 15:52:57 UTC

[2/4] mesos git commit: Fix unit tests that were broken by the additional TASK_STARTING update.

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index cd98b8f..d262bbe 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -757,7 +757,7 @@ TEST_F(OversubscriptionTest, FixedResourceEstimator)
 
   AWAIT_READY(status);
   EXPECT_EQ(task.task_id(), status->task_id());
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  EXPECT_EQ(TASK_STARTING, status->state());
 
   // Advance the clock for the slave to trigger the calculation of the
   // total oversubscribed resources. As we described above, we don't
@@ -1023,15 +1023,20 @@ TEST_F(OversubscriptionTest, QoSCorrectionKill)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 10");
 
+  Future<TaskStatus> status0;
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status0))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2))
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
+  AWAIT_READY(status0);
+  ASSERT_EQ(TASK_STARTING, status0->state());
+
   AWAIT_READY(status1);
   ASSERT_EQ(TASK_RUNNING, status1->state());
 
@@ -1132,15 +1137,20 @@ TEST_F(OversubscriptionTest, QoSCorrectionKillPartitionAware)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 10");
 
+  Future<TaskStatus> status0;
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status0))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2))
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
+  AWAIT_READY(status0);
+  ASSERT_EQ(TASK_STARTING, status0->state());
+
   AWAIT_READY(status1);
   ASSERT_EQ(TASK_RUNNING, status1->state());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 0597bd2..7b11264 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -215,22 +215,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlavePartitionAware)
 
   TaskInfo task = createTask(offer, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
-  const SlaveID& slaveId = runningStatus->slave_id();
+  AWAIT_READY(statusUpdateAck2);
 
-  AWAIT_READY(statusUpdateAck);
+  const SlaveID& slaveId = startingStatus->slave_id();
 
   // Now, induce a partition of the slave by having the master
   // timeout the slave.
@@ -572,8 +583,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
 
   TaskInfo task = createTask(offer, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
   Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
@@ -874,8 +887,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
   // Launch `task1` using `sched1`.
   TaskInfo task1 = createTask(offer.slave_id(), taskResources, "sleep 60");
 
+  Future<TaskStatus> startingStatus1;
   Future<TaskStatus> runningStatus1;
   EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&startingStatus1))
     .WillOnce(FutureArg<1>(&runningStatus1));
 
   Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
@@ -917,8 +932,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
   // Launch the second task.
   TaskInfo task2 = createTask(offer.slave_id(), taskResources, "sleep 60");
 
+  Future<TaskStatus> startingStatus2;
   Future<TaskStatus> runningStatus2;
   EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+    .WillOnce(FutureArg<1>(&startingStatus2))
     .WillOnce(FutureArg<1>(&runningStatus2));
 
   Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
@@ -1136,22 +1153,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, PartitionedSlaveOrphanedTask)
   // Launch `task` using `sched`.
   TaskInfo task = createTask(offer, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
   const SlaveID& slaveId = runningStatus->slave_id();
 
-  AWAIT_READY(statusUpdateAck);
+  AWAIT_READY(statusUpdateAck2);
 
   // Now, induce a partition of the slave by having the master
   // timeout the slave.
@@ -1404,22 +1432,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, DisconnectedFramework)
   // Launch `task` using `sched1`.
   TaskInfo task = createTask(offer, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
   Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
   driver1.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
   const SlaveID& slaveId = runningStatus->slave_id();
 
-  AWAIT_READY(statusUpdateAck1);
+  AWAIT_READY(statusUpdateAck2);
 
   // Shutdown the master.
   master->reset();
@@ -1573,22 +1612,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, SpuriousSlaveReregistration)
   // Launch `task` using `sched`.
   TaskInfo task = createTask(offer, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
   const SlaveID& slaveId = runningStatus->slave_id();
 
-  AWAIT_READY(statusUpdateAck);
+  AWAIT_READY(statusUpdateAck2);
 
   // Simulate a master loss event at the slave and then cause the
   // slave to reregister with the master. From the master's

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/persistent_volume_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_endpoints_tests.cpp b/src/tests/persistent_volume_endpoints_tests.cpp
index 7a24bf4..abc6dbb 100644
--- a/src/tests/persistent_volume_endpoints_tests.cpp
+++ b/src/tests/persistent_volume_endpoints_tests.cpp
@@ -1987,9 +1987,13 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
   filters.set_refuse_seconds(0);
 
   // Expect a TASK_RUNNING status.
-  EXPECT_CALL(sched, statusUpdate(&driver, _));
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   // Expect another resource offer.
@@ -1999,7 +2003,8 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
   driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})}, filters);
 
   // Wait for TASK_RUNNING update ack.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_statusUpdateAcknowledgement2);
 
   response = process::http::get(
       master.get()->pid,

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 1b35af4..65d86de 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -806,9 +806,11 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume)
       taskResources,
       "echo abc > path1/file");
 
+  Future<TaskStatus> status0;
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status0))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2));
 
@@ -823,6 +825,10 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume)
       {CREATE(volume),
        LAUNCH({task})});
 
+  AWAIT_READY(status0);
+  EXPECT_EQ(task.task_id(), status0->task_id());
+  EXPECT_EQ(TASK_STARTING, status0->state());
+
   AWAIT_READY(status1);
   EXPECT_EQ(task.task_id(), status1->task_id());
   EXPECT_EQ(TASK_RUNNING, status1->state());
@@ -982,21 +988,25 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks)
       taskResources2.get() + volume,
       "echo task2 > path1/file2");
 
-  // We should receive a TASK_RUNNING followed by a TASK_FINISHED for
-  // each of the 2 tasks. We do not check for the actual task state
-  // since it's not the primary objective of the test. We instead
-  // verify that the paths are created by the tasks after we receive
-  // enough status updates.
+  // We should receive a TASK_STARTING, followed by a TASK_RUNNING
+  // and a TASK_FINISHED for each of the 2 tasks.
+  // We do not check for the actual task state since it's not the
+  // primary objective of the test. We instead verify that the paths
+  // are created by the tasks after we receive enough status updates.
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
   Future<TaskStatus> status3;
   Future<TaskStatus> status4;
+  Future<TaskStatus> status5;
+  Future<TaskStatus> status6;
 
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2))
     .WillOnce(FutureArg<1>(&status3))
-    .WillOnce(FutureArg<1>(&status4));
+    .WillOnce(FutureArg<1>(&status4))
+    .WillOnce(FutureArg<1>(&status5))
+    .WillOnce(FutureArg<1>(&status6));
 
   driver.acceptOffers(
       {offer.id()},
@@ -1009,6 +1019,8 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks)
   AWAIT_READY(status2);
   AWAIT_READY(status3);
   AWAIT_READY(status4);
+  AWAIT_READY(status5);
+  AWAIT_READY(status6);
 
   const string& volumePath = slave::paths::getPersistentVolumePath(
       slaveFlags.work_dir,
@@ -1256,10 +1268,12 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
       Resources::parse("cpus:1;mem:128").get() + volume,
       "echo abc > path1/file1 && sleep 1000");
 
-  // We should receive a TASK_RUNNING for the launched task.
+  // We should receive a TASK_STARTING and a TASK_RUNNING for the launched task.
+  Future<TaskStatus> status0;
   Future<TaskStatus> status1;
 
   EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&status0))
     .WillOnce(FutureArg<1>(&status1));
 
   // We use a filter of 0 seconds so the resources will be available
@@ -1273,6 +1287,9 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
        LAUNCH({task1})},
       filters);
 
+  AWAIT_READY(status0);
+  EXPECT_EQ(TASK_STARTING, status0->state());
+
   AWAIT_READY(status1);
   EXPECT_EQ(TASK_RUNNING, status1->state());
 
@@ -1329,11 +1346,13 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
       Resources::parse("cpus:1;mem:256").get() + volume,
       "echo abc > path1/file2 && sleep 1000");
 
-  // We should receive a TASK_RUNNING for the launched task.
+  // We should receive a TASK_STARTING and a TASK_RUNNING for the launched task.
   Future<TaskStatus> status2;
+  Future<TaskStatus> status3;
 
   EXPECT_CALL(sched2, statusUpdate(&driver2, _))
-    .WillOnce(FutureArg<1>(&status2));
+    .WillOnce(FutureArg<1>(&status2))
+    .WillOnce(FutureArg<1>(&status3));
 
   driver2.acceptOffers(
       {offer2.id()},
@@ -1341,7 +1360,10 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
       filters);
 
   AWAIT_READY(status2);
-  EXPECT_EQ(TASK_RUNNING, status2->state());
+  EXPECT_EQ(TASK_STARTING, status2->state());
+
+  AWAIT_READY(status3);
+  EXPECT_EQ(TASK_RUNNING, status3->state());
 
   // Collect metrics based on both frameworks. Note that the `cpus_used` and
   // `mem_used` is updated, but `disk_used` does not change since both tasks
@@ -1432,13 +1454,17 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover)
       taskResources.get() + volume,
       "sleep 1000");
 
-  // We should receive a TASK_RUNNING for each of the tasks.
+  // We should receive a TASK_STARTING and a TASK_RUNNING for each of the tasks.
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
+  Future<TaskStatus> status3;
+  Future<TaskStatus> status4;
 
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status1))
-    .WillOnce(FutureArg<1>(&status2));
+    .WillOnce(FutureArg<1>(&status2))
+    .WillOnce(FutureArg<1>(&status3))
+    .WillOnce(FutureArg<1>(&status4));
 
   Future<CheckpointResourcesMessage> checkpointResources =
     FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get()->pid);
@@ -1449,11 +1475,14 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover)
        LAUNCH({task1, task2})});
 
   AWAIT_READY(checkpointResources);
+
+  // We only check the first and the last status, because the two in between
+  // could arrive in any order.
   AWAIT_READY(status1);
-  EXPECT_EQ(TASK_RUNNING, status1->state());
+  EXPECT_EQ(TASK_STARTING, status1->state());
 
-  AWAIT_READY(status2);
-  EXPECT_EQ(TASK_RUNNING, status2->state());
+  AWAIT_READY(status4);
+  EXPECT_EQ(TASK_RUNNING, status4->state());
 
   // This is to make sure CheckpointResourcesMessage is processed.
   Clock::pause();
@@ -1596,16 +1625,20 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers));
 
-  // We should receive a TASK_RUNNING each of the 2 tasks. We track task
-  // termination by a TASK_FINISHED for the short-lived task.
+  // We should receive a TASK_STARTING and a TASK_RUNNING each of the 2 tasks.
+  // We track task termination by a TASK_FINISHED for the short-lived task.
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
   Future<TaskStatus> status3;
+  Future<TaskStatus> status4;
+  Future<TaskStatus> status5;
 
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2))
-    .WillOnce(FutureArg<1>(&status3));
+    .WillOnce(FutureArg<1>(&status3))
+    .WillOnce(FutureArg<1>(&status4))
+    .WillOnce(FutureArg<1>(&status5));
 
   driver.acceptOffers(
       {offer.id()},
@@ -1614,18 +1647,23 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
        LAUNCH({task1, task2})},
       filters);
 
-  // Wait for TASK_RUNNING for both the tasks, and TASK_FINISHED for
-  // the short-lived task.
+  // Wait for TASK_STARTING and TASK_RUNNING for both the tasks,
+  // and TASK_FINISHED for the short-lived task.
   AWAIT_READY(status1);
   AWAIT_READY(status2);
   AWAIT_READY(status3);
+  AWAIT_READY(status4);
+  AWAIT_READY(status5);
 
   hashset<TaskID> tasksRunning;
   hashset<TaskID> tasksFinished;
-  vector<Future<TaskStatus>> statuses{status1, status2, status3};
+  vector<Future<TaskStatus>> statuses {
+    status1, status2, status3, status4, status5};
 
   foreach (const Future<TaskStatus>& status, statuses) {
-    if (status->state() == TASK_RUNNING) {
+    if (status->state() == TASK_STARTING) {
+      // ignore
+    } else if (status->state() == TASK_RUNNING) {
       tasksRunning.insert(status->task_id());
     } else {
       tasksFinished.insert(status->task_id());
@@ -1684,15 +1722,15 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
 
   // We kill the long-lived task and wait for TASK_KILLED, so we can
   // DESTROY the persistent volume once the task terminates.
-  Future<TaskStatus> status4;
+  Future<TaskStatus> status6;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status4));
+    .WillOnce(FutureArg<1>(&status6));
 
   driver.killTask(task1.task_id());
 
-  AWAIT_READY(status4);
-  EXPECT_EQ(task1.task_id(), status4->task_id());
-  EXPECT_EQ(TASK_KILLED, status4->state());
+  AWAIT_READY(status6);
+  EXPECT_EQ(task1.task_id(), status6->task_id());
+  EXPECT_EQ(TASK_KILLED, status6->state());
 
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers));
@@ -1921,25 +1959,37 @@ TEST_P(PersistentVolumeTest, SlaveRecovery)
       taskResources,
       "while true; do test -d path1; done");
 
+  Future<TaskStatus> status0;
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status0))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2));
 
-  Future<Nothing> ack =
+  Future<Nothing> ack1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> ack2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.acceptOffers(
       {offer.id()},
       {CREATE(volume), LAUNCH({task})});
 
+  AWAIT_READY(status0);
+  EXPECT_EQ(task.task_id(), status0->task_id());
+  EXPECT_EQ(TASK_STARTING, status0->state());
+
+  // Wait for the ACK to be checkpointed.
+  AWAIT_READY(ack1);
+
   AWAIT_READY(status1);
   EXPECT_EQ(task.task_id(), status1->task_id());
   EXPECT_EQ(TASK_RUNNING, status1->state());
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(ack);
+  AWAIT_READY(ack2);
 
   // Restart the slave.
   slave.get()->terminate();

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 64a1d3d..8ae2860 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -1178,22 +1178,33 @@ TEST_F(ReconciliationTest, PartitionedAgentThenMasterFailover)
   // Launch `task` using `sched`.
   TaskInfo task = createTask(offer, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
   const SlaveID slaveId = runningStatus->slave_id();
 
-  AWAIT_READY(statusUpdateAck);
+  AWAIT_READY(statusUpdateAck2);
 
   // Now, induce a partition of the slave by having the master
   // timeout the slave.

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/reservation_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp
index 5a6e9a7..409047c 100644
--- a/src/tests/reservation_endpoints_tests.cpp
+++ b/src/tests/reservation_endpoints_tests.cpp
@@ -371,6 +371,7 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
   MesosSchedulerDriver driver(
       &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
 
+  // Expect one TASK_STARTING and one TASK_RUNNING update
   EXPECT_CALL(sched, registered(&driver, _, _));
 
   Future<vector<Offer>> offers;
@@ -400,16 +401,21 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
   // recovers 'offered' resources portion.
   TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000");
 
-  // Expect a TASK_RUNNING status.
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  // Expect a TASK_STARTING and a TASK_RUNNING status.
+  EXPECT_CALL(sched, statusUpdate(_, _)).
+    WillRepeatedly(Return());
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
 
-  // Wait for TASK_RUNNING update ack.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  // Wait for update acks.
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_statusUpdateAcknowledgement2);
 
   // Summon an offer to receive the 'offered' resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -547,16 +553,21 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
   // recovers 'offered' resources portion.
   TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000");
 
-  // Expect a TASK_RUNNING status.
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  // Expect a TASK_STARTING and a TASK_RUNNING status.
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillRepeatedly(Return());
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
 
-  // Wait for TASK_RUNNING update ack.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  // Wait for update acks from TASK_STARTING and TASK_RUNNING.
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_statusUpdateAcknowledgement2);
 
   // Summon an offer to receive the 'offered' resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1596,9 +1607,11 @@ TEST_F(ReservationEndpointsTest, AgentStateEndpointResources)
 
   Offer offer = offers.get()[0];
 
-  Future<TaskStatus> status;
+  Future<TaskStatus> statusStarting;
+  Future<TaskStatus> statusRunning;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
+    .WillOnce(FutureArg<1>(&statusStarting))
+    .WillOnce(FutureArg<1>(&statusRunning));
 
   Resources taskResources = Resources::parse(
       "cpus(role):2;mem(role):512;cpus:2;mem:1024").get();
@@ -1607,8 +1620,11 @@ TEST_F(ReservationEndpointsTest, AgentStateEndpointResources)
 
   driver.acceptOffers({offer.id()}, {LAUNCH({task})});
 
-  AWAIT_READY(status);
-  ASSERT_EQ(TASK_RUNNING, status->state());
+  AWAIT_READY(statusStarting);
+  ASSERT_EQ(TASK_STARTING, statusStarting->state());
+
+  AWAIT_READY(statusRunning);
+  ASSERT_EQ(TASK_RUNNING, statusRunning->state());
 
   Future<Response> response = process::http::get(
       agent.get()->pid,

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/role_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/role_tests.cpp b/src/tests/role_tests.cpp
index fc4c017..852e2cc 100644
--- a/src/tests/role_tests.cpp
+++ b/src/tests/role_tests.cpp
@@ -986,9 +986,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(RoleTest, VolumesInOverlappingHierarchies)
         taskResources,
         "! (ls -Av path | grep -q .)");
 
-    // We expect two status updates for the task.
-    Future<TaskStatus> status1, status2;
+    // We expect three status updates for the task.
+    Future<TaskStatus> status0, status1, status2;
     EXPECT_CALL(sched, statusUpdate(&driver, _))
+      .WillOnce(FutureArg<1>(&status0))
       .WillOnce(FutureArg<1>(&status1))
       .WillOnce(FutureArg<1>(&status2));
 
@@ -997,6 +998,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(RoleTest, VolumesInOverlappingHierarchies)
         {offer.id()},
         {RESERVE(reservedDisk), CREATE(volume), LAUNCH({task})});
 
+    AWAIT_READY(status0);
+
+    EXPECT_EQ(task.task_id(), status0->task_id());
+    EXPECT_EQ(TASK_STARTING, status0->state());
+
     AWAIT_READY(status1);
 
     EXPECT_EQ(task.task_id(), status1->task_id());

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4eda96e..6df4d32 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -623,11 +623,15 @@ TEST_P(SchedulerTest, TaskGroupRunning)
   taskGroup.add_tasks()->CopyFrom(task1);
   taskGroup.add_tasks()->CopyFrom(task2);
 
+  Future<Event::Update> startingUpdate1;
+  Future<Event::Update> startingUpdate2;
   Future<Event::Update> runningUpdate1;
   Future<Event::Update> runningUpdate2;
   Future<Event::Update> finishedUpdate1;
   Future<Event::Update> finishedUpdate2;
   EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&startingUpdate1))
+    .WillOnce(FutureArg<1>(&startingUpdate2))
     .WillOnce(FutureArg<1>(&runningUpdate1))
     .WillOnce(FutureArg<1>(&runningUpdate2))
     .WillOnce(FutureArg<1>(&finishedUpdate1))
@@ -669,14 +673,58 @@ TEST_P(SchedulerTest, TaskGroupRunning)
   EXPECT_EQ(devolve(task2.task_id()),
             runTaskGroupMessage->task_group().tasks(1).task_id());
 
+  AWAIT_READY(startingUpdate1);
+  ASSERT_EQ(v1::TASK_STARTING, startingUpdate1->status().state());
+
+  AWAIT_READY(startingUpdate2);
+  ASSERT_EQ(v1::TASK_STARTING, startingUpdate2->status().state());
+
+  const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
+
+  // TASK_STARTING updates for the tasks in a
+  // task group can be received in any order.
+  const hashset<v1::TaskID> tasksStarting{
+    startingUpdate1->status().task_id(),
+    startingUpdate2->status().task_id()};
+
+  ASSERT_EQ(tasks, tasksStarting);
+
+  // Acknowledge the TASK_STARTING updates so
+  // that subsequent updates can be received.
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(
+        startingUpdate1->status().task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
+    acknowledge->set_uuid(startingUpdate1->status().uuid());
+
+    mesos.send(call);
+  }
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(
+        startingUpdate2->status().task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
+    acknowledge->set_uuid(startingUpdate2->status().uuid());
+
+    mesos.send(call);
+  }
+
   AWAIT_READY(runningUpdate1);
   ASSERT_EQ(v1::TASK_RUNNING, runningUpdate1->status().state());
 
   AWAIT_READY(runningUpdate2);
   ASSERT_EQ(v1::TASK_RUNNING, runningUpdate2->status().state());
 
-  const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
-
   // TASK_RUNNING updates for the tasks in a
   // task group can be received in any order.
   const hashset<v1::TaskID> tasksRunning{

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/slave_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_authorization_tests.cpp b/src/tests/slave_authorization_tests.cpp
index 4c7d37f..23e9d0b 100644
--- a/src/tests/slave_authorization_tests.cpp
+++ b/src/tests/slave_authorization_tests.cpp
@@ -633,10 +633,12 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
   // The first task should fail since the task user `foo` is not an
   // authorized user that can launch a task. However, the second task
   // should succeed.
+  Future<TaskStatus> status0;
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
 
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status0))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2));
 
@@ -644,16 +646,19 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
       {offer.id()},
       {LAUNCH({task1, task2})});
 
-  // Wait for TASK_FAILED for 1st task, and TASK_RUNNING for 2nd task.
+  // Wait for TASK_FAILED for 1st task, and TASK_STARTING followed by
+  // TASK_RUNNING for 2nd task.
+  AWAIT_READY(status0);
   AWAIT_READY(status1);
   AWAIT_READY(status2);
 
   // Validate both the statuses. Note that the order of receiving the
-  // status updates for the 2 tasks is not deterministic.
-  hashmap<TaskID, TaskStatus> statuses {
-    {status1->task_id(), status1.get()},
-    {status2->task_id(), status2.get()}
-  };
+  // status updates for the 2 tasks is not deterministic, but we know
+  // that task2's  TASK_RUNNING ARRIVES after TASK_STARTING.
+  hashmap<TaskID, TaskStatus> statuses;
+  statuses[status0->task_id()] = status0.get();
+  statuses[status1->task_id()] = status1.get();
+  statuses[status2->task_id()] = status2.get();
 
   ASSERT_TRUE(statuses.contains(task1.task_id()));
   EXPECT_EQ(TASK_ERROR, statuses.at(task1.task_id()).state());

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 30d8c23..8f328e9 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -252,7 +252,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
 
   // Capture the update.
   AWAIT_READY(update);
-  EXPECT_EQ(TASK_RUNNING, update->update().status().state());
+  EXPECT_EQ(TASK_STARTING, update->update().status().state());
 
   // Wait for the ACK to be checkpointed.
   AWAIT_READY(_statusUpdateAcknowledgement);
@@ -423,7 +423,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
   ASSERT_SOME(slave);
 
   AWAIT_READY(status);
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  EXPECT_EQ(TASK_STARTING, status->state());
 
   driver.stop();
   driver.join();
@@ -514,7 +514,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_ReconnectHTTPExecutor)
 
   // Scheduler should receive the recovered update.
   AWAIT_READY(status);
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  EXPECT_EQ(TASK_STARTING, status->state());
 
   driver.stop();
   driver.join();
@@ -762,13 +762,13 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
   TaskInfo task = createTask(offers.get()[0], "sleep 1000");
 
   // Drop the first update from the executor.
-  Future<StatusUpdateMessage> statusUpdate =
+  Future<StatusUpdateMessage> startingUpdate =
     DROP_PROTOBUF(StatusUpdateMessage(), _, _);
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
   // Stop the slave before the status update is received.
-  AWAIT_READY(statusUpdate);
+  AWAIT_READY(startingUpdate);
 
   slave.get()->terminate();
 
@@ -791,15 +791,15 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
   // Ensure the executor re-registers.
   AWAIT_READY(reregister);
 
-  // Executor should inform about the unacknowledged update.
-  ASSERT_EQ(1, reregister->updates_size());
+  // Executor should inform about the unacknowledged updates.
+  ASSERT_EQ(2, reregister->updates_size());
   const StatusUpdate& update = reregister->updates(0);
   EXPECT_EQ(task.task_id(), update.status().task_id());
-  EXPECT_EQ(TASK_RUNNING, update.status().state());
+  EXPECT_EQ(TASK_STARTING, update.status().state());
 
   // Scheduler should receive the recovered update.
   AWAIT_READY(status);
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  EXPECT_EQ(TASK_STARTING, status->state());
 
   driver.stop();
   driver.join();
@@ -846,7 +846,8 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry)
 
   Future<TaskStatus> statusUpdate;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&statusUpdate));
+    .WillOnce(FutureArg<1>(&statusUpdate))
+    .WillRepeatedly(Return()); // Ignore subsequent TASK_RUNNING updates.
 
   driver.start();
 
@@ -863,7 +864,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry)
   driver.launchTasks(offers.get()[0].id(), {task});
 
   AWAIT_READY(statusUpdate);
-  EXPECT_EQ(TASK_RUNNING, statusUpdate->state());
+  EXPECT_EQ(TASK_STARTING, statusUpdate->state());
 
   // Ensure the acknowledgement is checkpointed.
   Clock::settle();
@@ -974,12 +975,19 @@ TYPED_TEST(SlaveRecoveryTest, PingTimeoutDuringRecovery)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 1000");
 
+  Future<TaskStatus> statusUpdate0;
   Future<TaskStatus> statusUpdate1;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusUpdate0))
     .WillOnce(FutureArg<1>(&statusUpdate1));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
+  AWAIT_READY(statusUpdate0);
+  ASSERT_EQ(TASK_STARTING, statusUpdate0->state());
+
+  driver.acknowledgeStatusUpdate(statusUpdate0.get());
+
   AWAIT_READY(statusUpdate1);
   ASSERT_EQ(TASK_RUNNING, statusUpdate1->state());
 
@@ -1816,7 +1824,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
   TaskInfo task = createTask(offers1.get()[0], "exit 0");
 
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .Times(2); // TASK_RUNNING and TASK_FINISHED updates.
+    .Times(3); // TASK_STARTING, TASK_RUNNING and TASK_FINISHED updates.
 
   EXPECT_CALL(sched, offerRescinded(_, _))
     .Times(AtMost(1));
@@ -2439,15 +2447,21 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
 
   TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  // Expect a TASK_STARTING and a TASK_RUNNING update
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> ack =
+  Future<Nothing> ack1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> ack2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers1.get()[0].id(), {task});
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(ack);
+  AWAIT_READY(ack1);
+  AWAIT_READY(ack2);
 
   slave.get()->terminate();
 
@@ -3068,9 +3082,10 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
 
   TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
 
-  Future<Nothing> statusUpdate1;
+  Future<Nothing> statusUpdate1, statusUpdate2;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureSatisfy(&statusUpdate1))
+    .WillOnce(FutureSatisfy(&statusUpdate1)) // TASK_STARTING
+    .WillOnce(FutureSatisfy(&statusUpdate2)) // TASK_RUNNING
     .WillOnce(Return());  // Ignore TASK_FAILED update.
 
   Future<Message> registerExecutor =
@@ -3082,7 +3097,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
   AWAIT_READY(registerExecutor);
   UPID executorPid = registerExecutor->from;
 
-  AWAIT_READY(statusUpdate1); // Wait for TASK_RUNNING update.
+  AWAIT_READY(statusUpdate2); // Wait for TASK_RUNNING update.
 
   EXPECT_CALL(sched, offerRescinded(_, _))
     .Times(AtMost(1));
@@ -3186,18 +3201,22 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 1000");
 
-  Future<TaskStatus> status;
+  Future<TaskStatus> statusStarting, statusRunning;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
+    .WillOnce(FutureArg<1>(&statusStarting))
+    .WillOnce(FutureArg<1>(&statusRunning));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
-  AWAIT_READY(status);
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
 
-  Future<TaskStatus> status2;
+  AWAIT_READY(statusRunning);
+  EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+  Future<TaskStatus> statusLost;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status2));
+    .WillOnce(FutureArg<1>(&statusLost));
 
   Future<Nothing> slaveLost;
   EXPECT_CALL(sched, slaveLost(_, _))
@@ -3223,11 +3242,11 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
   AWAIT_READY(executorTerminated);
 
   // The master should send a TASK_LOST and slaveLost.
-  AWAIT_READY(status2);
+  AWAIT_READY(statusLost);
 
-  EXPECT_EQ(TASK_LOST, status2->state());
-  EXPECT_EQ(TaskStatus::SOURCE_MASTER, status2->source());
-  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status2->reason());
+  EXPECT_EQ(TASK_LOST, statusLost->state());
+  EXPECT_EQ(TaskStatus::SOURCE_MASTER, statusLost->source());
+  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, statusLost->reason());
 
   AWAIT_READY(slaveLost);
 
@@ -3415,16 +3434,21 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
   SlaveID slaveId = offers1.get()[0].slave_id();
   FrameworkID frameworkId = offers1.get()[0].framework_id();
 
-  // Expecting TASK_RUNNING status.
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  // Expecting TASK_STARTING and TASK_RUNNING status.
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers1.get()[0].id(), {task});
 
   // Wait for TASK_RUNNING update to be acknowledged.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_statusUpdateAcknowledgement2);
 
   slave.get()->terminate();
 
@@ -3824,15 +3848,21 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
   // Create a long running task.
   TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched1, statusUpdate(_, _));
+  // Expecting TASK_STARTING and TASK_RUNNING updates
+  EXPECT_CALL(sched1, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver1.launchTasks(offers1.get()[0].id(), {task});
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_statusUpdateAcknowledgement2);
 
   slave.get()->terminate();
 
@@ -3975,15 +4005,20 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
 
   TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers1.get()[0].id(), {task});
 
-  // Wait for the ACK to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  // Wait for both ACKs to be checkpointed.
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_statusUpdateAcknowledgement2);
 
   slave.get()->terminate();
 
@@ -4117,15 +4152,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
   // Framework 1 launches a task.
   TaskInfo task1 = createTask(offer1, "sleep 1000");
 
-  EXPECT_CALL(sched1, statusUpdate(_, _));
+  EXPECT_CALL(sched1, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement1 =
+  Future<Nothing> _startingStatusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _runningStatusUpdateAcknowledgement1 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver1.launchTasks(offer1.id(), {task1});
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_startingStatusUpdateAcknowledgement1);
+  AWAIT_READY(_runningStatusUpdateAcknowledgement1);
 
   // Framework 2. Enable checkpointing.
   FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
@@ -4150,14 +4190,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
   // Framework 2 launches a task.
   TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched2, statusUpdate(_, _));
+  EXPECT_CALL(sched2, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement2 =
+  Future<Nothing> _startingStatusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _runningStatusUpdateAcknowledgement2 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
   driver2.launchTasks(offers2.get()[0].id(), {task2});
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement2);
+  AWAIT_READY(_startingStatusUpdateAcknowledgement2);
+  AWAIT_READY(_runningStatusUpdateAcknowledgement2);
 
   slave.get()->terminate();
 
@@ -4281,15 +4327,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
   // Launch a long running task in the first slave.
   TaskInfo task1 = createTask(offers1.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement1 =
+  Future<Nothing> _startingStatusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(slave1.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _runningStatusUpdateAcknowledgement1 =
     FUTURE_DISPATCH(slave1.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers1.get()[0].id(), {task1});
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_startingStatusUpdateAcknowledgement1);
+  AWAIT_READY(_runningStatusUpdateAcknowledgement1);
 
   Future<vector<Offer>> offers2;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -4316,15 +4367,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
   // Launch a long running task in each slave.
   TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement2 =
+  Future<Nothing> _startingStatusUpdateAcknowledgement2 =
+    FUTURE_DISPATCH(slave2.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _runningStatusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(slave2.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers2.get()[0].id(), {task2});
 
   // Wait for the ACKs to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement2);
+  AWAIT_READY(_startingStatusUpdateAcknowledgement2);
+  AWAIT_READY(_runningStatusUpdateAcknowledgement2);
 
   slave1.get()->terminate();
   slave2.get()->terminate();

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 6d1e98d..67c8091 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -680,16 +680,22 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, CommandTaskWithArguments)
 
   task.mutable_command()->MergeFrom(command);
 
+  Future<TaskStatus> statusStarting;
   Future<TaskStatus> statusRunning;
   Future<TaskStatus> statusFinished;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
     .WillOnce(FutureArg<1>(&statusRunning))
     .WillOnce(FutureArg<1>(&statusFinished));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
-  // Scheduler should first receive TASK_RUNNING followed by the
-  // TASK_FINISHED from the executor.
+  // Scheduler should first receive TASK_STARTING, followed by
+  // TASK_RUNNING and TASK_FINISHED from the executor.
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusStarting->source());
+
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning->state());
   EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning->source());
@@ -746,12 +752,17 @@ TEST_F(SlaveTest, CommandTaskWithKillPolicy)
   task.mutable_kill_policy()->mutable_grace_period()->set_nanoseconds(
       gracePeriod.ns());
 
+  Future<TaskStatus> statusStarting;
   Future<TaskStatus> statusRunning;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
     .WillOnce(FutureArg<1>(&statusRunning));
 
   driver.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning->state());
 
@@ -2381,15 +2392,21 @@ TEST_F(SlaveTest, StatisticsEndpointRunningExecutor)
       Resources::parse("cpus:1;mem:32").get(),
       SLEEP_COMMAND(1000));
 
-  Future<TaskStatus> status;
+  Future<TaskStatus> statusStarting;
+  Future<TaskStatus> statusRunning;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status));
+    .WillOnce(FutureArg<1>(&statusStarting))
+    .WillOnce(FutureArg<1>(&statusRunning));
 
   driver.launchTasks(offer.id(), {task});
 
-  AWAIT_READY(status);
-  EXPECT_EQ(task.task_id(), status->task_id());
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(task.task_id(), statusStarting->task_id());
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+  AWAIT_READY(statusRunning);
+  EXPECT_EQ(task.task_id(), statusRunning->task_id());
+  EXPECT_EQ(TASK_RUNNING, statusRunning->state());
 
   // Hit the statistics endpoint and expect the response contains the
   // resource statistics for the running container.
@@ -5210,16 +5227,21 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorEnvironmentVariables)
 
   task.mutable_command()->MergeFrom(command);
 
+  Future<TaskStatus> statusStarting;
   Future<TaskStatus> statusRunning;
   Future<TaskStatus> statusFinished;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
     .WillOnce(FutureArg<1>(&statusRunning))
     .WillOnce(FutureArg<1>(&statusFinished));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
-  // Scheduler should first receive TASK_RUNNING followed by the
-  // TASK_FINISHED from the executor.
+  // Scheduler should first receive TASK_STARTING, followed by
+  // TASK_STARTING and TASK_FINISHED from the executor.
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning->state());
 
@@ -5565,7 +5587,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, HTTPSchedulerSlaveRestart)
   UPID executorPid = registerExecutorMessage->from;
 
   AWAIT_READY(status);
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  EXPECT_EQ(TASK_STARTING, status->state());
 
   // Restart the slave.
   slave.get()->terminate();
@@ -8179,12 +8201,19 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorReregistrationTimeoutFlag)
 
   TaskInfo task = createTask(offers->front(), "sleep 1000");
 
+  Future<TaskStatus> statusUpdate0;
   Future<TaskStatus> statusUpdate1;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusUpdate0))
     .WillOnce(FutureArg<1>(&statusUpdate1));
 
   driver.launchTasks(offers->front().id(), {task});
 
+  AWAIT_READY(statusUpdate0);
+  ASSERT_EQ(TASK_STARTING, statusUpdate0->state());
+
+  driver.acknowledgeStatusUpdate(statusUpdate0.get());
+
   AWAIT_READY(statusUpdate1);
   ASSERT_EQ(TASK_RUNNING, statusUpdate1->state());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/teardown_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/teardown_tests.cpp b/src/tests/teardown_tests.cpp
index 5eada4f..392cacf 100644
--- a/src/tests/teardown_tests.cpp
+++ b/src/tests/teardown_tests.cpp
@@ -353,20 +353,31 @@ TEST_F(TeardownTest, RecoveredFrameworkAfterMasterFailover)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 100");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
-  AWAIT_READY(statusUpdateAck);
+  AWAIT_READY(statusUpdateAck2);
 
   // Simulate master failover. We leave the scheduler without a master
   // so it does not attempt to re-register.