You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/10/12 15:52:57 UTC
[2/4] mesos git commit: Fix unit tests that were broken by the
additional TASK_STARTING update.
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index cd98b8f..d262bbe 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -757,7 +757,7 @@ TEST_F(OversubscriptionTest, FixedResourceEstimator)
AWAIT_READY(status);
EXPECT_EQ(task.task_id(), status->task_id());
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
// Advance the clock for the slave to trigger the calculation of the
// total oversubscribed resources. As we described above, we don't
@@ -1023,15 +1023,20 @@ TEST_F(OversubscriptionTest, QoSCorrectionKill)
TaskInfo task = createTask(offers.get()[0], "sleep 10");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(status0);
+ ASSERT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
ASSERT_EQ(TASK_RUNNING, status1->state());
@@ -1132,15 +1137,20 @@ TEST_F(OversubscriptionTest, QoSCorrectionKillPartitionAware)
TaskInfo task = createTask(offers.get()[0], "sleep 10");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
.WillRepeatedly(Return()); // Ignore subsequent updates.
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(status0);
+ ASSERT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
ASSERT_EQ(TASK_RUNNING, status1->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 0597bd2..7b11264 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -215,22 +215,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlavePartitionAware)
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
- const SlaveID& slaveId = runningStatus->slave_id();
+ AWAIT_READY(statusUpdateAck2);
- AWAIT_READY(statusUpdateAck);
+ const SlaveID& slaveId = startingStatus->slave_id();
// Now, induce a partition of the slave by having the master
// timeout the slave.
@@ -572,8 +583,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, ReregisterSlaveNotPartitionAware)
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
@@ -874,8 +887,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
// Launch `task1` using `sched1`.
TaskInfo task1 = createTask(offer.slave_id(), taskResources, "sleep 60");
+ Future<TaskStatus> startingStatus1;
Future<TaskStatus> runningStatus1;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&startingStatus1))
.WillOnce(FutureArg<1>(&runningStatus1));
Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
@@ -917,8 +932,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(
// Launch the second task.
TaskInfo task2 = createTask(offer.slave_id(), taskResources, "sleep 60");
+ Future<TaskStatus> startingStatus2;
Future<TaskStatus> runningStatus2;
EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+ .WillOnce(FutureArg<1>(&startingStatus2))
.WillOnce(FutureArg<1>(&runningStatus2));
Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
@@ -1136,22 +1153,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, PartitionedSlaveOrphanedTask)
// Launch `task` using `sched`.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID& slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Now, induce a partition of the slave by having the master
// timeout the slave.
@@ -1404,22 +1432,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, DisconnectedFramework)
// Launch `task` using `sched1`.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
driver1.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID& slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck1);
+ AWAIT_READY(statusUpdateAck2);
// Shutdown the master.
master->reset();
@@ -1573,22 +1612,33 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, SpuriousSlaveReregistration)
// Launch `task` using `sched`.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID& slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Simulate a master loss event at the slave and then cause the
// slave to reregister with the master. From the master's
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/persistent_volume_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_endpoints_tests.cpp b/src/tests/persistent_volume_endpoints_tests.cpp
index 7a24bf4..abc6dbb 100644
--- a/src/tests/persistent_volume_endpoints_tests.cpp
+++ b/src/tests/persistent_volume_endpoints_tests.cpp
@@ -1987,9 +1987,13 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
filters.set_refuse_seconds(0);
// Expect a TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(&driver, _));
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
// Expect another resource offer.
@@ -1999,7 +2003,8 @@ TEST_F(PersistentVolumeEndpointsTest, SlavesEndpointFullResources)
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})}, filters);
// Wait for TASK_RUNNING update ack.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
response = process::http::get(
master.get()->pid,
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/persistent_volume_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/persistent_volume_tests.cpp b/src/tests/persistent_volume_tests.cpp
index 1b35af4..65d86de 100644
--- a/src/tests/persistent_volume_tests.cpp
+++ b/src/tests/persistent_volume_tests.cpp
@@ -806,9 +806,11 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume)
taskResources,
"echo abc > path1/file");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
@@ -823,6 +825,10 @@ TEST_P(PersistentVolumeTest, AccessPersistentVolume)
{CREATE(volume),
LAUNCH({task})});
+ AWAIT_READY(status0);
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
EXPECT_EQ(TASK_RUNNING, status1->state());
@@ -982,21 +988,25 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks)
taskResources2.get() + volume,
"echo task2 > path1/file2");
- // We should receive a TASK_RUNNING followed by a TASK_FINISHED for
- // each of the 2 tasks. We do not check for the actual task state
- // since it's not the primary objective of the test. We instead
- // verify that the paths are created by the tasks after we receive
- // enough status updates.
+ // We should receive a TASK_STARTING, followed by a TASK_RUNNING
+ // and a TASK_FINISHED for each of the 2 tasks.
+ // We do not check for the actual task state since it's not the
+ // primary objective of the test. We instead verify that the paths
+ // are created by the tasks after we receive enough status updates.
Future<TaskStatus> status1;
Future<TaskStatus> status2;
Future<TaskStatus> status3;
Future<TaskStatus> status4;
+ Future<TaskStatus> status5;
+ Future<TaskStatus> status6;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
.WillOnce(FutureArg<1>(&status3))
- .WillOnce(FutureArg<1>(&status4));
+ .WillOnce(FutureArg<1>(&status4))
+ .WillOnce(FutureArg<1>(&status5))
+ .WillOnce(FutureArg<1>(&status6));
driver.acceptOffers(
{offer.id()},
@@ -1009,6 +1019,8 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleTasks)
AWAIT_READY(status2);
AWAIT_READY(status3);
AWAIT_READY(status4);
+ AWAIT_READY(status5);
+ AWAIT_READY(status6);
const string& volumePath = slave::paths::getPersistentVolumePath(
slaveFlags.work_dir,
@@ -1256,10 +1268,12 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
Resources::parse("cpus:1;mem:128").get() + volume,
"echo abc > path1/file1 && sleep 1000");
- // We should receive a TASK_RUNNING for the launched task.
+ // We should receive a TASK_STARTING and a TASK_RUNNING for the launched task.
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1));
// We use a filter of 0 seconds so the resources will be available
@@ -1273,6 +1287,9 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
LAUNCH({task1})},
filters);
+ AWAIT_READY(status0);
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(TASK_RUNNING, status1->state());
@@ -1329,11 +1346,13 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
Resources::parse("cpus:1;mem:256").get() + volume,
"echo abc > path1/file2 && sleep 1000");
- // We should receive a TASK_RUNNING for the launched task.
+ // We should receive a TASK_STARTING and a TASK_RUNNING for the launched task.
Future<TaskStatus> status2;
+ Future<TaskStatus> status3;
EXPECT_CALL(sched2, statusUpdate(&driver2, _))
- .WillOnce(FutureArg<1>(&status2));
+ .WillOnce(FutureArg<1>(&status2))
+ .WillOnce(FutureArg<1>(&status3));
driver2.acceptOffers(
{offer2.id()},
@@ -1341,7 +1360,10 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMultipleFrameworks)
filters);
AWAIT_READY(status2);
- EXPECT_EQ(TASK_RUNNING, status2->state());
+ EXPECT_EQ(TASK_STARTING, status2->state());
+
+ AWAIT_READY(status3);
+ EXPECT_EQ(TASK_RUNNING, status3->state());
// Collect metrics based on both frameworks. Note that the `cpus_used` and
// `mem_used` is updated, but `disk_used` does not change since both tasks
@@ -1432,13 +1454,17 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover)
taskResources.get() + volume,
"sleep 1000");
- // We should receive a TASK_RUNNING for each of the tasks.
+ // We should receive a TASK_STARTING and a TASK_RUNNING for each of the tasks.
Future<TaskStatus> status1;
Future<TaskStatus> status2;
+ Future<TaskStatus> status3;
+ Future<TaskStatus> status4;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1))
- .WillOnce(FutureArg<1>(&status2));
+ .WillOnce(FutureArg<1>(&status2))
+ .WillOnce(FutureArg<1>(&status3))
+ .WillOnce(FutureArg<1>(&status4));
Future<CheckpointResourcesMessage> checkpointResources =
FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get()->pid);
@@ -1449,11 +1475,14 @@ TEST_P(PersistentVolumeTest, SharedPersistentVolumeMasterFailover)
LAUNCH({task1, task2})});
AWAIT_READY(checkpointResources);
+
+ // We only check the first and the last status, because the two in between
+ // could arrive in any order.
AWAIT_READY(status1);
- EXPECT_EQ(TASK_RUNNING, status1->state());
+ EXPECT_EQ(TASK_STARTING, status1->state());
- AWAIT_READY(status2);
- EXPECT_EQ(TASK_RUNNING, status2->state());
+ AWAIT_READY(status4);
+ EXPECT_EQ(TASK_RUNNING, status4->state());
// This is to make sure CheckpointResourcesMessage is processed.
Clock::pause();
@@ -1596,16 +1625,20 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
- // We should receive a TASK_RUNNING each of the 2 tasks. We track task
- // termination by a TASK_FINISHED for the short-lived task.
+ // We should receive a TASK_STARTING and a TASK_RUNNING each of the 2 tasks.
+ // We track task termination by a TASK_FINISHED for the short-lived task.
Future<TaskStatus> status1;
Future<TaskStatus> status2;
Future<TaskStatus> status3;
+ Future<TaskStatus> status4;
+ Future<TaskStatus> status5;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2))
- .WillOnce(FutureArg<1>(&status3));
+ .WillOnce(FutureArg<1>(&status3))
+ .WillOnce(FutureArg<1>(&status4))
+ .WillOnce(FutureArg<1>(&status5));
driver.acceptOffers(
{offer.id()},
@@ -1614,18 +1647,23 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
LAUNCH({task1, task2})},
filters);
- // Wait for TASK_RUNNING for both the tasks, and TASK_FINISHED for
- // the short-lived task.
+ // Wait for TASK_STARTING and TASK_RUNNING for both the tasks,
+ // and TASK_FINISHED for the short-lived task.
AWAIT_READY(status1);
AWAIT_READY(status2);
AWAIT_READY(status3);
+ AWAIT_READY(status4);
+ AWAIT_READY(status5);
hashset<TaskID> tasksRunning;
hashset<TaskID> tasksFinished;
- vector<Future<TaskStatus>> statuses{status1, status2, status3};
+ vector<Future<TaskStatus>> statuses {
+ status1, status2, status3, status4, status5};
foreach (const Future<TaskStatus>& status, statuses) {
- if (status->state() == TASK_RUNNING) {
+ if (status->state() == TASK_STARTING) {
+ // ignore
+ } else if (status->state() == TASK_RUNNING) {
tasksRunning.insert(status->task_id());
} else {
tasksFinished.insert(status->task_id());
@@ -1684,15 +1722,15 @@ TEST_P(PersistentVolumeTest, DestroyPersistentVolumeMultipleTasks)
// We kill the long-lived task and wait for TASK_KILLED, so we can
// DESTROY the persistent volume once the task terminates.
- Future<TaskStatus> status4;
+ Future<TaskStatus> status6;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status4));
+ .WillOnce(FutureArg<1>(&status6));
driver.killTask(task1.task_id());
- AWAIT_READY(status4);
- EXPECT_EQ(task1.task_id(), status4->task_id());
- EXPECT_EQ(TASK_KILLED, status4->state());
+ AWAIT_READY(status6);
+ EXPECT_EQ(task1.task_id(), status6->task_id());
+ EXPECT_EQ(TASK_KILLED, status6->state());
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
@@ -1921,25 +1959,37 @@ TEST_P(PersistentVolumeTest, SlaveRecovery)
taskResources,
"while true; do test -d path1; done");
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
- Future<Nothing> ack =
+ Future<Nothing> ack1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> ack2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.acceptOffers(
{offer.id()},
{CREATE(volume), LAUNCH({task})});
+ AWAIT_READY(status0);
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
+ // Wait for the ACK to be checkpointed.
+ AWAIT_READY(ack1);
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
EXPECT_EQ(TASK_RUNNING, status1->state());
// Wait for the ACK to be checkpointed.
- AWAIT_READY(ack);
+ AWAIT_READY(ack2);
// Restart the slave.
slave.get()->terminate();
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp
index 64a1d3d..8ae2860 100644
--- a/src/tests/reconciliation_tests.cpp
+++ b/src/tests/reconciliation_tests.cpp
@@ -1178,22 +1178,33 @@ TEST_F(ReconciliationTest, PartitionedAgentThenMasterFailover)
// Launch `task` using `sched`.
TaskInfo task = createTask(offer, "sleep 60");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
const SlaveID slaveId = runningStatus->slave_id();
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Now, induce a partition of the slave by having the master
// timeout the slave.
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/reservation_endpoints_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_endpoints_tests.cpp b/src/tests/reservation_endpoints_tests.cpp
index 5a6e9a7..409047c 100644
--- a/src/tests/reservation_endpoints_tests.cpp
+++ b/src/tests/reservation_endpoints_tests.cpp
@@ -371,6 +371,7 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+ // Expect one TASK_STARTING and one TASK_RUNNING update
EXPECT_CALL(sched, registered(&driver, _, _));
Future<vector<Offer>> offers;
@@ -400,16 +401,21 @@ TEST_F(ReservationEndpointsTest, ReserveAvailableAndOfferedResources)
// recovers 'offered' resources portion.
TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000");
- // Expect a TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expect a TASK_STARTING and a TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _)).
+ WillRepeatedly(Return());
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
- // Wait for TASK_RUNNING update ack.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ // Wait for update acks.
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
// Summon an offer to receive the 'offered' resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -547,16 +553,21 @@ TEST_F(ReservationEndpointsTest, UnreserveAvailableAndOfferedResources)
// recovers 'offered' resources portion.
TaskInfo taskInfo = createTask(offer.slave_id(), available, "sleep 1000");
- // Expect a TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expect a TASK_STARTING and a TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillRepeatedly(Return());
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.acceptOffers({offer.id()}, {LAUNCH({taskInfo})});
- // Wait for TASK_RUNNING update ack.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ // Wait for update acks from TASK_STARTING and TASK_RUNNING.
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
// Summon an offer to receive the 'offered' resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -1596,9 +1607,11 @@ TEST_F(ReservationEndpointsTest, AgentStateEndpointResources)
Offer offer = offers.get()[0];
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning));
Resources taskResources = Resources::parse(
"cpus(role):2;mem(role):512;cpus:2;mem:1024").get();
@@ -1607,8 +1620,11 @@ TEST_F(ReservationEndpointsTest, AgentStateEndpointResources)
driver.acceptOffers({offer.id()}, {LAUNCH({task})});
- AWAIT_READY(status);
- ASSERT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ ASSERT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
+ ASSERT_EQ(TASK_RUNNING, statusRunning->state());
Future<Response> response = process::http::get(
agent.get()->pid,
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/role_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/role_tests.cpp b/src/tests/role_tests.cpp
index fc4c017..852e2cc 100644
--- a/src/tests/role_tests.cpp
+++ b/src/tests/role_tests.cpp
@@ -986,9 +986,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(RoleTest, VolumesInOverlappingHierarchies)
taskResources,
"! (ls -Av path | grep -q .)");
- // We expect two status updates for the task.
- Future<TaskStatus> status1, status2;
+ // We expect three status updates for the task.
+ Future<TaskStatus> status0, status1, status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
@@ -997,6 +998,11 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(RoleTest, VolumesInOverlappingHierarchies)
{offer.id()},
{RESERVE(reservedDisk), CREATE(volume), LAUNCH({task})});
+ AWAIT_READY(status0);
+
+ EXPECT_EQ(task.task_id(), status0->task_id());
+ EXPECT_EQ(TASK_STARTING, status0->state());
+
AWAIT_READY(status1);
EXPECT_EQ(task.task_id(), status1->task_id());
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 4eda96e..6df4d32 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -623,11 +623,15 @@ TEST_P(SchedulerTest, TaskGroupRunning)
taskGroup.add_tasks()->CopyFrom(task1);
taskGroup.add_tasks()->CopyFrom(task2);
+ Future<Event::Update> startingUpdate1;
+ Future<Event::Update> startingUpdate2;
Future<Event::Update> runningUpdate1;
Future<Event::Update> runningUpdate2;
Future<Event::Update> finishedUpdate1;
Future<Event::Update> finishedUpdate2;
EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&startingUpdate1))
+ .WillOnce(FutureArg<1>(&startingUpdate2))
.WillOnce(FutureArg<1>(&runningUpdate1))
.WillOnce(FutureArg<1>(&runningUpdate2))
.WillOnce(FutureArg<1>(&finishedUpdate1))
@@ -669,14 +673,58 @@ TEST_P(SchedulerTest, TaskGroupRunning)
EXPECT_EQ(devolve(task2.task_id()),
runTaskGroupMessage->task_group().tasks(1).task_id());
+ AWAIT_READY(startingUpdate1);
+ ASSERT_EQ(v1::TASK_STARTING, startingUpdate1->status().state());
+
+ AWAIT_READY(startingUpdate2);
+ ASSERT_EQ(v1::TASK_STARTING, startingUpdate2->status().state());
+
+ const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
+
+ // TASK_STARTING updates for the tasks in a
+ // task group can be received in any order.
+ const hashset<v1::TaskID> tasksStarting{
+ startingUpdate1->status().task_id(),
+ startingUpdate2->status().task_id()};
+
+ ASSERT_EQ(tasks, tasksStarting);
+
+ // Acknowledge the TASK_STARTING updates so
+ // that subsequent updates can be received.
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_task_id()->CopyFrom(
+ startingUpdate1->status().task_id());
+ acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
+ acknowledge->set_uuid(startingUpdate1->status().uuid());
+
+ mesos.send(call);
+ }
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACKNOWLEDGE);
+
+ Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+ acknowledge->mutable_task_id()->CopyFrom(
+ startingUpdate2->status().task_id());
+ acknowledge->mutable_agent_id()->CopyFrom(offers->offers(0).agent_id());
+ acknowledge->set_uuid(startingUpdate2->status().uuid());
+
+ mesos.send(call);
+ }
+
AWAIT_READY(runningUpdate1);
ASSERT_EQ(v1::TASK_RUNNING, runningUpdate1->status().state());
AWAIT_READY(runningUpdate2);
ASSERT_EQ(v1::TASK_RUNNING, runningUpdate2->status().state());
- const hashset<v1::TaskID> tasks{task1.task_id(), task2.task_id()};
-
// TASK_RUNNING updates for the tasks in a
// task group can be received in any order.
const hashset<v1::TaskID> tasksRunning{
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/slave_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_authorization_tests.cpp b/src/tests/slave_authorization_tests.cpp
index 4c7d37f..23e9d0b 100644
--- a/src/tests/slave_authorization_tests.cpp
+++ b/src/tests/slave_authorization_tests.cpp
@@ -633,10 +633,12 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
// The first task should fail since the task user `foo` is not an
// authorized user that can launch a task. However, the second task
// should succeed.
+ Future<TaskStatus> status0;
Future<TaskStatus> status1;
Future<TaskStatus> status2;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status0))
.WillOnce(FutureArg<1>(&status1))
.WillOnce(FutureArg<1>(&status2));
@@ -644,16 +646,19 @@ TYPED_TEST(SlaveAuthorizerTest, AuthorizeRunTaskOnAgent)
{offer.id()},
{LAUNCH({task1, task2})});
- // Wait for TASK_FAILED for 1st task, and TASK_RUNNING for 2nd task.
+ // Wait for TASK_FAILED for 1st task, and TASK_STARTING followed by
+ // TASK_RUNNING for 2nd task.
+ AWAIT_READY(status0);
AWAIT_READY(status1);
AWAIT_READY(status2);
// Validate both the statuses. Note that the order of receiving the
- // status updates for the 2 tasks is not deterministic.
- hashmap<TaskID, TaskStatus> statuses {
- {status1->task_id(), status1.get()},
- {status2->task_id(), status2.get()}
- };
+ // status updates for the 2 tasks is not deterministic, but we know
+ // that task2's TASK_RUNNING ARRIVES after TASK_STARTING.
+ hashmap<TaskID, TaskStatus> statuses;
+ statuses[status0->task_id()] = status0.get();
+ statuses[status1->task_id()] = status1.get();
+ statuses[status2->task_id()] = status2.get();
ASSERT_TRUE(statuses.contains(task1.task_id()));
EXPECT_EQ(TASK_ERROR, statuses.at(task1.task_id()).state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 30d8c23..8f328e9 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -252,7 +252,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverSlaveState)
// Capture the update.
AWAIT_READY(update);
- EXPECT_EQ(TASK_RUNNING, update->update().status().state());
+ EXPECT_EQ(TASK_STARTING, update->update().status().state());
// Wait for the ACK to be checkpointed.
AWAIT_READY(_statusUpdateAcknowledgement);
@@ -423,7 +423,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverStatusUpdateManager)
ASSERT_SOME(slave);
AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
driver.stop();
driver.join();
@@ -514,7 +514,7 @@ TYPED_TEST(SlaveRecoveryTest, DISABLED_ReconnectHTTPExecutor)
// Scheduler should receive the recovered update.
AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
driver.stop();
driver.join();
@@ -762,13 +762,13 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
// Drop the first update from the executor.
- Future<StatusUpdateMessage> statusUpdate =
+ Future<StatusUpdateMessage> startingUpdate =
DROP_PROTOBUF(StatusUpdateMessage(), _, _);
driver.launchTasks(offers.get()[0].id(), {task});
// Stop the slave before the status update is received.
- AWAIT_READY(statusUpdate);
+ AWAIT_READY(startingUpdate);
slave.get()->terminate();
@@ -791,15 +791,15 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutor)
// Ensure the executor re-registers.
AWAIT_READY(reregister);
- // Executor should inform about the unacknowledged update.
- ASSERT_EQ(1, reregister->updates_size());
+ // Executor should inform about the unacknowledged updates.
+ ASSERT_EQ(2, reregister->updates_size());
const StatusUpdate& update = reregister->updates(0);
EXPECT_EQ(task.task_id(), update.status().task_id());
- EXPECT_EQ(TASK_RUNNING, update.status().state());
+ EXPECT_EQ(TASK_STARTING, update.status().state());
// Scheduler should receive the recovered update.
AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
driver.stop();
driver.join();
@@ -846,7 +846,8 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry)
Future<TaskStatus> statusUpdate;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&statusUpdate));
+ .WillOnce(FutureArg<1>(&statusUpdate))
+ .WillRepeatedly(Return()); // Ignore subsequent TASK_RUNNING updates.
driver.start();
@@ -863,7 +864,7 @@ TYPED_TEST(SlaveRecoveryTest, ReconnectExecutorRetry)
driver.launchTasks(offers.get()[0].id(), {task});
AWAIT_READY(statusUpdate);
- EXPECT_EQ(TASK_RUNNING, statusUpdate->state());
+ EXPECT_EQ(TASK_STARTING, statusUpdate->state());
// Ensure the acknowledgement is checkpointed.
Clock::settle();
@@ -974,12 +975,19 @@ TYPED_TEST(SlaveRecoveryTest, PingTimeoutDuringRecovery)
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
+ Future<TaskStatus> statusUpdate0;
Future<TaskStatus> statusUpdate1;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusUpdate0))
.WillOnce(FutureArg<1>(&statusUpdate1));
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(statusUpdate0);
+ ASSERT_EQ(TASK_STARTING, statusUpdate0->state());
+
+ driver.acknowledgeStatusUpdate(statusUpdate0.get());
+
AWAIT_READY(statusUpdate1);
ASSERT_EQ(TASK_RUNNING, statusUpdate1->state());
@@ -1816,7 +1824,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
TaskInfo task = createTask(offers1.get()[0], "exit 0");
EXPECT_CALL(sched, statusUpdate(_, _))
- .Times(2); // TASK_RUNNING and TASK_FINISHED updates.
+ .Times(3); // TASK_STARTING, TASK_RUNNING and TASK_FINISHED updates.
EXPECT_CALL(sched, offerRescinded(_, _))
.Times(AtMost(1));
@@ -2439,15 +2447,21 @@ TYPED_TEST(SlaveRecoveryTest, KillTask)
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expect a TASK_STARTING and a TASK_RUNNING update
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> ack =
+ Future<Nothing> ack1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> ack2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(ack);
+ AWAIT_READY(ack1);
+ AWAIT_READY(ack2);
slave.get()->terminate();
@@ -3068,9 +3082,10 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- Future<Nothing> statusUpdate1;
+ Future<Nothing> statusUpdate1, statusUpdate2;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureSatisfy(&statusUpdate1))
+ .WillOnce(FutureSatisfy(&statusUpdate1)) // TASK_STARTING
+ .WillOnce(FutureSatisfy(&statusUpdate2)) // TASK_RUNNING
.WillOnce(Return()); // Ignore TASK_FAILED update.
Future<Message> registerExecutor =
@@ -3082,7 +3097,7 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
AWAIT_READY(registerExecutor);
UPID executorPid = registerExecutor->from;
- AWAIT_READY(statusUpdate1); // Wait for TASK_RUNNING update.
+ AWAIT_READY(statusUpdate2); // Wait for TASK_RUNNING update.
EXPECT_CALL(sched, offerRescinded(_, _))
.Times(AtMost(1));
@@ -3186,18 +3201,22 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
TaskInfo task = createTask(offers.get()[0], "sleep 1000");
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting, statusRunning;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offers.get()[0].id(), {task});
- AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
- Future<TaskStatus> status2;
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+ Future<TaskStatus> statusLost;
EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status2));
+ .WillOnce(FutureArg<1>(&statusLost));
Future<Nothing> slaveLost;
EXPECT_CALL(sched, slaveLost(_, _))
@@ -3223,11 +3242,11 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlaveSIGUSR1)
AWAIT_READY(executorTerminated);
// The master should send a TASK_LOST and slaveLost.
- AWAIT_READY(status2);
+ AWAIT_READY(statusLost);
- EXPECT_EQ(TASK_LOST, status2->state());
- EXPECT_EQ(TaskStatus::SOURCE_MASTER, status2->source());
- EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status2->reason());
+ EXPECT_EQ(TASK_LOST, statusLost->state());
+ EXPECT_EQ(TaskStatus::SOURCE_MASTER, statusLost->source());
+ EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, statusLost->reason());
AWAIT_READY(slaveLost);
@@ -3415,16 +3434,21 @@ TYPED_TEST(SlaveRecoveryTest, ReconcileKillTask)
SlaveID slaveId = offers1.get()[0].slave_id();
FrameworkID frameworkId = offers1.get()[0].framework_id();
- // Expecting TASK_RUNNING status.
- EXPECT_CALL(sched, statusUpdate(_, _));
+ // Expecting TASK_STARTING and TASK_RUNNING status.
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
// Wait for TASK_RUNNING update to be acknowledged.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -3824,15 +3848,21 @@ TYPED_TEST(SlaveRecoveryTest, SchedulerFailover)
// Create a long running task.
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched1, statusUpdate(_, _));
+ // Expecting TASK_STARTING and TASK_RUNNING updates
+ EXPECT_CALL(sched1, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver1.launchTasks(offers1.get()[0].id(), {task});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -3975,15 +4005,20 @@ TYPED_TEST(SlaveRecoveryTest, MasterFailover)
TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement =
+ Future<Nothing> _statusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _statusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task});
- // Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement);
+ // Wait for both ACKs to be checkpointed.
+ AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_statusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -4117,15 +4152,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
// Framework 1 launches a task.
TaskInfo task1 = createTask(offer1, "sleep 1000");
- EXPECT_CALL(sched1, statusUpdate(_, _));
+ EXPECT_CALL(sched1, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement1 =
+ Future<Nothing> _startingStatusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _runningStatusUpdateAcknowledgement1 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
driver1.launchTasks(offer1.id(), {task1});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_startingStatusUpdateAcknowledgement1);
+ AWAIT_READY(_runningStatusUpdateAcknowledgement1);
// Framework 2. Enable checkpointing.
FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
@@ -4150,14 +4190,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleFrameworks)
// Framework 2 launches a task.
TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
- EXPECT_CALL(sched2, statusUpdate(_, _));
+ EXPECT_CALL(sched2, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement2 =
+ Future<Nothing> _startingStatusUpdateAcknowledgement2 =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _runningStatusUpdateAcknowledgement2 =
+ FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
driver2.launchTasks(offers2.get()[0].id(), {task2});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement2);
+ AWAIT_READY(_startingStatusUpdateAcknowledgement2);
+ AWAIT_READY(_runningStatusUpdateAcknowledgement2);
slave.get()->terminate();
@@ -4281,15 +4327,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
// Launch a long running task in the first slave.
TaskInfo task1 = createTask(offers1.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement1 =
+ Future<Nothing> _startingStatusUpdateAcknowledgement1 =
+ FUTURE_DISPATCH(slave1.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _runningStatusUpdateAcknowledgement1 =
FUTURE_DISPATCH(slave1.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers1.get()[0].id(), {task1});
// Wait for the ACK to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement1);
+ AWAIT_READY(_startingStatusUpdateAcknowledgement1);
+ AWAIT_READY(_runningStatusUpdateAcknowledgement1);
Future<vector<Offer>> offers2;
EXPECT_CALL(sched, resourceOffers(&driver, _))
@@ -4316,15 +4367,20 @@ TYPED_TEST(SlaveRecoveryTest, MultipleSlaves)
// Launch a long running task in each slave.
TaskInfo task2 = createTask(offers2.get()[0], "sleep 1000");
- EXPECT_CALL(sched, statusUpdate(_, _));
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .Times(2);
- Future<Nothing> _statusUpdateAcknowledgement2 =
+ Future<Nothing> _startingStatusUpdateAcknowledgement2 =
+ FUTURE_DISPATCH(slave2.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> _runningStatusUpdateAcknowledgement2 =
FUTURE_DISPATCH(slave2.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers2.get()[0].id(), {task2});
// Wait for the ACKs to be checkpointed.
- AWAIT_READY(_statusUpdateAcknowledgement2);
+ AWAIT_READY(_startingStatusUpdateAcknowledgement2);
+ AWAIT_READY(_runningStatusUpdateAcknowledgement2);
slave1.get()->terminate();
slave2.get()->terminate();
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 6d1e98d..67c8091 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -680,16 +680,22 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, CommandTaskWithArguments)
task.mutable_command()->MergeFrom(command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
- // Scheduler should first receive TASK_RUNNING followed by the
- // TASK_FINISHED from the executor.
+ // Scheduler should first receive TASK_STARTING, followed by
+ // TASK_RUNNING and TASK_FINISHED from the executor.
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+ EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusStarting->source());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
EXPECT_EQ(TaskStatus::SOURCE_EXECUTOR, statusRunning->source());
@@ -746,12 +752,17 @@ TEST_F(SlaveTest, CommandTaskWithKillPolicy)
task.mutable_kill_policy()->mutable_grace_period()->set_nanoseconds(
gracePeriod.ns());
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offer.id(), {task});
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -2381,15 +2392,21 @@ TEST_F(SlaveTest, StatisticsEndpointRunningExecutor)
Resources::parse("cpus:1;mem:32").get(),
SLEEP_COMMAND(1000));
- Future<TaskStatus> status;
+ Future<TaskStatus> statusStarting;
+ Future<TaskStatus> statusRunning;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&statusStarting))
+ .WillOnce(FutureArg<1>(&statusRunning));
driver.launchTasks(offer.id(), {task});
- AWAIT_READY(status);
- EXPECT_EQ(task.task_id(), status->task_id());
- EXPECT_EQ(TASK_RUNNING, status->state());
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(task.task_id(), statusStarting->task_id());
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(task.task_id(), statusRunning->task_id());
+ EXPECT_EQ(TASK_RUNNING, statusRunning->state());
// Hit the statistics endpoint and expect the response contains the
// resource statistics for the running container.
@@ -5210,16 +5227,21 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorEnvironmentVariables)
task.mutable_command()->MergeFrom(command);
+ Future<TaskStatus> statusStarting;
Future<TaskStatus> statusRunning;
Future<TaskStatus> statusFinished;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusStarting))
.WillOnce(FutureArg<1>(&statusRunning))
.WillOnce(FutureArg<1>(&statusFinished));
driver.launchTasks(offers.get()[0].id(), {task});
- // Scheduler should first receive TASK_RUNNING followed by the
- // TASK_FINISHED from the executor.
+ // Scheduler should first receive TASK_STARTING, followed by
+ // TASK_STARTING and TASK_FINISHED from the executor.
+ AWAIT_READY(statusStarting);
+ EXPECT_EQ(TASK_STARTING, statusStarting->state());
+
AWAIT_READY(statusRunning);
EXPECT_EQ(TASK_RUNNING, statusRunning->state());
@@ -5565,7 +5587,7 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, HTTPSchedulerSlaveRestart)
UPID executorPid = registerExecutorMessage->from;
AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status->state());
+ EXPECT_EQ(TASK_STARTING, status->state());
// Restart the slave.
slave.get()->terminate();
@@ -8179,12 +8201,19 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorReregistrationTimeoutFlag)
TaskInfo task = createTask(offers->front(), "sleep 1000");
+ Future<TaskStatus> statusUpdate0;
Future<TaskStatus> statusUpdate1;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusUpdate0))
.WillOnce(FutureArg<1>(&statusUpdate1));
driver.launchTasks(offers->front().id(), {task});
+ AWAIT_READY(statusUpdate0);
+ ASSERT_EQ(TASK_STARTING, statusUpdate0->state());
+
+ driver.acknowledgeStatusUpdate(statusUpdate0.get());
+
AWAIT_READY(statusUpdate1);
ASSERT_EQ(TASK_RUNNING, statusUpdate1->state());
http://git-wip-us.apache.org/repos/asf/mesos/blob/1e1e409b/src/tests/teardown_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/teardown_tests.cpp b/src/tests/teardown_tests.cpp
index 5eada4f..392cacf 100644
--- a/src/tests/teardown_tests.cpp
+++ b/src/tests/teardown_tests.cpp
@@ -353,20 +353,31 @@ TEST_F(TeardownTest, RecoveredFrameworkAfterMasterFailover)
TaskInfo task = createTask(offers.get()[0], "sleep 100");
+ Future<TaskStatus> startingStatus;
Future<TaskStatus> runningStatus;
EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&startingStatus))
.WillOnce(FutureArg<1>(&runningStatus));
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ Future<Nothing> statusUpdateAck2 = FUTURE_DISPATCH(
slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
driver.launchTasks(offers.get()[0].id(), {task});
+ AWAIT_READY(startingStatus);
+ EXPECT_EQ(TASK_STARTING, startingStatus->state());
+ EXPECT_EQ(task.task_id(), startingStatus->task_id());
+
+ AWAIT_READY(statusUpdateAck1);
+
AWAIT_READY(runningStatus);
EXPECT_EQ(TASK_RUNNING, runningStatus->state());
EXPECT_EQ(task.task_id(), runningStatus->task_id());
- AWAIT_READY(statusUpdateAck);
+ AWAIT_READY(statusUpdateAck2);
// Simulate master failover. We leave the scheduler without a master
// so it does not attempt to re-register.