You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/10/18 19:41:54 UTC
[4/6] mesos git commit: Fix unit tests that were broken by the
additional TASK_STARTING update.
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/hook_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hook_tests.cpp b/src/tests/hook_tests.cpp
index c4fadbb..5428782 100644
--- a/src/tests/hook_tests.cpp
+++ b/src/tests/hook_tests.cpp
@@ -707,9 +707,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
Invoke(&containerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
@@ -717,6 +719,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
@@ -924,9 +928,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
Invoke(&containerizer,
&MockDockerContainerizer::_launch)));
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished))
.WillRepeatedly(DoDefault());
@@ -934,6 +940,8 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
driver.launchTasks(offers.get()[0].id(), tasks);
AWAIT_READY_FOR(containerId, Seconds(60));
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
AWAIT_READY_FOR(statusFinished, Seconds(60));
@@ -1039,14 +1047,19 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(HookTest, ROOT_DOCKER_VerifySlavePostFetchHook)
ContainerInfo::DockerInfo* dockerInfo = containerInfo->mutable_docker();
dockerInfo->set_image("alpine");
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY_FOR(statusStarting, Seconds(60));
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY_FOR(statusRunning, Seconds(60));
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 5d96457..c6906a7 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -2786,22 +2786,34 @@ TEST_F(MasterTest, UnreachableTaskAfterFailover)
TaskInfo task = createTask(offers.get()[0], "sleep 100");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&runningStatus));
+ .WillOnce(FutureArg<1>(&startingStatus))
+ .WillOnce(FutureArg<1>(&runningStatus))
+ .WillRepeatedly(Return());
+
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ const SlaveID slaveId = startingStatus->slave_id();
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
- const SlaveID slaveId = runningStatus->slave_id();
-
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Step 4: Simulate master failover. We leave the slave without a
// master so it does not attempt to re-register.
@@ -6828,20 +6840,31 @@ TEST_F(MasterTest, FailoverAgentReregisterFirst)
TaskInfo task = createTask(offers.get()[0], "sleep 100");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Simulate master failover. We leave the scheduler without a master
// so it does not attempt to re-register yet.
@@ -7048,22 +7071,33 @@ TEST_F(MasterTest, AgentRestartNoReregister)
TaskInfo task = createTask(offer, "sleep 100");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
- slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
Clock::pause();
@@ -7416,12 +7450,18 @@ TEST_F(MasterTest, TaskWithTinyResources)
Resources::parse("cpus:0.00001;mem:1").get(),
SLEEP_COMMAND(1000));
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
@@ -7506,12 +7546,18 @@ TEST_F(MasterTest, MultiRoleSchedulerUnsubscribeFromRole)
TaskInfo task = createTask(offer.slave_id(), resources, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
driver1.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
@@ -7808,12 +7854,18 @@ TEST_F(MasterTest, AgentDomainDifferentRegion)
// Check that we can launch a task in a remote region.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
@@ -8409,9 +8461,11 @@ TEST_P(MasterTestPrePostReservationRefinement, LaunchGroup)
v1::TaskGroupInfo taskGroup;
taskGroup.add_tasks()->CopyFrom(taskInfo);
- Future<v1::scheduler::Event::Update> update;
+ Future<v1::scheduler::Event::Update> startingUpdate;
+ Future<v1::scheduler::Event::Update> runningUpdate;
EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&update));
+ .WillOnce(FutureArg<1>(&startingUpdate))
+ .WillOnce(FutureArg<1>(&runningUpdate));
{
Call call;
@@ -8433,11 +8487,17 @@ TEST_P(MasterTestPrePostReservationRefinement, LaunchGroup)
mesos.send(call);
}
- AWAIT_READY(update);
+ AWAIT_READY(startingUpdate);
+
+ EXPECT_EQ(TASK_STARTING, startingUpdate->status().state());
+ EXPECT_EQ(taskInfo.task_id(), startingUpdate->status().task_id());
+ EXPECT_TRUE(startingUpdate->status().has_timestamp());
+
+ AWAIT_READY(runningUpdate);
- EXPECT_EQ(TASK_RUNNING, update->status().state());
- EXPECT_EQ(taskInfo.task_id(), update->status().task_id());
- EXPECT_TRUE(update->status().has_timestamp());
+ EXPECT_EQ(TASK_STARTING, runningUpdate->status().state());
+ EXPECT_EQ(taskInfo.task_id(), runningUpdate->status().task_id());
+ EXPECT_TRUE(runningUpdate->status().has_timestamp());
// Ensure that the task sandbox symbolic link is created.
EXPECT_TRUE(os::exists(path::join(
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index f00dd9b..7da1be5 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -1889,8 +1889,10 @@ TEST_F(TaskValidationTest, TaskReusesUnreachableTaskID)
Offer offer1 = offers1.get()[0];
TaskInfo task1 = createTask(offer1, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
@@ -1898,6 +1900,10 @@ TEST_F(TaskValidationTest, TaskReusesUnreachableTaskID)
driver.launchTasks(offer1.id(), {task1});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task1.task_id(), startingStatus->task_id());
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task1.task_id(), runningStatus->task_id());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index cd98b8f..d262bbe 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -757,7 +757,7 @@ TEST_F(OversubscriptionTest, FixedResourceEstimator)
AWAIT_READY(status);
EXPECT_EQ(task.task_id(), status->task_id());
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
// Advance the clock for the slave to trigger the calculation of the
// total oversubscribed resources. As we described above, we don't
@@ -1023,15 +1023,20 @@ TEST_F(OversubscriptionTest, QoSCorrectionKill)
TaskInfo task = createTask(offers.get()[0], "sleep 10");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(status0);
+ ASSERT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
ASSERT_EQ(TASK_RUNNING, status1->state());
@@ -1132,15 +1137,20 @@ TEST_F(OversubscriptionTest, QoSCorrectionKillPartitionAware)
TaskInfo task = createTask(offers.get()[0], "sleep 10");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(status0);
+ ASSERT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
ASSERT_EQ(TASK_RUNNING, status1->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 0597bd2..7b11264 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -215,22 +215,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlavePartitionAware)
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
- const SlaveID& slaveId = runningStatus->slave_id();
+ AWAIT_READY(statusUpdateAck2);
- AWAIT_READY(statusUpdateAck);
+ const SlaveID& slaveId = startingStatus->slave_id();
// Now, induce a partition of the slave by having the master
// timeout the slave.
@@ -572,8 +583,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
@@ -874,8 +887,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
// Launch `task1` using `sched1`.
TaskInfo task1 = createTask(offer.slave_id(), taskResources, "sleep 60");
+ Future<TaskStatus> startingStatus1;
Future<TaskStatus> runningStatus1;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&startingStatus1))
.WillOnce(FutureArg<1>(&runningStatus1));
Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
@@ -917,8 +932,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
// Launch the second task.
TaskInfo task2 = createTask(offer.slave_id(), taskResources, "sleep 60");
+ Future<TaskStatus> startingStatus2;
Future<TaskStatus> runningStatus2;
EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+ .WillOnce(FutureArg<1>(&startingStatus2))
.WillOnce(FutureArg<1>(&runningStatus2));
Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
@@ -1136,22 +1153,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, PartitionedSlaveOrphanedTask)
// Launch `task` using `sched`.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID& slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Now, induce a partition of the slave by having the master
// timeout the slave.
@@ -1404,22 +1432,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, DisconnectedFramework)
// Launch `task` using `sched1`.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
driver1.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID& slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck1);
+ AWAIT_READY(statusUpdateAck2);
// Shutdown the master.
master->reset();
@@ -1573,22 +1612,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, SpuriousSlaveReregistration)
// Launch `task` using `sched`.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID& slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Simulate a master loss event at the slave and then cause the
// slave to reregister with the master. From the master's
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/persistent_volume_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_endpoints_tests.cpp b/src/tests/persistent_volume_endpoints_tests.cpp
index 444737a..883192d 100644
--- a/src/tests/persistent_volume_endpoints_tests.cpp
+++ b/src/tests/persistent_volume_endpoints_tests.cpp
@@ -1988,16 +1988,15 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
TaskInfo taskInfo = createTask(offer.slave_id(), taskResources, "sleep 1000");
- // Expect a TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(&driver, _));
-
- Future<Nothing> _statusUpdateAcknowledgement =
- FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+ Future<TaskStatus> starting;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&starting))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
- // Wait for TASK_RUNNING update ack.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(starting);
+ EXPECT_EQ(TASK_STARTING, starting->state());
// Summon an offer.
EXPECT_CALL(sched, resourceOffers(&driver, _))
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 11fe432..acfeac1 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -806,9 +806,11 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume)
taskResources,
"echo abc > path1/file");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
@@ -823,6 +825,10 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume)
{CREATE(volume),
LAUNCH({task})});
+ AWAIT_READY(status0);
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
EXPECT_EQ(TASK_RUNNING, status1->state());
@@ -982,21 +988,25 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks)
taskResources2.get() + volume,
"echo task2 > path1/file2");
- // We should receive a TASK_RUNNING followed by a TASK_FINISHED for
- // each of the 2 tasks. We do not check for the actual task state
- // since it's not the primary objective of the test. We instead
- // verify that the paths are created by the tasks after we receive
- // enough status updates.
+ // We should receive a TASK_STARTING, followed by a TASK_RUNNING
+ // and a TASK_FINISHED for each of the 2 tasks.
+ // We do not check for the actual task state since it's not the
+ // primary objective of the test. We instead verify that the paths
+ // are created by the tasks after we receive enough status updates.
Future<TaskStatus> status1;
Future<TaskStatus> status2;
Future<TaskStatus> status3;
Future<TaskStatus> status4;
+ Future<TaskStatus> status5;
+ Future<TaskStatus> status6;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
.WillOnce(FutureArg<1>(&status3))
- .WillOnce(FutureArg<1>(&status4));
+ .WillOnce(FutureArg<1>(&status4))
+ .WillOnce(FutureArg<1>(&status5))
+ .WillOnce(FutureArg<1>(&status6));
driver.acceptOffers(
{offer.id()},
@@ -1009,6 +1019,8 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks)
AWAIT_READY(status2);
AWAIT_READY(status3);
AWAIT_READY(status4);
+ AWAIT_READY(status5);
+ AWAIT_READY(status6);
const string& volumePath = slave::paths::getPersistentVolumePath(
slaveFlags.work_dir,
@@ -1258,10 +1270,12 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
Resources::parse("cpus:1;mem:128").get() + volume,
"echo abc > path1/file1 && sleep 1000");
- // We should receive a TASK_RUNNING for the launched task.
+ // We should receive a TASK_STARTING and a TASK_RUNNING for the launched task.
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1));
// We use a filter of 0 seconds so the resources will be available
@@ -1275,6 +1289,9 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
LAUNCH({task1})},
filters);
+ AWAIT_READY(status0);
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(TASK_RUNNING, status1->state());
@@ -1331,11 +1348,13 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
Resources::parse("cpus:1;mem:256").get() + volume,
"echo abc > path1/file2 && sleep 1000");
- // We should receive a TASK_RUNNING for the launched task.
+ // We should receive a TASK_STARTING and a TASK_RUNNING for the launched task.
Future<TaskStatus> status2;
+ Future<TaskStatus> status3;
EXPECT_CALL(sched2, statusUpdate(&driver2, _))
- .WillOnce(FutureArg<1>(&status2));
+ .WillOnce(FutureArg<1>(&status2))
+ .WillOnce(FutureArg<1>(&status3));
driver2.acceptOffers(
{offer2.id()},
@@ -1343,7 +1362,10 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
filters);
AWAIT_READY(status2);
- EXPECT_EQ(TASK_RUNNING, status2->state());
+ EXPECT_EQ(TASK_STARTING, status2->state());
+
+ AWAIT_READY(status3);
+ EXPECT_EQ(TASK_RUNNING, status3->state());
// Collect metrics based on both frameworks. Note that the `cpus_used` and
// `mem_used` is updated, but `disk_used` does not change since both tasks
@@ -1434,13 +1456,17 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover)
taskResources.get() + volume,
"sleep 1000");
- // We should receive a TASK_RUNNING for each of the tasks.
+ // We should receive a TASK_STARTING and a TASK_RUNNING for each of the tasks.
Future<TaskStatus> status1;
Future<TaskStatus> status2;
+ Future<TaskStatus> status3;
+ Future<TaskStatus> status4;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1))
- .WillOnce(FutureArg<1>(&status2));
+ .WillOnce(FutureArg<1>(&status2))
+ .WillOnce(FutureArg<1>(&status3))
+ .WillOnce(FutureArg<1>(&status4));
Future<CheckpointResourcesMessage> checkpointResources =
FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get()->pid);
@@ -1451,11 +1477,14 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover)
LAUNCH({task1, task2})});
AWAIT_READY(checkpointResources);
+
+ // We only check the first and the last status, because the two in between
+ // could arrive in any order.
AWAIT_READY(status1);
- EXPECT_EQ(TASK_RUNNING, status1->state());
+ EXPECT_EQ(TASK_STARTING, status1->state());
- AWAIT_READY(status2);
- EXPECT_EQ(TASK_RUNNING, status2->state());
+ AWAIT_READY(status4);
+ EXPECT_EQ(TASK_RUNNING, status4->state());
// This is to make sure CheckpointResourcesMessage is processed.
Clock::pause();
@@ -1598,16 +1627,20 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
- // We should receive a TASK_RUNNING each of the 2 tasks. We track task
- // termination by a TASK_FINISHED for the short-lived task.
+ // We should receive a TASK_STARTING and a TASK_RUNNING each of the 2 tasks.
+ // We track task termination by a TASK_FINISHED for the short-lived task.
Future<TaskStatus> status1;
Future<TaskStatus> status2;
Future<TaskStatus> status3;
+ Future<TaskStatus> status4;
+ Future<TaskStatus> status5;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
- .WillOnce(FutureArg<1>(&status3));
+ .WillOnce(FutureArg<1>(&status3))
+ .WillOnce(FutureArg<1>(&status4))
+ .WillOnce(FutureArg<1>(&status5));
driver.acceptOffers(
{offer.id()},
@@ -1616,18 +1649,23 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
LAUNCH({task1, task2})},
filters);
- // Wait for TASK_RUNNING for both the tasks, and TASK_FINISHED for
- // the short-lived task.
+ // Wait for TASK_STARTING and TASK_RUNNING for both the tasks,
+ // and TASK_FINISHED for the short-lived task.
AWAIT_READY(status1);
AWAIT_READY(status2);
AWAIT_READY(status3);
+ AWAIT_READY(status4);
+ AWAIT_READY(status5);
hashset<TaskID> tasksRunning;
hashset<TaskID> tasksFinished;
- vector<Future<TaskStatus>> statuses{status1, status2, status3};
+ vector<Future<TaskStatus>> statuses {
+ status1, status2, status3, status4, status5};
foreach (const Future<TaskStatus>& status, statuses) {
- if (status->state() == TASK_RUNNING) {
+ if (status->state() == TASK_STARTING) {
+ // ignore
+ } else if (status->state() == TASK_RUNNING) {
tasksRunning.insert(status->task_id());
} else {
tasksFinished.insert(status->task_id());
@@ -1686,15 +1724,15 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
// We kill the long-lived task and wait for TASK_KILLED, so we can
// DESTROY the persistent volume once the task terminates.
- Future<TaskStatus> status4;
+ Future<TaskStatus> status6;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status4));
+ .WillOnce(FutureArg<1>(&status6));
driver.killTask(task1.task_id());
- AWAIT_READY(status4);
- EXPECT_EQ(task1.task_id(), status4->task_id());
- EXPECT_EQ(TASK_KILLED, status4->state());
+ AWAIT_READY(status6);
+ EXPECT_EQ(task1.task_id(), status6->task_id());
+ EXPECT_EQ(TASK_KILLED, status6->state());
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
@@ -1923,25 +1961,37 @@ TEST_P(PersistentVolumeTest, SlaveRecovery)
taskResources,
"while true; do test -d path1; done");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
- Future<Nothing> ack =
+ Future<Nothing> ack1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> ack2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.acceptOffers(
{offer.id()},
{CREATE(volume), LAUNCH({task})});
+ AWAIT_READY(status0);
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
+ // Wait for the ACK to be checkpointed.
+ AWAIT_READY(ack1);
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
EXPECT_EQ(TASK_RUNNING, status1->state());
// Wait for the ACK to be checkpointed.
- AWAIT_READY(ack);
+ AWAIT_READY(ack2);
// Restart the slave.
slave.get()->terminate();
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 64a1d3d..8ae2860 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -1178,22 +1178,33 @@ TEST_F(ReconciliationTest, PartitionedAgentThenMasterFailover)
// Launch `task` using `sched`.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Now, induce a partition of the slave by having the master
// timeout the slave.
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/reservation_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp
index e70dd0d..3645cd8 100644
--- a/src/tests/reservation_endpoints_tests.cpp
+++ b/src/tests/reservation_endpoints_tests.cpp
@@ -378,6 +378,7 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+ // Expect one TASK_STARTING and one TASK_RUNNING update
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
@@ -407,16 +408,21 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
// recovers 'offered' resources portion.
TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000");
- // Expect a TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expect a TASK_STARTING and a TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _)).
+ WillRepeatedly(Return());
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
- // Wait for TASK_RUNNING update ack.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ // Wait for update acks.
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
// Summon an offer to receive the 'offered' resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -550,16 +556,21 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
// recovers 'offered' resources portion.
TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000");
- // Expect a TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expect a TASK_STARTING and a TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillRepeatedly(Return());
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
- // Wait for TASK_RUNNING update ack.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ // Wait for update acks from TASK_STARTING and TASK_RUNNING.
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
// Summon an offer to receive the 'offered' resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1575,9 +1586,11 @@ TEST_F(ReservationEndpointsTest, AgentStateEndpointResources)
Offer offer = offers.get()[0];
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning));
Resources taskResources = Resources::parse(
"cpus(role):2;mem(role):512;cpus:2;mem:1024").get();
@@ -1586,8 +1599,11 @@ TEST_F(ReservationEndpointsTest, AgentStateEndpointResources)
driver.acceptOffers({offer.id()}, {LAUNCH({task})});
- AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ ASSERT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
+ ASSERT_EQ(TASK_RUNNING, statusRunning->state());
Future<Response> response = process::http::get(
agent.get()->pid,
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/role_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/role_tests.cpp b/src/tests/role_tests.cpp
index 568ea90..084555a 100644
--- a/src/tests/role_tests.cpp
+++ b/src/tests/role_tests.cpp
@@ -979,9 +979,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(RoleTest, VolumesInOverlappingHierarchies)
taskResources,
"! (ls -Av path | grep -q .)");
- // We expect two status updates for the task.
- Future<TaskStatus> status1, status2;
+ // We expect three status updates for the task.
+ Future<TaskStatus> status0, status1, status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
@@ -990,6 +991,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(RoleTest, VolumesInOverlappingHierarchies)
{offer.id()},
{RESERVE(reservedDisk), CREATE(volume), LAUNCH({task})});
+ AWAIT_READY(status0);
+
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4eda96e..6df4d32 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -623,11 +623,15 @@ TEST_P(SchedulerTest, TaskGroupRunning)
taskGroup.add_tasks()->CopyFrom(task1);
taskGroup.add_tasks()->CopyFrom(task2);
+ Future<Event::Update> startingUpdate1;
+ Future<Event::Update> startingUpdate2;
Future<Event::Update> runningUpdate1;
Future<Event::Update> runningUpdate2;
Future<Event::Update> finishedUpdate1;
Future<Event::Update> finishedUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&startingUpdate1))
+ .WillOnce(FutureArg<1>(&startingUpdate2))
.WillOnce(FutureArg<1>(&runningUpdate1))
.WillOnce(FutureArg<1>(&runningUpdate2))
.WillOnce(FutureArg<1>(&finishedUpdate1))
@@ -669,14 +673,58 @@ TEST_P(SchedulerTest, TaskGroupRunning)
EXPECT_EQ(devolve(task2.task_id()),
runTaskGroupMessage->task_group().tasks(1).task_id());
+ AWAIT_READY(startingUpdate1);
+ ASSERT_EQ(v1::TASK_STARTING, startingUpdate1->status().state());
+
+ AWAIT_READY(startingUpdate2);
+ ASSERT_EQ(v1::TASK_STARTING, startingUpdate2->status().state());
+
+ const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
+
+ // TASK_STARTING updates for the tasks in a
+ // task group can be received in any order.
+ const hashset<v1::TaskID> tasksStarting{
+ startingUpdate1->status().task_id(),
+ startingUpdate2->status().task_id()};
+
+ ASSERT_EQ(tasks, tasksStarting);
+
+ // Acknowledge the TASK_STARTING updates so
+ // that subsequent updates can be received.
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_task_id()->CopyFrom(
+ startingUpdate1->status().task_id());
+ acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
+ acknowledge->set_uuid(startingUpdate1->status().uuid());
+
+ mesos.send(call);
+ }
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_task_id()->CopyFrom(
+ startingUpdate2->status().task_id());
+ acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
+ acknowledge->set_uuid(startingUpdate2->status().uuid());
+
+ mesos.send(call);
+ }
+
AWAIT_READY(runningUpdate1);
ASSERT_EQ(v1::TASK_RUNNING, runningUpdate1->status().state());
AWAIT_READY(runningUpdate2);
ASSERT_EQ(v1::TASK_RUNNING, runningUpdate2->status().state());
- const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
-
// TASK_RUNNING updates for the tasks in a
// task group can be received in any order.
const hashset<v1::TaskID> tasksRunning{
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/slave_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_authorization_tests.cpp b/src/tests/slave_authorization_tests.cpp
index 868e39e..2dcfd6c 100644
--- a/src/tests/slave_authorization_tests.cpp
+++ b/src/tests/slave_authorization_tests.cpp
@@ -627,10 +627,12 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
// The first task should fail since the task user `foo` is not an
// authorized user that can launch a task. However, the second task
// should succeed.
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
@@ -638,16 +640,19 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
{offer.id()},
{LAUNCH({task1, task2})});
- // Wait for TASK_FAILED for 1st task, and TASK_RUNNING for 2nd task.
+ // Wait for TASK_FAILED for 1st task, and TASK_STARTING followed by
+ // TASK_RUNNING for 2nd task.
+ AWAIT_READY(status0);
AWAIT_READY(status1);
AWAIT_READY(status2);
// Validate both the statuses. Note that the order of receiving the
- // status updates for the 2 tasks is not deterministic.
- hashmap<TaskID, TaskStatus> statuses {
- {status1->task_id(), status1.get()},
- {status2->task_id(), status2.get()}
- };
+ // status updates for the 2 tasks is not deterministic, but we know
+ // that task2's TASK_RUNNING ARRIVES after TASK_STARTING.
+ hashmap<TaskID, TaskStatus> statuses;
+ statuses[status0->task_id()] = status0.get();
+ statuses[status1->task_id()] = status1.get();
+ statuses[status2->task_id()] = status2.get();
ASSERT_TRUE(statuses.contains(task1.task_id()));
EXPECT_EQ(TASK_ERROR, statuses.at(task1.task_id()).state());
@@ -741,7 +746,7 @@ TEST_F(ExecutorAuthorizationTest, RunTaskGroup)
AWAIT_READY(status);
ASSERT_EQ(task.task_id(), status->task_id());
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
driver.stop();
driver.join();
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 30d8c23..c2d9cc8 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -252,7 +252,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
// Capture the update.
AWAIT_READY(update);
- EXPECT_EQ(TASK_RUNNING, update->update().status().state());
+ EXPECT_EQ(TASK_STARTING, update->update().status().state());
// Wait for the ACK to be checkpointed.
AWAIT_READY(_statusUpdateAcknowledgement);
@@ -314,7 +314,9 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
.info);
// Check status update and ack.
- ASSERT_EQ(
+ // (the number might be bigger than 1 because we might have
+ // received any number of additional TASK_RUNNING updates)
+ ASSERT_LE(
1U,
state
.frameworks[frameworkId]
@@ -423,7 +425,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
ASSERT_SOME(slave);
AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
driver.stop();
driver.join();
@@ -514,7 +516,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_ReconnectHTTPExecutor)
// Scheduler should receive the recovered update.
AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
driver.stop();
driver.join();
@@ -762,13 +764,13 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
// Drop the first update from the executor.
- Future<StatusUpdateMessage> statusUpdate =
+ Future<StatusUpdateMessage> startingUpdate =
DROP_PROTOBUF(StatusUpdateMessage(), _, _);
driver.launchTasks(offers.get()[0].id(), {task});
// Stop the slave before the status update is received.
- AWAIT_READY(statusUpdate);
+ AWAIT_READY(startingUpdate);
slave.get()->terminate();
@@ -791,15 +793,15 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
// Ensure the executor re-registers.
AWAIT_READY(reregister);
- // Executor should inform about the unacknowledged update.
- ASSERT_EQ(1, reregister->updates_size());
+ // Executor should inform about the unacknowledged updates.
+ ASSERT_LE(1, reregister->updates_size());
const StatusUpdate& update = reregister->updates(0);
EXPECT_EQ(task.task_id(), update.status().task_id());
- EXPECT_EQ(TASK_RUNNING, update.status().state());
+ EXPECT_EQ(TASK_STARTING, update.status().state());
// Scheduler should receive the recovered update.
AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
driver.stop();
driver.join();
@@ -846,7 +848,8 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry)
Future<TaskStatus> statusUpdate;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&statusUpdate));
+ .WillOnce(FutureArg<1>(&statusUpdate))
+ .WillRepeatedly(Return()); // Ignore subsequent TASK_RUNNING updates.
driver.start();
@@ -863,7 +866,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry)
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(statusUpdate);
- EXPECT_EQ(TASK_RUNNING, statusUpdate->state());
+ EXPECT_EQ(TASK_STARTING, statusUpdate->state());
// Ensure the acknowledgement is checkpointed.
Clock::settle();
@@ -974,12 +977,19 @@ TYPED_TEST(SlaveRecoveryTest, PingTimeoutDuringRecovery)
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+ Future<TaskStatus> statusUpdate0;
Future<TaskStatus> statusUpdate1;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusUpdate0))
.WillOnce(FutureArg<1>(&statusUpdate1));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusUpdate0);
+ ASSERT_EQ(TASK_STARTING, statusUpdate0->state());
+
+ driver.acknowledgeStatusUpdate(statusUpdate0.get());
+
AWAIT_READY(statusUpdate1);
ASSERT_EQ(TASK_RUNNING, statusUpdate1->state());
@@ -1442,7 +1452,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedHTTPExecutor)
Future<vector<Offer>> offers1;
EXPECT_CALL(sched, resourceOffers(_, _))
- .WillOnce(FutureArg<1>(&offers1));
+ .WillOnce(FutureArg<1>(&offers1))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
@@ -1451,7 +1462,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedHTTPExecutor)
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillRepeatedly(Return()); // Allow any number of subsequent status updates.
Future<Nothing> ack =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
@@ -1583,7 +1595,8 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
Future<vector<Offer>> offers1;
EXPECT_CALL(sched, resourceOffers(_, _))
- .WillOnce(FutureArg<1>(&offers1));
+ .WillOnce(FutureArg<1>(&offers1))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
driver.start();
@@ -1595,25 +1608,42 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
Future<Message> registerExecutor =
FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
- EXPECT_CALL(sched, statusUpdate(_, _));
- Future<Nothing> ack =
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillRepeatedly(Return()); // Ignore subsequent status updates.
+
+ Future<Nothing> ackRunning =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+ Future<Nothing> ackStarting =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
+ // Wait for the TASK_STARTING update from the executor
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ // Wait for the TASK_RUNNING update from the executor
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
// Capture the executor pid.
AWAIT_READY(registerExecutor);
UPID executorPid = registerExecutor->from;
// Wait for the ACK to be checkpointed.
- AWAIT_READY(ack);
+ AWAIT_READY(ackStarting);
+ AWAIT_READY(ackRunning);
slave.get()->terminate();
- Future<TaskStatus> status;
+ Future<TaskStatus> statusLost;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status))
+ .WillOnce(FutureArg<1>(&statusLost))
.WillRepeatedly(Return()); // Ignore subsequent status updates.
// Now shut down the executor, when the slave is down.
@@ -1644,18 +1674,18 @@ TYPED_TEST(SlaveRecoveryTest, RecoverTerminatedExecutor)
Clock::advance(flags.executor_reregistration_timeout);
// Now advance time until the reaper reaps the executor.
- while (status.isPending()) {
+ while (statusLost.isPending()) {
Clock::advance(process::MAX_REAP_INTERVAL());
Clock::settle();
}
// Scheduler should receive the TASK_LOST update.
- AWAIT_READY(status);
+ AWAIT_READY(statusLost);
- EXPECT_EQ(TASK_LOST, status->state());
- EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status->source());
+ EXPECT_EQ(TASK_LOST, statusLost->state());
+ EXPECT_EQ(TaskStatus::SOURCE_SLAVE, statusLost->source());
EXPECT_EQ(TaskStatus::REASON_EXECUTOR_REREGISTRATION_TIMEOUT,
- status->reason());
+ statusLost->reason());
while (offers2.isPending()) {
Clock::advance(Seconds(1));
@@ -1816,7 +1846,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
TaskInfo task = createTask(offers1.get()[0], "exit 0");
EXPECT_CALL(sched, statusUpdate(_, _))
- .Times(2); // TASK_RUNNING and TASK_FINISHED updates.
+ .Times(3); // TASK_STARTING, TASK_RUNNING and TASK_FINISHED updates.
EXPECT_CALL(sched, offerRescinded(_, _))
.Times(AtMost(1));
@@ -2010,15 +2040,22 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expect TASK_STARTING and TASK_RUNNING updates
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2)
+ .WillRepeatedly(Return()); // Ignore subsequent updates
- Future<Nothing> ack =
+ Future<Nothing> ackRunning =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> ackStarting =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers.get()[0].id(), {task});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(ack);
+ AWAIT_READY(ackStarting);
+ AWAIT_READY(ackRunning);
slave.get()->terminate();
@@ -2129,17 +2166,23 @@ TYPED_TEST(SlaveRecoveryTest, RemoveNonCheckpointingFramework)
Resources(offer1.resources()) +
Resources(offer2.resources())));
+ Future<Nothing> update0;
Future<Nothing> update1;
Future<Nothing> update2;
+ Future<Nothing> update3;
EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureSatisfy(&update0))
.WillOnce(FutureSatisfy(&update1))
- .WillOnce(FutureSatisfy(&update2));
+ .WillOnce(FutureSatisfy(&update2))
+ .WillOnce(FutureSatisfy(&update3));
driver.launchTasks(offers.get()[0].id(), tasks);
- // Wait for TASK_RUNNING updates from the tasks.
+ // Wait for TASK_STARTING and TASK_RUNNING updates from the tasks.
+ AWAIT_READY(update0);
AWAIT_READY(update1);
AWAIT_READY(update2);
+ AWAIT_READY(update3);
// The master should generate TASK_LOST updates once the slave is stopped.
Future<TaskStatus> status1;
@@ -2439,15 +2482,21 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expect a TASK_STARTING and a TASK_RUNNING update
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> ack =
+ Future<Nothing> ack1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> ack2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(ack);
+ AWAIT_READY(ack1);
+ AWAIT_READY(ack2);
slave.get()->terminate();
@@ -3068,9 +3117,10 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- Future<Nothing> statusUpdate1;
+ Future<Nothing> statusUpdate1, statusUpdate2;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureSatisfy(&statusUpdate1))
+ .WillOnce(FutureSatisfy(&statusUpdate1)) // TASK_STARTING
+ .WillOnce(FutureSatisfy(&statusUpdate2)) // TASK_RUNNING
.WillOnce(Return()); // Ignore TASK_FAILED update.
Future<Message> registerExecutor =
@@ -3082,7 +3132,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
AWAIT_READY(registerExecutor);
UPID executorPid = registerExecutor->from;
- AWAIT_READY(statusUpdate1); // Wait for TASK_RUNNING update.
+ AWAIT_READY(statusUpdate2); // Wait for TASK_RUNNING update.
EXPECT_CALL(sched, offerRescinded(_, _))
.Times(AtMost(1));
@@ -3186,18 +3236,22 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting, statusRunning;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
- AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
- Future<TaskStatus> status2;
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+ Future<TaskStatus> statusLost;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status2));
+ .WillOnce(FutureArg<1>(&statusLost));
Future<Nothing> slaveLost;
EXPECT_CALL(sched, slaveLost(_, _))
@@ -3223,11 +3277,11 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
AWAIT_READY(executorTerminated);
// The master should send a TASK_LOST and slaveLost.
- AWAIT_READY(status2);
+ AWAIT_READY(statusLost);
- EXPECT_EQ(TASK_LOST, status2->state());
- EXPECT_EQ(TaskStatus::SOURCE_MASTER, status2->source());
- EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status2->reason());
+ EXPECT_EQ(TASK_LOST, statusLost->state());
+ EXPECT_EQ(TaskStatus::SOURCE_MASTER, statusLost->source());
+ EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, statusLost->reason());
AWAIT_READY(slaveLost);
@@ -3415,16 +3469,21 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
SlaveID slaveId = offers1.get()[0].slave_id();
FrameworkID frameworkId = offers1.get()[0].framework_id();
- // Expecting TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expecting TASK_STARTING and TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
// Wait for TASK_RUNNING update to be acknowledged.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -3513,18 +3572,24 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileShutdownFramework)
// Capture the framework id.
FrameworkID frameworkId = offers.get()[0].framework_id();
- // Expecting TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expecting a TASK_STARTING and a TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2)
+ .WillRepeatedly(Return());
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
driver.launchTasks(offers.get()[0].id(), {task});
- // Wait for TASK_RUNNING update to be acknowledged.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ // Wait for the updates to be acknowledged.
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -3661,15 +3726,30 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileTasksMissingFromSlave)
// re-registers by wiping the relevant meta directory.
TaskInfo task = createTask(offers1.get()[0], "sleep 10");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ Future<TaskStatus> starting;
+ Future<TaskStatus> running;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&starting))
+ .WillOnce(FutureArg<1>(&running))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> startingAck =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> runningAck =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(starting);
+ AWAIT_READY(startingAck);
+
+ AWAIT_READY(running);
+ AWAIT_READY(runningAck);
+
+ EXPECT_EQ(TASK_STARTING, starting->state());
+ EXPECT_EQ(TASK_RUNNING, running->state());
EXPECT_CALL(allocator, deactivateSlave(_));
@@ -3824,15 +3904,21 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
// Create a long running task.
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched1, statusUpdate(_, _));
+ // Expecting TASK_STARTING and TASK_RUNNING updates
+ EXPECT_CALL(sched1, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver1.launchTasks(offers1.get()[0].id(), {task});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -3975,15 +4061,20 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2); // TASK_STARTING and TASK_RUNNING
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
- // Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ // Wait for both ACKs to be checkpointed.
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -4117,15 +4208,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
// Framework 1 launches a task.
TaskInfo task1 = createTask(offer1, "sleep 1000");
- EXPECT_CALL(sched1, statusUpdate(_, _));
+ EXPECT_CALL(sched1, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement1 =
+ Future<Nothing> _startingStatusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _runningStatusUpdateAcknowledgement1 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver1.launchTasks(offer1.id(), {task1});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_startingStatusUpdateAcknowledgement1);
+ AWAIT_READY(_runningStatusUpdateAcknowledgement1);
// Framework 2. Enable checkpointing.
FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
@@ -4150,14 +4246,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
// Framework 2 launches a task.
TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
- EXPECT_CALL(sched2, statusUpdate(_, _));
+ EXPECT_CALL(sched2, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement2 =
+ Future<Nothing> _startingStatusUpdateAcknowledgement2 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _runningStatusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
driver2.launchTasks(offers2.get()[0].id(), {task2});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement2);
+ AWAIT_READY(_startingStatusUpdateAcknowledgement2);
+ AWAIT_READY(_runningStatusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -4281,15 +4383,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
// Launch a long running task in the first slave.
TaskInfo task1 = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement1 =
+ Future<Nothing> _startingStatusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(slave1.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _runningStatusUpdateAcknowledgement1 =
FUTURE_DISPATCH(slave1.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task1});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_startingStatusUpdateAcknowledgement1);
+ AWAIT_READY(_runningStatusUpdateAcknowledgement1);
Future<vector<Offer>> offers2;
EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -4316,15 +4423,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
// Launch a long running task in each slave.
TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement2 =
+ Future<Nothing> _startingStatusUpdateAcknowledgement2 =
+ FUTURE_DISPATCH(slave2.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _runningStatusUpdateAcknowledgement2 =
FUTURE_DISPATCH(slave2.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers2.get()[0].id(), {task2});
// Wait for the ACKs to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement2);
+ AWAIT_READY(_startingStatusUpdateAcknowledgement2);
+ AWAIT_READY(_runningStatusUpdateAcknowledgement2);
slave1.get()->terminate();
slave2.get()->terminate();
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 76a157f..def64b5 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -680,16 +680,22 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, CommandTaskWithArguments)
task.mutable_command()->MergeFrom(command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
- // Scheduler should first receive TASK_RUNNING followed by the
- // TASK_FINISHED from the executor.
+ // Scheduler should first receive TASK_STARTING, followed by
+ // TASK_RUNNING and TASK_FINISHED from the executor.
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusStarting->source());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning->source());
@@ -746,12 +752,17 @@ TEST_F(SlaveTest, CommandTaskWithKillPolicy)
task.mutable_kill_policy()->mutable_grace_period()->set_nanoseconds(
gracePeriod.ns());
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -1051,16 +1062,22 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
task.mutable_command()->MergeFrom(command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
- // Scheduler should first receive TASK_RUNNING followed by the
- // TASK_FINISHED from the executor.
+ // Scheduler should first receive TASK_STARTING followed by
+ // TASK_RUNNING and TASK_FINISHED from the executor.
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusStarting->source());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning->source());
@@ -2381,15 +2398,21 @@ TEST_F(SlaveTest, StatisticsEndpointRunningExecutor)
Resources::parse("cpus:1;mem:32").get(),
SLEEP_COMMAND(1000));
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offer.id(), {task});
- AWAIT_READY(status);
- EXPECT_EQ(task.task_id(), status->task_id());
- EXPECT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(task.task_id(), statusRunning->task_id());
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// Hit the statistics endpoint and expect the response contains the
// resource statistics for the running container.
@@ -5207,16 +5230,21 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorEnvironmentVariables)
task.mutable_command()->MergeFrom(command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
- // Scheduler should first receive TASK_RUNNING followed by the
- // TASK_FINISHED from the executor.
+ // Scheduler should first receive TASK_STARTING, followed by
+ // TASK_STARTING and TASK_FINISHED from the executor.
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -5562,7 +5590,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, HTTPSchedulerSlaveRestart)
UPID executorPid = registerExecutorMessage->from;
AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
// Restart the slave.
slave.get()->terminate();
@@ -6829,14 +6857,23 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
AWAIT_READY(offers);
ASSERT_FALSE(offers->offers().empty());
- Future<v1::scheduler::Event::Update> update;
-
- EXPECT_CALL(*scheduler, update(_, _))
- .WillOnce(FutureArg<1>(&update));
-
const v1::Offer offer = offers->offers(0);
const v1::AgentID& agentId = offer.agent_id();
+ Future<v1::scheduler::Event::Update> updateStarting;
+ Future<v1::scheduler::Event::Update> updateRunning;
+
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&updateStarting),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillOnce(
+ DoAll(
+ FutureArg<1>(&updateRunning),
+ v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
v1::Resources resources =
v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
@@ -6878,28 +6915,15 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
mesos.send(call);
}
- AWAIT_READY(update);
-
- ASSERT_EQ(TASK_RUNNING, update->status().state());
- ASSERT_EQ(taskInfo.task_id(), update->status().task_id());
-
- Future<Nothing> _statusUpdateAcknowledgement =
- FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
-
- {
- Call call;
- call.mutable_framework_id()->CopyFrom(frameworkId);
- call.set_type(Call::ACKNOWLEDGE);
+ AWAIT_READY(updateStarting);
- Call::Acknowledge* acknowledge = call.mutable_acknowledge();
- acknowledge->mutable_task_id()->CopyFrom(update->status().task_id());
- acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
- acknowledge->set_uuid(update->status().uuid());
+ ASSERT_EQ(TASK_STARTING, updateStarting->status().state());
+ ASSERT_EQ(taskInfo.task_id(), updateStarting->status().task_id());
- mesos.send(call);
- }
+ AWAIT_READY(updateRunning);
- AWAIT_READY(_statusUpdateAcknowledgement);
+ ASSERT_EQ(TASK_RUNNING, updateRunning->status().state());
+ ASSERT_EQ(taskInfo.task_id(), updateRunning->status().task_id());
// Restart the agent.
slave.get()->terminate();
@@ -8177,12 +8201,19 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorReregistrationTimeoutFlag)
TaskInfo task = createTask(offers->front(), "sleep 1000");
+ Future<TaskStatus> statusUpdate0;
Future<TaskStatus> statusUpdate1;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusUpdate0))
.WillOnce(FutureArg<1>(&statusUpdate1));
driver.launchTasks(offers->front().id(), {task});
+ AWAIT_READY(statusUpdate0);
+ ASSERT_EQ(TASK_STARTING, statusUpdate0->state());
+
+ driver.acknowledgeStatusUpdate(statusUpdate0.get());
+
AWAIT_READY(statusUpdate1);
ASSERT_EQ(TASK_RUNNING, statusUpdate1->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/37053061/src/tests/teardown_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/teardown_tests.cpp b/src/tests/teardown_tests.cpp
index 5eada4f..392cacf 100644
--- a/src/tests/teardown_tests.cpp
+++ b/src/tests/teardown_tests.cpp
@@ -353,20 +353,31 @@ TEST_F(TeardownTest, RecoveredFrameworkAfterMasterFailover)
TaskInfo task = createTask(offers.get()[0], "sleep 100");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Simulate master failover. We leave the scheduler without a master
// so it does not attempt to re-register.