You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/10/18 19:41:54 UTC

[4/6] mesos git commit: Fix unit tests that were broken by the additional TASK_STARTING update.

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/hook_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hook_tests.cpp b/src/tests/hook_tests.cpp
index c4fadbb..5428782 100644
--- a/src/tests/hook_tests.cpp
+++ b/src/tests/hook_tests.cpp
@@ -707,9 +707,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
                     Invoke(&containerizer,
                            &MockDockerContainerizer::_launch)));
 
+  Future<TaskStatus> statusStarting;
   Future<TaskStatus> statusRunning;
   Future<TaskStatus> statusFinished;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
     .WillOnce(FutureArg<1>(&statusRunning))
     .WillOnce(FutureArg<1>(&statusFinished))
     .WillRepeatedly(DoDefault());
@@ -717,6 +719,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
   driver.launchTasks(offers.get()[0].id(), {task});
 
   AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY_FOR(statusStarting, Seconds(60));
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
   AWAIT_READY_FOR(statusRunning, Seconds(60));
   EXPECT_EQ(TASK_RUNNING, statusRunning->state());
   AWAIT_READY_FOR(statusFinished, Seconds(60));
@@ -924,9 +928,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
                     Invoke(&containerizer,
                            &MockDockerContainerizer::_launch)));
 
+  Future<TaskStatus> statusStarting;
   Future<TaskStatus> statusRunning;
   Future<TaskStatus> statusFinished;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
     .WillOnce(FutureArg<1>(&statusRunning))
     .WillOnce(FutureArg<1>(&statusFinished))
     .WillRepeatedly(DoDefault());
@@ -934,6 +940,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
   driver.launchTasks(offers.get()[0].id(), tasks);
 
   AWAIT_READY_FOR(containerId, Seconds(60));
+  AWAIT_READY_FOR(statusStarting, Seconds(60));
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
   AWAIT_READY_FOR(statusRunning, Seconds(60));
   EXPECT_EQ(TASK_RUNNING, statusRunning->state());
   AWAIT_READY_FOR(statusFinished, Seconds(60));
@@ -1039,14 +1047,19 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HookTest, ROOT_DOCKER_VerifySlavePostFetchHook)
   ContainerInfo::DockerInfo* dockerInfo = containerInfo->mutable_docker();
   dockerInfo->set_image("alpine");
 
+  Future<TaskStatus> statusStarting;
   Future<TaskStatus> statusRunning;
   Future<TaskStatus> statusFinished;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
     .WillOnce(FutureArg<1>(&statusRunning))
     .WillOnce(FutureArg<1>(&statusFinished));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
+  AWAIT_READY_FOR(statusStarting, Seconds(60));
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
   AWAIT_READY_FOR(statusRunning, Seconds(60));
   EXPECT_EQ(TASK_RUNNING, statusRunning->state());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 5d96457..c6906a7 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -2786,22 +2786,34 @@ TEST_F(MasterTest, UnreachableTaskAfterFailover)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 100");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&runningStatus));
+    .WillOnce(FutureArg<1>(&startingStatus))
+    .WillOnce(FutureArg<1>(&runningStatus))
+    .WillRepeatedly(Return());
+
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  const SlaveID slaveId = startingStatus->slave_id();
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
-  const SlaveID slaveId = runningStatus->slave_id();
-
-  AWAIT_READY(statusUpdateAck);
+  AWAIT_READY(statusUpdateAck2);
 
   // Step 4: Simulate master failover. We leave the slave without a
   // master so it does not attempt to re-register.
@@ -6828,20 +6840,31 @@ TEST_F(MasterTest, FailoverAgentReregisterFirst)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 100");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
-  AWAIT_READY(statusUpdateAck);
+  AWAIT_READY(statusUpdateAck2);
 
   // Simulate master failover. We leave the scheduler without a master
   // so it does not attempt to re-register yet.
@@ -7048,22 +7071,33 @@ TEST_F(MasterTest, AgentRestartNoReregister)
 
   TaskInfo task = createTask(offer, "sleep 100");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
-      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+    slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
+    slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
   const SlaveID slaveId = runningStatus->slave_id();
 
-  AWAIT_READY(statusUpdateAck);
+  AWAIT_READY(statusUpdateAck2);
 
   Clock::pause();
 
@@ -7416,12 +7450,18 @@ TEST_F(MasterTest, TaskWithTinyResources)
       Resources::parse("cpus:0.00001;mem:1").get(),
       SLEEP_COMMAND(1000));
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
   driver.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
@@ -7506,12 +7546,18 @@ TEST_F(MasterTest, MultiRoleSchedulerUnsubscribeFromRole)
 
   TaskInfo task = createTask(offer.slave_id(), resources, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
   driver1.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
@@ -7808,12 +7854,18 @@ TEST_F(MasterTest, AgentDomainDifferentRegion)
     // Check that we can launch a task in a remote region.
     TaskInfo task = createTask(offer, "sleep 60");
 
+    Future<TaskStatus> startingStatus;
     Future<TaskStatus> runningStatus;
     EXPECT_CALL(sched, statusUpdate(&driver, _))
+      .WillOnce(FutureArg<1>(&startingStatus))
       .WillOnce(FutureArg<1>(&runningStatus));
 
     driver.launchTasks(offer.id(), {task});
 
+    AWAIT_READY(startingStatus);
+    EXPECT_EQ(TASK_STARTING, startingStatus->state());
+    EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
     AWAIT_READY(runningStatus);
     EXPECT_EQ(TASK_RUNNING, runningStatus->state());
     EXPECT_EQ(task.task_id(), runningStatus->task_id());
@@ -8409,9 +8461,11 @@ TEST_P(MasterTestPrePostReservationRefinement, LaunchGroup)
   v1::TaskGroupInfo taskGroup;
   taskGroup.add_tasks()->CopyFrom(taskInfo);
 
-  Future<v1::scheduler::Event::Update> update;
+  Future<v1::scheduler::Event::Update> startingUpdate;
+  Future<v1::scheduler::Event::Update> runningUpdate;
   EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(FutureArg<1>(&update));
+    .WillOnce(FutureArg<1>(&startingUpdate))
+    .WillOnce(FutureArg<1>(&runningUpdate));
 
   {
     Call call;
@@ -8433,11 +8487,17 @@ TEST_P(MasterTestPrePostReservationRefinement, LaunchGroup)
     mesos.send(call);
   }
 
-  AWAIT_READY(update);
+  AWAIT_READY(startingUpdate);
+
+  EXPECT_EQ(TASK_STARTING, startingUpdate->status().state());
+  EXPECT_EQ(taskInfo.task_id(), startingUpdate->status().task_id());
+  EXPECT_TRUE(startingUpdate->status().has_timestamp());
+
+  AWAIT_READY(runningUpdate);
 
-  EXPECT_EQ(TASK_RUNNING, update->status().state());
-  EXPECT_EQ(taskInfo.task_id(), update->status().task_id());
-  EXPECT_TRUE(update->status().has_timestamp());
+  EXPECT_EQ(TASK_STARTING, runningUpdate->status().state());
+  EXPECT_EQ(taskInfo.task_id(), runningUpdate->status().task_id());
+  EXPECT_TRUE(runningUpdate->status().has_timestamp());
 
   // Ensure that the task sandbox symbolic link is created.
   EXPECT_TRUE(os::exists(path::join(

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index f00dd9b..7da1be5 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -1889,8 +1889,10 @@ TEST_F(TaskValidationTest, TaskReusesUnreachableTaskID)
   Offer offer1 = offers1.get()[0];
   TaskInfo task1 = createTask(offer1, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
   Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
@@ -1898,6 +1900,10 @@ TEST_F(TaskValidationTest, TaskReusesUnreachableTaskID)
 
   driver.launchTasks(offer1.id(), {task1});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task1.task_id(), startingStatus->task_id());
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task1.task_id(), runningStatus->task_id());

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index cd98b8f..d262bbe 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -757,7 +757,7 @@ TEST_F(OversubscriptionTest, FixedResourceEstimator)
 
   AWAIT_READY(status);
   EXPECT_EQ(task.task_id(), status->task_id());
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  EXPECT_EQ(TASK_STARTING, status->state());
 
   // Advance the clock for the slave to trigger the calculation of the
   // total oversubscribed resources. As we described above, we don't
@@ -1023,15 +1023,20 @@ TEST_F(OversubscriptionTest, QoSCorrectionKill)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 10");
 
+  Future<TaskStatus> status0;
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status0))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2))
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
+  AWAIT_READY(status0);
+  ASSERT_EQ(TASK_STARTING, status0->state());
+
   AWAIT_READY(status1);
   ASSERT_EQ(TASK_RUNNING, status1->state());
 
@@ -1132,15 +1137,20 @@ TEST_F(OversubscriptionTest, QoSCorrectionKillPartitionAware)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 10");
 
+  Future<TaskStatus> status0;
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status0))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2))
     .WillRepeatedly(Return());       // Ignore subsequent updates.
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
+  AWAIT_READY(status0);
+  ASSERT_EQ(TASK_STARTING, status0->state());
+
   AWAIT_READY(status1);
   ASSERT_EQ(TASK_RUNNING, status1->state());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 0597bd2..7b11264 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -215,22 +215,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlavePartitionAware)
 
   TaskInfo task = createTask(offer, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
-  const SlaveID& slaveId = runningStatus->slave_id();
+  AWAIT_READY(statusUpdateAck2);
 
-  AWAIT_READY(statusUpdateAck);
+  const SlaveID& slaveId = startingStatus->slave_id();
 
   // Now, induce a partition of the slave by having the master
   // timeout the slave.
@@ -572,8 +583,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
 
   TaskInfo task = createTask(offer, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
   Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
@@ -874,8 +887,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
   // Launch `task1` using `sched1`.
   TaskInfo task1 = createTask(offer.slave_id(), taskResources, "sleep 60");
 
+  Future<TaskStatus> startingStatus1;
   Future<TaskStatus> runningStatus1;
   EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&startingStatus1))
     .WillOnce(FutureArg<1>(&runningStatus1));
 
   Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
@@ -917,8 +932,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
   // Launch the second task.
   TaskInfo task2 = createTask(offer.slave_id(), taskResources, "sleep 60");
 
+  Future<TaskStatus> startingStatus2;
   Future<TaskStatus> runningStatus2;
   EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+    .WillOnce(FutureArg<1>(&startingStatus2))
     .WillOnce(FutureArg<1>(&runningStatus2));
 
   Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
@@ -1136,22 +1153,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, PartitionedSlaveOrphanedTask)
   // Launch `task` using `sched`.
   TaskInfo task = createTask(offer, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
   const SlaveID& slaveId = runningStatus->slave_id();
 
-  AWAIT_READY(statusUpdateAck);
+  AWAIT_READY(statusUpdateAck2);
 
   // Now, induce a partition of the slave by having the master
   // timeout the slave.
@@ -1404,22 +1432,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, DisconnectedFramework)
   // Launch `task` using `sched1`.
   TaskInfo task = createTask(offer, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
   Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
   driver1.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
   const SlaveID& slaveId = runningStatus->slave_id();
 
-  AWAIT_READY(statusUpdateAck1);
+  AWAIT_READY(statusUpdateAck2);
 
   // Shutdown the master.
   master->reset();
@@ -1573,22 +1612,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, SpuriousSlaveReregistration)
   // Launch `task` using `sched`.
   TaskInfo task = createTask(offer, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
   const SlaveID& slaveId = runningStatus->slave_id();
 
-  AWAIT_READY(statusUpdateAck);
+  AWAIT_READY(statusUpdateAck2);
 
   // Simulate a master loss event at the slave and then cause the
   // slave to reregister with the master. From the master's

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/persistent_volume_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_endpoints_tests.cpp b/src/tests/persistent_volume_endpoints_tests.cpp
index 444737a..883192d 100644
--- a/src/tests/persistent_volume_endpoints_tests.cpp
+++ b/src/tests/persistent_volume_endpoints_tests.cpp
@@ -1988,16 +1988,15 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
 
   TaskInfo taskInfo = createTask(offer.slave_id(), taskResources, "sleep 1000");
 
-  // Expect a TASK_RUNNING status.
-  EXPECT_CALL(sched, statusUpdate(&driver, _));
-
-  Future<Nothing> _statusUpdateAcknowledgement =
-    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+  Future<TaskStatus> starting;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&starting))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
 
   driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
 
-  // Wait for TASK_RUNNING update ack.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  AWAIT_READY(starting);
+  EXPECT_EQ(TASK_STARTING, starting->state());
 
   // Summon an offer.
   EXPECT_CALL(sched, resourceOffers(&driver, _))

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 11fe432..acfeac1 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -806,9 +806,11 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume)
       taskResources,
       "echo abc > path1/file");
 
+  Future<TaskStatus> status0;
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status0))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2));
 
@@ -823,6 +825,10 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume)
       {CREATE(volume),
        LAUNCH({task})});
 
+  AWAIT_READY(status0);
+  EXPECT_EQ(task.task_id(), status0->task_id());
+  EXPECT_EQ(TASK_STARTING, status0->state());
+
   AWAIT_READY(status1);
   EXPECT_EQ(task.task_id(), status1->task_id());
   EXPECT_EQ(TASK_RUNNING, status1->state());
@@ -982,21 +988,25 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks)
       taskResources2.get() + volume,
       "echo task2 > path1/file2");
 
-  // We should receive a TASK_RUNNING followed by a TASK_FINISHED for
-  // each of the 2 tasks. We do not check for the actual task state
-  // since it's not the primary objective of the test. We instead
-  // verify that the paths are created by the tasks after we receive
-  // enough status updates.
+  // We should receive a TASK_STARTING, followed by a TASK_RUNNING
+  // and a TASK_FINISHED for each of the 2 tasks.
+  // We do not check for the actual task state since it's not the
+  // primary objective of the test. We instead verify that the paths
+  // are created by the tasks after we receive enough status updates.
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
   Future<TaskStatus> status3;
   Future<TaskStatus> status4;
+  Future<TaskStatus> status5;
+  Future<TaskStatus> status6;
 
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2))
     .WillOnce(FutureArg<1>(&status3))
-    .WillOnce(FutureArg<1>(&status4));
+    .WillOnce(FutureArg<1>(&status4))
+    .WillOnce(FutureArg<1>(&status5))
+    .WillOnce(FutureArg<1>(&status6));
 
   driver.acceptOffers(
       {offer.id()},
@@ -1009,6 +1019,8 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks)
   AWAIT_READY(status2);
   AWAIT_READY(status3);
   AWAIT_READY(status4);
+  AWAIT_READY(status5);
+  AWAIT_READY(status6);
 
   const string& volumePath = slave::paths::getPersistentVolumePath(
       slaveFlags.work_dir,
@@ -1258,10 +1270,12 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
       Resources::parse("cpus:1;mem:128").get() + volume,
       "echo abc > path1/file1 && sleep 1000");
 
-  // We should receive a TASK_RUNNING for the launched task.
+  // We should receive a TASK_STARTING and a TASK_RUNNING for the launched task.
+  Future<TaskStatus> status0;
   Future<TaskStatus> status1;
 
   EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&status0))
     .WillOnce(FutureArg<1>(&status1));
 
   // We use a filter of 0 seconds so the resources will be available
@@ -1275,6 +1289,9 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
        LAUNCH({task1})},
       filters);
 
+  AWAIT_READY(status0);
+  EXPECT_EQ(TASK_STARTING, status0->state());
+
   AWAIT_READY(status1);
   EXPECT_EQ(TASK_RUNNING, status1->state());
 
@@ -1331,11 +1348,13 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
       Resources::parse("cpus:1;mem:256").get() + volume,
       "echo abc > path1/file2 && sleep 1000");
 
-  // We should receive a TASK_RUNNING for the launched task.
+  // We should receive a TASK_STARTING and a TASK_RUNNING for the launched task.
   Future<TaskStatus> status2;
+  Future<TaskStatus> status3;
 
   EXPECT_CALL(sched2, statusUpdate(&driver2, _))
-    .WillOnce(FutureArg<1>(&status2));
+    .WillOnce(FutureArg<1>(&status2))
+    .WillOnce(FutureArg<1>(&status3));
 
   driver2.acceptOffers(
       {offer2.id()},
@@ -1343,7 +1362,10 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
       filters);
 
   AWAIT_READY(status2);
-  EXPECT_EQ(TASK_RUNNING, status2->state());
+  EXPECT_EQ(TASK_STARTING, status2->state());
+
+  AWAIT_READY(status3);
+  EXPECT_EQ(TASK_RUNNING, status3->state());
 
   // Collect metrics based on both frameworks. Note that the `cpus_used` and
   // `mem_used` is updated, but `disk_used` does not change since both tasks
@@ -1434,13 +1456,17 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover)
       taskResources.get() + volume,
       "sleep 1000");
 
-  // We should receive a TASK_RUNNING for each of the tasks.
+  // We should receive a TASK_STARTING and a TASK_RUNNING for each of the tasks.
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
+  Future<TaskStatus> status3;
+  Future<TaskStatus> status4;
 
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status1))
-    .WillOnce(FutureArg<1>(&status2));
+    .WillOnce(FutureArg<1>(&status2))
+    .WillOnce(FutureArg<1>(&status3))
+    .WillOnce(FutureArg<1>(&status4));
 
   Future<CheckpointResourcesMessage> checkpointResources =
     FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get()->pid);
@@ -1451,11 +1477,14 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover)
        LAUNCH({task1, task2})});
 
   AWAIT_READY(checkpointResources);
+
+  // We only check the first and the last status, because the two in between
+  // could arrive in any order.
   AWAIT_READY(status1);
-  EXPECT_EQ(TASK_RUNNING, status1->state());
+  EXPECT_EQ(TASK_STARTING, status1->state());
 
-  AWAIT_READY(status2);
-  EXPECT_EQ(TASK_RUNNING, status2->state());
+  AWAIT_READY(status4);
+  EXPECT_EQ(TASK_RUNNING, status4->state());
 
   // This is to make sure CheckpointResourcesMessage is processed.
   Clock::pause();
@@ -1598,16 +1627,20 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers));
 
-  // We should receive a TASK_RUNNING each of the 2 tasks. We track task
-  // termination by a TASK_FINISHED for the short-lived task.
+  // We should receive a TASK_STARTING and a TASK_RUNNING each of the 2 tasks.
+  // We track task termination by a TASK_FINISHED for the short-lived task.
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
   Future<TaskStatus> status3;
+  Future<TaskStatus> status4;
+  Future<TaskStatus> status5;
 
   EXPECT_CALL(sched, statusUpdate(&driver, _))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2))
-    .WillOnce(FutureArg<1>(&status3));
+    .WillOnce(FutureArg<1>(&status3))
+    .WillOnce(FutureArg<1>(&status4))
+    .WillOnce(FutureArg<1>(&status5));
 
   driver.acceptOffers(
       {offer.id()},
@@ -1616,18 +1649,23 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
        LAUNCH({task1, task2})},
       filters);
 
-  // Wait for TASK_RUNNING for both the tasks, and TASK_FINISHED for
-  // the short-lived task.
+  // Wait for TASK_STARTING and TASK_RUNNING for both the tasks,
+  // and TASK_FINISHED for the short-lived task.
   AWAIT_READY(status1);
   AWAIT_READY(status2);
   AWAIT_READY(status3);
+  AWAIT_READY(status4);
+  AWAIT_READY(status5);
 
   hashset<TaskID> tasksRunning;
   hashset<TaskID> tasksFinished;
-  vector<Future<TaskStatus>> statuses{status1, status2, status3};
+  vector<Future<TaskStatus>> statuses {
+    status1, status2, status3, status4, status5};
 
   foreach (const Future<TaskStatus>& status, statuses) {
-    if (status->state() == TASK_RUNNING) {
+    if (status->state() == TASK_STARTING) {
+      // ignore
+    } else if (status->state() == TASK_RUNNING) {
       tasksRunning.insert(status->task_id());
     } else {
       tasksFinished.insert(status->task_id());
@@ -1686,15 +1724,15 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
 
   // We kill the long-lived task and wait for TASK_KILLED, so we can
   // DESTROY the persistent volume once the task terminates.
-  Future<TaskStatus> status4;
+  Future<TaskStatus> status6;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status4));
+    .WillOnce(FutureArg<1>(&status6));
 
   driver.killTask(task1.task_id());
 
-  AWAIT_READY(status4);
-  EXPECT_EQ(task1.task_id(), status4->task_id());
-  EXPECT_EQ(TASK_KILLED, status4->state());
+  AWAIT_READY(status6);
+  EXPECT_EQ(task1.task_id(), status6->task_id());
+  EXPECT_EQ(TASK_KILLED, status6->state());
 
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers));
@@ -1923,25 +1961,37 @@ TEST_P(PersistentVolumeTest, SlaveRecovery)
       taskResources,
       "while true; do test -d path1; done");
 
+  Future<TaskStatus> status0;
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status0))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2));
 
-  Future<Nothing> ack =
+  Future<Nothing> ack1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> ack2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.acceptOffers(
       {offer.id()},
       {CREATE(volume), LAUNCH({task})});
 
+  AWAIT_READY(status0);
+  EXPECT_EQ(task.task_id(), status0->task_id());
+  EXPECT_EQ(TASK_STARTING, status0->state());
+
+  // Wait for the ACK to be checkpointed.
+  AWAIT_READY(ack1);
+
   AWAIT_READY(status1);
   EXPECT_EQ(task.task_id(), status1->task_id());
   EXPECT_EQ(TASK_RUNNING, status1->state());
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(ack);
+  AWAIT_READY(ack2);
 
   // Restart the slave.
   slave.get()->terminate();

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 64a1d3d..8ae2860 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -1178,22 +1178,33 @@ TEST_F(ReconciliationTest, PartitionedAgentThenMasterFailover)
   // Launch `task` using `sched`.
   TaskInfo task = createTask(offer, "sleep 60");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
   const SlaveID slaveId = runningStatus->slave_id();
 
-  AWAIT_READY(statusUpdateAck);
+  AWAIT_READY(statusUpdateAck2);
 
   // Now, induce a partition of the slave by having the master
   // timeout the slave.

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/reservation_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp
index e70dd0d..3645cd8 100644
--- a/src/tests/reservation_endpoints_tests.cpp
+++ b/src/tests/reservation_endpoints_tests.cpp
@@ -378,6 +378,7 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
   MesosSchedulerDriver driver(
       &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
 
+  // Expect one TASK_STARTING and one TASK_RUNNING update
   EXPECT_CALL(sched, registered(&driver, _, _));
 
   Future<vector<Offer>> offers;
@@ -407,16 +408,21 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
   // recovers 'offered' resources portion.
   TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000");
 
-  // Expect a TASK_RUNNING status.
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  // Expect a TASK_STARTING and a TASK_RUNNING status.
+  EXPECT_CALL(sched, statusUpdate(_, _)).
+    WillRepeatedly(Return());
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
 
-  // Wait for TASK_RUNNING update ack.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  // Wait for update acks.
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_statusUpdateAcknowledgement2);
 
   // Summon an offer to receive the 'offered' resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -550,16 +556,21 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
   // recovers 'offered' resources portion.
   TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000");
 
-  // Expect a TASK_RUNNING status.
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  // Expect a TASK_STARTING and a TASK_RUNNING status.
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillRepeatedly(Return());
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
 
-  // Wait for TASK_RUNNING update ack.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  // Wait for update acks from TASK_STARTING and TASK_RUNNING.
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_statusUpdateAcknowledgement2);
 
   // Summon an offer to receive the 'offered' resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1575,9 +1586,11 @@ TEST_F(ReservationEndpointsTest, AgentStateEndpointResources)
 
   Offer offer = offers.get()[0];
 
-  Future<TaskStatus> status;
+  Future<TaskStatus> statusStarting;
+  Future<TaskStatus> statusRunning;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
+    .WillOnce(FutureArg<1>(&statusStarting))
+    .WillOnce(FutureArg<1>(&statusRunning));
 
   Resources taskResources = Resources::parse(
       "cpus(role):2;mem(role):512;cpus:2;mem:1024").get();
@@ -1586,8 +1599,11 @@ TEST_F(ReservationEndpointsTest, AgentStateEndpointResources)
 
   driver.acceptOffers({offer.id()}, {LAUNCH({task})});
 
-  AWAIT_READY(status);
-  ASSERT_EQ(TASK_RUNNING, status->state());
+  AWAIT_READY(statusStarting);
+  ASSERT_EQ(TASK_STARTING, statusStarting->state());
+
+  AWAIT_READY(statusRunning);
+  ASSERT_EQ(TASK_RUNNING, statusRunning->state());
 
   Future<Response> response = process::http::get(
       agent.get()->pid,

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/role_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/role_tests.cpp b/src/tests/role_tests.cpp
index 568ea90..084555a 100644
--- a/src/tests/role_tests.cpp
+++ b/src/tests/role_tests.cpp
@@ -979,9 +979,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(RoleTest, VolumesInOverlappingHierarchies)
         taskResources,
         "! (ls -Av path | grep -q .)");
 
-    // We expect two status updates for the task.
-    Future<TaskStatus> status1, status2;
+    // We expect three status updates for the task.
+    Future<TaskStatus> status0, status1, status2;
     EXPECT_CALL(sched, statusUpdate(&driver, _))
+      .WillOnce(FutureArg<1>(&status0))
       .WillOnce(FutureArg<1>(&status1))
       .WillOnce(FutureArg<1>(&status2));
 
@@ -990,6 +991,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(RoleTest, VolumesInOverlappingHierarchies)
         {offer.id()},
         {RESERVE(reservedDisk), CREATE(volume), LAUNCH({task})});
 
+    AWAIT_READY(status0);
+
+    EXPECT_EQ(task.task_id(), status0->task_id());
+    EXPECT_EQ(TASK_STARTING, status0->state());
+
     AWAIT_READY(status1);
 
     EXPECT_EQ(task.task_id(), status1->task_id());

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4eda96e..6df4d32 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -623,11 +623,15 @@ TEST_P(SchedulerTest, TaskGroupRunning)
   taskGroup.add_tasks()->CopyFrom(task1);
   taskGroup.add_tasks()->CopyFrom(task2);
 
+  Future<Event::Update> startingUpdate1;
+  Future<Event::Update> startingUpdate2;
   Future<Event::Update> runningUpdate1;
   Future<Event::Update> runningUpdate2;
   Future<Event::Update> finishedUpdate1;
   Future<Event::Update> finishedUpdate2;
   EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&startingUpdate1))
+    .WillOnce(FutureArg<1>(&startingUpdate2))
     .WillOnce(FutureArg<1>(&runningUpdate1))
     .WillOnce(FutureArg<1>(&runningUpdate2))
     .WillOnce(FutureArg<1>(&finishedUpdate1))
@@ -669,14 +673,58 @@ TEST_P(SchedulerTest, TaskGroupRunning)
   EXPECT_EQ(devolve(task2.task_id()),
             runTaskGroupMessage->task_group().tasks(1).task_id());
 
+  AWAIT_READY(startingUpdate1);
+  ASSERT_EQ(v1::TASK_STARTING, startingUpdate1->status().state());
+
+  AWAIT_READY(startingUpdate2);
+  ASSERT_EQ(v1::TASK_STARTING, startingUpdate2->status().state());
+
+  const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
+
+  // TASK_STARTING updates for the tasks in a
+  // task group can be received in any order.
+  const hashset<v1::TaskID> tasksStarting{
+    startingUpdate1->status().task_id(),
+    startingUpdate2->status().task_id()};
+
+  ASSERT_EQ(tasks, tasksStarting);
+
+  // Acknowledge the TASK_STARTING updates so
+  // that subsequent updates can be received.
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(
+        startingUpdate1->status().task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
+    acknowledge->set_uuid(startingUpdate1->status().uuid());
+
+    mesos.send(call);
+  }
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(
+        startingUpdate2->status().task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
+    acknowledge->set_uuid(startingUpdate2->status().uuid());
+
+    mesos.send(call);
+  }
+
   AWAIT_READY(runningUpdate1);
   ASSERT_EQ(v1::TASK_RUNNING, runningUpdate1->status().state());
 
   AWAIT_READY(runningUpdate2);
   ASSERT_EQ(v1::TASK_RUNNING, runningUpdate2->status().state());
 
-  const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
-
   // TASK_RUNNING updates for the tasks in a
   // task group can be received in any order.
   const hashset<v1::TaskID> tasksRunning{

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/slave_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_authorization_tests.cpp b/src/tests/slave_authorization_tests.cpp
index 868e39e..2dcfd6c 100644
--- a/src/tests/slave_authorization_tests.cpp
+++ b/src/tests/slave_authorization_tests.cpp
@@ -627,10 +627,12 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
   // The first task should fail since the task user `foo` is not an
   // authorized user that can launch a task. However, the second task
   // should succeed.
+  Future<TaskStatus> status0;
   Future<TaskStatus> status1;
   Future<TaskStatus> status2;
 
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status0))
     .WillOnce(FutureArg<1>(&status1))
     .WillOnce(FutureArg<1>(&status2));
 
@@ -638,16 +640,19 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
       {offer.id()},
       {LAUNCH({task1, task2})});
 
-  // Wait for TASK_FAILED for 1st task, and TASK_RUNNING for 2nd task.
+  // Wait for TASK_FAILED for 1st task, and TASK_STARTING followed by
+  // TASK_RUNNING for 2nd task.
+  AWAIT_READY(status0);
   AWAIT_READY(status1);
   AWAIT_READY(status2);
 
   // Validate both the statuses. Note that the order of receiving the
-  // status updates for the 2 tasks is not deterministic.
-  hashmap<TaskID, TaskStatus> statuses {
-    {status1->task_id(), status1.get()},
-    {status2->task_id(), status2.get()}
-  };
+  // status updates for the 2 tasks is not deterministic, but we know
+  // that task2's  TASK_RUNNING ARRIVES after TASK_STARTING.
+  hashmap<TaskID, TaskStatus> statuses;
+  statuses[status0->task_id()] = status0.get();
+  statuses[status1->task_id()] = status1.get();
+  statuses[status2->task_id()] = status2.get();
 
   ASSERT_TRUE(statuses.contains(task1.task_id()));
   EXPECT_EQ(TASK_ERROR, statuses.at(task1.task_id()).state());
@@ -741,7 +746,7 @@ TEST_F(ExecutorAuthorizationTest, RunTaskGroup)
   AWAIT_READY(status);
 
   ASSERT_EQ(task.task_id(), status->task_id());
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  EXPECT_EQ(TASK_STARTING, status->state());
 
   driver.stop();
   driver.join();

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 30d8c23..c2d9cc8 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -252,7 +252,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
 
   // Capture the update.
   AWAIT_READY(update);
-  EXPECT_EQ(TASK_RUNNING, update->update().status().state());
+  EXPECT_EQ(TASK_STARTING, update->update().status().state());
 
   // Wait for the ACK to be checkpointed.
   AWAIT_READY(_statusUpdateAcknowledgement);
@@ -314,7 +314,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
         .info);
 
   // Check status update and ack.
-  ASSERT_EQ(
+  // (the number might be bigger than 1 because we might have
+  // received any number of additional TASK_RUNNING updates)
+  ASSERT_LE(
       1U,
       state
         .frameworks[frameworkId]
@@ -423,7 +425,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
   ASSERT_SOME(slave);
 
   AWAIT_READY(status);
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  EXPECT_EQ(TASK_STARTING, status->state());
 
   driver.stop();
   driver.join();
@@ -514,7 +516,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_ReconnectHTTPExecutor)
 
   // Scheduler should receive the recovered update.
   AWAIT_READY(status);
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  EXPECT_EQ(TASK_STARTING, status->state());
 
   driver.stop();
   driver.join();
@@ -762,13 +764,13 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
   TaskInfo task = createTask(offers.get()[0], "sleep 1000");
 
   // Drop the first update from the executor.
-  Future<StatusUpdateMessage> statusUpdate =
+  Future<StatusUpdateMessage> startingUpdate =
     DROP_PROTOBUF(StatusUpdateMessage(), _, _);
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
   // Stop the slave before the status update is received.
-  AWAIT_READY(statusUpdate);
+  AWAIT_READY(startingUpdate);
 
   slave.get()->terminate();
 
@@ -791,15 +793,15 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
   // Ensure the executor re-registers.
   AWAIT_READY(reregister);
 
-  // Executor should inform about the unacknowledged update.
-  ASSERT_EQ(1, reregister->updates_size());
+  // Executor should inform about the unacknowledged updates.
+  ASSERT_LE(1, reregister->updates_size());
   const StatusUpdate& update = reregister->updates(0);
   EXPECT_EQ(task.task_id(), update.status().task_id());
-  EXPECT_EQ(TASK_RUNNING, update.status().state());
+  EXPECT_EQ(TASK_STARTING, update.status().state());
 
   // Scheduler should receive the recovered update.
   AWAIT_READY(status);
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  EXPECT_EQ(TASK_STARTING, status->state());
 
   driver.stop();
   driver.join();
@@ -846,7 +848,8 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry)
 
   Future<TaskStatus> statusUpdate;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&statusUpdate));
+    .WillOnce(FutureArg<1>(&statusUpdate))
+    .WillRepeatedly(Return()); // Ignore subsequent TASK_RUNNING updates.
 
   driver.start();
 
@@ -863,7 +866,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry)
   driver.launchTasks(offers.get()[0].id(), {task});
 
   AWAIT_READY(statusUpdate);
-  EXPECT_EQ(TASK_RUNNING, statusUpdate->state());
+  EXPECT_EQ(TASK_STARTING, statusUpdate->state());
 
   // Ensure the acknowledgement is checkpointed.
   Clock::settle();
@@ -974,12 +977,19 @@ TYPED_TEST(SlaveRecoveryTest, PingTimeoutDuringRecovery)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 1000");
 
+  Future<TaskStatus> statusUpdate0;
   Future<TaskStatus> statusUpdate1;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusUpdate0))
     .WillOnce(FutureArg<1>(&statusUpdate1));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
+  AWAIT_READY(statusUpdate0);
+  ASSERT_EQ(TASK_STARTING, statusUpdate0->state());
+
+  driver.acknowledgeStatusUpdate(statusUpdate0.get());
+
   AWAIT_READY(statusUpdate1);
   ASSERT_EQ(TASK_RUNNING, statusUpdate1->state());
 
@@ -1442,7 +1452,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedHTTPExecutor)
 
   Future<vector<Offer>> offers1;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers1));
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   driver.start();
 
@@ -1451,7 +1462,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedHTTPExecutor)
 
   TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillRepeatedly(Return()); // Allow any number of subsequent status updates.
 
   Future<Nothing> ack =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
@@ -1583,7 +1595,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
 
   Future<vector<Offer>> offers1;
   EXPECT_CALL(sched, resourceOffers(_, _))
-    .WillOnce(FutureArg<1>(&offers1));
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
 
   driver.start();
 
@@ -1595,25 +1608,42 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
   Future<Message> registerExecutor =
     FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
 
-  EXPECT_CALL(sched, statusUpdate(_, _));
 
-  Future<Nothing> ack =
+  Future<TaskStatus> statusStarting;
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillRepeatedly(Return()); // Ignore subsequent status updates.
+
+  Future<Nothing> ackRunning =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+  Future<Nothing> ackStarting =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers1.get()[0].id(), {task});
 
+  // Wait for the TASK_STARTING update from the executor
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+  // Wait for the TASK_RUNNING update from the executor
+  AWAIT_READY(statusRunning);
+  EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
   // Capture the executor pid.
   AWAIT_READY(registerExecutor);
   UPID executorPid = registerExecutor->from;
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(ack);
+  AWAIT_READY(ackStarting);
+  AWAIT_READY(ackRunning);
 
   slave.get()->terminate();
 
-  Future<TaskStatus> status;
+  Future<TaskStatus> statusLost;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status))
+    .WillOnce(FutureArg<1>(&statusLost))
     .WillRepeatedly(Return()); // Ignore subsequent status updates.
 
   // Now shut down the executor, when the slave is down.
@@ -1644,18 +1674,18 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
   Clock::advance(flags.executor_reregistration_timeout);
 
   // Now advance time until the reaper reaps the executor.
-  while (status.isPending()) {
+  while (statusLost.isPending()) {
     Clock::advance(process::MAX_REAP_INTERVAL());
     Clock::settle();
   }
 
   // Scheduler should receive the TASK_LOST update.
-  AWAIT_READY(status);
+  AWAIT_READY(statusLost);
 
-  EXPECT_EQ(TASK_LOST, status->state());
-  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
+  EXPECT_EQ(TASK_LOST, statusLost->state());
+  EXPECT_EQ(TaskStatus::SOURCE_SLAVE, statusLost->source());
   EXPECT_EQ(TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT,
-            status->reason());
+            statusLost->reason());
 
   while (offers2.isPending()) {
     Clock::advance(Seconds(1));
@@ -1816,7 +1846,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
   TaskInfo task = createTask(offers1.get()[0], "exit 0");
 
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .Times(2); // TASK_RUNNING and TASK_FINISHED updates.
+    .Times(3); // TASK_STARTING, TASK_RUNNING and TASK_FINISHED updates.
 
   EXPECT_CALL(sched, offerRescinded(_, _))
     .Times(AtMost(1));
@@ -2010,15 +2040,22 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  // Expect TASK_STARTING and TASK_RUNNING updates
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .Times(2)
+    .WillRepeatedly(Return()); // Ignore subsequent updates
 
-  Future<Nothing> ack =
+  Future<Nothing> ackRunning =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> ackStarting =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(ack);
+  AWAIT_READY(ackStarting);
+  AWAIT_READY(ackRunning);
 
   slave.get()->terminate();
 
@@ -2129,17 +2166,23 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework)
         Resources(offer1.resources()) +
         Resources(offer2.resources())));
 
+  Future<Nothing> update0;
   Future<Nothing> update1;
   Future<Nothing> update2;
+  Future<Nothing> update3;
   EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureSatisfy(&update0))
     .WillOnce(FutureSatisfy(&update1))
-    .WillOnce(FutureSatisfy(&update2));
+    .WillOnce(FutureSatisfy(&update2))
+    .WillOnce(FutureSatisfy(&update3));
 
   driver.launchTasks(offers.get()[0].id(), tasks);
 
-  // Wait for TASK_RUNNING updates from the tasks.
+  // Wait for TASK_STARTING and TASK_RUNNING updates from the tasks.
+  AWAIT_READY(update0);
   AWAIT_READY(update1);
   AWAIT_READY(update2);
+  AWAIT_READY(update3);
 
   // The master should generate TASK_LOST updates once the slave is stopped.
   Future<TaskStatus> status1;
@@ -2439,15 +2482,21 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
 
   TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  // Expect a TASK_STARTING and a TASK_RUNNING update
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> ack =
+  Future<Nothing> ack1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> ack2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers1.get()[0].id(), {task});
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(ack);
+  AWAIT_READY(ack1);
+  AWAIT_READY(ack2);
 
   slave.get()->terminate();
 
@@ -3068,9 +3117,10 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
 
   TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
 
-  Future<Nothing> statusUpdate1;
+  Future<Nothing> statusUpdate1, statusUpdate2;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureSatisfy(&statusUpdate1))
+    .WillOnce(FutureSatisfy(&statusUpdate1)) // TASK_STARTING
+    .WillOnce(FutureSatisfy(&statusUpdate2)) // TASK_RUNNING
     .WillOnce(Return());  // Ignore TASK_FAILED update.
 
   Future<Message> registerExecutor =
@@ -3082,7 +3132,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
   AWAIT_READY(registerExecutor);
   UPID executorPid = registerExecutor->from;
 
-  AWAIT_READY(statusUpdate1); // Wait for TASK_RUNNING update.
+  AWAIT_READY(statusUpdate2); // Wait for TASK_RUNNING update.
 
   EXPECT_CALL(sched, offerRescinded(_, _))
     .Times(AtMost(1));
@@ -3186,18 +3236,22 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 1000");
 
-  Future<TaskStatus> status;
+  Future<TaskStatus> statusStarting, statusRunning;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status));
+    .WillOnce(FutureArg<1>(&statusStarting))
+    .WillOnce(FutureArg<1>(&statusRunning));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
-  AWAIT_READY(status);
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
 
-  Future<TaskStatus> status2;
+  AWAIT_READY(statusRunning);
+  EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+  Future<TaskStatus> statusLost;
   EXPECT_CALL(sched, statusUpdate(_, _))
-    .WillOnce(FutureArg<1>(&status2));
+    .WillOnce(FutureArg<1>(&statusLost));
 
   Future<Nothing> slaveLost;
   EXPECT_CALL(sched, slaveLost(_, _))
@@ -3223,11 +3277,11 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
   AWAIT_READY(executorTerminated);
 
   // The master should send a TASK_LOST and slaveLost.
-  AWAIT_READY(status2);
+  AWAIT_READY(statusLost);
 
-  EXPECT_EQ(TASK_LOST, status2->state());
-  EXPECT_EQ(TaskStatus::SOURCE_MASTER, status2->source());
-  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status2->reason());
+  EXPECT_EQ(TASK_LOST, statusLost->state());
+  EXPECT_EQ(TaskStatus::SOURCE_MASTER, statusLost->source());
+  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, statusLost->reason());
 
   AWAIT_READY(slaveLost);
 
@@ -3415,16 +3469,21 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
   SlaveID slaveId = offers1.get()[0].slave_id();
   FrameworkID frameworkId = offers1.get()[0].framework_id();
 
-  // Expecting TASK_RUNNING status.
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  // Expecting TASK_STARTING and TASK_RUNNING status.
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers1.get()[0].id(), {task});
 
   // Wait for TASK_RUNNING update to be acknowledged.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_statusUpdateAcknowledgement2);
 
   slave.get()->terminate();
 
@@ -3513,18 +3572,24 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
   // Capture the framework id.
   FrameworkID frameworkId = offers.get()[0].framework_id();
 
-  // Expecting TASK_RUNNING status.
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  // Expecting a TASK_STARTING and a TASK_RUNNING status.
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .Times(2)
+    .WillRepeatedly(Return());
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   TaskInfo task = createTask(offers.get()[0], "sleep 1000");
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
-  // Wait for TASK_RUNNING update to be acknowledged.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  // Wait for the updates to be acknowledged.
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_statusUpdateAcknowledgement2);
 
   slave.get()->terminate();
 
@@ -3661,15 +3726,30 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
   // re-registers by wiping the relevant meta directory.
   TaskInfo task = createTask(offers1.get()[0], "sleep 10");
 
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  Future<TaskStatus> starting;
+  Future<TaskStatus> running;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&starting))
+    .WillOnce(FutureArg<1>(&running))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> startingAck =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> runningAck =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers1.get()[0].id(), {task});
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  AWAIT_READY(starting);
+  AWAIT_READY(startingAck);
+
+  AWAIT_READY(running);
+  AWAIT_READY(runningAck);
+
+  EXPECT_EQ(TASK_STARTING, starting->state());
+  EXPECT_EQ(TASK_RUNNING, running->state());
 
   EXPECT_CALL(allocator, deactivateSlave(_));
 
@@ -3824,15 +3904,21 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
   // Create a long running task.
   TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched1, statusUpdate(_, _));
+  // Expecting TASK_STARTING and TASK_RUNNING updates
+  EXPECT_CALL(sched1, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver1.launchTasks(offers1.get()[0].id(), {task});
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_statusUpdateAcknowledgement2);
 
   slave.get()->terminate();
 
@@ -3975,15 +4061,20 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
 
   TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .Times(2); // TASK_STARTING and TASK_RUNNING
 
-  Future<Nothing> _statusUpdateAcknowledgement =
+  Future<Nothing> _statusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _statusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers1.get()[0].id(), {task});
 
-  // Wait for the ACK to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  // Wait for both ACKs to be checkpointed.
+  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_statusUpdateAcknowledgement2);
 
   slave.get()->terminate();
 
@@ -4117,15 +4208,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
   // Framework 1 launches a task.
   TaskInfo task1 = createTask(offer1, "sleep 1000");
 
-  EXPECT_CALL(sched1, statusUpdate(_, _));
+  EXPECT_CALL(sched1, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement1 =
+  Future<Nothing> _startingStatusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _runningStatusUpdateAcknowledgement1 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
   driver1.launchTasks(offer1.id(), {task1});
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_startingStatusUpdateAcknowledgement1);
+  AWAIT_READY(_runningStatusUpdateAcknowledgement1);
 
   // Framework 2. Enable checkpointing.
   FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
@@ -4150,14 +4246,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
   // Framework 2 launches a task.
   TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched2, statusUpdate(_, _));
+  EXPECT_CALL(sched2, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement2 =
+  Future<Nothing> _startingStatusUpdateAcknowledgement2 =
+    FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _runningStatusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
   driver2.launchTasks(offers2.get()[0].id(), {task2});
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement2);
+  AWAIT_READY(_startingStatusUpdateAcknowledgement2);
+  AWAIT_READY(_runningStatusUpdateAcknowledgement2);
 
   slave.get()->terminate();
 
@@ -4281,15 +4383,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
   // Launch a long running task in the first slave.
   TaskInfo task1 = createTask(offers1.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement1 =
+  Future<Nothing> _startingStatusUpdateAcknowledgement1 =
+    FUTURE_DISPATCH(slave1.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _runningStatusUpdateAcknowledgement1 =
     FUTURE_DISPATCH(slave1.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers1.get()[0].id(), {task1});
 
   // Wait for the ACK to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement1);
+  AWAIT_READY(_startingStatusUpdateAcknowledgement1);
+  AWAIT_READY(_runningStatusUpdateAcknowledgement1);
 
   Future<vector<Offer>> offers2;
   EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -4316,15 +4423,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
   // Launch a long running task in each slave.
   TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
 
-  EXPECT_CALL(sched, statusUpdate(_, _));
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .Times(2);
 
-  Future<Nothing> _statusUpdateAcknowledgement2 =
+  Future<Nothing> _startingStatusUpdateAcknowledgement2 =
+    FUTURE_DISPATCH(slave2.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> _runningStatusUpdateAcknowledgement2 =
     FUTURE_DISPATCH(slave2.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers2.get()[0].id(), {task2});
 
   // Wait for the ACKs to be checkpointed.
-  AWAIT_READY(_statusUpdateAcknowledgement2);
+  AWAIT_READY(_startingStatusUpdateAcknowledgement2);
+  AWAIT_READY(_runningStatusUpdateAcknowledgement2);
 
   slave1.get()->terminate();
   slave2.get()->terminate();

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 76a157f..def64b5 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -680,16 +680,22 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, CommandTaskWithArguments)
 
   task.mutable_command()->MergeFrom(command);
 
+  Future<TaskStatus> statusStarting;
   Future<TaskStatus> statusRunning;
   Future<TaskStatus> statusFinished;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
     .WillOnce(FutureArg<1>(&statusRunning))
     .WillOnce(FutureArg<1>(&statusFinished));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
-  // Scheduler should first receive TASK_RUNNING followed by the
-  // TASK_FINISHED from the executor.
+  // Scheduler should first receive TASK_STARTING, followed by
+  // TASK_RUNNING and TASK_FINISHED from the executor.
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusStarting->source());
+
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning->state());
   EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning->source());
@@ -746,12 +752,17 @@ TEST_F(SlaveTest, CommandTaskWithKillPolicy)
   task.mutable_kill_policy()->mutable_grace_period()->set_nanoseconds(
       gracePeriod.ns());
 
+  Future<TaskStatus> statusStarting;
   Future<TaskStatus> statusRunning;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
     .WillOnce(FutureArg<1>(&statusRunning));
 
   driver.launchTasks(offer.id(), {task});
 
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning->state());
 
@@ -1051,16 +1062,22 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
 
   task.mutable_command()->MergeFrom(command);
 
+  Future<TaskStatus> statusStarting;
   Future<TaskStatus> statusRunning;
   Future<TaskStatus> statusFinished;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
     .WillOnce(FutureArg<1>(&statusRunning))
     .WillOnce(FutureArg<1>(&statusFinished));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
-  // Scheduler should first receive TASK_RUNNING followed by the
-  // TASK_FINISHED from the executor.
+  // Scheduler should first receive TASK_STARTING followed by
+  // TASK_RUNNING and TASK_FINISHED from the executor.
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+  EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusStarting->source());
+
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning->state());
   EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning->source());
@@ -2381,15 +2398,21 @@ TEST_F(SlaveTest, StatisticsEndpointRunningExecutor)
       Resources::parse("cpus:1;mem:32").get(),
       SLEEP_COMMAND(1000));
 
-  Future<TaskStatus> status;
+  Future<TaskStatus> statusStarting;
+  Future<TaskStatus> statusRunning;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status));
+    .WillOnce(FutureArg<1>(&statusStarting))
+    .WillOnce(FutureArg<1>(&statusRunning));
 
   driver.launchTasks(offer.id(), {task});
 
-  AWAIT_READY(status);
-  EXPECT_EQ(task.task_id(), status->task_id());
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(task.task_id(), statusStarting->task_id());
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+  AWAIT_READY(statusRunning);
+  EXPECT_EQ(task.task_id(), statusRunning->task_id());
+  EXPECT_EQ(TASK_RUNNING, statusRunning->state());
 
   // Hit the statistics endpoint and expect the response contains the
   // resource statistics for the running container.
@@ -5207,16 +5230,21 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorEnvironmentVariables)
 
   task.mutable_command()->MergeFrom(command);
 
+  Future<TaskStatus> statusStarting;
   Future<TaskStatus> statusRunning;
   Future<TaskStatus> statusFinished;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusStarting))
     .WillOnce(FutureArg<1>(&statusRunning))
     .WillOnce(FutureArg<1>(&statusFinished));
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
-  // Scheduler should first receive TASK_RUNNING followed by the
-  // TASK_FINISHED from the executor.
+  // Scheduler should first receive TASK_STARTING, followed by
+  // TASK_STARTING and TASK_FINISHED from the executor.
+  AWAIT_READY(statusStarting);
+  EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
   AWAIT_READY(statusRunning);
   EXPECT_EQ(TASK_RUNNING, statusRunning->state());
 
@@ -5562,7 +5590,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, HTTPSchedulerSlaveRestart)
   UPID executorPid = registerExecutorMessage->from;
 
   AWAIT_READY(status);
-  EXPECT_EQ(TASK_RUNNING, status->state());
+  EXPECT_EQ(TASK_STARTING, status->state());
 
   // Restart the slave.
   slave.get()->terminate();
@@ -6829,14 +6857,23 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
   AWAIT_READY(offers);
   ASSERT_FALSE(offers->offers().empty());
 
-  Future<v1::scheduler::Event::Update> update;
-
-  EXPECT_CALL(*scheduler, update(_, _))
-    .WillOnce(FutureArg<1>(&update));
-
   const v1::Offer offer = offers->offers(0);
   const v1::AgentID& agentId = offer.agent_id();
 
+  Future<v1::scheduler::Event::Update> updateStarting;
+  Future<v1::scheduler::Event::Update> updateRunning;
+
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(
+      DoAll(
+        FutureArg<1>(&updateStarting),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(
+      DoAll(
+        FutureArg<1>(&updateRunning),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
   v1::Resources resources =
     v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
 
@@ -6878,28 +6915,15 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
     mesos.send(call);
   }
 
-  AWAIT_READY(update);
-
-  ASSERT_EQ(TASK_RUNNING, update->status().state());
-  ASSERT_EQ(taskInfo.task_id(), update->status().task_id());
-
-  Future<Nothing> _statusUpdateAcknowledgement =
-    FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
-
-  {
-    Call call;
-    call.mutable_framework_id()->CopyFrom(frameworkId);
-    call.set_type(Call::ACKNOWLEDGE);
+  AWAIT_READY(updateStarting);
 
-    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
-    acknowledge->mutable_task_id()->CopyFrom(update->status().task_id());
-    acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
-    acknowledge->set_uuid(update->status().uuid());
+  ASSERT_EQ(TASK_STARTING, updateStarting->status().state());
+  ASSERT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
 
-    mesos.send(call);
-  }
+  AWAIT_READY(updateRunning);
 
-  AWAIT_READY(_statusUpdateAcknowledgement);
+  ASSERT_EQ(TASK_RUNNING, updateRunning->status().state());
+  ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
 
   // Restart the agent.
   slave.get()->terminate();
@@ -8177,12 +8201,19 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorReregistrationTimeoutFlag)
 
   TaskInfo task = createTask(offers->front(), "sleep 1000");
 
+  Future<TaskStatus> statusUpdate0;
   Future<TaskStatus> statusUpdate1;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusUpdate0))
     .WillOnce(FutureArg<1>(&statusUpdate1));
 
   driver.launchTasks(offers->front().id(), {task});
 
+  AWAIT_READY(statusUpdate0);
+  ASSERT_EQ(TASK_STARTING, statusUpdate0->state());
+
+  driver.acknowledgeStatusUpdate(statusUpdate0.get());
+
   AWAIT_READY(statusUpdate1);
   ASSERT_EQ(TASK_RUNNING, statusUpdate1->state());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/teardown_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/teardown_tests.cpp b/src/tests/teardown_tests.cpp
index 5eada4f..392cacf 100644
--- a/src/tests/teardown_tests.cpp
+++ b/src/tests/teardown_tests.cpp
@@ -353,20 +353,31 @@ TEST_F(TeardownTest, RecoveredFrameworkAfterMasterFailover)
 
   TaskInfo task = createTask(offers.get()[0], "sleep 100");
 
+  Future<TaskStatus> startingStatus;
   Future<TaskStatus> runningStatus;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&startingStatus))
     .WillOnce(FutureArg<1>(&runningStatus));
 
-  Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
       slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
 
   driver.launchTasks(offers.get()[0].id(), {task});
 
+  AWAIT_READY(startingStatus);
+  EXPECT_EQ(TASK_STARTING, startingStatus->state());
+  EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+  AWAIT_READY(statusUpdateAck1);
+
   AWAIT_READY(runningStatus);
   EXPECT_EQ(TASK_RUNNING, runningStatus->state());
   EXPECT_EQ(task.task_id(), runningStatus->task_id());
 
-  AWAIT_READY(statusUpdateAck);
+  AWAIT_READY(statusUpdateAck2);
 
   // Simulate master failover. We leave the scheduler without a master
   // so it does not attempt to re-register.